Skip to main content

commonware_storage/journal/
authenticated.rs

1//! Authenticated journal implementation.
2//!
3//! An authenticated journal maintains a contiguous journal of items alongside a Merkle-family
4//! structure. The item at index i in the journal corresponds to the leaf at Location i in the
5//! Merkle structure. This structure enables efficient proofs that an item is included in the
6//! journal at a specific location.
7
8use crate::{
9    journal::{
10        contiguous::{fixed, variable, Contiguous, Many, Mutable, Reader},
11        Error as JournalError,
12    },
13    merkle::{
14        self, batch,
15        hasher::{Hasher as _, Standard as StandardHasher},
16        journaled::Journaled,
17        Family, Location, Position, Proof, Readable,
18    },
19    Context, Persistable,
20};
21use alloc::{
22    sync::{Arc, Weak},
23    vec::Vec,
24};
25use commonware_codec::{CodecFixedShared, CodecShared, Encode, EncodeShared};
26use commonware_cryptography::{Digest, Hasher};
27use core::num::NonZeroU64;
28use futures::{future::try_join_all, try_join, TryFutureExt as _};
29use thiserror::Error;
30use tracing::{debug, warn};
31
32/// Errors that can occur when interacting with an authenticated journal.
33#[derive(Error, Debug)]
34pub enum Error<F: Family> {
35    #[error("merkle error: {0}")]
36    Merkle(#[from] merkle::Error<F>),
37
38    #[error("journal error: {0}")]
39    Journal(#[from] super::Error),
40}
41
42/// A speculative batch whose root digest has not yet been computed,
43/// in contrast to [`MerkleizedBatch`].
44pub struct UnmerkleizedBatch<F: Family, H: Hasher, Item: Send + Sync> {
45    // The inner batch of Merkle leaf digests.
46    inner: batch::UnmerkleizedBatch<F, H::Digest>,
47    // The hasher to use for hashing the items.
48    hasher: StandardHasher<H>,
49    // The items to append from this batch.
50    items: Vec<Item>,
51    // This batch's parent, or None if the parent is the journal itself.
52    parent: Option<Arc<MerkleizedBatch<F, H::Digest, Item>>>,
53}
54
55impl<F: Family, H: Hasher, Item: Encode + Send + Sync> UnmerkleizedBatch<F, H, Item> {
56    /// Add an item to the batch.
57    #[allow(clippy::should_implement_trait)]
58    pub fn add(mut self, item: Item) -> Self {
59        let encoded = item.encode();
60        self.inner = self.inner.add(&self.hasher, &encoded);
61        self.items.push(item);
62        self
63    }
64
65    /// Collect ancestor items from the parent chain before downgrading.
66    fn collect_ancestor_items(
67        parent: &Option<Arc<MerkleizedBatch<F, H::Digest, Item>>>,
68    ) -> Vec<Arc<Vec<Item>>> {
69        let Some(parent) = parent else {
70            return Vec::new();
71        };
72        let mut items = Vec::new();
73        if !parent.items.is_empty() {
74            items.push(Arc::clone(&parent.items));
75        }
76        let mut current = parent.parent.as_ref().and_then(Weak::upgrade);
77        while let Some(batch) = current {
78            if !batch.items.is_empty() {
79                items.push(Arc::clone(&batch.items));
80            }
81            current = batch.parent.as_ref().and_then(Weak::upgrade);
82        }
83        items.reverse();
84        items
85    }
86
87    /// Merkleize the batch, computing the root digest.
88    /// `base` provides committed node data as fallback during hash computation.
89    pub fn merkleize(
90        self,
91        base: &merkle::mem::Mem<F, H::Digest>,
92    ) -> Arc<MerkleizedBatch<F, H::Digest, Item>> {
93        let merkle = self.inner.merkleize(base, &self.hasher);
94        let ancestor_items = Self::collect_ancestor_items(&self.parent);
95        Arc::new(MerkleizedBatch {
96            inner: merkle,
97            items: Arc::new(self.items),
98            parent: self.parent.as_ref().map(Arc::downgrade),
99            ancestor_items,
100        })
101    }
102
103    /// Like [`merkleize`](Self::merkleize), but the caller supplies the items instead of
104    /// accumulating them with [`add`](Self::add). The two approaches must not be mixed: do
105    /// not call [`add`](Self::add) before this method.
106    ///
107    /// The items are encoded and hashed into the Merkle structure, and the `Arc` is stored
108    /// directly in the resulting [`MerkleizedBatch`] without copying.
109    ///
110    /// # Panics
111    ///
112    /// Panics if items were previously added via [`add`](Self::add).
113    pub(crate) fn merkleize_with(
114        mut self,
115        base: &merkle::mem::Mem<F, H::Digest>,
116        items: Arc<Vec<Item>>,
117    ) -> Arc<MerkleizedBatch<F, H::Digest, Item>> {
118        assert!(
119            self.items.is_empty(),
120            "merkleize_with expects no items added via add"
121        );
122
123        #[cfg(feature = "std")]
124        if let Some(pool) = self
125            .inner
126            .pool()
127            .filter(|_| items.len() >= batch::MIN_TO_PARALLELIZE)
128        {
129            // Parallel path: encode items and compute leaf digests on the thread pool,
130            // then feed the pre-computed digests sequentially into the MMR batch.
131            use rayon::prelude::*;
132
133            let starting_leaves = self.inner.leaves();
134            let digests: Vec<H::Digest> = pool.install(|| {
135                items
136                    .par_iter()
137                    .enumerate()
138                    .map_init(
139                        || self.hasher.clone(),
140                        |h, (i, item)| {
141                            let loc = Location::<F>::new(*starting_leaves + i as u64);
142                            let pos = Position::try_from(loc).expect("valid leaf location");
143                            h.leaf_digest(pos, &item.encode())
144                        },
145                    )
146                    .collect()
147            });
148            for digest in digests {
149                self.inner = self.inner.add_leaf_digest(digest);
150            }
151        } else {
152            for item in &*items {
153                let encoded = item.encode();
154                self.inner = self.inner.add(&self.hasher, &encoded);
155            }
156        }
157
158        #[cfg(not(feature = "std"))]
159        for item in &*items {
160            let encoded = item.encode();
161            self.inner = self.inner.add(&self.hasher, &encoded);
162        }
163
164        let merkle = self.inner.merkleize(base, &self.hasher);
165        let ancestor_items = Self::collect_ancestor_items(&self.parent);
166        Arc::new(MerkleizedBatch {
167            inner: merkle,
168            items,
169            parent: self.parent.as_ref().map(Arc::downgrade),
170            ancestor_items,
171        })
172    }
173}
174
175/// A speculative batch whose root digest has been computed, in contrast to [`UnmerkleizedBatch`].
176#[derive(Clone, Debug)]
177pub struct MerkleizedBatch<F: Family, D: Digest, Item: Send + Sync> {
178    /// The inner batch of Merkle leaf digests.
179    pub(crate) inner: Arc<batch::MerkleizedBatch<F, D>>,
180    /// The items to append from this batch.
181    items: Arc<Vec<Item>>,
182    /// This batch's parent, or None if the parent is the journal itself.
183    parent: Option<Weak<Self>>,
184    /// Ancestor item batches collected at merkleize time (root-to-tip order).
185    pub(crate) ancestor_items: Vec<Arc<Vec<Item>>>,
186}
187
188impl<F: Family, D: Digest, Item: Send + Sync> MerkleizedBatch<F, D, Item> {
189    /// Return the root digest of the authenticated journal after this batch is applied.
190    pub fn root(&self) -> D {
191        self.inner.root()
192    }
193
194    /// The number of items visible through this batch, including ancestors.
195    pub(crate) fn size(&self) -> u64 {
196        *self.inner.leaves()
197    }
198
199    /// The items added in this batch.
200    pub(crate) const fn items(&self) -> &Arc<Vec<Item>> {
201        &self.items
202    }
203
204    /// Create a new speculative batch of operations with this batch as its parent.
205    ///
206    /// All uncommitted ancestors in the chain must be kept alive until the child (or any
207    /// descendant) is merkleized. Dropping an uncommitted ancestor causes data
208    /// loss detected at `apply_batch` time.
209    pub fn new_batch<H: Hasher<Digest = D>>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, Item>
210    where
211        Item: Encode,
212    {
213        UnmerkleizedBatch {
214            inner: self.inner.new_batch(),
215            hasher: StandardHasher::new(),
216            items: Vec::new(),
217            parent: Some(Arc::clone(self)),
218        }
219    }
220}
221
222impl<F: Family, D: Digest, Item: Send + Sync> Readable for MerkleizedBatch<F, D, Item> {
223    type Family = F;
224    type Digest = D;
225    type Error = merkle::Error<F>;
226
227    fn size(&self) -> Position<F> {
228        self.inner.size()
229    }
230
231    fn get_node(&self, pos: Position<F>) -> Option<D> {
232        self.inner.get_node(pos)
233    }
234
235    fn root(&self) -> D {
236        self.inner.root()
237    }
238
239    fn pruning_boundary(&self) -> Location<F> {
240        self.inner.pruning_boundary()
241    }
242
243    fn proof(
244        &self,
245        hasher: &impl crate::merkle::hasher::Hasher<F, Digest = D>,
246        loc: Location<F>,
247    ) -> Result<Proof<F, D>, merkle::Error<F>> {
248        self.inner.proof(hasher, loc)
249    }
250
251    fn range_proof(
252        &self,
253        hasher: &impl crate::merkle::hasher::Hasher<F, Digest = D>,
254        range: core::ops::Range<Location<F>>,
255    ) -> Result<Proof<F, D>, merkle::Error<F>> {
256        self.inner.range_proof(hasher, range)
257    }
258}
259
260/// An append-only data structure that maintains a sequential journal of items alongside a
261/// Merkle-family structure. The item at index i in the journal corresponds to the leaf at Location
262/// i in the Merkle structure. This structure enables efficient proofs that an item is included in
263/// the journal at a specific location.
264pub struct Journal<F, E, C, H>
265where
266    F: Family,
267    E: Context,
268    C: Contiguous<Item: EncodeShared>,
269    H: Hasher,
270{
271    /// Merkle structure where each leaf is an item digest.
272    /// Invariant: leaf i corresponds to item i in the journal.
273    pub(crate) merkle: Journaled<F, E, H::Digest>,
274
275    /// Journal of items.
276    /// Invariant: item i corresponds to leaf i in the Merkle structure.
277    pub(crate) journal: C,
278
279    pub(crate) hasher: StandardHasher<H>,
280}
281
282impl<F, E, C, H> Journal<F, E, C, H>
283where
284    F: Family,
285    E: Context,
286    C: Contiguous<Item: EncodeShared>,
287    H: Hasher,
288{
289    /// Returns the Location of the next item appended to the journal.
290    pub async fn size(&self) -> Location<F> {
291        Location::new(self.journal.size().await)
292    }
293
294    /// Return the root of the Merkle structure.
295    pub fn root(&self) -> H::Digest {
296        self.merkle.root()
297    }
298
299    /// Create a speculative batch atop this journal.
300    pub fn new_batch(&self) -> UnmerkleizedBatch<F, H, C::Item>
301    where
302        C::Item: Encode,
303    {
304        let root = self.merkle.to_batch();
305        UnmerkleizedBatch {
306            inner: root.new_batch(),
307            hasher: StandardHasher::new(),
308            items: Vec::new(),
309            parent: None,
310        }
311    }
312
313    /// Borrow the committed Mem through the read lock.
314    pub(crate) fn with_mem<R>(&self, f: impl FnOnce(&merkle::mem::Mem<F, H::Digest>) -> R) -> R {
315        self.merkle.with_mem(f)
316    }
317
318    /// Create an owned [`MerkleizedBatch`] representing the current committed state.
319    ///
320    /// The batch has no items (the committed items are on disk, not in memory).
321    /// This is the starting point for building owned batch chains.
322    pub(crate) fn to_merkleized_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, C::Item>> {
323        Arc::new(MerkleizedBatch {
324            inner: self.merkle.to_batch(),
325            items: Arc::new(Vec::new()),
326            parent: None,
327            ancestor_items: Vec::new(),
328        })
329    }
330}
331
332impl<F, E, C, H> Journal<F, E, C, H>
333where
334    F: Family,
335    E: Context,
336    C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
337    H: Hasher,
338{
339    /// Durably persist the journal. This is faster than `sync()` but does not persist the Merkle
340    /// structure, meaning recovery will be required on startup if we crash before `sync()`.
341    pub async fn commit(&self) -> Result<(), Error<F>> {
342        self.journal.commit().await.map_err(Error::Journal)
343    }
344}
345
346impl<F, E, C, H> Journal<F, E, C, H>
347where
348    F: Family,
349    E: Context,
350    C: Mutable<Item: EncodeShared>,
351    H: Hasher,
352{
353    /// Create a new [Journal] from the given components after aligning the Merkle structure with
354    /// the journal.
355    pub async fn from_components(
356        mut merkle: Journaled<F, E, H::Digest>,
357        journal: C,
358        hasher: StandardHasher<H>,
359        apply_batch_size: u64,
360    ) -> Result<Self, Error<F>> {
361        Self::align(&mut merkle, &journal, &hasher, apply_batch_size).await?;
362
363        // Sync the Merkle structure to disk to avoid having to repeat any recovery that may have
364        // been performed on next startup.
365        merkle.sync().await?;
366
367        Ok(Self {
368            merkle,
369            journal,
370            hasher,
371        })
372    }
373
374    /// Align the Merkle structure to be consistent with the journal. Any items in the structure
375    /// that are not in the journal are popped, and any items in the journal that are not in the
376    /// structure are added. Items are added in batches of size `apply_batch_size` to avoid memory
377    /// bloat.
378    async fn align(
379        merkle: &mut Journaled<F, E, H::Digest>,
380        journal: &C,
381        hasher: &StandardHasher<H>,
382        apply_batch_size: u64,
383    ) -> Result<(), Error<F>> {
384        // Rewind Merkle structure elements that are ahead of the journal.
385        let journal_size = journal.size().await;
386        let mut merkle_leaves = merkle.leaves();
387        if merkle_leaves > journal_size {
388            let rewind_count = merkle_leaves - journal_size;
389            warn!(
390                journal_size,
391                ?rewind_count,
392                "rewinding Merkle structure to match journal"
393            );
394            merkle.rewind(*rewind_count as usize, hasher).await?;
395            merkle_leaves = Location::new(journal_size);
396        }
397
398        // If the Merkle structure is behind, replay journal items to catch up.
399        if merkle_leaves < journal_size {
400            let replay_count = journal_size - *merkle_leaves;
401            warn!(
402                ?journal_size,
403                replay_count, "Merkle structure lags behind journal, replaying journal to catch up"
404            );
405
406            let reader = journal.reader().await;
407            while merkle_leaves < journal_size {
408                let batch = {
409                    let mut batch = merkle.new_batch();
410                    let mut count = 0u64;
411                    while count < apply_batch_size && merkle_leaves < journal_size {
412                        let op = reader.read(*merkle_leaves).await?;
413                        batch = batch.add(hasher, &op.encode());
414                        merkle_leaves += 1;
415                        count += 1;
416                    }
417                    batch
418                };
419                let batch = merkle.with_mem(|mem| batch.merkleize(mem, hasher));
420                merkle.apply_batch(&batch)?;
421            }
422            return Ok(());
423        }
424
425        // At this point the Merkle structure and journal should be consistent.
426        assert_eq!(journal.size().await, *merkle.leaves());
427
428        Ok(())
429    }
430
431    /// Append an item to the journal and update the Merkle structure.
432    pub async fn append(&mut self, item: &C::Item) -> Result<Location<F>, Error<F>> {
433        let encoded_item = item.encode();
434
435        // Append item to the journal, then update the Merkle structure state.
436        let loc = self.journal.append(item).await?;
437        let unmerkleized_batch = self.merkle.new_batch().add(&self.hasher, &encoded_item);
438        let batch = self
439            .merkle
440            .with_mem(|mem| unmerkleized_batch.merkleize(mem, &self.hasher));
441        self.merkle.apply_batch(&batch)?;
442
443        Ok(Location::new(loc))
444    }
445
446    /// Apply a batch to the journal.
447    ///
448    /// A batch is valid if the journal has not been modified since the batch
449    /// chain was created, or if only ancestors of this batch have been applied.
450    /// Already-committed ancestors are skipped automatically.
451    /// Applying a batch from a different fork returns an error.
452    pub async fn apply_batch(
453        &mut self,
454        batch: &MerkleizedBatch<F, H::Digest, C::Item>,
455    ) -> Result<(), Error<F>> {
456        let merkle_size = self.merkle.size();
457        let base_size = batch.inner.base_size();
458
459        // Determine whether ancestors have already been committed.
460        // `base_size` is the merkle size when the batch chain was forked.
461        // If the merkle has advanced past the fork point, ancestors are
462        // already on disk; check that the current size is reachable from
463        // the batch chain before skipping them.
464        let skip_ancestors = if merkle_size == base_size {
465            false
466        } else if merkle_size > base_size && merkle_size < batch.inner.size() {
467            true
468        } else {
469            // Merkle is at an incompatible position (a sibling or unrelated
470            // fork was committed). Eagerly reject to avoid mutating the journal.
471            return Err(merkle::Error::StaleBatch {
472                expected: base_size,
473                actual: merkle_size,
474            }
475            .into());
476        };
477
478        // Apply ancestor item batches in root-to-tip order. Already-committed
479        // batches are skipped by tracking cumulative leaf count.
480        // Batches are collected into a single append_many call to acquire the
481        // journal's write lock once instead of per-batch.
482        let committed_leaves = self.journal.size().await;
483        let base_leaves = *Location::<F>::try_from(base_size)?;
484        let mut batch_leaf_end = base_leaves;
485        let mut batches: Vec<&[C::Item]> = Vec::with_capacity(batch.ancestor_items.len() + 1);
486        for ancestor in &batch.ancestor_items {
487            batch_leaf_end += ancestor.len() as u64;
488            if skip_ancestors && batch_leaf_end <= committed_leaves {
489                continue;
490            }
491            batches.push(ancestor);
492        }
493        if !batch.items.is_empty() {
494            batches.push(&batch.items);
495        }
496        if !batches.is_empty() {
497            self.journal.append_many(Many::Nested(&batches)).await?;
498        }
499
500        self.merkle.apply_batch(&batch.inner)?;
501        assert_eq!(*self.merkle.leaves(), self.journal.size().await);
502        Ok(())
503    }
504
505    /// Prune both the Merkle structure and journal to the given location.
506    ///
507    /// # Returns
508    /// The new pruning boundary, which may be less than the requested `prune_loc`.
509    pub async fn prune(&mut self, prune_loc: Location<F>) -> Result<Location<F>, Error<F>> {
510        if self.merkle.size() == 0 {
511            // DB is empty, nothing to prune.
512            return Ok(Location::new(self.reader().await.bounds().start));
513        }
514
515        // Sync the Merkle structure before pruning the journal, otherwise its last element could
516        // end up behind the journal's first element after a crash, and there would be no way to
517        // replay the items between the structure's last element and the journal's first element.
518        self.merkle.sync().await?;
519
520        // Prune the journal and check if anything was actually pruned
521        if !self.journal.prune(*prune_loc).await? {
522            return Ok(Location::new(self.reader().await.bounds().start));
523        }
524
525        let bounds = self.reader().await.bounds();
526        debug!(size = ?bounds.end, ?prune_loc, boundary = ?bounds.start, "pruned inactive ops");
527
528        // Prune Merkle structure to match the journal's actual boundary
529        self.merkle.prune(Location::from(bounds.start)).await?;
530
531        Ok(Location::new(bounds.start))
532    }
533}
534
535impl<F, E, C, H> Journal<F, E, C, H>
536where
537    F: Family,
538    E: Context,
539    C: Contiguous<Item: EncodeShared>,
540    H: Hasher,
541{
542    /// Generate a proof of inclusion for items starting at `start_loc`.
543    ///
544    /// Returns a proof and the items corresponding to the leaves in the range `start_loc..end_loc`,
545    /// where `end_loc` is the minimum of the current item count and `start_loc + max_ops`.
546    ///
547    /// # Errors
548    ///
549    /// - Returns [Error::Merkle] with [merkle::Error::LocationOverflow] if `start_loc` >
550    ///   [Family::MAX_LEAVES].
551    /// - Returns [Error::Merkle] with [merkle::Error::RangeOutOfBounds] if `start_loc` >= current
552    ///   item count.
553    /// - Returns [Error::Journal] with [crate::journal::Error::ItemPruned] if `start_loc` has been
554    ///   pruned.
555    pub async fn proof(
556        &self,
557        start_loc: Location<F>,
558        max_ops: NonZeroU64,
559    ) -> Result<(Proof<F, H::Digest>, Vec<C::Item>), Error<F>> {
560        self.historical_proof(self.size().await, start_loc, max_ops)
561            .await
562    }
563
564    /// Generate a historical proof with respect to the state of the Merkle structure when it had
565    /// `historical_leaves` leaves.
566    ///
567    /// Returns a proof and the items corresponding to the leaves in the range `start_loc..end_loc`,
568    /// where `end_loc` is the minimum of `historical_leaves` and `start_loc + max_ops`.
569    ///
570    /// # Errors
571    ///
572    /// - Returns [Error::Merkle] with [merkle::Error::RangeOutOfBounds] if `start_loc` >=
573    ///   `historical_leaves` or `historical_leaves` > number of items in the journal.
574    /// - Returns [Error::Journal] with [crate::journal::Error::ItemPruned] if `start_loc` has been
575    ///   pruned.
576    pub async fn historical_proof(
577        &self,
578        historical_leaves: Location<F>,
579        start_loc: Location<F>,
580        max_ops: NonZeroU64,
581    ) -> Result<(Proof<F, H::Digest>, Vec<C::Item>), Error<F>> {
582        // Acquire a reader guard to prevent pruning from advancing while we read.
583        let reader = self.journal.reader().await;
584        let bounds = reader.bounds();
585
586        if *historical_leaves > bounds.end {
587            return Err(merkle::Error::RangeOutOfBounds(Location::new(bounds.end)).into());
588        }
589        if start_loc >= historical_leaves {
590            return Err(merkle::Error::RangeOutOfBounds(start_loc).into());
591        }
592
593        let end_loc = std::cmp::min(historical_leaves, start_loc.saturating_add(max_ops.get()));
594
595        let hasher = self.hasher.clone();
596        let proof = self
597            .merkle
598            .historical_range_proof(&hasher, historical_leaves, start_loc..end_loc)
599            .await?;
600
601        let mut ops = Vec::with_capacity((*end_loc - *start_loc) as usize);
602        let futures = (*start_loc..*end_loc)
603            .map(|i| reader.read(i))
604            .collect::<Vec<_>>();
605        try_join_all(futures)
606            .await?
607            .into_iter()
608            .for_each(|op| ops.push(op));
609
610        Ok((proof, ops))
611    }
612}
613
614impl<F, E, C, H> Journal<F, E, C, H>
615where
616    F: Family,
617    E: Context,
618    C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
619    H: Hasher,
620{
621    /// Destroy the authenticated journal, removing all data from disk.
622    pub async fn destroy(self) -> Result<(), Error<F>> {
623        try_join!(
624            self.journal.destroy().map_err(Error::Journal),
625            self.merkle.destroy().map_err(Error::Merkle),
626        )?;
627
628        Ok(())
629    }
630
631    /// Durably persist the journal, ensuring no recovery is required on startup.
632    pub async fn sync(&self) -> Result<(), Error<F>> {
633        try_join!(
634            self.journal.sync().map_err(Error::Journal),
635            self.merkle.sync().map_err(Error::Merkle)
636        )?;
637
638        Ok(())
639    }
640}
641
642/// The number of items to apply to the Merkle structure in a single batch.
643const APPLY_BATCH_SIZE: u64 = 1 << 16;
644
645/// Generate a `new()` constructor for an authenticated journal backed by a specific contiguous
646/// journal type.
647macro_rules! impl_journal_new {
648    ($journal_mod:ident, $cfg_ty:ty, $codec_bound:path) => {
649        impl<F, E, O, H> Journal<F, E, $journal_mod::Journal<E, O>, H>
650        where
651            F: Family,
652            E: Context,
653            O: $codec_bound,
654            H: Hasher,
655        {
656            /// Create a new authenticated [Journal].
657            ///
658            /// The inner journal will be rewound to the last item matching `rewind_predicate`,
659            /// and the merkle structure will be aligned to match.
660            pub async fn new(
661                context: E,
662                merkle_cfg: merkle::journaled::Config,
663                journal_cfg: $cfg_ty,
664                rewind_predicate: fn(&O) -> bool,
665            ) -> Result<Self, Error<F>> {
666                let mut journal =
667                    $journal_mod::Journal::init(context.with_label("journal"), journal_cfg).await?;
668                journal.rewind_to(rewind_predicate).await?;
669
670                let hasher = StandardHasher::<H>::new();
671                let mut merkle =
672                    Journaled::init(context.with_label("merkle"), &hasher, merkle_cfg).await?;
673                Self::align(&mut merkle, &journal, &hasher, APPLY_BATCH_SIZE).await?;
674
675                journal.sync().await?;
676                merkle.sync().await?;
677
678                Ok(Self {
679                    merkle,
680                    journal,
681                    hasher,
682                })
683            }
684        }
685    };
686}
687
688impl_journal_new!(fixed, fixed::Config, CodecFixedShared);
689impl_journal_new!(variable, variable::Config<O::Cfg>, CodecShared);
690
691impl<F, E, C, H> Contiguous for Journal<F, E, C, H>
692where
693    F: Family,
694    E: Context,
695    C: Contiguous<Item: EncodeShared>,
696    H: Hasher,
697{
698    type Item = C::Item;
699
700    async fn reader(&self) -> impl Reader<Item = C::Item> + '_ {
701        self.journal.reader().await
702    }
703
704    async fn size(&self) -> u64 {
705        self.journal.size().await
706    }
707}
708
709impl<F, E, C, H> Mutable for Journal<F, E, C, H>
710where
711    F: Family,
712    E: Context,
713    C: Mutable<Item: EncodeShared>,
714    H: Hasher,
715{
716    async fn append(&mut self, item: &Self::Item) -> Result<u64, JournalError> {
717        let res = self.append(item).await.map_err(|e| match e {
718            Error::Journal(inner) => inner,
719            Error::Merkle(inner) => JournalError::Merkle(anyhow::Error::from(inner)),
720        })?;
721
722        Ok(*res)
723    }
724
725    async fn prune(&mut self, min_position: u64) -> Result<bool, JournalError> {
726        self.journal.prune(min_position).await
727    }
728
729    async fn rewind(&mut self, size: u64) -> Result<(), JournalError> {
730        self.journal.rewind(size).await?;
731
732        let leaves = *self.merkle.leaves();
733        if leaves > size {
734            self.merkle
735                .rewind((leaves - size) as usize, &self.hasher)
736                .await
737                .map_err(|error| JournalError::Merkle(anyhow::Error::from(error)))?;
738        }
739
740        Ok(())
741    }
742}
743
744/// A [Mutable] journal that can serve as the inner journal of an authenticated [Journal].
745pub trait Inner<E: Context>: Mutable + Persistable<Error = JournalError> {
746    /// The configuration needed to initialize this journal.
747    type Config: Clone + Send;
748
749    /// Initialize an authenticated [Journal] backed by this journal type.
750    fn init<F: Family, H: Hasher>(
751        context: E,
752        merkle_cfg: merkle::journaled::Config,
753        journal_cfg: Self::Config,
754        rewind_predicate: fn(&Self::Item) -> bool,
755    ) -> impl core::future::Future<Output = Result<Journal<F, E, Self, H>, Error<F>>> + Send
756    where
757        Self: Sized,
758        Self::Item: EncodeShared;
759}
760
761impl<F, E, C, H> Persistable for Journal<F, E, C, H>
762where
763    F: Family,
764    E: Context,
765    C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
766    H: Hasher,
767{
768    type Error = JournalError;
769
770    async fn commit(&self) -> Result<(), JournalError> {
771        self.commit().await.map_err(|e| match e {
772            Error::Journal(inner) => inner,
773            Error::Merkle(inner) => JournalError::Merkle(anyhow::Error::from(inner)),
774        })
775    }
776
777    async fn sync(&self) -> Result<(), JournalError> {
778        self.sync().await.map_err(|e| match e {
779            Error::Journal(inner) => inner,
780            Error::Merkle(inner) => JournalError::Merkle(anyhow::Error::from(inner)),
781        })
782    }
783
784    async fn destroy(self) -> Result<(), JournalError> {
785        self.destroy().await.map_err(|e| match e {
786            Error::Journal(inner) => inner,
787            Error::Merkle(inner) => JournalError::Merkle(anyhow::Error::from(inner)),
788        })
789    }
790}
791
792#[cfg(test)]
793impl<F, E, C, H> Journal<F, E, C, H>
794where
795    F: Family,
796    E: Context,
797    C: Contiguous<Item: EncodeShared>,
798    H: Hasher,
799{
800    /// Test helper: Read the item at the given location.
801    pub(crate) async fn read(&self, loc: Location<F>) -> Result<C::Item, Error<F>> {
802        self.journal
803            .reader()
804            .await
805            .read(*loc)
806            .await
807            .map_err(Error::Journal)
808    }
809}
810
811#[cfg(test)]
812mod tests {
813    use super::*;
814    use crate::{
815        journal::contiguous::fixed::{Config as JConfig, Journal as ContiguousJournal},
816        merkle::{
817            journaled::{Config as MerkleConfig, Journaled},
818            mmb, mmr,
819        },
820        qmdb::{
821            any::{
822                operation::{update::Unordered as Update, Unordered as Op},
823                value::FixedEncoding,
824            },
825            operation::Committable,
826        },
827    };
828    use commonware_codec::Encode;
829    use commonware_cryptography::{sha256::Digest, Sha256};
830    use commonware_macros::test_traced;
831    use commonware_runtime::{
832        buffer::paged::CacheRef,
833        deterministic::{self, Context},
834        BufferPooler, Metrics, Runner as _,
835    };
836    use commonware_utils::{NZUsize, NZU16, NZU64};
837    use futures::StreamExt as _;
838    use std::num::{NonZeroU16, NonZeroUsize};
839
840    const PAGE_SIZE: NonZeroU16 = NZU16!(101);
841    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
842
843    /// Generic operation type for testing, parameterized by Merkle family.
844    type TestOp<F> = Op<F, Digest, FixedEncoding<Digest>>;
845
846    /// Generic authenticated journal type for testing, parameterized by Merkle family.
847    type TestJournal<F> = Journal<
848        F,
849        deterministic::Context,
850        ContiguousJournal<deterministic::Context, TestOp<F>>,
851        Sha256,
852    >;
853
854    /// Create Merkle configuration for tests.
855    fn merkle_config(suffix: &str, pooler: &impl BufferPooler) -> MerkleConfig {
856        MerkleConfig {
857            journal_partition: format!("mmr-journal-{suffix}"),
858            metadata_partition: format!("mmr-metadata-{suffix}"),
859            items_per_blob: NZU64!(11),
860            write_buffer: NZUsize!(1024),
861            thread_pool: None,
862            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
863        }
864    }
865
866    /// Create journal configuration for tests.
867    fn journal_config(suffix: &str, pooler: &impl BufferPooler) -> JConfig {
868        JConfig {
869            partition: format!("journal-{suffix}"),
870            items_per_blob: NZU64!(7),
871            write_buffer: NZUsize!(1024),
872            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
873        }
874    }
875
876    /// Create a new empty authenticated journal.
877    async fn create_empty_journal<F: Family + PartialEq>(
878        context: Context,
879        suffix: &str,
880    ) -> TestJournal<F> {
881        let merkle_cfg = merkle_config(suffix, &context);
882        let journal_cfg = journal_config(suffix, &context);
883        TestJournal::<F>::new(context, merkle_cfg, journal_cfg, |op: &TestOp<F>| {
884            op.is_commit()
885        })
886        .await
887        .unwrap()
888    }
889
890    /// Create a test operation with predictable values based on index.
891    fn create_operation<F: Family + PartialEq>(index: u8) -> TestOp<F> {
892        TestOp::<F>::Update(Update(
893            Sha256::fill(index),
894            Sha256::fill(index.wrapping_add(1)),
895        ))
896    }
897
898    /// Create an authenticated journal with N committed operations.
899    ///
900    /// Operations are added and then synced to ensure they are committed.
901    async fn create_journal_with_ops<F: Family + PartialEq>(
902        context: Context,
903        suffix: &str,
904        count: usize,
905    ) -> TestJournal<F> {
906        let mut journal = create_empty_journal::<F>(context, suffix).await;
907
908        for i in 0..count {
909            let op = create_operation::<F>(i as u8);
910            let loc = journal.append(&op).await.unwrap();
911            assert_eq!(loc, Location::<F>::new(i as u64));
912        }
913
914        journal.sync().await.unwrap();
915        journal
916    }
917
918    /// Create separate Merkle and journal components for testing alignment.
919    ///
920    /// These components are created independently and can be manipulated separately to test
921    /// scenarios where the Merkle structure and journal are out of sync (e.g., one ahead of the
922    /// other).
923    async fn create_components<F: Family + PartialEq>(
924        context: Context,
925        suffix: &str,
926    ) -> (
927        Journaled<F, deterministic::Context, Digest>,
928        ContiguousJournal<deterministic::Context, TestOp<F>>,
929        StandardHasher<Sha256>,
930    ) {
931        let hasher = StandardHasher::new();
932        let merkle = Journaled::<F, _, Digest>::init(
933            context.with_label("mmr"),
934            &hasher,
935            merkle_config(suffix, &context),
936        )
937        .await
938        .unwrap();
939        let journal = ContiguousJournal::init(
940            context.with_label("journal"),
941            journal_config(suffix, &context),
942        )
943        .await
944        .unwrap();
945        (merkle, journal, hasher)
946    }
947
948    /// Verify that a proof correctly proves the given operations are included in the Merkle
949    /// structure.
950    fn verify_proof<F: Family + PartialEq>(
951        proof: &Proof<F, <Sha256 as commonware_cryptography::Hasher>::Digest>,
952        operations: &[TestOp<F>],
953        start_loc: Location<F>,
954        root: &<Sha256 as commonware_cryptography::Hasher>::Digest,
955        hasher: &StandardHasher<Sha256>,
956    ) -> bool {
957        let encoded_ops: Vec<_> = operations.iter().map(|op| op.encode()).collect();
958        proof.verify_range_inclusion(hasher, &encoded_ops, start_loc, root)
959    }
960
961    /// Verify that new() creates an empty authenticated journal.
962    async fn test_new_creates_empty_journal_inner<F: Family + PartialEq>(context: Context) {
963        let journal = create_empty_journal::<F>(context, "new-empty").await;
964
965        let bounds = journal.reader().await.bounds();
966        assert_eq!(bounds.end, 0);
967        assert_eq!(bounds.start, 0);
968        assert!(bounds.is_empty());
969    }
970
971    #[test_traced("INFO")]
972    fn test_new_creates_empty_journal_mmr() {
973        let executor = deterministic::Runner::default();
974        executor.start(test_new_creates_empty_journal_inner::<mmr::Family>);
975    }
976
977    #[test_traced("INFO")]
978    fn test_new_creates_empty_journal_mmb() {
979        let executor = deterministic::Runner::default();
980        executor.start(test_new_creates_empty_journal_inner::<mmb::Family>);
981    }
982
983    /// Verify that align() correctly handles empty Merkle and journal components.
984    async fn test_align_with_empty_mmr_and_journal_inner<F: Family + PartialEq>(context: Context) {
985        let (mut merkle, journal, hasher) = create_components::<F>(context, "align-empty").await;
986
987        TestJournal::<F>::align(&mut merkle, &journal, &hasher, APPLY_BATCH_SIZE)
988            .await
989            .unwrap();
990
991        assert_eq!(merkle.leaves(), Location::<F>::new(0));
992        assert_eq!(journal.size().await, 0);
993    }
994
995    #[test_traced("INFO")]
996    fn test_align_with_empty_mmr_and_journal_mmr() {
997        let executor = deterministic::Runner::default();
998        executor.start(test_align_with_empty_mmr_and_journal_inner::<mmr::Family>);
999    }
1000
1001    #[test_traced("INFO")]
1002    fn test_align_with_empty_mmr_and_journal_mmb() {
1003        let executor = deterministic::Runner::default();
1004        executor.start(test_align_with_empty_mmr_and_journal_inner::<mmb::Family>);
1005    }
1006
1007    /// Verify that align() pops Merkle elements when Merkle is ahead of the journal.
1008    async fn test_align_when_mmr_ahead_inner<F: Family + PartialEq>(context: Context) {
1009        let (mut merkle, journal, hasher) = create_components::<F>(context, "mmr-ahead").await;
1010
1011        // Add 20 operations to both Merkle and journal
1012        {
1013            let batch = {
1014                let mut batch = merkle.new_batch();
1015                for i in 0..20 {
1016                    let op = create_operation::<F>(i as u8);
1017                    let encoded = op.encode();
1018                    batch = batch.add(&hasher, &encoded);
1019                    journal.append(&op).await.unwrap();
1020                }
1021                batch
1022            };
1023            let batch = merkle.with_mem(|mem| batch.merkleize(mem, &hasher));
1024            merkle.apply_batch(&batch).unwrap();
1025        }
1026
1027        // Add commit operation to journal only (making journal ahead)
1028        let commit_op = TestOp::<F>::CommitFloor(None, Location::<F>::new(0));
1029        journal.append(&commit_op).await.unwrap();
1030        journal.sync().await.unwrap();
1031
1032        // Merkle has 20 leaves, journal has 21 operations (20 ops + 1 commit)
1033        TestJournal::<F>::align(&mut merkle, &journal, &hasher, APPLY_BATCH_SIZE)
1034            .await
1035            .unwrap();
1036
1037        // Merkle should have been aligned to match journal
1038        assert_eq!(merkle.leaves(), Location::<F>::new(21));
1039        assert_eq!(journal.size().await, 21);
1040    }
1041
1042    #[test_traced("WARN")]
1043    fn test_align_when_mmr_ahead_mmr() {
1044        let executor = deterministic::Runner::default();
1045        executor.start(test_align_when_mmr_ahead_inner::<mmr::Family>);
1046    }
1047
1048    #[test_traced("WARN")]
1049    fn test_align_when_mmr_ahead_mmb() {
1050        let executor = deterministic::Runner::default();
1051        executor.start(test_align_when_mmr_ahead_inner::<mmb::Family>);
1052    }
1053
1054    /// Verify that align() replays journal operations when journal is ahead of Merkle.
1055    async fn test_align_when_journal_ahead_inner<F: Family + PartialEq>(context: Context) {
1056        let (mut merkle, journal, hasher) = create_components::<F>(context, "journal-ahead").await;
1057
1058        // Add 20 operations to journal only
1059        for i in 0..20 {
1060            let op = create_operation::<F>(i as u8);
1061            journal.append(&op).await.unwrap();
1062        }
1063
1064        // Add commit
1065        let commit_op = TestOp::<F>::CommitFloor(None, Location::<F>::new(0));
1066        journal.append(&commit_op).await.unwrap();
1067        journal.sync().await.unwrap();
1068
1069        // Journal has 21 operations, Merkle has 0 leaves
1070        TestJournal::<F>::align(&mut merkle, &journal, &hasher, APPLY_BATCH_SIZE)
1071            .await
1072            .unwrap();
1073
1074        // Merkle should have been replayed to match journal
1075        assert_eq!(merkle.leaves(), Location::<F>::new(21));
1076        assert_eq!(journal.size().await, 21);
1077    }
1078
1079    #[test_traced("WARN")]
1080    fn test_align_when_journal_ahead_mmr() {
1081        let executor = deterministic::Runner::default();
1082        executor.start(test_align_when_journal_ahead_inner::<mmr::Family>);
1083    }
1084
1085    #[test_traced("WARN")]
1086    fn test_align_when_journal_ahead_mmb() {
1087        let executor = deterministic::Runner::default();
1088        executor.start(test_align_when_journal_ahead_inner::<mmb::Family>);
1089    }
1090
1091    /// Verify that align() discards uncommitted operations.
1092    async fn test_align_with_mismatched_committed_ops_inner<F: Family + PartialEq>(
1093        context: Context,
1094    ) {
1095        let mut journal =
1096            create_empty_journal::<F>(context.with_label("first"), "mismatched").await;
1097
1098        // Add 20 uncommitted operations
1099        for i in 0..20 {
1100            let loc = journal
1101                .append(&create_operation::<F>(i as u8))
1102                .await
1103                .unwrap();
1104            assert_eq!(loc, Location::<F>::new(i as u64));
1105        }
1106
1107        // Don't sync - these are uncommitted
1108        // After alignment, they should be discarded
1109        let size_before = journal.size().await;
1110        assert_eq!(size_before, 20);
1111
1112        // Drop and recreate to simulate restart (which calls align internally)
1113        journal.sync().await.unwrap();
1114        drop(journal);
1115        let journal = create_empty_journal::<F>(context.with_label("second"), "mismatched").await;
1116
1117        // Uncommitted operations should be gone
1118        assert_eq!(journal.size().await, 0);
1119    }
1120
1121    #[test_traced("INFO")]
1122    fn test_align_with_mismatched_committed_ops_mmr() {
1123        let executor = deterministic::Runner::default();
1124        executor.start(|context| {
1125            test_align_with_mismatched_committed_ops_inner::<mmr::Family>(context)
1126        });
1127    }
1128
1129    #[test_traced("INFO")]
1130    fn test_align_with_mismatched_committed_ops_mmb() {
1131        let executor = deterministic::Runner::default();
1132        executor.start(|context| {
1133            test_align_with_mismatched_committed_ops_inner::<mmb::Family>(context)
1134        });
1135    }
1136
1137    async fn test_rewind_inner<F: Family + PartialEq>(context: Context) {
1138        // Test 1: Matching operation is kept
1139        {
1140            let mut journal = ContiguousJournal::init(
1141                context.with_label("rewind_match"),
1142                journal_config("rewind-match", &context),
1143            )
1144            .await
1145            .unwrap();
1146
1147            // Add operations where operation 3 is a commit
1148            for i in 0..3 {
1149                journal.append(&create_operation::<F>(i)).await.unwrap();
1150            }
1151            journal
1152                .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1153                .await
1154                .unwrap();
1155            for i in 4..7 {
1156                journal.append(&create_operation::<F>(i)).await.unwrap();
1157            }
1158
1159            // Rewind to last commit
1160            let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1161            assert_eq!(final_size, 4);
1162            assert_eq!(journal.size().await, 4);
1163
1164            // Verify the commit operation is still there
1165            let op = journal.read(3).await.unwrap();
1166            assert!(op.is_commit());
1167        }
1168
1169        // Test 2: Last matching operation is chosen when multiple match
1170        {
1171            let mut journal = ContiguousJournal::init(
1172                context.with_label("rewind_multiple"),
1173                journal_config("rewind-multiple", &context),
1174            )
1175            .await
1176            .unwrap();
1177
1178            // Add multiple commits
1179            journal.append(&create_operation::<F>(0)).await.unwrap();
1180            journal
1181                .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1182                .await
1183                .unwrap(); // pos 1
1184            journal.append(&create_operation::<F>(2)).await.unwrap();
1185            journal
1186                .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(1)))
1187                .await
1188                .unwrap(); // pos 3
1189            journal.append(&create_operation::<F>(4)).await.unwrap();
1190
1191            // Should rewind to last commit (pos 3)
1192            let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1193            assert_eq!(final_size, 4);
1194
1195            // Verify the last commit is still there
1196            let op = journal.read(3).await.unwrap();
1197            assert!(op.is_commit());
1198
1199            // Verify we can't read pos 4
1200            assert!(journal.read(4).await.is_err());
1201        }
1202
1203        // Test 3: Rewind to pruning boundary when no match
1204        {
1205            let mut journal = ContiguousJournal::init(
1206                context.with_label("rewind_no_match"),
1207                journal_config("rewind-no-match", &context),
1208            )
1209            .await
1210            .unwrap();
1211
1212            // Add operations with no commits
1213            for i in 0..10 {
1214                journal.append(&create_operation::<F>(i)).await.unwrap();
1215            }
1216
1217            // Rewind should go to pruning boundary (0 for unpruned)
1218            let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1219            assert_eq!(final_size, 0, "Should rewind to pruning boundary (0)");
1220            assert_eq!(journal.size().await, 0);
1221        }
1222
1223        // Test 4: Rewind with existing pruning boundary
1224        {
1225            let mut journal = ContiguousJournal::init(
1226                context.with_label("rewind_with_pruning"),
1227                journal_config("rewind-with-pruning", &context),
1228            )
1229            .await
1230            .unwrap();
1231
1232            // Add operations and a commit at position 10 (past first section boundary of 7)
1233            for i in 0..10 {
1234                journal.append(&create_operation::<F>(i)).await.unwrap();
1235            }
1236            journal
1237                .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1238                .await
1239                .unwrap(); // pos 10
1240            for i in 11..15 {
1241                journal.append(&create_operation::<F>(i)).await.unwrap();
1242            }
1243            journal.sync().await.unwrap();
1244
1245            // Prune up to position 8 (this will prune section 0, items 0-6, keeping 7+)
1246            journal.prune(8).await.unwrap();
1247            assert_eq!(journal.reader().await.bounds().start, 7);
1248
1249            // Add more uncommitted operations
1250            for i in 15..20 {
1251                journal.append(&create_operation::<F>(i)).await.unwrap();
1252            }
1253
1254            // Rewind should keep the commit at position 10
1255            let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1256            assert_eq!(final_size, 11);
1257
1258            // Verify commit is still there
1259            let op = journal.read(10).await.unwrap();
1260            assert!(op.is_commit());
1261        }
1262
1263        // Test 5: Rewind with no matches after pruning boundary
1264        {
1265            let mut journal = ContiguousJournal::init(
1266                context.with_label("rewind_no_match_pruned"),
1267                journal_config("rewind-no-match-pruned", &context),
1268            )
1269            .await
1270            .unwrap();
1271
1272            // Add operations with a commit at position 5 (in section 0: 0-6)
1273            for i in 0..5 {
1274                journal.append(&create_operation::<F>(i)).await.unwrap();
1275            }
1276            journal
1277                .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1278                .await
1279                .unwrap(); // pos 5
1280            for i in 6..10 {
1281                journal.append(&create_operation::<F>(i)).await.unwrap();
1282            }
1283            journal.sync().await.unwrap();
1284
1285            // Prune up to position 8 (this prunes section 0, including the commit at pos 5)
1286            // Pruning boundary will be at position 7 (start of section 1)
1287            journal.prune(8).await.unwrap();
1288            assert_eq!(journal.reader().await.bounds().start, 7);
1289
1290            // Add uncommitted operations with no commits (in section 1: 7-13)
1291            for i in 10..14 {
1292                journal.append(&create_operation::<F>(i)).await.unwrap();
1293            }
1294
1295            // Rewind with no matching commits after the pruning boundary
1296            // Should rewind to the pruning boundary at position 7
1297            let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1298            assert_eq!(final_size, 7);
1299        }
1300
1301        // Test 6: Empty journal
1302        {
1303            let mut journal = ContiguousJournal::init(
1304                context.with_label("rewind_empty"),
1305                journal_config("rewind-empty", &context),
1306            )
1307            .await
1308            .unwrap();
1309
1310            // Rewind empty journal should be no-op
1311            let final_size = journal
1312                .rewind_to(|op: &TestOp<F>| op.is_commit())
1313                .await
1314                .unwrap();
1315            assert_eq!(final_size, 0);
1316            assert_eq!(journal.size().await, 0);
1317        }
1318
1319        // Test 7: Position based authenticated journal rewind.
1320        {
1321            let merkle_cfg = merkle_config("rewind", &context);
1322            let journal_cfg = journal_config("rewind", &context);
1323            let mut journal =
1324                TestJournal::<F>::new(context, merkle_cfg, journal_cfg, |op| op.is_commit())
1325                    .await
1326                    .unwrap();
1327
1328            // Add operations with a commit at position 5 (in section 0: 0-6)
1329            for i in 0..5 {
1330                journal.append(&create_operation::<F>(i)).await.unwrap();
1331            }
1332            journal
1333                .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1334                .await
1335                .unwrap(); // pos 5
1336            for i in 6..10 {
1337                journal.append(&create_operation::<F>(i)).await.unwrap();
1338            }
1339            assert_eq!(journal.size().await, 10);
1340
1341            journal.rewind(2).await.unwrap();
1342            assert_eq!(journal.size().await, 2);
1343            assert_eq!(journal.merkle.leaves(), 2);
1344            assert_eq!(journal.merkle.size(), 3);
1345            let bounds = journal.reader().await.bounds();
1346            assert_eq!(bounds.start, 0);
1347            assert!(!bounds.is_empty());
1348
1349            assert!(matches!(
1350                journal.rewind(3).await,
1351                Err(JournalError::InvalidRewind(_))
1352            ));
1353
1354            journal.rewind(0).await.unwrap();
1355            assert_eq!(journal.size().await, 0);
1356            assert_eq!(journal.merkle.leaves(), 0);
1357            assert_eq!(journal.merkle.size(), 0);
1358            let bounds = journal.reader().await.bounds();
1359            assert_eq!(bounds.start, 0);
1360            assert!(bounds.is_empty());
1361
1362            // Test rewinding after pruning.
1363            for i in 0..255 {
1364                journal.append(&create_operation::<F>(i)).await.unwrap();
1365            }
1366            journal.prune(Location::<F>::new(100)).await.unwrap();
1367            assert_eq!(journal.reader().await.bounds().start, 98);
1368            let res = journal.rewind(97).await;
1369            assert!(matches!(res, Err(JournalError::InvalidRewind(97))));
1370            journal.rewind(98).await.unwrap();
1371            let bounds = journal.reader().await.bounds();
1372            assert_eq!(bounds.end, 98);
1373            assert_eq!(journal.merkle.leaves(), 98);
1374            assert_eq!(bounds.start, 98);
1375            assert!(bounds.is_empty());
1376        }
1377    }
1378
1379    #[test_traced("INFO")]
1380    fn test_rewind_mmr() {
1381        let executor = deterministic::Runner::default();
1382        executor.start(test_rewind_inner::<mmr::Family>);
1383    }
1384
1385    #[test_traced("INFO")]
1386    fn test_rewind_mmb() {
1387        let executor = deterministic::Runner::default();
1388        executor.start(test_rewind_inner::<mmb::Family>);
1389    }
1390
1391    /// Verify that append() increments the operation count, returns correct locations, and
1392    /// operations can be read back correctly.
1393    async fn test_apply_op_and_read_operations_inner<F: Family + PartialEq>(context: Context) {
1394        let mut journal = create_empty_journal::<F>(context, "apply_op").await;
1395
1396        assert_eq!(journal.size().await, 0);
1397
1398        // Add 50 operations
1399        let expected_ops: Vec<_> = (0..50).map(|i| create_operation::<F>(i as u8)).collect();
1400        for (i, op) in expected_ops.iter().enumerate() {
1401            let loc = journal.append(op).await.unwrap();
1402            assert_eq!(loc, Location::<F>::new(i as u64));
1403            assert_eq!(journal.size().await, (i + 1) as u64);
1404        }
1405
1406        assert_eq!(journal.size().await, 50);
1407
1408        // Verify all operations can be read back correctly
1409        journal.sync().await.unwrap();
1410        for (i, expected_op) in expected_ops.iter().enumerate() {
1411            let read_op = journal.read(Location::<F>::new(i as u64)).await.unwrap();
1412            assert_eq!(read_op, *expected_op);
1413        }
1414    }
1415
1416    #[test_traced("INFO")]
1417    fn test_apply_op_and_read_operations_mmr() {
1418        let executor = deterministic::Runner::default();
1419        executor.start(test_apply_op_and_read_operations_inner::<mmr::Family>);
1420    }
1421
1422    #[test_traced("INFO")]
1423    fn test_apply_op_and_read_operations_mmb() {
1424        let executor = deterministic::Runner::default();
1425        executor.start(test_apply_op_and_read_operations_inner::<mmb::Family>);
1426    }
1427
1428    /// Verify that read() returns correct operations at various positions.
1429    async fn test_read_operations_at_various_positions_inner<F: Family + PartialEq>(
1430        context: Context,
1431    ) {
1432        let journal = create_journal_with_ops::<F>(context, "read", 50).await;
1433
1434        // Verify reading first operation
1435        let first_op = journal.read(Location::<F>::new(0)).await.unwrap();
1436        assert_eq!(first_op, create_operation::<F>(0));
1437
1438        // Verify reading middle operation
1439        let middle_op = journal.read(Location::<F>::new(25)).await.unwrap();
1440        assert_eq!(middle_op, create_operation::<F>(25));
1441
1442        // Verify reading last operation
1443        let last_op = journal.read(Location::<F>::new(49)).await.unwrap();
1444        assert_eq!(last_op, create_operation::<F>(49));
1445
1446        // Verify all operations match expected values
1447        for i in 0..50 {
1448            let op = journal.read(Location::<F>::new(i)).await.unwrap();
1449            assert_eq!(op, create_operation::<F>(i as u8));
1450        }
1451    }
1452
1453    #[test_traced("INFO")]
1454    fn test_read_operations_at_various_positions_mmr() {
1455        let executor = deterministic::Runner::default();
1456        executor.start(|context| {
1457            test_read_operations_at_various_positions_inner::<mmr::Family>(context)
1458        });
1459    }
1460
1461    #[test_traced("INFO")]
1462    fn test_read_operations_at_various_positions_mmb() {
1463        let executor = deterministic::Runner::default();
1464        executor.start(|context| {
1465            test_read_operations_at_various_positions_inner::<mmb::Family>(context)
1466        });
1467    }
1468
1469    /// Verify that read() returns an error for pruned operations.
1470    async fn test_read_pruned_operation_returns_error_inner<F: Family + PartialEq>(
1471        context: Context,
1472    ) {
1473        let mut journal = create_journal_with_ops::<F>(context, "read_pruned", 100).await;
1474
1475        // Add commit and prune
1476        journal
1477            .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1478            .await
1479            .unwrap();
1480        journal.sync().await.unwrap();
1481        let pruned_boundary = journal.prune(Location::<F>::new(50)).await.unwrap();
1482
1483        // Try to read an operation before the pruned boundary
1484        let read_loc = Location::<F>::new(0);
1485        if read_loc < pruned_boundary {
1486            let result = journal.read(read_loc).await;
1487            assert!(matches!(
1488                result,
1489                Err(Error::Journal(crate::journal::Error::ItemPruned(_)))
1490            ));
1491        }
1492    }
1493
1494    #[test_traced("INFO")]
1495    fn test_read_pruned_operation_returns_error_mmr() {
1496        let executor = deterministic::Runner::default();
1497        executor.start(|context| {
1498            test_read_pruned_operation_returns_error_inner::<mmr::Family>(context)
1499        });
1500    }
1501
1502    #[test_traced("INFO")]
1503    fn test_read_pruned_operation_returns_error_mmb() {
1504        let executor = deterministic::Runner::default();
1505        executor.start(|context| {
1506            test_read_pruned_operation_returns_error_inner::<mmb::Family>(context)
1507        });
1508    }
1509
1510    /// Verify that read() returns an error for out-of-range locations.
1511    async fn test_read_out_of_range_returns_error_inner<F: Family + PartialEq>(context: Context) {
1512        let journal = create_journal_with_ops::<F>(context, "read_oob", 3).await;
1513
1514        // Try to read beyond the end
1515        let result = journal.read(Location::<F>::new(10)).await;
1516        assert!(matches!(
1517            result,
1518            Err(Error::Journal(crate::journal::Error::ItemOutOfRange(_)))
1519        ));
1520    }
1521
1522    #[test_traced("INFO")]
1523    fn test_read_out_of_range_returns_error_mmr() {
1524        let executor = deterministic::Runner::default();
1525        executor.start(test_read_out_of_range_returns_error_inner::<mmr::Family>);
1526    }
1527
1528    #[test_traced("INFO")]
1529    fn test_read_out_of_range_returns_error_mmb() {
1530        let executor = deterministic::Runner::default();
1531        executor.start(test_read_out_of_range_returns_error_inner::<mmb::Family>);
1532    }
1533
1534    /// Verify that we can read all operations back correctly.
1535    async fn test_read_all_operations_back_correctly_inner<F: Family + PartialEq>(
1536        context: Context,
1537    ) {
1538        let journal = create_journal_with_ops::<F>(context, "read_all", 50).await;
1539
1540        assert_eq!(journal.size().await, 50);
1541
1542        // Verify all operations can be read back and match expected values
1543        for i in 0..50 {
1544            let op = journal.read(Location::<F>::new(i)).await.unwrap();
1545            assert_eq!(op, create_operation::<F>(i as u8));
1546        }
1547    }
1548
1549    #[test_traced("INFO")]
1550    fn test_read_all_operations_back_correctly_mmr() {
1551        let executor = deterministic::Runner::default();
1552        executor.start(test_read_all_operations_back_correctly_inner::<mmr::Family>);
1553    }
1554
1555    #[test_traced("INFO")]
1556    fn test_read_all_operations_back_correctly_mmb() {
1557        let executor = deterministic::Runner::default();
1558        executor.start(test_read_all_operations_back_correctly_inner::<mmb::Family>);
1559    }
1560
1561    /// Verify that sync() persists operations.
1562    async fn test_sync_inner<F: Family + PartialEq>(context: Context) {
1563        let mut journal =
1564            create_empty_journal::<F>(context.with_label("first"), "close_pending").await;
1565
1566        // Add 20 operations
1567        let expected_ops: Vec<_> = (0..20).map(|i| create_operation::<F>(i as u8)).collect();
1568        for (i, op) in expected_ops.iter().enumerate() {
1569            let loc = journal.append(op).await.unwrap();
1570            assert_eq!(loc, Location::<F>::new(i as u64),);
1571        }
1572
1573        // Add commit operation to commit the operations
1574        let commit_loc = journal
1575            .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1576            .await
1577            .unwrap();
1578        assert_eq!(
1579            commit_loc,
1580            Location::<F>::new(20),
1581            "commit should be at location 20"
1582        );
1583        journal.sync().await.unwrap();
1584
1585        // Reopen and verify the operations persisted
1586        drop(journal);
1587        let journal =
1588            create_empty_journal::<F>(context.with_label("second"), "close_pending").await;
1589        assert_eq!(journal.size().await, 21);
1590
1591        // Verify all operations can be read back
1592        for (i, expected_op) in expected_ops.iter().enumerate() {
1593            let read_op = journal.read(Location::<F>::new(i as u64)).await.unwrap();
1594            assert_eq!(read_op, *expected_op);
1595        }
1596    }
1597
1598    #[test_traced("INFO")]
1599    fn test_sync_mmr() {
1600        let executor = deterministic::Runner::default();
1601        executor.start(test_sync_inner::<mmr::Family>);
1602    }
1603
1604    #[test_traced("INFO")]
1605    fn test_sync_mmb() {
1606        let executor = deterministic::Runner::default();
1607        executor.start(test_sync_inner::<mmb::Family>);
1608    }
1609
1610    /// Verify that pruning an empty journal returns the boundary.
1611    async fn test_prune_empty_journal_inner<F: Family + PartialEq>(context: Context) {
1612        let mut journal = create_empty_journal::<F>(context, "prune_empty").await;
1613
1614        let boundary = journal.prune(Location::<F>::new(0)).await.unwrap();
1615
1616        assert_eq!(boundary, Location::<F>::new(0));
1617    }
1618
1619    #[test_traced("INFO")]
1620    fn test_prune_empty_journal_mmr() {
1621        let executor = deterministic::Runner::default();
1622        executor.start(test_prune_empty_journal_inner::<mmr::Family>);
1623    }
1624
1625    #[test_traced("INFO")]
1626    fn test_prune_empty_journal_mmb() {
1627        let executor = deterministic::Runner::default();
1628        executor.start(test_prune_empty_journal_inner::<mmb::Family>);
1629    }
1630
1631    /// Verify that pruning to a specific location works correctly.
1632    async fn test_prune_to_location_inner<F: Family + PartialEq>(context: Context) {
1633        let mut journal = create_journal_with_ops::<F>(context, "prune_to", 100).await;
1634
1635        // Add commit at position 50
1636        journal
1637            .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1638            .await
1639            .unwrap();
1640        journal.sync().await.unwrap();
1641
1642        let boundary = journal.prune(Location::<F>::new(50)).await.unwrap();
1643
1644        // Boundary should be <= requested location (may align to section boundary)
1645        assert!(boundary <= Location::<F>::new(50));
1646    }
1647
1648    #[test_traced("INFO")]
1649    fn test_prune_to_location_mmr() {
1650        let executor = deterministic::Runner::default();
1651        executor.start(test_prune_to_location_inner::<mmr::Family>);
1652    }
1653
1654    #[test_traced("INFO")]
1655    fn test_prune_to_location_mmb() {
1656        let executor = deterministic::Runner::default();
1657        executor.start(test_prune_to_location_inner::<mmb::Family>);
1658    }
1659
1660    /// Verify that prune() returns the actual boundary (which may differ from requested).
1661    async fn test_prune_returns_actual_boundary_inner<F: Family + PartialEq>(context: Context) {
1662        let mut journal = create_journal_with_ops::<F>(context, "prune_boundary", 100).await;
1663
1664        journal
1665            .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1666            .await
1667            .unwrap();
1668        journal.sync().await.unwrap();
1669
1670        let requested = Location::<F>::new(50);
1671        let actual = journal.prune(requested).await.unwrap();
1672
1673        // Actual boundary should match bounds.start
1674        let bounds = journal.reader().await.bounds();
1675        assert!(!bounds.is_empty());
1676        assert_eq!(actual, bounds.start);
1677
1678        // Actual may be <= requested due to section alignment
1679        assert!(actual <= requested);
1680    }
1681
1682    #[test_traced("INFO")]
1683    fn test_prune_returns_actual_boundary_mmr() {
1684        let executor = deterministic::Runner::default();
1685        executor.start(test_prune_returns_actual_boundary_inner::<mmr::Family>);
1686    }
1687
1688    #[test_traced("INFO")]
1689    fn test_prune_returns_actual_boundary_mmb() {
1690        let executor = deterministic::Runner::default();
1691        executor.start(test_prune_returns_actual_boundary_inner::<mmb::Family>);
1692    }
1693
1694    /// Verify that pruning doesn't change the operation count.
1695    async fn test_prune_preserves_operation_count_inner<F: Family + PartialEq>(context: Context) {
1696        let mut journal = create_journal_with_ops::<F>(context, "prune_count", 100).await;
1697
1698        journal
1699            .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1700            .await
1701            .unwrap();
1702        journal.sync().await.unwrap();
1703
1704        let count_before = journal.size().await;
1705        journal.prune(Location::<F>::new(50)).await.unwrap();
1706        let count_after = journal.size().await;
1707
1708        assert_eq!(count_before, count_after);
1709    }
1710
1711    #[test_traced("INFO")]
1712    fn test_prune_preserves_operation_count_mmr() {
1713        let executor = deterministic::Runner::default();
1714        executor.start(test_prune_preserves_operation_count_inner::<mmr::Family>);
1715    }
1716
1717    #[test_traced("INFO")]
1718    fn test_prune_preserves_operation_count_mmb() {
1719        let executor = deterministic::Runner::default();
1720        executor.start(test_prune_preserves_operation_count_inner::<mmb::Family>);
1721    }
1722
1723    /// Verify bounds() for empty journal, no pruning, and after pruning.
1724    async fn test_bounds_empty_and_pruned_inner<F: Family + PartialEq>(context: Context) {
1725        // Test empty journal
1726        let journal = create_empty_journal::<F>(context.with_label("empty"), "oldest").await;
1727        assert!(journal.reader().await.bounds().is_empty());
1728        journal.destroy().await.unwrap();
1729
1730        // Test no pruning
1731        let journal =
1732            create_journal_with_ops::<F>(context.with_label("no_prune"), "oldest", 100).await;
1733        let bounds = journal.reader().await.bounds();
1734        assert!(!bounds.is_empty());
1735        assert_eq!(bounds.start, 0);
1736        journal.destroy().await.unwrap();
1737
1738        // Test after pruning
1739        let mut journal =
1740            create_journal_with_ops::<F>(context.with_label("pruned"), "oldest", 100).await;
1741        journal
1742            .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1743            .await
1744            .unwrap();
1745        journal.sync().await.unwrap();
1746
1747        let pruned_boundary = journal.prune(Location::<F>::new(50)).await.unwrap();
1748
1749        // Should match the pruned boundary (may be <= 50 due to section alignment)
1750        let bounds = journal.reader().await.bounds();
1751        assert!(!bounds.is_empty());
1752        assert_eq!(bounds.start, pruned_boundary);
1753        // Should be <= requested location (50)
1754        assert!(pruned_boundary <= 50);
1755        journal.destroy().await.unwrap();
1756    }
1757
1758    #[test_traced("INFO")]
1759    fn test_bounds_empty_and_pruned_mmr() {
1760        let executor = deterministic::Runner::default();
1761        executor.start(test_bounds_empty_and_pruned_inner::<mmr::Family>);
1762    }
1763
1764    #[test_traced("INFO")]
1765    fn test_bounds_empty_and_pruned_mmb() {
1766        let executor = deterministic::Runner::default();
1767        executor.start(test_bounds_empty_and_pruned_inner::<mmb::Family>);
1768    }
1769
1770    /// Verify bounds().start for empty journal, no pruning, and after pruning.
1771    async fn test_bounds_start_after_prune_inner<F: Family + PartialEq>(context: Context) {
1772        // Test empty journal
1773        let journal = create_empty_journal::<F>(context.with_label("empty"), "boundary").await;
1774        assert_eq!(journal.reader().await.bounds().start, 0);
1775
1776        // Test no pruning
1777        let journal =
1778            create_journal_with_ops::<F>(context.with_label("no_prune"), "boundary", 100).await;
1779        assert_eq!(journal.reader().await.bounds().start, 0);
1780
1781        // Test after pruning
1782        let mut journal =
1783            create_journal_with_ops::<F>(context.with_label("pruned"), "boundary", 100).await;
1784        journal
1785            .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1786            .await
1787            .unwrap();
1788        journal.sync().await.unwrap();
1789
1790        let pruned_boundary = journal.prune(Location::<F>::new(50)).await.unwrap();
1791
1792        assert_eq!(journal.reader().await.bounds().start, pruned_boundary);
1793    }
1794
1795    #[test_traced("INFO")]
1796    fn test_bounds_start_after_prune_mmr() {
1797        let executor = deterministic::Runner::default();
1798        executor.start(test_bounds_start_after_prune_inner::<mmr::Family>);
1799    }
1800
1801    #[test_traced("INFO")]
1802    fn test_bounds_start_after_prune_mmb() {
1803        let executor = deterministic::Runner::default();
1804        executor.start(test_bounds_start_after_prune_inner::<mmb::Family>);
1805    }
1806
1807    /// Verify that Merkle prunes to the journal's actual boundary, not the requested location.
1808    async fn test_mmr_prunes_to_journal_boundary_inner<F: Family + PartialEq>(context: Context) {
1809        let mut journal = create_journal_with_ops::<F>(context, "mmr_boundary", 50).await;
1810
1811        journal
1812            .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(25)))
1813            .await
1814            .unwrap();
1815        journal.sync().await.unwrap();
1816
1817        let pruned_boundary = journal.prune(Location::<F>::new(25)).await.unwrap();
1818
1819        // Verify Merkle and journal remain in sync
1820        let bounds = journal.reader().await.bounds();
1821        assert!(!bounds.is_empty());
1822        assert_eq!(pruned_boundary, bounds.start);
1823
1824        // Verify boundary is at or before requested (due to section alignment)
1825        assert!(pruned_boundary <= Location::<F>::new(25));
1826
1827        // Verify operation count is unchanged
1828        assert_eq!(journal.size().await, 51);
1829    }
1830
1831    #[test_traced("INFO")]
1832    fn test_mmr_prunes_to_journal_boundary_mmr() {
1833        let executor = deterministic::Runner::default();
1834        executor.start(test_mmr_prunes_to_journal_boundary_inner::<mmr::Family>);
1835    }
1836
1837    #[test_traced("INFO")]
1838    fn test_mmr_prunes_to_journal_boundary_mmb() {
1839        let executor = deterministic::Runner::default();
1840        executor.start(test_mmr_prunes_to_journal_boundary_inner::<mmb::Family>);
1841    }
1842
1843    /// Verify proof() for multiple operations.
1844    async fn test_proof_multiple_operations_inner<F: Family + PartialEq>(context: Context) {
1845        let journal = create_journal_with_ops::<F>(context, "proof_multi", 50).await;
1846
1847        let (proof, ops) = journal
1848            .proof(Location::<F>::new(0), NZU64!(50))
1849            .await
1850            .unwrap();
1851
1852        assert_eq!(ops.len(), 50);
1853        for (i, op) in ops.iter().enumerate() {
1854            assert_eq!(*op, create_operation::<F>(i as u8));
1855        }
1856
1857        // Verify the proof is valid
1858        let hasher = StandardHasher::new();
1859        let root = journal.root();
1860        assert!(verify_proof(
1861            &proof,
1862            &ops,
1863            Location::<F>::new(0),
1864            &root,
1865            &hasher
1866        ));
1867    }
1868
1869    #[test_traced("INFO")]
1870    fn test_proof_multiple_operations_mmr() {
1871        let executor = deterministic::Runner::default();
1872        executor.start(test_proof_multiple_operations_inner::<mmr::Family>);
1873    }
1874
1875    #[test_traced("INFO")]
1876    fn test_proof_multiple_operations_mmb() {
1877        let executor = deterministic::Runner::default();
1878        executor.start(test_proof_multiple_operations_inner::<mmb::Family>);
1879    }
1880
1881    /// Verify that historical_proof() respects the max_ops limit.
1882    async fn test_historical_proof_limited_by_max_ops_inner<F: Family + PartialEq>(
1883        context: Context,
1884    ) {
1885        let journal = create_journal_with_ops::<F>(context, "proof_limit", 50).await;
1886
1887        let size = journal.size().await;
1888        let (proof, ops) = journal
1889            .historical_proof(size, Location::<F>::new(0), NZU64!(20))
1890            .await
1891            .unwrap();
1892
1893        // Should return only 20 operations despite 50 being available
1894        assert_eq!(ops.len(), 20);
1895        for (i, op) in ops.iter().enumerate() {
1896            assert_eq!(*op, create_operation::<F>(i as u8));
1897        }
1898
1899        // Verify the proof is valid
1900        let hasher = StandardHasher::new();
1901        let root = journal.root();
1902        assert!(verify_proof(
1903            &proof,
1904            &ops,
1905            Location::<F>::new(0),
1906            &root,
1907            &hasher
1908        ));
1909    }
1910
1911    #[test_traced("INFO")]
1912    fn test_historical_proof_limited_by_max_ops_mmr() {
1913        let executor = deterministic::Runner::default();
1914        executor.start(|context| {
1915            test_historical_proof_limited_by_max_ops_inner::<mmr::Family>(context)
1916        });
1917    }
1918
1919    #[test_traced("INFO")]
1920    fn test_historical_proof_limited_by_max_ops_mmb() {
1921        let executor = deterministic::Runner::default();
1922        executor.start(|context| {
1923            test_historical_proof_limited_by_max_ops_inner::<mmb::Family>(context)
1924        });
1925    }
1926
1927    /// Verify historical_proof() at the end of the journal.
1928    async fn test_historical_proof_at_end_of_journal_inner<F: Family + PartialEq>(
1929        context: Context,
1930    ) {
1931        let journal = create_journal_with_ops::<F>(context, "proof_end", 50).await;
1932
1933        let size = journal.size().await;
1934        // Request proof starting near the end
1935        let (proof, ops) = journal
1936            .historical_proof(size, Location::<F>::new(40), NZU64!(20))
1937            .await
1938            .unwrap();
1939
1940        // Should return only 10 operations (positions 40-49)
1941        assert_eq!(ops.len(), 10);
1942        for (i, op) in ops.iter().enumerate() {
1943            assert_eq!(*op, create_operation::<F>((40 + i) as u8));
1944        }
1945
1946        // Verify the proof is valid
1947        let hasher = StandardHasher::new();
1948        let root = journal.root();
1949        assert!(verify_proof(
1950            &proof,
1951            &ops,
1952            Location::<F>::new(40),
1953            &root,
1954            &hasher
1955        ));
1956    }
1957
1958    #[test_traced("INFO")]
1959    fn test_historical_proof_at_end_of_journal_mmr() {
1960        let executor = deterministic::Runner::default();
1961        executor.start(test_historical_proof_at_end_of_journal_inner::<mmr::Family>);
1962    }
1963
1964    #[test_traced("INFO")]
1965    fn test_historical_proof_at_end_of_journal_mmb() {
1966        let executor = deterministic::Runner::default();
1967        executor.start(test_historical_proof_at_end_of_journal_inner::<mmb::Family>);
1968    }
1969
1970    /// Verify that historical_proof() returns an error for invalid size.
1971    async fn test_historical_proof_out_of_range_returns_error_inner<F: Family + PartialEq>(
1972        context: Context,
1973    ) {
1974        let journal = create_journal_with_ops::<F>(context, "proof_oob", 5).await;
1975
1976        // Request proof with size > actual journal size
1977        let result = journal
1978            .historical_proof(Location::<F>::new(10), Location::<F>::new(0), NZU64!(1))
1979            .await;
1980
1981        assert!(matches!(
1982            result,
1983            Err(Error::Merkle(merkle::Error::RangeOutOfBounds(_)))
1984        ));
1985    }
1986
1987    #[test_traced("INFO")]
1988    fn test_historical_proof_out_of_range_returns_error_mmr() {
1989        let executor = deterministic::Runner::default();
1990        executor.start(|context| {
1991            test_historical_proof_out_of_range_returns_error_inner::<mmr::Family>(context)
1992        });
1993    }
1994
1995    #[test_traced("INFO")]
1996    fn test_historical_proof_out_of_range_returns_error_mmb() {
1997        let executor = deterministic::Runner::default();
1998        executor.start(|context| {
1999            test_historical_proof_out_of_range_returns_error_inner::<mmb::Family>(context)
2000        });
2001    }
2002
2003    /// Verify that historical_proof() returns an error when start_loc >= size.
2004    async fn test_historical_proof_start_too_large_returns_error_inner<F: Family + PartialEq>(
2005        context: Context,
2006    ) {
2007        let journal = create_journal_with_ops::<F>(context, "proof_start_oob", 5).await;
2008
2009        let size = journal.size().await;
2010        // Request proof starting at size (should fail)
2011        let result = journal.historical_proof(size, size, NZU64!(1)).await;
2012
2013        assert!(matches!(
2014            result,
2015            Err(Error::Merkle(merkle::Error::RangeOutOfBounds(_)))
2016        ));
2017    }
2018
2019    #[test_traced("INFO")]
2020    fn test_historical_proof_start_too_large_returns_error_mmr() {
2021        let executor = deterministic::Runner::default();
2022        executor.start(|context| {
2023            test_historical_proof_start_too_large_returns_error_inner::<mmr::Family>(context)
2024        });
2025    }
2026
2027    #[test_traced("INFO")]
2028    fn test_historical_proof_start_too_large_returns_error_mmb() {
2029        let executor = deterministic::Runner::default();
2030        executor.start(|context| {
2031            test_historical_proof_start_too_large_returns_error_inner::<mmb::Family>(context)
2032        });
2033    }
2034
2035    /// Verify historical_proof() for a truly historical state (before more operations added).
2036    async fn test_historical_proof_truly_historical_inner<F: Family + PartialEq>(context: Context) {
2037        // Create journal with initial operations
2038        let mut journal = create_journal_with_ops::<F>(context, "proof_historical", 50).await;
2039
2040        // Capture root at historical state
2041        let hasher = StandardHasher::new();
2042        let historical_root = journal.root();
2043        let historical_size = journal.size().await;
2044
2045        // Add more operations after the historical state
2046        for i in 50..100 {
2047            journal
2048                .append(&create_operation::<F>(i as u8))
2049                .await
2050                .unwrap();
2051        }
2052        journal.sync().await.unwrap();
2053
2054        // Generate proof for the historical state
2055        let (proof, ops) = journal
2056            .historical_proof(historical_size, Location::<F>::new(0), NZU64!(50))
2057            .await
2058            .unwrap();
2059
2060        // Verify operations match expected historical operations
2061        assert_eq!(ops.len(), 50);
2062        for (i, op) in ops.iter().enumerate() {
2063            assert_eq!(*op, create_operation::<F>(i as u8));
2064        }
2065
2066        // Verify the proof is valid against the historical root
2067        assert!(verify_proof(
2068            &proof,
2069            &ops,
2070            Location::<F>::new(0),
2071            &historical_root,
2072            &hasher
2073        ));
2074    }
2075
2076    #[test_traced("INFO")]
2077    fn test_historical_proof_truly_historical_mmr() {
2078        let executor = deterministic::Runner::default();
2079        executor.start(test_historical_proof_truly_historical_inner::<mmr::Family>);
2080    }
2081
2082    #[test_traced("INFO")]
2083    fn test_historical_proof_truly_historical_mmb() {
2084        let executor = deterministic::Runner::default();
2085        executor.start(test_historical_proof_truly_historical_inner::<mmb::Family>);
2086    }
2087
2088    /// Verify that historical_proof() returns an error when start_loc is pruned.
2089    async fn test_historical_proof_pruned_location_returns_error_inner<F: Family + PartialEq>(
2090        context: Context,
2091    ) {
2092        let mut journal = create_journal_with_ops::<F>(context, "proof_pruned", 50).await;
2093
2094        journal
2095            .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(25)))
2096            .await
2097            .unwrap();
2098        journal.sync().await.unwrap();
2099        let pruned_boundary = journal.prune(Location::<F>::new(25)).await.unwrap();
2100
2101        // Try to get proof starting at a location before the pruned boundary
2102        let size = journal.size().await;
2103        let start_loc = Location::<F>::new(0);
2104        if start_loc < pruned_boundary {
2105            let result = journal.historical_proof(size, start_loc, NZU64!(1)).await;
2106
2107            // Should fail when trying to read pruned operations
2108            assert!(result.is_err());
2109        }
2110    }
2111
2112    #[test_traced("INFO")]
2113    fn test_historical_proof_pruned_location_returns_error_mmr() {
2114        let executor = deterministic::Runner::default();
2115        executor.start(|context| {
2116            test_historical_proof_pruned_location_returns_error_inner::<mmr::Family>(context)
2117        });
2118    }
2119
2120    #[test_traced("INFO")]
2121    fn test_historical_proof_pruned_location_returns_error_mmb() {
2122        let executor = deterministic::Runner::default();
2123        executor.start(|context| {
2124            test_historical_proof_pruned_location_returns_error_inner::<mmb::Family>(context)
2125        });
2126    }
2127
2128    /// Verify replay() with empty journal and multiple operations.
2129    async fn test_replay_operations_inner<F: Family + PartialEq>(context: Context) {
2130        // Test empty journal
2131        let journal = create_empty_journal::<F>(context.with_label("empty"), "replay").await;
2132        let reader = journal.reader().await;
2133        let stream = reader.replay(NZUsize!(10), 0).await.unwrap();
2134        futures::pin_mut!(stream);
2135        assert!(stream.next().await.is_none());
2136
2137        // Test replaying all operations
2138        let journal =
2139            create_journal_with_ops::<F>(context.with_label("with_ops"), "replay", 50).await;
2140        let reader = journal.reader().await;
2141        let stream = reader.replay(NZUsize!(100), 0).await.unwrap();
2142        futures::pin_mut!(stream);
2143
2144        for i in 0..50 {
2145            let (pos, op) = stream.next().await.unwrap().unwrap();
2146            assert_eq!(pos, i);
2147            assert_eq!(op, create_operation::<F>(i as u8));
2148        }
2149
2150        assert!(stream.next().await.is_none());
2151    }
2152
2153    #[test_traced("INFO")]
2154    fn test_replay_operations_mmr() {
2155        let executor = deterministic::Runner::default();
2156        executor.start(test_replay_operations_inner::<mmr::Family>);
2157    }
2158
2159    #[test_traced("INFO")]
2160    fn test_replay_operations_mmb() {
2161        let executor = deterministic::Runner::default();
2162        executor.start(test_replay_operations_inner::<mmb::Family>);
2163    }
2164
2165    /// Verify replay() starting from a middle location.
2166    async fn test_replay_from_middle_inner<F: Family + PartialEq>(context: Context) {
2167        let journal = create_journal_with_ops::<F>(context, "replay_middle", 50).await;
2168        let reader = journal.reader().await;
2169        let stream = reader.replay(NZUsize!(100), 25).await.unwrap();
2170        futures::pin_mut!(stream);
2171
2172        let mut count = 0;
2173        while let Some(result) = stream.next().await {
2174            let (pos, op) = result.unwrap();
2175            assert_eq!(pos, 25 + count);
2176            assert_eq!(op, create_operation::<F>((25 + count) as u8));
2177            count += 1;
2178        }
2179
2180        // Should have replayed positions 25-49 (25 operations)
2181        assert_eq!(count, 25);
2182    }
2183
2184    #[test_traced("INFO")]
2185    fn test_replay_from_middle_mmr() {
2186        let executor = deterministic::Runner::default();
2187        executor.start(test_replay_from_middle_inner::<mmr::Family>);
2188    }
2189
2190    #[test_traced("INFO")]
2191    fn test_replay_from_middle_mmb() {
2192        let executor = deterministic::Runner::default();
2193        executor.start(test_replay_from_middle_inner::<mmb::Family>);
2194    }
2195
2196    /// Verify the speculative batch API: fork two batches, verify independent roots, apply one.
2197    async fn test_speculative_batch_inner<F: Family + PartialEq>(context: Context) {
2198        let mut journal = create_journal_with_ops::<F>(context, "speculative_batch", 10).await;
2199        let original_root = journal.root();
2200
2201        // Fork two independent speculative batches.
2202        let b1 = journal.new_batch();
2203        let b2 = journal.new_batch();
2204
2205        // Add different items to each batch.
2206        let op_a = create_operation::<F>(100);
2207        let op_b = create_operation::<F>(200);
2208        let b1 = b1.add(op_a.clone());
2209        let b2 = b2.add(op_b);
2210
2211        // Merkleize and verify independent roots.
2212        let m1 = journal.merkle.with_mem(|mem| b1.merkleize(mem));
2213        let m2 = journal.merkle.with_mem(|mem| b2.merkleize(mem));
2214        assert_ne!(m1.root(), m2.root());
2215        assert_ne!(m1.root(), original_root);
2216        assert_ne!(m2.root(), original_root);
2217
2218        // Journal root should be unchanged (batches are speculative).
2219        assert_eq!(journal.root(), original_root);
2220
2221        // Apply batch 1.
2222        let expected_root = m1.root();
2223        journal.apply_batch(&m1).await.unwrap();
2224
2225        // Journal should now match the applied batch's root.
2226        assert_eq!(journal.root(), expected_root);
2227        assert_eq!(*journal.size().await, 11);
2228    }
2229
2230    #[test_traced("INFO")]
2231    fn test_speculative_batch_mmr() {
2232        let executor = deterministic::Runner::default();
2233        executor.start(test_speculative_batch_inner::<mmr::Family>);
2234    }
2235
2236    #[test_traced("INFO")]
2237    fn test_speculative_batch_mmb() {
2238        let executor = deterministic::Runner::default();
2239        executor.start(test_speculative_batch_inner::<mmb::Family>);
2240    }
2241
2242    /// Verify stacking: create batch A, merkleize, create batch B from merkleized A,
2243    /// merkleize, and apply. Verify root and items.
2244    async fn test_speculative_batch_stacking_inner<F: Family + PartialEq>(context: Context) {
2245        let mut journal = create_journal_with_ops::<F>(context, "batch_stacking", 10).await;
2246
2247        let op_a = create_operation::<F>(100);
2248        let op_b = create_operation::<F>(200);
2249
2250        let merkleized_b = {
2251            let batch_a = journal.new_batch().add(op_a.clone());
2252            let merkleized_a = journal.merkle.with_mem(|mem| batch_a.merkleize(mem));
2253
2254            let batch_b = merkleized_a.new_batch::<Sha256>().add(op_b.clone());
2255            journal.merkle.with_mem(|mem| batch_b.merkleize(mem))
2256        };
2257
2258        let expected_root = merkleized_b.root();
2259        journal.apply_batch(&merkleized_b).await.unwrap();
2260
2261        assert_eq!(journal.root(), expected_root);
2262        assert_eq!(*journal.size().await, 12);
2263
2264        // Verify both items were appended correctly.
2265        let read_a = journal.read(Location::<F>::new(10)).await.unwrap();
2266        assert_eq!(read_a, op_a);
2267        let read_b = journal.read(Location::<F>::new(11)).await.unwrap();
2268        assert_eq!(read_b, op_b);
2269    }
2270
2271    #[test_traced("INFO")]
2272    fn test_speculative_batch_stacking_mmr() {
2273        let executor = deterministic::Runner::default();
2274        executor.start(test_speculative_batch_stacking_inner::<mmr::Family>);
2275    }
2276
2277    #[test_traced("INFO")]
2278    fn test_speculative_batch_stacking_mmb() {
2279        let executor = deterministic::Runner::default();
2280        executor.start(test_speculative_batch_stacking_inner::<mmb::Family>);
2281    }
2282
2283    /// Verify sequential batch application: apply batch A, then build and apply batch B
2284    /// from the committed state. Verify root and items.
2285    async fn test_speculative_batch_sequential_inner<F: Family + PartialEq>(context: Context) {
2286        let mut journal = create_journal_with_ops::<F>(context, "batch_sequential", 10).await;
2287
2288        let op_a = create_operation::<F>(100);
2289        let op_b = create_operation::<F>(200);
2290
2291        // Apply batch A.
2292        let batch_a = journal.new_batch().add(op_a.clone());
2293        let merkleized_a = journal.merkle.with_mem(|mem| batch_a.merkleize(mem));
2294        journal.apply_batch(&merkleized_a).await.unwrap();
2295        assert_eq!(*journal.size().await, 11);
2296
2297        // Apply batch B (built on top of the committed A).
2298        let batch_b = journal.new_batch().add(op_b.clone());
2299        let merkleized_b = journal.merkle.with_mem(|mem| batch_b.merkleize(mem));
2300        let expected_root = merkleized_b.root();
2301        journal.apply_batch(&merkleized_b).await.unwrap();
2302
2303        assert_eq!(journal.root(), expected_root);
2304        assert_eq!(*journal.size().await, 12);
2305
2306        // Verify both items were appended correctly.
2307        let read_a = journal.read(Location::<F>::new(10)).await.unwrap();
2308        assert_eq!(read_a, op_a);
2309        let read_b = journal.read(Location::<F>::new(11)).await.unwrap();
2310        assert_eq!(read_b, op_b);
2311    }
2312
2313    #[test_traced("INFO")]
2314    fn test_speculative_batch_sequential_mmr() {
2315        let executor = deterministic::Runner::default();
2316        executor.start(test_speculative_batch_sequential_inner::<mmr::Family>);
2317    }
2318
2319    #[test_traced("INFO")]
2320    fn test_speculative_batch_sequential_mmb() {
2321        let executor = deterministic::Runner::default();
2322        executor.start(test_speculative_batch_sequential_inner::<mmb::Family>);
2323    }
2324
2325    async fn test_stale_batch_sibling_inner<F: Family + PartialEq>(context: Context) {
2326        let mut journal = create_empty_journal::<F>(context, "stale-sibling").await;
2327        let op_a = create_operation::<F>(1);
2328        let op_b = create_operation::<F>(2);
2329
2330        // Create two batches from the same base.
2331        let batch_a = journal.new_batch().add(op_a.clone());
2332        let merkleized_a = journal.merkle.with_mem(|mem| batch_a.merkleize(mem));
2333        let batch_b = journal.new_batch().add(op_b);
2334        let merkleized_b = journal.merkle.with_mem(|mem| batch_b.merkleize(mem));
2335
2336        // Apply A -- should succeed.
2337        journal.apply_batch(&merkleized_a).await.unwrap();
2338        let expected_root = journal.root();
2339        let expected_size = journal.size().await;
2340
2341        // Apply B -- should fail (stale).
2342        let result = journal.apply_batch(&merkleized_b).await;
2343        assert!(
2344            matches!(
2345                result,
2346                Err(super::Error::Merkle(merkle::Error::StaleBatch { .. }))
2347            ),
2348            "expected StaleBatch, got {result:?}"
2349        );
2350
2351        // The stale batch must not mutate the journal or desync it from the Merkle.
2352        assert_eq!(journal.root(), expected_root);
2353        assert_eq!(journal.size().await, expected_size);
2354        let (_, ops) = journal
2355            .proof(Location::<F>::new(0), NZU64!(1))
2356            .await
2357            .unwrap();
2358        assert_eq!(ops, vec![op_a]);
2359    }
2360
2361    #[test_traced("INFO")]
2362    fn test_stale_batch_sibling_mmr() {
2363        let executor = deterministic::Runner::default();
2364        executor.start(test_stale_batch_sibling_inner::<mmr::Family>);
2365    }
2366
2367    #[test_traced("INFO")]
2368    fn test_stale_batch_sibling_mmb() {
2369        let executor = deterministic::Runner::default();
2370        executor.start(test_stale_batch_sibling_inner::<mmb::Family>);
2371    }
2372
2373    async fn test_stale_batch_chained_inner<F: Family + PartialEq>(context: Context) {
2374        let mut journal = create_journal_with_ops::<F>(context, "stale-chained", 5).await;
2375
2376        // Parent batch, then fork two children.
2377        let parent_batch = journal.new_batch().add(create_operation::<F>(10));
2378        let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem));
2379        let batch_a = parent.new_batch::<Sha256>().add(create_operation::<F>(20));
2380        let child_a = journal.merkle.with_mem(|mem| batch_a.merkleize(mem));
2381        let batch_b = parent.new_batch::<Sha256>().add(create_operation::<F>(30));
2382        let child_b = journal.merkle.with_mem(|mem| batch_b.merkleize(mem));
2383        drop(parent);
2384
2385        // Apply child_a, then child_b should be stale.
2386        journal.apply_batch(&child_a).await.unwrap();
2387        let result = journal.apply_batch(&child_b).await;
2388        assert!(
2389            matches!(
2390                result,
2391                Err(super::Error::Merkle(merkle::Error::StaleBatch { .. }))
2392            ),
2393            "expected StaleBatch for sibling, got {result:?}"
2394        );
2395    }
2396
2397    #[test_traced("INFO")]
2398    fn test_stale_batch_chained_mmr() {
2399        let executor = deterministic::Runner::default();
2400        executor.start(test_stale_batch_chained_inner::<mmr::Family>);
2401    }
2402
2403    #[test_traced("INFO")]
2404    fn test_stale_batch_chained_mmb() {
2405        let executor = deterministic::Runner::default();
2406        executor.start(test_stale_batch_chained_inner::<mmb::Family>);
2407    }
2408
2409    async fn test_stale_batch_parent_before_child_inner<F: Family + PartialEq>(context: Context) {
2410        let mut journal = create_empty_journal::<F>(context, "stale-parent-first").await;
2411
2412        // Create parent, then child.
2413        let parent_batch = journal.new_batch().add(create_operation::<F>(1));
2414        let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem));
2415        let child_batch = parent.new_batch::<Sha256>().add(create_operation::<F>(2));
2416        let child = journal.merkle.with_mem(|mem| child_batch.merkleize(mem));
2417
2418        let expected_root = child.root();
2419
2420        // Apply parent, then child (sequential commit).
2421        journal.apply_batch(&parent).await.unwrap();
2422        journal.apply_batch(&child).await.unwrap();
2423
2424        assert_eq!(journal.root(), expected_root);
2425        assert_eq!(*journal.size().await, 2);
2426    }
2427
2428    #[test_traced("INFO")]
2429    fn test_stale_batch_parent_before_child_mmr() {
2430        let executor = deterministic::Runner::default();
2431        executor.start(test_stale_batch_parent_before_child_inner::<mmr::Family>);
2432    }
2433
2434    #[test_traced("INFO")]
2435    fn test_stale_batch_parent_before_child_mmb() {
2436        let executor = deterministic::Runner::default();
2437        executor.start(test_stale_batch_parent_before_child_inner::<mmb::Family>);
2438    }
2439
2440    async fn test_stale_batch_child_before_parent_inner<F: Family + PartialEq>(context: Context) {
2441        let mut journal = create_empty_journal::<F>(context, "stale-child-first").await;
2442
2443        // Create parent, then child.
2444        let parent_batch = journal.new_batch().add(create_operation::<F>(1));
2445        let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem));
2446        let child_batch = parent.new_batch::<Sha256>().add(create_operation::<F>(2));
2447        let child = journal.merkle.with_mem(|mem| child_batch.merkleize(mem));
2448
2449        // Apply child first (full chain) -- parent should now be stale.
2450        journal.apply_batch(&child).await.unwrap();
2451        let result = journal.apply_batch(&parent).await;
2452        assert!(
2453            matches!(
2454                result,
2455                Err(super::Error::Merkle(merkle::Error::StaleBatch { .. }))
2456            ),
2457            "expected StaleBatch for parent after child applied, got {result:?}"
2458        );
2459    }
2460
2461    #[test_traced("INFO")]
2462    fn test_stale_batch_child_before_parent_mmr() {
2463        let executor = deterministic::Runner::default();
2464        executor.start(test_stale_batch_child_before_parent_inner::<mmr::Family>);
2465    }
2466
2467    #[test_traced("INFO")]
2468    fn test_stale_batch_child_before_parent_mmb() {
2469        let executor = deterministic::Runner::default();
2470        executor.start(test_stale_batch_child_before_parent_inner::<mmb::Family>);
2471    }
2472
2473    /// Apply parent then child: child skips already-committed ancestor items.
2474    async fn test_apply_batch_skip_ancestor_items_inner<F: Family + PartialEq>(context: Context) {
2475        let mut journal = create_journal_with_ops::<F>(context, "rp-skip", 3).await;
2476
2477        // Parent: 2 items.
2478        let parent_batch = journal
2479            .new_batch()
2480            .add(create_operation::<F>(10))
2481            .add(create_operation::<F>(11));
2482        let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem));
2483
2484        // Child: 3 more items.
2485        let child_batch = parent
2486            .new_batch::<Sha256>()
2487            .add(create_operation::<F>(20))
2488            .add(create_operation::<F>(21))
2489            .add(create_operation::<F>(22));
2490        let child = journal.merkle.with_mem(|mem| child_batch.merkleize(mem));
2491
2492        // Apply parent.
2493        journal.apply_batch(&parent).await.unwrap();
2494
2495        // Apply child (ancestor items already committed, skipped automatically).
2496        journal.apply_batch(&child).await.unwrap();
2497
2498        // Verify all items are present.
2499        let (_, ops) = journal
2500            .proof(Location::<F>::new(3), NZU64!(5))
2501            .await
2502            .unwrap();
2503        assert_eq!(ops.len(), 5);
2504    }
2505
2506    #[test_traced("INFO")]
2507    fn test_apply_batch_skip_ancestor_items_mmr() {
2508        let executor = deterministic::Runner::default();
2509        executor.start(test_apply_batch_skip_ancestor_items_inner::<mmr::Family>);
2510    }
2511
2512    #[test_traced("INFO")]
2513    fn test_apply_batch_skip_ancestor_items_mmb() {
2514        let executor = deterministic::Runner::default();
2515        executor.start(test_apply_batch_skip_ancestor_items_inner::<mmb::Family>);
2516    }
2517
2518    /// `apply_batch` works correctly across a 3-level chain.
2519    async fn test_apply_batch_cross_batch_inner<F: Family + PartialEq>(context: Context) {
2520        let mut journal = create_journal_with_ops::<F>(context, "rp-cross", 2).await;
2521
2522        // Grandparent: 3 items.
2523        let grandparent_batch = journal
2524            .new_batch()
2525            .add(create_operation::<F>(3))
2526            .add(create_operation::<F>(4))
2527            .add(create_operation::<F>(5));
2528        let grandparent = journal
2529            .merkle
2530            .with_mem(|mem| grandparent_batch.merkleize(mem));
2531
2532        // Parent: 2 items.
2533        let parent_batch = grandparent
2534            .new_batch::<Sha256>()
2535            .add(create_operation::<F>(6))
2536            .add(create_operation::<F>(7));
2537        let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem));
2538
2539        // Child: 1 item.
2540        let child_batch = parent.new_batch::<Sha256>().add(create_operation::<F>(8));
2541        let child = journal.merkle.with_mem(|mem| child_batch.merkleize(mem));
2542
2543        // Apply grandparent, then parent, then child sequentially.
2544        journal.apply_batch(&grandparent).await.unwrap();
2545
2546        // Apply parent (ancestor items already committed, skipped automatically).
2547        journal.apply_batch(&parent).await.unwrap();
2548
2549        // Apply child (ancestor items already committed, skipped automatically).
2550        journal.apply_batch(&child).await.unwrap();
2551
2552        // All 8 items (2 base + 3 + 2 + 1) should be present.
2553        assert_eq!(*journal.size().await, 8);
2554
2555        // Verify the actual items at each location.
2556        let (_, ops) = journal
2557            .proof(Location::<F>::new(2), NZU64!(6))
2558            .await
2559            .unwrap();
2560        for (i, op) in ops.iter().enumerate() {
2561            assert_eq!(*op, create_operation::<F>((i + 3) as u8));
2562        }
2563    }
2564
2565    #[test_traced("INFO")]
2566    fn test_apply_batch_cross_batch_mmr() {
2567        let executor = deterministic::Runner::default();
2568        executor.start(test_apply_batch_cross_batch_inner::<mmr::Family>);
2569    }
2570
2571    #[test_traced("INFO")]
2572    fn test_apply_batch_cross_batch_mmb() {
2573        let executor = deterministic::Runner::default();
2574        executor.start(test_apply_batch_cross_batch_inner::<mmb::Family>);
2575    }
2576
2577    /// merkleize_with produces the same root as add + merkleize.
2578    async fn test_merkleize_with_matches_add_inner<F: Family + PartialEq>(context: Context) {
2579        let journal = create_journal_with_ops::<F>(context, "mw-matches", 5).await;
2580
2581        let ops = vec![
2582            create_operation::<F>(10),
2583            create_operation::<F>(11),
2584            create_operation::<F>(12),
2585        ];
2586
2587        // add + merkleize
2588        let mut batch = journal.new_batch();
2589        for op in &ops {
2590            batch = batch.add(op.clone());
2591        }
2592        let expected = journal.merkle.with_mem(|mem| batch.merkleize(mem));
2593
2594        // merkleize_with
2595        let batch = journal.new_batch();
2596        let actual = journal
2597            .merkle
2598            .with_mem(|mem| batch.merkleize_with(mem, Arc::new(ops)));
2599
2600        assert_eq!(actual.root(), expected.root());
2601    }
2602
2603    #[test_traced("INFO")]
2604    fn test_merkleize_with_matches_add_mmr() {
2605        let executor = deterministic::Runner::default();
2606        executor.start(test_merkleize_with_matches_add_inner::<mmr::Family>);
2607    }
2608
2609    #[test_traced("INFO")]
2610    fn test_merkleize_with_matches_add_mmb() {
2611        let executor = deterministic::Runner::default();
2612        executor.start(test_merkleize_with_matches_add_inner::<mmb::Family>);
2613    }
2614
2615    /// merkleize_with items are readable after apply.
2616    async fn test_merkleize_with_apply_inner<F: Family + PartialEq>(context: Context) {
2617        let mut journal = create_journal_with_ops::<F>(context, "mw-apply", 5).await;
2618
2619        let ops = vec![create_operation::<F>(10), create_operation::<F>(11)];
2620        let batch = journal.new_batch();
2621        let merkleized = journal
2622            .merkle
2623            .with_mem(|mem| batch.merkleize_with(mem, Arc::new(ops.clone())));
2624
2625        let expected_root = merkleized.root();
2626        journal.apply_batch(&merkleized).await.unwrap();
2627
2628        assert_eq!(journal.root(), expected_root);
2629        assert_eq!(*journal.size().await, 7);
2630
2631        let reader = journal.reader().await;
2632        assert_eq!(reader.read(5).await.unwrap(), ops[0]);
2633        assert_eq!(reader.read(6).await.unwrap(), ops[1]);
2634    }
2635
2636    #[test_traced("INFO")]
2637    fn test_merkleize_with_apply_mmr() {
2638        let executor = deterministic::Runner::default();
2639        executor.start(test_merkleize_with_apply_inner::<mmr::Family>);
2640    }
2641
2642    #[test_traced("INFO")]
2643    fn test_merkleize_with_apply_mmb() {
2644        let executor = deterministic::Runner::default();
2645        executor.start(test_merkleize_with_apply_inner::<mmb::Family>);
2646    }
2647
2648    /// merkleize_with stores the caller's Arc directly (no deep copy).
2649    async fn test_merkleize_with_shares_arc_inner<F: Family + PartialEq>(context: Context) {
2650        let journal = create_journal_with_ops::<F>(context, "mw-arc", 3).await;
2651
2652        let ops = Arc::new(vec![create_operation::<F>(20), create_operation::<F>(21)]);
2653        let ops_clone = Arc::clone(&ops);
2654        let batch = journal.new_batch();
2655        let merkleized = journal
2656            .merkle
2657            .with_mem(|mem| batch.merkleize_with(mem, ops_clone));
2658
2659        // The batch should hold the same Arc allocation, not a copy.
2660        assert!(Arc::ptr_eq(&merkleized.items, &ops));
2661    }
2662
2663    #[test_traced("INFO")]
2664    fn test_merkleize_with_shares_arc_mmr() {
2665        let executor = deterministic::Runner::default();
2666        executor.start(test_merkleize_with_shares_arc_inner::<mmr::Family>);
2667    }
2668
2669    #[test_traced("INFO")]
2670    fn test_merkleize_with_shares_arc_mmb() {
2671        let executor = deterministic::Runner::default();
2672        executor.start(test_merkleize_with_shares_arc_inner::<mmb::Family>);
2673    }
2674
2675    /// Apply C (grandchild of A) after only A is committed. B's journal items
2676    /// must still be applied -- skip only A's items.
2677    async fn test_apply_batch_skips_only_committed_ancestor_items_inner<F: Family + PartialEq>(
2678        context: Context,
2679    ) {
2680        let mut journal = create_empty_journal::<F>(context.clone(), "skip-partial").await;
2681
2682        // Build chain: A -> B -> C
2683        let a_batch = journal.new_batch().add(create_operation::<F>(1));
2684        let a = journal.merkle.with_mem(|mem| a_batch.merkleize(mem));
2685        let b_batch = a.new_batch::<Sha256>().add(create_operation::<F>(2));
2686        let b = journal.merkle.with_mem(|mem| b_batch.merkleize(mem));
2687        let c_batch = b.new_batch::<Sha256>().add(create_operation::<F>(3));
2688        let c = journal.merkle.with_mem(|mem| c_batch.merkleize(mem));
2689
2690        // Apply A, then apply C directly (skipping B's apply_batch).
2691        journal.apply_batch(&a).await.unwrap();
2692        journal.apply_batch(&c).await.unwrap();
2693
2694        // All 3 items should be in the journal.
2695        assert_eq!(*journal.size().await, 3);
2696
2697        // Build a reference that applies all three sequentially.
2698        let mut reference =
2699            create_empty_journal::<F>(context.with_label("ref"), "skip-partial-ref").await;
2700        for i in 1..=3u8 {
2701            reference.append(&create_operation::<F>(i)).await.unwrap();
2702        }
2703        assert_eq!(journal.root(), reference.root());
2704    }
2705
2706    #[test_traced("INFO")]
2707    fn test_apply_batch_skips_only_committed_ancestor_items_mmr() {
2708        let executor = deterministic::Runner::default();
2709        executor.start(test_apply_batch_skips_only_committed_ancestor_items_inner::<mmr::Family>);
2710    }
2711
2712    #[test_traced("INFO")]
2713    fn test_apply_batch_skips_only_committed_ancestor_items_mmb() {
2714        let executor = deterministic::Runner::default();
2715        executor.start(test_apply_batch_skips_only_committed_ancestor_items_inner::<mmb::Family>);
2716    }
2717}