Skip to main content

commonware_storage/journal/
authenticated.rs

1//! Authenticated journal implementation.
2//!
3//! An authenticated journal maintains a contiguous journal of items alongside a Merkle-family
4//! structure. The item at index i in the journal corresponds to the leaf at Location i in the
5//! Merkle structure. This structure enables efficient proofs that an item is included in the
6//! journal at a specific location.
7
8use crate::{
9    journal::{
10        contiguous::{fixed, variable, Contiguous, Many, Mutable, Reader},
11        Error as JournalError,
12    },
13    merkle::{
14        self, batch,
15        full::Merkle,
16        hasher::{Hasher as _, Standard as StandardHasher},
17        mem::Mem,
18        Bagging, Family, Location, Position, Proof, Readable,
19    },
20    Context, Persistable,
21};
22use alloc::{
23    sync::{Arc, Weak},
24    vec::Vec,
25};
26use commonware_codec::{CodecFixedShared, CodecShared, Encode, EncodeShared};
27use commonware_cryptography::{Digest, Hasher};
28use commonware_parallel::Strategy;
29use core::num::NonZeroU64;
30use futures::{try_join, TryFutureExt as _};
31use thiserror::Error;
32use tracing::{debug, warn};
33
34/// Errors that can occur when interacting with an authenticated journal.
35#[derive(Error, Debug)]
36pub enum Error<F: Family> {
37    #[error("merkle error: {0}")]
38    Merkle(#[from] merkle::Error<F>),
39
40    #[error("journal error: {0}")]
41    Journal(#[from] super::Error),
42}
43
44/// Strong ref to an ancestor [`MerkleizedBatch`] in the journal-batch chain.
45type MerkleizedParent<F, H, Item, S> = Arc<MerkleizedBatch<F, <H as Hasher>::Digest, Item, S>>;
46
47/// A speculative batch whose root digest has not yet been computed,
48/// in contrast to [`MerkleizedBatch`].
49pub struct UnmerkleizedBatch<F: Family, H: Hasher, Item: Send + Sync, S: Strategy> {
50    // The inner batch of Merkle leaf digests.
51    inner: batch::UnmerkleizedBatch<F, H::Digest, S>,
52    // The hasher to use for hashing the items.
53    hasher: StandardHasher<H>,
54    // The items to append from this batch.
55    items: Vec<Item>,
56    // This batch's parent, or None if the parent is the journal itself.
57    parent: Option<MerkleizedParent<F, H, Item, S>>,
58}
59
60type MerkleizedBatchArc<F, H, Item, S> = Arc<MerkleizedBatch<F, <H as Hasher>::Digest, Item, S>>;
61
62impl<F: Family, H: Hasher, Item: Encode + Send + Sync, S: Strategy>
63    UnmerkleizedBatch<F, H, Item, S>
64{
65    /// Add an item to the batch.
66    #[allow(clippy::should_implement_trait)]
67    pub fn add(mut self, item: Item) -> Self {
68        let encoded = item.encode();
69        self.inner = self.inner.add(&self.hasher, &encoded);
70        self.items.push(item);
71        self
72    }
73
74    /// Collect ancestor items from the parent chain before downgrading.
75    fn collect_ancestor_items(
76        parent: &Option<MerkleizedParent<F, H, Item, S>>,
77    ) -> Vec<Arc<Vec<Item>>> {
78        let Some(parent) = parent else {
79            return Vec::new();
80        };
81        let mut items = Vec::new();
82        if !parent.items.is_empty() {
83            items.push(Arc::clone(&parent.items));
84        }
85        let mut current = parent.parent.as_ref().and_then(Weak::upgrade);
86        while let Some(batch) = current {
87            if !batch.items.is_empty() {
88                items.push(Arc::clone(&batch.items));
89            }
90            current = batch.parent.as_ref().and_then(Weak::upgrade);
91        }
92        items.reverse();
93        items
94    }
95
96    /// Merkleize the batch.
97    /// `base` provides committed node data as fallback during hash computation.
98    pub fn merkleize(self, base: &Mem<F, H::Digest>) -> MerkleizedBatchArc<F, H, Item, S> {
99        let Self {
100            inner,
101            hasher,
102            items,
103            parent,
104        } = self;
105
106        let items = Arc::new(items);
107        let merkle = inner.merkleize(base, &hasher);
108        let ancestor_items = Self::collect_ancestor_items(&parent);
109        Arc::new(MerkleizedBatch {
110            inner: merkle,
111            bagging: hasher.root_bagging(),
112            items,
113            parent: parent.as_ref().map(Arc::downgrade),
114            ancestor_items,
115        })
116    }
117
118    /// Like [`merkleize`](Self::merkleize), but the caller supplies the items instead of
119    /// accumulating them with [`add`](Self::add). The two approaches must not be mixed: do
120    /// not call [`add`](Self::add) before this method.
121    ///
122    /// The items are encoded and hashed into the Merkle structure, and the `Arc` is stored
123    /// directly in the resulting [`MerkleizedBatch`] without copying.
124    ///
125    /// # Panics
126    ///
127    /// Panics if items were previously added via [`add`](Self::add).
128    pub(crate) fn merkleize_with(
129        mut self,
130        base: &Mem<F, H::Digest>,
131        items: Arc<Vec<Item>>,
132    ) -> MerkleizedBatchArc<F, H, Item, S> {
133        assert!(
134            self.items.is_empty(),
135            "merkleize_with expects no items added via add"
136        );
137
138        let starting_leaves = self.inner.leaves();
139        let digests: Vec<H::Digest> = self.inner.strategy().map_init_collect_vec(
140            items.iter().enumerate(),
141            || self.hasher.clone(),
142            |h, (i, item)| {
143                let loc = Location::<F>::new(*starting_leaves + i as u64);
144                let pos = Position::try_from(loc).expect("valid leaf location");
145                h.leaf_digest(pos, &item.encode())
146            },
147        );
148        for digest in digests {
149            self.inner = self.inner.add_leaf_digest(digest);
150        }
151
152        let merkle = self.inner.merkleize(base, &self.hasher);
153        let ancestor_items = Self::collect_ancestor_items(&self.parent);
154        Arc::new(MerkleizedBatch {
155            inner: merkle,
156            bagging: self.hasher.root_bagging(),
157            items,
158            parent: self.parent.as_ref().map(Arc::downgrade),
159            ancestor_items,
160        })
161    }
162}
163
164/// A speculative batch whose root digest has been computed, in contrast to [`UnmerkleizedBatch`].
165#[derive(Clone, Debug)]
166pub struct MerkleizedBatch<F: Family, D: Digest, Item: Send + Sync, S: Strategy> {
167    /// The inner batch of Merkle leaf digests.
168    pub(crate) inner: Arc<batch::MerkleizedBatch<F, D, S>>,
169    /// The peak bagging policy inherited from the parent journal or batch.
170    bagging: Bagging,
171    /// The items to append from this batch.
172    items: Arc<Vec<Item>>,
173    /// This batch's parent, or None if the parent is the journal itself.
174    parent: Option<Weak<Self>>,
175    /// Ancestor item batches collected at merkleize time (root-to-tip order).
176    pub(crate) ancestor_items: Vec<Arc<Vec<Item>>>,
177}
178
179impl<F: Family, D: Digest, Item: Send + Sync, S: Strategy> MerkleizedBatch<F, D, Item, S> {
180    /// The number of items visible through this batch, including ancestors.
181    pub(crate) fn size(&self) -> u64 {
182        *self.inner.leaves()
183    }
184
185    /// Compute the root digest after this batch is applied using `inactive_peaks` and the bagging
186    /// carried by `hasher`.
187    ///
188    /// This recomputes the root rather than reading a cache.
189    pub fn root(
190        &self,
191        base: &Mem<F, D>,
192        hasher: &impl merkle::hasher::Hasher<F, Digest = D>,
193        inactive_peaks: usize,
194    ) -> Result<D, merkle::Error<F>> {
195        self.inner.root(base, hasher, inactive_peaks)
196    }
197
198    /// Inclusion proof for the element at `loc`.
199    pub fn proof(
200        &self,
201        hasher: &impl merkle::hasher::Hasher<F, Digest = D>,
202        loc: Location<F>,
203        inactive_peaks: usize,
204    ) -> Result<Proof<F, D>, merkle::Error<F>> {
205        self.inner.proof(hasher, loc, inactive_peaks)
206    }
207
208    /// Inclusion proof for all elements in `range`.
209    pub fn range_proof(
210        &self,
211        hasher: &impl merkle::hasher::Hasher<F, Digest = D>,
212        range: core::ops::Range<Location<F>>,
213        inactive_peaks: usize,
214    ) -> Result<Proof<F, D>, merkle::Error<F>> {
215        self.inner.range_proof(hasher, range, inactive_peaks)
216    }
217
218    /// The items added in this batch.
219    pub(crate) const fn items(&self) -> &Arc<Vec<Item>> {
220        &self.items
221    }
222
223    /// Create a new speculative batch of operations with this batch as its parent.
224    ///
225    /// The batch becomes invalid if any ancestor is dropped before being applied, or a sibling
226    /// fork has been applied.
227    pub fn new_batch<H: Hasher<Digest = D>>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, Item, S>
228    where
229        Item: Encode,
230    {
231        UnmerkleizedBatch {
232            inner: self.inner.new_batch(),
233            hasher: StandardHasher::new(self.bagging),
234            items: Vec::new(),
235            parent: Some(Arc::clone(self)),
236        }
237    }
238}
239
240impl<F: Family, D: Digest, Item: Send + Sync, S: Strategy> Readable
241    for MerkleizedBatch<F, D, Item, S>
242{
243    type Family = F;
244    type Digest = D;
245    type Error = merkle::Error<F>;
246
247    fn size(&self) -> Position<F> {
248        self.inner.size()
249    }
250
251    fn get_node(&self, pos: Position<F>) -> Option<D> {
252        self.inner.get_node(pos)
253    }
254
255    fn pruning_boundary(&self) -> Location<F> {
256        self.inner.pruning_boundary()
257    }
258}
259
260/// An append-only data structure that maintains a sequential journal of items alongside a
261/// Merkle-family structure. The item at index i in the journal corresponds to the leaf at Location
262/// i in the Merkle structure. This structure enables efficient proofs that an item is included in
263/// the journal at a specific location.
264pub struct Journal<F, E, C, H, S>
265where
266    F: Family,
267    E: Context,
268    C: Contiguous<Item: EncodeShared>,
269    H: Hasher,
270    S: Strategy,
271{
272    /// Merkle structure where each leaf is an item digest.
273    /// Invariant: leaf i corresponds to item i in the journal.
274    pub(crate) merkle: Merkle<F, E, H::Digest, S>,
275
276    /// Journal of items.
277    /// Invariant: item i corresponds to leaf i in the Merkle structure.
278    pub(crate) journal: C,
279
280    pub(crate) hasher: StandardHasher<H>,
281}
282
283impl<F, E, C, H, S> Journal<F, E, C, H, S>
284where
285    F: Family,
286    E: Context,
287    C: Contiguous<Item: EncodeShared>,
288    H: Hasher,
289    S: Strategy,
290{
291    /// Returns the Location of the next item appended to the journal.
292    pub async fn size(&self) -> Location<F> {
293        Location::new(self.journal.size().await)
294    }
295
296    /// Compute the root of the Merkle structure using `inactive_peaks` and the bagging carried by
297    /// the journal's hasher.
298    pub fn root(&self, inactive_peaks: usize) -> Result<H::Digest, Error<F>> {
299        self.merkle
300            .root(&self.hasher, inactive_peaks)
301            .map_err(Into::into)
302    }
303
304    /// Convert authenticated-journal errors to the contiguous journal trait error type.
305    fn map_error(error: Error<F>) -> JournalError {
306        match error {
307            Error::Journal(inner) => inner,
308            Error::Merkle(inner) => JournalError::Merkle(anyhow::Error::from(inner)),
309        }
310    }
311
312    /// Return a reference to the merkleization strategy.
313    pub const fn strategy(&self) -> &S {
314        self.merkle.strategy()
315    }
316
317    /// Create a speculative batch atop this journal.
318    pub fn new_batch(&self) -> UnmerkleizedBatch<F, H, C::Item, S>
319    where
320        C::Item: Encode,
321    {
322        let root = self.merkle.to_batch();
323        UnmerkleizedBatch {
324            inner: root.new_batch(),
325            hasher: StandardHasher::new(self.hasher.root_bagging()),
326            items: Vec::new(),
327            parent: None,
328        }
329    }
330
331    /// Borrow the committed Mem through the read lock.
332    pub(crate) fn with_mem<R>(&self, f: impl FnOnce(&Mem<F, H::Digest>) -> R) -> R {
333        self.merkle.with_mem(f)
334    }
335
336    /// Create an owned [`MerkleizedBatch`] representing the current committed state.
337    ///
338    /// The batch has no items (the committed items are on disk, not in memory).
339    /// This is the starting point for building owned batch chains.
340    pub(crate) fn to_merkleized_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, C::Item, S>> {
341        Arc::new(MerkleizedBatch {
342            inner: self.merkle.to_batch(),
343            bagging: self.hasher.root_bagging(),
344            items: Arc::new(Vec::new()),
345            parent: None,
346            ancestor_items: Vec::new(),
347        })
348    }
349}
350
351impl<F, E, C, H, S> Journal<F, E, C, H, S>
352where
353    F: Family,
354    E: Context,
355    C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
356    H: Hasher,
357    S: Strategy,
358{
359    /// Durably persist the journal. This is faster than `sync()` but does not persist the Merkle
360    /// structure, meaning recovery will be required on startup if we crash before `sync()`.
361    pub async fn commit(&self) -> Result<(), Error<F>> {
362        self.journal.commit().await.map_err(Error::Journal)
363    }
364}
365
366impl<F, E, C, H, S> Journal<F, E, C, H, S>
367where
368    F: Family,
369    E: Context,
370    C: Mutable<Item: EncodeShared>,
371    H: Hasher,
372    S: Strategy,
373{
374    /// Create a new [Journal] from the given components after aligning the Merkle structure with
375    /// the journal.
376    pub async fn from_components(
377        mut merkle: Merkle<F, E, H::Digest, S>,
378        journal: C,
379        hasher: StandardHasher<H>,
380        apply_batch_size: u64,
381    ) -> Result<Self, Error<F>> {
382        Self::align(&mut merkle, &journal, &hasher, apply_batch_size).await?;
383
384        // Sync the Merkle structure to disk to avoid having to repeat any recovery that may have
385        // been performed on next startup.
386        merkle.sync().await?;
387
388        Ok(Self {
389            merkle,
390            journal,
391            hasher,
392        })
393    }
394
395    /// Align the Merkle structure to be consistent with the journal. Any items in the structure
396    /// that are not in the journal are popped, and any items in the journal that are not in the
397    /// structure are added. Items are added in batches of size `apply_batch_size` to avoid memory
398    /// bloat.
399    async fn align(
400        merkle: &mut Merkle<F, E, H::Digest, S>,
401        journal: &C,
402        hasher: &StandardHasher<H>,
403        apply_batch_size: u64,
404    ) -> Result<(), Error<F>> {
405        // Rewind Merkle structure elements that are ahead of the journal.
406        let journal_size = journal.size().await;
407        let mut merkle_leaves = merkle.leaves();
408        if merkle_leaves > journal_size {
409            let rewind_count = merkle_leaves - journal_size;
410            warn!(
411                journal_size,
412                ?rewind_count,
413                "rewinding Merkle structure to match journal"
414            );
415            merkle.rewind(*rewind_count as usize).await?;
416            merkle_leaves = Location::new(journal_size);
417        }
418
419        // If the Merkle structure is behind, replay journal items to catch up.
420        if merkle_leaves < journal_size {
421            let replay_count = journal_size - *merkle_leaves;
422            warn!(
423                ?journal_size,
424                replay_count, "Merkle structure lags behind journal, replaying journal to catch up"
425            );
426
427            let reader = journal.reader().await;
428            while merkle_leaves < journal_size {
429                let batch = {
430                    let mut batch = merkle.new_batch();
431                    let mut count = 0u64;
432                    while count < apply_batch_size && merkle_leaves < journal_size {
433                        let op = reader.read(*merkle_leaves).await?;
434                        batch = batch.add(hasher, &op.encode());
435                        merkle_leaves += 1;
436                        count += 1;
437                    }
438                    batch
439                };
440                let batch = merkle.with_mem(|mem| batch.merkleize(mem, hasher));
441                merkle.apply_batch(&batch)?;
442            }
443            return Ok(());
444        }
445
446        // At this point the Merkle structure and journal should be consistent.
447        assert_eq!(journal.size().await, *merkle.leaves());
448
449        Ok(())
450    }
451
452    /// Append an item to the journal and update the Merkle structure.
453    pub async fn append(&mut self, item: &C::Item) -> Result<Location<F>, Error<F>> {
454        let encoded_item = item.encode();
455
456        // Append item to the journal, then update the Merkle structure state.
457        let loc = self.journal.append(item).await?;
458        let unmerkleized_batch = self.merkle.new_batch().add(&self.hasher, &encoded_item);
459        let batch = self
460            .merkle
461            .with_mem(|mem| unmerkleized_batch.merkleize(mem, &self.hasher));
462        self.merkle.apply_batch(&batch)?;
463
464        Ok(Location::new(loc))
465    }
466
467    /// Apply a batch to the journal.
468    ///
469    /// A batch is valid if the journal has not been modified since the batch
470    /// chain was created, or if only ancestors of this batch have been applied.
471    /// Already-committed ancestors are skipped automatically.
472    /// Applying a batch from a different fork returns an error.
473    pub async fn apply_batch(
474        &mut self,
475        batch: &MerkleizedBatch<F, H::Digest, C::Item, S>,
476    ) -> Result<(), Error<F>> {
477        let merkle_size = self.merkle.size();
478        let base_size = batch.inner.base_size();
479
480        // Determine whether ancestors have already been committed.
481        // `base_size` is the merkle size when the batch chain was forked.
482        // If the merkle has advanced past the fork point, ancestors are
483        // already on disk; check that the current size is reachable from
484        // the batch chain before skipping them.
485        let skip_ancestors = if merkle_size == base_size {
486            false
487        } else if merkle_size > base_size && merkle_size < batch.inner.size() {
488            true
489        } else {
490            // Merkle is at an incompatible position (a sibling or unrelated
491            // fork was committed). Eagerly reject to avoid mutating the journal.
492            return Err(merkle::Error::StaleBatch {
493                expected: base_size,
494                actual: merkle_size,
495            }
496            .into());
497        };
498
499        // Apply ancestor item batches in root-to-tip order. Already-committed
500        // batches are skipped by tracking cumulative leaf count.
501        // Batches are collected into a single append_many call to acquire the
502        // journal's write lock once instead of per-batch.
503        let committed_leaves = self.journal.size().await;
504        let base_leaves = *Location::<F>::try_from(base_size)?;
505        let mut batch_leaf_end = base_leaves;
506        let mut batches: Vec<&[C::Item]> = Vec::with_capacity(batch.ancestor_items.len() + 1);
507        for ancestor in &batch.ancestor_items {
508            batch_leaf_end += ancestor.len() as u64;
509            if skip_ancestors && batch_leaf_end <= committed_leaves {
510                continue;
511            }
512            batches.push(ancestor);
513        }
514        if !batch.items.is_empty() {
515            batches.push(&batch.items);
516        }
517        if !batches.is_empty() {
518            self.journal.append_many(Many::Nested(&batches)).await?;
519        }
520
521        self.merkle.apply_batch(&batch.inner)?;
522        assert_eq!(*self.merkle.leaves(), self.journal.size().await);
523        Ok(())
524    }
525
526    /// Rewind the journal and Merkle structure.
527    pub async fn rewind(&mut self, size: u64) -> Result<(), Error<F>> {
528        self.journal.rewind(size).await?;
529
530        let leaves = *self.merkle.leaves();
531        if leaves > size {
532            self.merkle.rewind((leaves - size) as usize).await?;
533        }
534
535        Ok(())
536    }
537
538    /// Prune both the Merkle structure and journal to the given location.
539    ///
540    /// # Returns
541    /// The new pruning boundary, which may be less than the requested `prune_loc`.
542    pub async fn prune(&mut self, prune_loc: Location<F>) -> Result<Location<F>, Error<F>> {
543        self.prune_inner(prune_loc)
544            .await
545            .map(|(boundary, _)| boundary)
546    }
547
548    async fn prune_inner(
549        &mut self,
550        prune_loc: Location<F>,
551    ) -> Result<(Location<F>, bool), Error<F>> {
552        if self.merkle.size() == 0 {
553            // DB is empty, nothing to prune.
554            return Ok((Location::new(self.reader().await.bounds().start), false));
555        }
556
557        // Sync the Merkle structure before pruning the journal, otherwise its last element could
558        // end up behind the journal's first element after a crash, and there would be no way to
559        // replay the items between the structure's last element and the journal's first element.
560        self.merkle.sync().await?;
561
562        let journal_pruned = self.journal.prune(*prune_loc).await?;
563        let bounds = self.reader().await.bounds();
564        let boundary = Location::new(bounds.start);
565        let merkle_boundary = self.merkle.bounds().start;
566
567        if boundary > merkle_boundary {
568            debug!(size = ?bounds.end, ?prune_loc, boundary = ?bounds.start, "pruned inactive ops");
569            self.merkle.prune(boundary).await?;
570        }
571
572        Ok((boundary, journal_pruned || boundary > merkle_boundary))
573    }
574}
575
576impl<F, E, C, H, S> Journal<F, E, C, H, S>
577where
578    F: Family,
579    E: Context,
580    C: Contiguous<Item: EncodeShared>,
581    H: Hasher,
582    S: Strategy,
583{
584    /// Generate a proof of inclusion for items starting at `start_loc`.
585    ///
586    /// Returns a proof and the items corresponding to the leaves in the range `start_loc..end_loc`,
587    /// where `end_loc` is the minimum of the current item count and `start_loc + max_ops`.
588    ///
589    /// # Errors
590    ///
591    /// - Returns [Error::Merkle] with [merkle::Error::LocationOverflow] if `start_loc` >
592    ///   [Family::MAX_LEAVES].
593    /// - Returns [Error::Merkle] with [merkle::Error::RangeOutOfBounds] if `start_loc` >= current
594    ///   item count.
595    /// - Returns [Error::Journal] with [crate::journal::Error::ItemPruned] if `start_loc` has been
596    ///   pruned.
597    pub async fn proof(
598        &self,
599        start_loc: Location<F>,
600        max_ops: NonZeroU64,
601        inactive_peaks: usize,
602    ) -> Result<(Proof<F, H::Digest>, Vec<C::Item>), Error<F>> {
603        self.historical_proof(self.size().await, start_loc, max_ops, inactive_peaks)
604            .await
605    }
606
607    /// Generate a historical proof with respect to the state of the Merkle structure when it had
608    /// `historical_leaves` leaves.
609    ///
610    /// Returns a proof and the items corresponding to the leaves in the range `start_loc..end_loc`,
611    /// where `end_loc` is the minimum of `historical_leaves` and `start_loc + max_ops`.
612    ///
613    /// # Errors
614    ///
615    /// - Returns [Error::Merkle] with [merkle::Error::RangeOutOfBounds] if `start_loc` >=
616    ///   `historical_leaves` or `historical_leaves` > number of items in the journal.
617    /// - Returns [Error::Journal] with [crate::journal::Error::ItemPruned] if `start_loc` has been
618    ///   pruned.
619    pub async fn historical_proof(
620        &self,
621        historical_leaves: Location<F>,
622        start_loc: Location<F>,
623        max_ops: NonZeroU64,
624        inactive_peaks: usize,
625    ) -> Result<(Proof<F, H::Digest>, Vec<C::Item>), Error<F>> {
626        let reader = self.journal.reader().await;
627        let bounds = reader.bounds();
628
629        if *historical_leaves > bounds.end {
630            return Err(merkle::Error::RangeOutOfBounds(Location::new(bounds.end)).into());
631        }
632        if start_loc >= historical_leaves {
633            return Err(merkle::Error::RangeOutOfBounds(start_loc).into());
634        }
635
636        let end_loc = std::cmp::min(historical_leaves, start_loc.saturating_add(max_ops.get()));
637
638        let hasher = self.hasher.clone();
639        let proof = self
640            .merkle
641            .historical_range_proof(
642                &hasher,
643                historical_leaves,
644                start_loc..end_loc,
645                inactive_peaks,
646            )
647            .await?;
648
649        let positions: Vec<u64> = (*start_loc..*end_loc).collect();
650        let ops = reader.read_many(&positions).await?;
651
652        Ok((proof, ops))
653    }
654}
655
656impl<F, E, C, H, S> Journal<F, E, C, H, S>
657where
658    F: Family,
659    E: Context,
660    C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
661    H: Hasher,
662    S: Strategy,
663{
664    /// Destroy the authenticated journal, removing all data from disk.
665    pub async fn destroy(self) -> Result<(), Error<F>> {
666        try_join!(
667            self.journal.destroy().map_err(Error::Journal),
668            self.merkle.destroy().map_err(Error::Merkle),
669        )?;
670
671        Ok(())
672    }
673
674    /// Durably persist the journal, ensuring no recovery is required on startup.
675    pub async fn sync(&self) -> Result<(), Error<F>> {
676        try_join!(
677            self.journal.sync().map_err(Error::Journal),
678            self.merkle.sync().map_err(Error::Merkle)
679        )?;
680
681        Ok(())
682    }
683}
684
685/// The number of items to apply to the Merkle structure in a single batch.
686const APPLY_BATCH_SIZE: u64 = 1 << 16;
687
688/// Generate a `new()` constructor for an authenticated journal backed by a specific contiguous
689/// journal type.
690macro_rules! impl_journal_new {
691    ($journal_mod:ident, $cfg_ty:ty, $codec_bound:path) => {
692        impl<F, E, O, H, S> Journal<F, E, $journal_mod::Journal<E, O>, H, S>
693        where
694            F: Family,
695            E: Context,
696            O: $codec_bound,
697            H: Hasher,
698            S: Strategy,
699        {
700            /// Create a new authenticated [Journal].
701            ///
702            /// The inner journal will be rewound to the last item matching `rewind_predicate`,
703            /// and the merkle structure will be aligned to match.
704            pub async fn new(
705                context: E,
706                merkle_cfg: merkle::full::Config<S>,
707                journal_cfg: $cfg_ty,
708                rewind_predicate: fn(&O) -> bool,
709                bagging: merkle::Bagging,
710            ) -> Result<Self, Error<F>> {
711                let mut journal =
712                    $journal_mod::Journal::init(context.child("journal"), journal_cfg).await?;
713                journal.rewind_to(rewind_predicate).await?;
714
715                let hasher = StandardHasher::<H>::new(bagging);
716                let mut merkle = Merkle::init(context.child("merkle"), &hasher, merkle_cfg).await?;
717                Self::align(&mut merkle, &journal, &hasher, APPLY_BATCH_SIZE).await?;
718
719                journal.sync().await?;
720                merkle.sync().await?;
721
722                Ok(Self {
723                    merkle,
724                    journal,
725                    hasher,
726                })
727            }
728        }
729    };
730}
731
732impl_journal_new!(fixed, fixed::Config, CodecFixedShared);
733impl_journal_new!(variable, variable::Config<O::Cfg>, CodecShared);
734
735impl<F, E, C, H, S> Contiguous for Journal<F, E, C, H, S>
736where
737    F: Family,
738    E: Context,
739    C: Contiguous<Item: EncodeShared>,
740    H: Hasher,
741    S: Strategy,
742{
743    type Item = C::Item;
744
745    async fn reader(&self) -> impl Reader<Item = C::Item> + '_ {
746        self.journal.reader().await
747    }
748
749    async fn size(&self) -> u64 {
750        self.journal.size().await
751    }
752}
753
754impl<F, E, C, H, S> Mutable for Journal<F, E, C, H, S>
755where
756    F: Family,
757    E: Context,
758    C: Mutable<Item: EncodeShared>,
759    H: Hasher,
760    S: Strategy,
761{
762    async fn append(&mut self, item: &Self::Item) -> Result<u64, JournalError> {
763        let res = self.append(item).await.map_err(Self::map_error)?;
764
765        Ok(*res)
766    }
767
768    async fn prune(&mut self, min_position: u64) -> Result<bool, JournalError> {
769        let prune_to = {
770            let reader = self.journal.reader().await;
771            let bounds = reader.bounds();
772            min_position.min(bounds.end)
773        };
774
775        let (_, pruned) = self
776            .prune_inner(Location::new(prune_to))
777            .await
778            .map_err(Self::map_error)?;
779        Ok(pruned)
780    }
781
782    async fn rewind(&mut self, size: u64) -> Result<(), JournalError> {
783        self.rewind(size).await.map_err(Self::map_error)
784    }
785}
786
787/// A [Mutable] journal that can serve as the inner journal of an authenticated [Journal].
788pub trait Inner<E: Context>: Mutable + Persistable<Error = JournalError> {
789    /// The configuration needed to initialize this journal.
790    type Config: Clone + Send;
791
792    /// Initialize an authenticated [Journal] backed by this journal type.
793    fn init<F: Family, H: Hasher, S: Strategy>(
794        context: E,
795        merkle_cfg: merkle::full::Config<S>,
796        journal_cfg: Self::Config,
797        rewind_predicate: fn(&Self::Item) -> bool,
798        bagging: merkle::Bagging,
799    ) -> impl core::future::Future<Output = Result<Journal<F, E, Self, H, S>, Error<F>>> + Send
800    where
801        Self: Sized,
802        Self::Item: EncodeShared;
803}
804
805impl<F, E, C, H, S> Persistable for Journal<F, E, C, H, S>
806where
807    F: Family,
808    E: Context,
809    C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
810    H: Hasher,
811    S: Strategy,
812{
813    type Error = JournalError;
814
815    async fn commit(&self) -> Result<(), JournalError> {
816        self.commit().await.map_err(Self::map_error)
817    }
818
819    async fn sync(&self) -> Result<(), JournalError> {
820        self.sync().await.map_err(Self::map_error)
821    }
822
823    async fn destroy(self) -> Result<(), JournalError> {
824        self.destroy().await.map_err(Self::map_error)
825    }
826}
827
828#[cfg(test)]
829impl<F, E, C, H, S> Journal<F, E, C, H, S>
830where
831    F: Family,
832    E: Context,
833    C: Contiguous<Item: EncodeShared>,
834    S: Strategy,
835    H: Hasher,
836{
837    /// Test helper: Read the item at the given location.
838    pub(crate) async fn read(&self, loc: Location<F>) -> Result<C::Item, Error<F>> {
839        self.journal
840            .reader()
841            .await
842            .read(*loc)
843            .await
844            .map_err(Error::Journal)
845    }
846}
847
848#[cfg(test)]
849mod tests {
850    use super::*;
851    use crate::{
852        journal::contiguous::fixed::{Config as JConfig, Journal as ContiguousJournal},
853        merkle::{
854            full::{Config as MerkleConfig, Merkle},
855            mmb, mmr,
856            Bagging::{BackwardFold, ForwardFold},
857        },
858        qmdb::{
859            any::{
860                operation::{update::Unordered as Update, Unordered as Op},
861                value::FixedEncoding,
862            },
863            operation::Committable,
864        },
865    };
866    use commonware_codec::Encode;
867    use commonware_cryptography::{sha256::Digest, Sha256};
868    use commonware_macros::test_traced;
869    use commonware_parallel::Sequential;
870    use commonware_runtime::{
871        buffer::paged::CacheRef,
872        deterministic::{self, Context},
873        BufferPooler, Runner as _, Supervisor as _,
874    };
875    use commonware_utils::{NZUsize, NZU16, NZU64};
876    use futures::StreamExt as _;
877    use std::num::{NonZeroU16, NonZeroUsize};
878
879    const PAGE_SIZE: NonZeroU16 = NZU16!(101);
880    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
881
882    /// Generic operation type for testing, parameterized by Merkle family.
883    type TestOp<F> = Op<F, Digest, FixedEncoding<Digest>>;
884
885    /// Generic authenticated journal type for testing, parameterized by Merkle family.
886    type TestJournal<F> = Journal<
887        F,
888        deterministic::Context,
889        ContiguousJournal<deterministic::Context, TestOp<F>>,
890        Sha256,
891        Sequential,
892    >;
893
894    fn journal_root<F: Family>(journal: &TestJournal<F>) -> Digest {
895        journal.root(0).unwrap()
896    }
897
898    fn batch_root<F: Family>(
899        journal: &TestJournal<F>,
900        batch: &MerkleizedBatch<F, Digest, TestOp<F>, Sequential>,
901    ) -> Digest {
902        journal
903            .merkle
904            .with_mem(|mem| batch.root(mem, &journal.hasher, 0))
905            .unwrap()
906    }
907
908    /// Create Merkle configuration for tests.
909    fn merkle_config(suffix: &str, pooler: &impl BufferPooler) -> MerkleConfig<Sequential> {
910        MerkleConfig {
911            journal_partition: format!("mmr-journal-{suffix}"),
912            metadata_partition: format!("mmr-metadata-{suffix}"),
913            items_per_blob: NZU64!(11),
914            write_buffer: NZUsize!(1024),
915            strategy: Sequential,
916            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
917        }
918    }
919
920    /// Create journal configuration for tests.
921    fn journal_config(suffix: &str, pooler: &impl BufferPooler) -> JConfig {
922        JConfig {
923            partition: format!("journal-{suffix}"),
924            items_per_blob: NZU64!(7),
925            write_buffer: NZUsize!(1024),
926            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
927        }
928    }
929
930    /// Create a new empty authenticated journal.
931    async fn create_empty_journal<F: Family + PartialEq>(
932        context: Context,
933        suffix: &str,
934    ) -> TestJournal<F> {
935        let merkle_cfg = merkle_config(suffix, &context);
936        let journal_cfg = journal_config(suffix, &context);
937        TestJournal::<F>::new(
938            context,
939            merkle_cfg,
940            journal_cfg,
941            |op: &TestOp<F>| op.is_commit(),
942            ForwardFold,
943        )
944        .await
945        .unwrap()
946    }
947
948    #[test]
949    fn test_batches_inherit_journal_bagging() {
950        deterministic::Runner::default().start(|context| async move {
951            let merkle_cfg = merkle_config("batch-bagging", &context);
952            let journal_cfg = journal_config("batch-bagging", &context);
953            let journal = TestJournal::<mmr::Family>::new(
954                context,
955                merkle_cfg,
956                journal_cfg,
957                |op: &TestOp<mmr::Family>| op.is_commit(),
958                BackwardFold,
959            )
960            .await
961            .unwrap();
962
963            let batch = journal.new_batch();
964            assert_eq!(batch.hasher.root_bagging(), BackwardFold);
965
966            let merkleized = journal.merkle.with_mem(|mem| batch.merkleize(mem));
967            let child: UnmerkleizedBatch<mmr::Family, Sha256, TestOp<mmr::Family>, Sequential> =
968                merkleized.new_batch();
969            assert_eq!(child.hasher.root_bagging(), BackwardFold);
970        });
971    }
972
973    /// Create a test operation with predictable values based on index.
974    fn create_operation<F: Family + PartialEq>(index: u8) -> TestOp<F> {
975        TestOp::<F>::Update(Update(
976            Sha256::fill(index),
977            Sha256::fill(index.wrapping_add(1)),
978        ))
979    }
980
981    /// Create an authenticated journal with N committed operations.
982    ///
983    /// Operations are added and then synced to ensure they are committed.
984    async fn create_journal_with_ops<F: Family + PartialEq>(
985        context: Context,
986        suffix: &str,
987        count: usize,
988    ) -> TestJournal<F> {
989        let mut journal = create_empty_journal::<F>(context, suffix).await;
990
991        for i in 0..count {
992            let op = create_operation::<F>(i as u8);
993            let loc = journal.append(&op).await.unwrap();
994            assert_eq!(loc, Location::<F>::new(i as u64));
995        }
996
997        journal.sync().await.unwrap();
998        journal
999    }
1000
1001    /// Create separate Merkle and journal components for testing alignment.
1002    ///
1003    /// These components are created independently and can be manipulated separately to test
1004    /// scenarios where the Merkle structure and journal are out of sync (e.g., one ahead of the
1005    /// other).
1006    async fn create_components<F: Family + PartialEq>(
1007        context: Context,
1008        suffix: &str,
1009    ) -> (
1010        Merkle<F, deterministic::Context, Digest, Sequential>,
1011        ContiguousJournal<deterministic::Context, TestOp<F>>,
1012        StandardHasher<Sha256>,
1013    ) {
1014        let hasher = StandardHasher::new(ForwardFold);
1015        let merkle = Merkle::<F, _, Digest, Sequential>::init(
1016            context.child("mmr"),
1017            &hasher,
1018            merkle_config(suffix, &context),
1019        )
1020        .await
1021        .unwrap();
1022        let journal =
1023            ContiguousJournal::init(context.child("journal"), journal_config(suffix, &context))
1024                .await
1025                .unwrap();
1026        (merkle, journal, hasher)
1027    }
1028
1029    /// Verify that a proof correctly proves the given operations are included in the Merkle
1030    /// structure.
1031    fn verify_proof<F: Family + PartialEq>(
1032        proof: &Proof<F, <Sha256 as commonware_cryptography::Hasher>::Digest>,
1033        operations: &[TestOp<F>],
1034        start_loc: Location<F>,
1035        root: &<Sha256 as commonware_cryptography::Hasher>::Digest,
1036        hasher: &StandardHasher<Sha256>,
1037    ) -> bool {
1038        let encoded_ops: Vec<_> = operations.iter().map(|op| op.encode()).collect();
1039        proof.verify_range_inclusion(hasher, &encoded_ops, start_loc, root)
1040    }
1041
1042    /// Verify that new() creates an empty authenticated journal.
1043    async fn test_new_creates_empty_journal_inner<F: Family + PartialEq>(context: Context) {
1044        let journal = create_empty_journal::<F>(context, "new-empty").await;
1045
1046        let bounds = journal.reader().await.bounds();
1047        assert_eq!(bounds.end, 0);
1048        assert_eq!(bounds.start, 0);
1049        assert!(bounds.is_empty());
1050    }
1051
1052    #[test_traced("INFO")]
1053    fn test_new_creates_empty_journal_mmr() {
1054        let executor = deterministic::Runner::default();
1055        executor.start(test_new_creates_empty_journal_inner::<mmr::Family>);
1056    }
1057
1058    #[test_traced("INFO")]
1059    fn test_new_creates_empty_journal_mmb() {
1060        let executor = deterministic::Runner::default();
1061        executor.start(test_new_creates_empty_journal_inner::<mmb::Family>);
1062    }
1063
1064    /// Verify that align() correctly handles empty Merkle and journal components.
1065    async fn test_align_with_empty_mmr_and_journal_inner<F: Family + PartialEq>(context: Context) {
1066        let (mut merkle, journal, hasher) = create_components::<F>(context, "align-empty").await;
1067
1068        TestJournal::<F>::align(&mut merkle, &journal, &hasher, APPLY_BATCH_SIZE)
1069            .await
1070            .unwrap();
1071
1072        assert_eq!(merkle.leaves(), Location::<F>::new(0));
1073        assert_eq!(journal.size().await, 0);
1074    }
1075
1076    #[test_traced("INFO")]
1077    fn test_align_with_empty_mmr_and_journal_mmr() {
1078        let executor = deterministic::Runner::default();
1079        executor.start(test_align_with_empty_mmr_and_journal_inner::<mmr::Family>);
1080    }
1081
1082    #[test_traced("INFO")]
1083    fn test_align_with_empty_mmr_and_journal_mmb() {
1084        let executor = deterministic::Runner::default();
1085        executor.start(test_align_with_empty_mmr_and_journal_inner::<mmb::Family>);
1086    }
1087
1088    /// Verify that align() pops Merkle elements when Merkle is ahead of the journal.
1089    async fn test_align_when_mmr_ahead_inner<F: Family + PartialEq>(context: Context) {
1090        let (mut merkle, journal, hasher) = create_components::<F>(context, "mmr-ahead").await;
1091
1092        // Add 20 operations to both Merkle and journal
1093        {
1094            let batch = {
1095                let mut batch = merkle.new_batch();
1096                for i in 0..20 {
1097                    let op = create_operation::<F>(i as u8);
1098                    let encoded = op.encode();
1099                    batch = batch.add(&hasher, &encoded);
1100                    journal.append(&op).await.unwrap();
1101                }
1102                batch
1103            };
1104            let batch = merkle.with_mem(|mem| batch.merkleize(mem, &hasher));
1105            merkle.apply_batch(&batch).unwrap();
1106        }
1107
1108        // Add commit operation to journal only (making journal ahead)
1109        let commit_op = TestOp::<F>::CommitFloor(None, Location::<F>::new(0));
1110        journal.append(&commit_op).await.unwrap();
1111        journal.sync().await.unwrap();
1112
1113        // Merkle has 20 leaves, journal has 21 operations (20 ops + 1 commit)
1114        TestJournal::<F>::align(&mut merkle, &journal, &hasher, APPLY_BATCH_SIZE)
1115            .await
1116            .unwrap();
1117
1118        // Merkle should have been aligned to match journal
1119        assert_eq!(merkle.leaves(), Location::<F>::new(21));
1120        assert_eq!(journal.size().await, 21);
1121    }
1122
1123    #[test_traced("WARN")]
1124    fn test_align_when_mmr_ahead_mmr() {
1125        let executor = deterministic::Runner::default();
1126        executor.start(test_align_when_mmr_ahead_inner::<mmr::Family>);
1127    }
1128
1129    #[test_traced("WARN")]
1130    fn test_align_when_mmr_ahead_mmb() {
1131        let executor = deterministic::Runner::default();
1132        executor.start(test_align_when_mmr_ahead_inner::<mmb::Family>);
1133    }
1134
1135    /// Verify that align() replays journal operations when journal is ahead of Merkle.
1136    async fn test_align_when_journal_ahead_inner<F: Family + PartialEq>(context: Context) {
1137        let (mut merkle, journal, hasher) = create_components::<F>(context, "journal-ahead").await;
1138
1139        // Add 20 operations to journal only
1140        for i in 0..20 {
1141            let op = create_operation::<F>(i as u8);
1142            journal.append(&op).await.unwrap();
1143        }
1144
1145        // Add commit
1146        let commit_op = TestOp::<F>::CommitFloor(None, Location::<F>::new(0));
1147        journal.append(&commit_op).await.unwrap();
1148        journal.sync().await.unwrap();
1149
1150        // Journal has 21 operations, Merkle has 0 leaves
1151        TestJournal::<F>::align(&mut merkle, &journal, &hasher, APPLY_BATCH_SIZE)
1152            .await
1153            .unwrap();
1154
1155        // Merkle should have been replayed to match journal
1156        assert_eq!(merkle.leaves(), Location::<F>::new(21));
1157        assert_eq!(journal.size().await, 21);
1158    }
1159
1160    #[test_traced("WARN")]
1161    fn test_align_when_journal_ahead_mmr() {
1162        let executor = deterministic::Runner::default();
1163        executor.start(test_align_when_journal_ahead_inner::<mmr::Family>);
1164    }
1165
1166    #[test_traced("WARN")]
1167    fn test_align_when_journal_ahead_mmb() {
1168        let executor = deterministic::Runner::default();
1169        executor.start(test_align_when_journal_ahead_inner::<mmb::Family>);
1170    }
1171
1172    /// Verify that align() discards uncommitted operations.
1173    async fn test_align_with_mismatched_committed_ops_inner<F: Family + PartialEq>(
1174        context: Context,
1175    ) {
1176        let mut journal = create_empty_journal::<F>(context.child("first"), "mismatched").await;
1177
1178        // Add 20 uncommitted operations
1179        for i in 0..20 {
1180            let loc = journal
1181                .append(&create_operation::<F>(i as u8))
1182                .await
1183                .unwrap();
1184            assert_eq!(loc, Location::<F>::new(i as u64));
1185        }
1186
1187        // Don't sync - these are uncommitted
1188        // After alignment, they should be discarded
1189        let size_before = journal.size().await;
1190        assert_eq!(size_before, 20);
1191
1192        // Drop and recreate to simulate restart (which calls align internally)
1193        journal.sync().await.unwrap();
1194        drop(journal);
1195        let journal = create_empty_journal::<F>(context.child("second"), "mismatched").await;
1196
1197        // Uncommitted operations should be gone
1198        assert_eq!(journal.size().await, 0);
1199    }
1200
1201    #[test_traced("INFO")]
1202    fn test_align_with_mismatched_committed_ops_mmr() {
1203        let executor = deterministic::Runner::default();
1204        executor.start(|context| {
1205            test_align_with_mismatched_committed_ops_inner::<mmr::Family>(context)
1206        });
1207    }
1208
1209    #[test_traced("INFO")]
1210    fn test_align_with_mismatched_committed_ops_mmb() {
1211        let executor = deterministic::Runner::default();
1212        executor.start(|context| {
1213            test_align_with_mismatched_committed_ops_inner::<mmb::Family>(context)
1214        });
1215    }
1216
1217    async fn test_rewind_inner<F: Family + PartialEq>(context: Context) {
1218        // Test 1: Matching operation is kept
1219        {
1220            let mut journal = ContiguousJournal::init(
1221                context.child("rewind_match"),
1222                journal_config("rewind-match", &context),
1223            )
1224            .await
1225            .unwrap();
1226
1227            // Add operations where operation 3 is a commit
1228            for i in 0..3 {
1229                journal.append(&create_operation::<F>(i)).await.unwrap();
1230            }
1231            journal
1232                .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1233                .await
1234                .unwrap();
1235            for i in 4..7 {
1236                journal.append(&create_operation::<F>(i)).await.unwrap();
1237            }
1238
1239            // Rewind to last commit
1240            let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1241            assert_eq!(final_size, 4);
1242            assert_eq!(journal.size().await, 4);
1243
1244            // Verify the commit operation is still there
1245            let op = journal.read(3).await.unwrap();
1246            assert!(op.is_commit());
1247        }
1248
1249        // Test 2: Last matching operation is chosen when multiple match
1250        {
1251            let mut journal = ContiguousJournal::init(
1252                context.child("rewind_multiple"),
1253                journal_config("rewind-multiple", &context),
1254            )
1255            .await
1256            .unwrap();
1257
1258            // Add multiple commits
1259            journal.append(&create_operation::<F>(0)).await.unwrap();
1260            journal
1261                .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1262                .await
1263                .unwrap(); // pos 1
1264            journal.append(&create_operation::<F>(2)).await.unwrap();
1265            journal
1266                .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(1)))
1267                .await
1268                .unwrap(); // pos 3
1269            journal.append(&create_operation::<F>(4)).await.unwrap();
1270
1271            // Should rewind to last commit (pos 3)
1272            let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1273            assert_eq!(final_size, 4);
1274
1275            // Verify the last commit is still there
1276            let op = journal.read(3).await.unwrap();
1277            assert!(op.is_commit());
1278
1279            // Verify we can't read pos 4
1280            assert!(journal.read(4).await.is_err());
1281        }
1282
1283        // Test 3: Rewind to pruning boundary when no match
1284        {
1285            let mut journal = ContiguousJournal::init(
1286                context.child("rewind_no_match"),
1287                journal_config("rewind-no-match", &context),
1288            )
1289            .await
1290            .unwrap();
1291
1292            // Add operations with no commits
1293            for i in 0..10 {
1294                journal.append(&create_operation::<F>(i)).await.unwrap();
1295            }
1296
1297            // Rewind should go to pruning boundary (0 for unpruned)
1298            let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1299            assert_eq!(final_size, 0, "Should rewind to pruning boundary (0)");
1300            assert_eq!(journal.size().await, 0);
1301        }
1302
1303        // Test 4: Rewind with existing pruning boundary
1304        {
1305            let mut journal = ContiguousJournal::init(
1306                context.child("rewind_with_pruning"),
1307                journal_config("rewind-with-pruning", &context),
1308            )
1309            .await
1310            .unwrap();
1311
1312            // Add operations and a commit at position 10 (past first section boundary of 7)
1313            for i in 0..10 {
1314                journal.append(&create_operation::<F>(i)).await.unwrap();
1315            }
1316            journal
1317                .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1318                .await
1319                .unwrap(); // pos 10
1320            for i in 11..15 {
1321                journal.append(&create_operation::<F>(i)).await.unwrap();
1322            }
1323            journal.sync().await.unwrap();
1324
1325            // Prune up to position 8 (this will prune section 0, items 0-6, keeping 7+)
1326            journal.prune(8).await.unwrap();
1327            assert_eq!(journal.reader().await.bounds().start, 7);
1328
1329            // Add more uncommitted operations
1330            for i in 15..20 {
1331                journal.append(&create_operation::<F>(i)).await.unwrap();
1332            }
1333
1334            // Rewind should keep the commit at position 10
1335            let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1336            assert_eq!(final_size, 11);
1337
1338            // Verify commit is still there
1339            let op = journal.read(10).await.unwrap();
1340            assert!(op.is_commit());
1341        }
1342
1343        // Test 5: Rewind with no matches after pruning boundary
1344        {
1345            let mut journal = ContiguousJournal::init(
1346                context.child("rewind_no_match_pruned"),
1347                journal_config("rewind-no-match-pruned", &context),
1348            )
1349            .await
1350            .unwrap();
1351
1352            // Add operations with a commit at position 5 (in section 0: 0-6)
1353            for i in 0..5 {
1354                journal.append(&create_operation::<F>(i)).await.unwrap();
1355            }
1356            journal
1357                .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1358                .await
1359                .unwrap(); // pos 5
1360            for i in 6..10 {
1361                journal.append(&create_operation::<F>(i)).await.unwrap();
1362            }
1363            journal.sync().await.unwrap();
1364
1365            // Prune up to position 8 (this prunes section 0, including the commit at pos 5)
1366            // Pruning boundary will be at position 7 (start of section 1)
1367            journal.prune(8).await.unwrap();
1368            assert_eq!(journal.reader().await.bounds().start, 7);
1369
1370            // Add uncommitted operations with no commits (in section 1: 7-13)
1371            for i in 10..14 {
1372                journal.append(&create_operation::<F>(i)).await.unwrap();
1373            }
1374
1375            // Rewind with no matching commits after the pruning boundary
1376            // Should rewind to the pruning boundary at position 7
1377            let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1378            assert_eq!(final_size, 7);
1379        }
1380
1381        // Test 6: Empty journal
1382        {
1383            let mut journal = ContiguousJournal::init(
1384                context.child("rewind_empty"),
1385                journal_config("rewind-empty", &context),
1386            )
1387            .await
1388            .unwrap();
1389
1390            // Rewind empty journal should be no-op
1391            let final_size = journal
1392                .rewind_to(|op: &TestOp<F>| op.is_commit())
1393                .await
1394                .unwrap();
1395            assert_eq!(final_size, 0);
1396            assert_eq!(journal.size().await, 0);
1397        }
1398
1399        // Test 7: Position based authenticated journal rewind.
1400        {
1401            let merkle_cfg = merkle_config("rewind", &context);
1402            let journal_cfg = journal_config("rewind", &context);
1403            let mut journal = TestJournal::<F>::new(
1404                context,
1405                merkle_cfg,
1406                journal_cfg,
1407                |op| op.is_commit(),
1408                ForwardFold,
1409            )
1410            .await
1411            .unwrap();
1412
1413            // Add operations with a commit at position 5 (in section 0: 0-6)
1414            for i in 0..5 {
1415                journal.append(&create_operation::<F>(i)).await.unwrap();
1416            }
1417            journal
1418                .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1419                .await
1420                .unwrap(); // pos 5
1421            for i in 6..10 {
1422                journal.append(&create_operation::<F>(i)).await.unwrap();
1423            }
1424            assert_eq!(journal.size().await, 10);
1425
1426            journal.rewind(2).await.unwrap();
1427            assert_eq!(journal.size().await, 2);
1428            assert_eq!(journal.merkle.leaves(), 2);
1429            assert_eq!(journal.merkle.size(), 3);
1430            let bounds = journal.reader().await.bounds();
1431            assert_eq!(bounds.start, 0);
1432            assert!(!bounds.is_empty());
1433
1434            assert!(matches!(
1435                journal.rewind(3).await,
1436                Err(Error::Journal(JournalError::InvalidRewind(_)))
1437            ));
1438
1439            journal.rewind(0).await.unwrap();
1440            assert_eq!(journal.size().await, 0);
1441            assert_eq!(journal.merkle.leaves(), 0);
1442            assert_eq!(journal.merkle.size(), 0);
1443            let bounds = journal.reader().await.bounds();
1444            assert_eq!(bounds.start, 0);
1445            assert!(bounds.is_empty());
1446
1447            // Test rewinding after pruning.
1448            for i in 0..255 {
1449                journal.append(&create_operation::<F>(i)).await.unwrap();
1450            }
1451            journal.prune(Location::<F>::new(100)).await.unwrap();
1452            assert_eq!(journal.reader().await.bounds().start, 98);
1453            let res = journal.rewind(97).await;
1454            assert!(matches!(
1455                res,
1456                Err(Error::Journal(JournalError::InvalidRewind(97)))
1457            ));
1458            journal.rewind(98).await.unwrap();
1459            let bounds = journal.reader().await.bounds();
1460            assert_eq!(bounds.end, 98);
1461            assert_eq!(journal.merkle.leaves(), 98);
1462            assert_eq!(bounds.start, 98);
1463            assert!(bounds.is_empty());
1464        }
1465    }
1466
1467    #[test_traced("INFO")]
1468    fn test_rewind_mmr() {
1469        let executor = deterministic::Runner::default();
1470        executor.start(test_rewind_inner::<mmr::Family>);
1471    }
1472
1473    #[test_traced("INFO")]
1474    fn test_rewind_mmb() {
1475        let executor = deterministic::Runner::default();
1476        executor.start(test_rewind_inner::<mmb::Family>);
1477    }
1478
1479    /// Verify that append() increments the operation count, returns correct locations, and
1480    /// operations can be read back correctly.
1481    async fn test_apply_op_and_read_operations_inner<F: Family + PartialEq>(context: Context) {
1482        let mut journal = create_empty_journal::<F>(context, "apply_op").await;
1483
1484        assert_eq!(journal.size().await, 0);
1485
1486        // Add 50 operations
1487        let expected_ops: Vec<_> = (0..50).map(|i| create_operation::<F>(i as u8)).collect();
1488        for (i, op) in expected_ops.iter().enumerate() {
1489            let loc = journal.append(op).await.unwrap();
1490            assert_eq!(loc, Location::<F>::new(i as u64));
1491            assert_eq!(journal.size().await, (i + 1) as u64);
1492        }
1493
1494        assert_eq!(journal.size().await, 50);
1495
1496        // Verify all operations can be read back correctly
1497        journal.sync().await.unwrap();
1498        for (i, expected_op) in expected_ops.iter().enumerate() {
1499            let read_op = journal.read(Location::<F>::new(i as u64)).await.unwrap();
1500            assert_eq!(read_op, *expected_op);
1501        }
1502    }
1503
1504    #[test_traced("INFO")]
1505    fn test_apply_op_and_read_operations_mmr() {
1506        let executor = deterministic::Runner::default();
1507        executor.start(test_apply_op_and_read_operations_inner::<mmr::Family>);
1508    }
1509
1510    #[test_traced("INFO")]
1511    fn test_apply_op_and_read_operations_mmb() {
1512        let executor = deterministic::Runner::default();
1513        executor.start(test_apply_op_and_read_operations_inner::<mmb::Family>);
1514    }
1515
1516    /// Verify that read() returns correct operations at various positions.
1517    async fn test_read_operations_at_various_positions_inner<F: Family + PartialEq>(
1518        context: Context,
1519    ) {
1520        let journal = create_journal_with_ops::<F>(context, "read", 50).await;
1521
1522        // Verify reading first operation
1523        let first_op = journal.read(Location::<F>::new(0)).await.unwrap();
1524        assert_eq!(first_op, create_operation::<F>(0));
1525
1526        // Verify reading middle operation
1527        let middle_op = journal.read(Location::<F>::new(25)).await.unwrap();
1528        assert_eq!(middle_op, create_operation::<F>(25));
1529
1530        // Verify reading last operation
1531        let last_op = journal.read(Location::<F>::new(49)).await.unwrap();
1532        assert_eq!(last_op, create_operation::<F>(49));
1533
1534        // Verify all operations match expected values
1535        for i in 0..50 {
1536            let op = journal.read(Location::<F>::new(i)).await.unwrap();
1537            assert_eq!(op, create_operation::<F>(i as u8));
1538        }
1539    }
1540
1541    #[test_traced("INFO")]
1542    fn test_read_operations_at_various_positions_mmr() {
1543        let executor = deterministic::Runner::default();
1544        executor.start(|context| {
1545            test_read_operations_at_various_positions_inner::<mmr::Family>(context)
1546        });
1547    }
1548
1549    #[test_traced("INFO")]
1550    fn test_read_operations_at_various_positions_mmb() {
1551        let executor = deterministic::Runner::default();
1552        executor.start(|context| {
1553            test_read_operations_at_various_positions_inner::<mmb::Family>(context)
1554        });
1555    }
1556
1557    /// Verify that read() returns an error for pruned operations.
1558    async fn test_read_pruned_operation_returns_error_inner<F: Family + PartialEq>(
1559        context: Context,
1560    ) {
1561        let mut journal = create_journal_with_ops::<F>(context, "read_pruned", 100).await;
1562
1563        // Add commit and prune
1564        journal
1565            .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1566            .await
1567            .unwrap();
1568        journal.sync().await.unwrap();
1569        let pruned_boundary = journal.prune(Location::<F>::new(50)).await.unwrap();
1570
1571        // Try to read an operation before the pruned boundary
1572        let read_loc = Location::<F>::new(0);
1573        if read_loc < pruned_boundary {
1574            let result = journal.read(read_loc).await;
1575            assert!(matches!(
1576                result,
1577                Err(Error::Journal(crate::journal::Error::ItemPruned(_)))
1578            ));
1579        }
1580    }
1581
1582    #[test_traced("INFO")]
1583    fn test_read_pruned_operation_returns_error_mmr() {
1584        let executor = deterministic::Runner::default();
1585        executor.start(|context| {
1586            test_read_pruned_operation_returns_error_inner::<mmr::Family>(context)
1587        });
1588    }
1589
1590    #[test_traced("INFO")]
1591    fn test_read_pruned_operation_returns_error_mmb() {
1592        let executor = deterministic::Runner::default();
1593        executor.start(|context| {
1594            test_read_pruned_operation_returns_error_inner::<mmb::Family>(context)
1595        });
1596    }
1597
1598    /// Verify that read() returns an error for out-of-range locations.
1599    async fn test_read_out_of_range_returns_error_inner<F: Family + PartialEq>(context: Context) {
1600        let journal = create_journal_with_ops::<F>(context, "read_oob", 3).await;
1601
1602        // Try to read beyond the end
1603        let result = journal.read(Location::<F>::new(10)).await;
1604        assert!(matches!(
1605            result,
1606            Err(Error::Journal(crate::journal::Error::ItemOutOfRange(_)))
1607        ));
1608    }
1609
1610    #[test_traced("INFO")]
1611    fn test_read_out_of_range_returns_error_mmr() {
1612        let executor = deterministic::Runner::default();
1613        executor.start(test_read_out_of_range_returns_error_inner::<mmr::Family>);
1614    }
1615
1616    #[test_traced("INFO")]
1617    fn test_read_out_of_range_returns_error_mmb() {
1618        let executor = deterministic::Runner::default();
1619        executor.start(test_read_out_of_range_returns_error_inner::<mmb::Family>);
1620    }
1621
1622    /// Verify that we can read all operations back correctly.
1623    async fn test_read_all_operations_back_correctly_inner<F: Family + PartialEq>(
1624        context: Context,
1625    ) {
1626        let journal = create_journal_with_ops::<F>(context, "read_all", 50).await;
1627
1628        assert_eq!(journal.size().await, 50);
1629
1630        // Verify all operations can be read back and match expected values
1631        for i in 0..50 {
1632            let op = journal.read(Location::<F>::new(i)).await.unwrap();
1633            assert_eq!(op, create_operation::<F>(i as u8));
1634        }
1635    }
1636
1637    #[test_traced("INFO")]
1638    fn test_read_all_operations_back_correctly_mmr() {
1639        let executor = deterministic::Runner::default();
1640        executor.start(test_read_all_operations_back_correctly_inner::<mmr::Family>);
1641    }
1642
1643    #[test_traced("INFO")]
1644    fn test_read_all_operations_back_correctly_mmb() {
1645        let executor = deterministic::Runner::default();
1646        executor.start(test_read_all_operations_back_correctly_inner::<mmb::Family>);
1647    }
1648
1649    /// Verify that sync() persists operations.
1650    async fn test_sync_inner<F: Family + PartialEq>(context: Context) {
1651        let mut journal = create_empty_journal::<F>(context.child("first"), "close_pending").await;
1652
1653        // Add 20 operations
1654        let expected_ops: Vec<_> = (0..20).map(|i| create_operation::<F>(i as u8)).collect();
1655        for (i, op) in expected_ops.iter().enumerate() {
1656            let loc = journal.append(op).await.unwrap();
1657            assert_eq!(loc, Location::<F>::new(i as u64),);
1658        }
1659
1660        // Add commit operation to commit the operations
1661        let commit_loc = journal
1662            .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1663            .await
1664            .unwrap();
1665        assert_eq!(
1666            commit_loc,
1667            Location::<F>::new(20),
1668            "commit should be at location 20"
1669        );
1670        journal.sync().await.unwrap();
1671
1672        // Reopen and verify the operations persisted
1673        drop(journal);
1674        let journal = create_empty_journal::<F>(context.child("second"), "close_pending").await;
1675        assert_eq!(journal.size().await, 21);
1676
1677        // Verify all operations can be read back
1678        for (i, expected_op) in expected_ops.iter().enumerate() {
1679            let read_op = journal.read(Location::<F>::new(i as u64)).await.unwrap();
1680            assert_eq!(read_op, *expected_op);
1681        }
1682    }
1683
1684    #[test_traced("INFO")]
1685    fn test_sync_mmr() {
1686        let executor = deterministic::Runner::default();
1687        executor.start(test_sync_inner::<mmr::Family>);
1688    }
1689
1690    #[test_traced("INFO")]
1691    fn test_sync_mmb() {
1692        let executor = deterministic::Runner::default();
1693        executor.start(test_sync_inner::<mmb::Family>);
1694    }
1695
1696    /// Verify that pruning an empty journal returns the boundary.
1697    async fn test_prune_empty_journal_inner<F: Family + PartialEq>(context: Context) {
1698        let mut journal = create_empty_journal::<F>(context, "prune_empty").await;
1699
1700        let boundary = journal.prune(Location::<F>::new(0)).await.unwrap();
1701
1702        assert_eq!(boundary, Location::<F>::new(0));
1703    }
1704
1705    #[test_traced("INFO")]
1706    fn test_prune_empty_journal_mmr() {
1707        let executor = deterministic::Runner::default();
1708        executor.start(test_prune_empty_journal_inner::<mmr::Family>);
1709    }
1710
1711    #[test_traced("INFO")]
1712    fn test_prune_empty_journal_mmb() {
1713        let executor = deterministic::Runner::default();
1714        executor.start(test_prune_empty_journal_inner::<mmb::Family>);
1715    }
1716
1717    /// Verify that pruning to a specific location works correctly.
1718    async fn test_prune_to_location_inner<F: Family + PartialEq>(context: Context) {
1719        let mut journal = create_journal_with_ops::<F>(context, "prune_to", 100).await;
1720
1721        // Add commit at position 50
1722        journal
1723            .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1724            .await
1725            .unwrap();
1726        journal.sync().await.unwrap();
1727
1728        let boundary = journal.prune(Location::<F>::new(50)).await.unwrap();
1729
1730        // Boundary should be <= requested location (may align to section boundary)
1731        assert!(boundary <= Location::<F>::new(50));
1732    }
1733
1734    #[test_traced("INFO")]
1735    fn test_prune_to_location_mmr() {
1736        let executor = deterministic::Runner::default();
1737        executor.start(test_prune_to_location_inner::<mmr::Family>);
1738    }
1739
1740    #[test_traced("INFO")]
1741    fn test_prune_to_location_mmb() {
1742        let executor = deterministic::Runner::default();
1743        executor.start(test_prune_to_location_inner::<mmb::Family>);
1744    }
1745
1746    /// Verify that prune() returns the actual boundary (which may differ from requested).
1747    async fn test_prune_returns_actual_boundary_inner<F: Family + PartialEq>(context: Context) {
1748        let mut journal = create_journal_with_ops::<F>(context, "prune_boundary", 100).await;
1749
1750        journal
1751            .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1752            .await
1753            .unwrap();
1754        journal.sync().await.unwrap();
1755
1756        let requested = Location::<F>::new(50);
1757        let actual = journal.prune(requested).await.unwrap();
1758
1759        // Actual boundary should match bounds.start
1760        let bounds = journal.reader().await.bounds();
1761        assert!(!bounds.is_empty());
1762        assert_eq!(actual, bounds.start);
1763
1764        // Actual may be <= requested due to section alignment
1765        assert!(actual <= requested);
1766    }
1767
1768    #[test_traced("INFO")]
1769    fn test_prune_returns_actual_boundary_mmr() {
1770        let executor = deterministic::Runner::default();
1771        executor.start(test_prune_returns_actual_boundary_inner::<mmr::Family>);
1772    }
1773
1774    #[test_traced("INFO")]
1775    fn test_prune_returns_actual_boundary_mmb() {
1776        let executor = deterministic::Runner::default();
1777        executor.start(test_prune_returns_actual_boundary_inner::<mmb::Family>);
1778    }
1779
1780    /// Verify that pruning through the Mutable trait also prunes authenticated Merkle state.
1781    async fn test_mutable_prune_updates_merkle_boundary_inner<F: Family + PartialEq>(
1782        context: Context,
1783    ) {
1784        let mut journal = create_journal_with_ops::<F>(context, "trait_prune", 100).await;
1785
1786        journal
1787            .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1788            .await
1789            .unwrap();
1790        journal.sync().await.unwrap();
1791
1792        let pruned = <TestJournal<F> as Mutable>::prune(&mut journal, 50)
1793            .await
1794            .unwrap();
1795        assert!(pruned);
1796
1797        let item_boundary = journal.reader().await.bounds().start;
1798        let merkle_boundary = journal.merkle.bounds().start;
1799        assert_eq!(Location::<F>::new(item_boundary), merkle_boundary);
1800        assert!(merkle_boundary > Location::<F>::new(0));
1801
1802        let pruned = <TestJournal<F> as Mutable>::prune(&mut journal, 50)
1803            .await
1804            .unwrap();
1805        assert!(!pruned);
1806        assert_eq!(journal.reader().await.bounds().start, item_boundary);
1807        assert_eq!(journal.merkle.bounds().start, merkle_boundary);
1808    }
1809
1810    #[test_traced("INFO")]
1811    fn test_mutable_prune_updates_merkle_boundary_mmr() {
1812        let executor = deterministic::Runner::default();
1813        executor.start(test_mutable_prune_updates_merkle_boundary_inner::<mmr::Family>);
1814    }
1815
1816    #[test_traced("INFO")]
1817    fn test_mutable_prune_updates_merkle_boundary_mmb() {
1818        let executor = deterministic::Runner::default();
1819        executor.start(test_mutable_prune_updates_merkle_boundary_inner::<mmb::Family>);
1820    }
1821
1822    /// Verify that pruning doesn't change the operation count.
1823    async fn test_prune_preserves_operation_count_inner<F: Family + PartialEq>(context: Context) {
1824        let mut journal = create_journal_with_ops::<F>(context, "prune_count", 100).await;
1825
1826        journal
1827            .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1828            .await
1829            .unwrap();
1830        journal.sync().await.unwrap();
1831
1832        let count_before = journal.size().await;
1833        journal.prune(Location::<F>::new(50)).await.unwrap();
1834        let count_after = journal.size().await;
1835
1836        assert_eq!(count_before, count_after);
1837    }
1838
1839    #[test_traced("INFO")]
1840    fn test_prune_preserves_operation_count_mmr() {
1841        let executor = deterministic::Runner::default();
1842        executor.start(test_prune_preserves_operation_count_inner::<mmr::Family>);
1843    }
1844
1845    #[test_traced("INFO")]
1846    fn test_prune_preserves_operation_count_mmb() {
1847        let executor = deterministic::Runner::default();
1848        executor.start(test_prune_preserves_operation_count_inner::<mmb::Family>);
1849    }
1850
1851    /// Verify bounds() for empty journal, no pruning, and after pruning.
1852    async fn test_bounds_empty_and_pruned_inner<F: Family + PartialEq>(context: Context) {
1853        // Test empty journal
1854        let journal = create_empty_journal::<F>(context.child("empty"), "oldest").await;
1855        assert!(journal.reader().await.bounds().is_empty());
1856        journal.destroy().await.unwrap();
1857
1858        // Test no pruning
1859        let journal = create_journal_with_ops::<F>(context.child("no_prune"), "oldest", 100).await;
1860        let bounds = journal.reader().await.bounds();
1861        assert!(!bounds.is_empty());
1862        assert_eq!(bounds.start, 0);
1863        journal.destroy().await.unwrap();
1864
1865        // Test after pruning
1866        let mut journal =
1867            create_journal_with_ops::<F>(context.child("pruned"), "oldest", 100).await;
1868        journal
1869            .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1870            .await
1871            .unwrap();
1872        journal.sync().await.unwrap();
1873
1874        let pruned_boundary = journal.prune(Location::<F>::new(50)).await.unwrap();
1875
1876        // Should match the pruned boundary (may be <= 50 due to section alignment)
1877        let bounds = journal.reader().await.bounds();
1878        assert!(!bounds.is_empty());
1879        assert_eq!(bounds.start, pruned_boundary);
1880        // Should be <= requested location (50)
1881        assert!(pruned_boundary <= 50);
1882        journal.destroy().await.unwrap();
1883    }
1884
1885    #[test_traced("INFO")]
1886    fn test_bounds_empty_and_pruned_mmr() {
1887        let executor = deterministic::Runner::default();
1888        executor.start(test_bounds_empty_and_pruned_inner::<mmr::Family>);
1889    }
1890
1891    #[test_traced("INFO")]
1892    fn test_bounds_empty_and_pruned_mmb() {
1893        let executor = deterministic::Runner::default();
1894        executor.start(test_bounds_empty_and_pruned_inner::<mmb::Family>);
1895    }
1896
1897    /// Verify bounds().start for empty journal, no pruning, and after pruning.
1898    async fn test_bounds_start_after_prune_inner<F: Family + PartialEq>(context: Context) {
1899        // Test empty journal
1900        let journal = create_empty_journal::<F>(context.child("empty"), "boundary").await;
1901        assert_eq!(journal.reader().await.bounds().start, 0);
1902
1903        // Test no pruning
1904        let journal =
1905            create_journal_with_ops::<F>(context.child("no_prune"), "boundary", 100).await;
1906        assert_eq!(journal.reader().await.bounds().start, 0);
1907
1908        // Test after pruning
1909        let mut journal =
1910            create_journal_with_ops::<F>(context.child("pruned"), "boundary", 100).await;
1911        journal
1912            .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1913            .await
1914            .unwrap();
1915        journal.sync().await.unwrap();
1916
1917        let pruned_boundary = journal.prune(Location::<F>::new(50)).await.unwrap();
1918
1919        assert_eq!(journal.reader().await.bounds().start, pruned_boundary);
1920    }
1921
1922    #[test_traced("INFO")]
1923    fn test_bounds_start_after_prune_mmr() {
1924        let executor = deterministic::Runner::default();
1925        executor.start(test_bounds_start_after_prune_inner::<mmr::Family>);
1926    }
1927
1928    #[test_traced("INFO")]
1929    fn test_bounds_start_after_prune_mmb() {
1930        let executor = deterministic::Runner::default();
1931        executor.start(test_bounds_start_after_prune_inner::<mmb::Family>);
1932    }
1933
1934    /// Verify that Merkle prunes to the journal's actual boundary, not the requested location.
1935    async fn test_mmr_prunes_to_journal_boundary_inner<F: Family + PartialEq>(context: Context) {
1936        let mut journal = create_journal_with_ops::<F>(context, "mmr_boundary", 50).await;
1937
1938        journal
1939            .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(25)))
1940            .await
1941            .unwrap();
1942        journal.sync().await.unwrap();
1943
1944        let pruned_boundary = journal.prune(Location::<F>::new(25)).await.unwrap();
1945
1946        // Verify Merkle and journal remain in sync
1947        let bounds = journal.reader().await.bounds();
1948        assert!(!bounds.is_empty());
1949        assert_eq!(pruned_boundary, bounds.start);
1950
1951        // Verify boundary is at or before requested (due to section alignment)
1952        assert!(pruned_boundary <= Location::<F>::new(25));
1953
1954        // Verify operation count is unchanged
1955        assert_eq!(journal.size().await, 51);
1956    }
1957
1958    #[test_traced("INFO")]
1959    fn test_mmr_prunes_to_journal_boundary_mmr() {
1960        let executor = deterministic::Runner::default();
1961        executor.start(test_mmr_prunes_to_journal_boundary_inner::<mmr::Family>);
1962    }
1963
1964    #[test_traced("INFO")]
1965    fn test_mmr_prunes_to_journal_boundary_mmb() {
1966        let executor = deterministic::Runner::default();
1967        executor.start(test_mmr_prunes_to_journal_boundary_inner::<mmb::Family>);
1968    }
1969
1970    /// Verify proof() for multiple operations.
1971    async fn test_proof_multiple_operations_inner<F: Family + PartialEq>(context: Context) {
1972        let journal = create_journal_with_ops::<F>(context, "proof_multi", 50).await;
1973
1974        let (proof, ops) = journal
1975            .proof(Location::<F>::new(0), NZU64!(50), 0)
1976            .await
1977            .unwrap();
1978
1979        assert_eq!(ops.len(), 50);
1980        for (i, op) in ops.iter().enumerate() {
1981            assert_eq!(*op, create_operation::<F>(i as u8));
1982        }
1983
1984        // Verify the proof is valid
1985        let hasher = StandardHasher::new(ForwardFold);
1986        let root = journal_root(&journal);
1987        assert!(verify_proof(
1988            &proof,
1989            &ops,
1990            Location::<F>::new(0),
1991            &root,
1992            &hasher
1993        ));
1994    }
1995
1996    #[test_traced("INFO")]
1997    fn test_proof_multiple_operations_mmr() {
1998        let executor = deterministic::Runner::default();
1999        executor.start(test_proof_multiple_operations_inner::<mmr::Family>);
2000    }
2001
2002    #[test_traced("INFO")]
2003    fn test_proof_multiple_operations_mmb() {
2004        let executor = deterministic::Runner::default();
2005        executor.start(test_proof_multiple_operations_inner::<mmb::Family>);
2006    }
2007
2008    /// Verify that historical_proof() respects the max_ops limit.
2009    async fn test_historical_proof_limited_by_max_ops_inner<F: Family + PartialEq>(
2010        context: Context,
2011    ) {
2012        let journal = create_journal_with_ops::<F>(context, "proof_limit", 50).await;
2013
2014        let size = journal.size().await;
2015        let (proof, ops) = journal
2016            .historical_proof(size, Location::<F>::new(0), NZU64!(20), 0)
2017            .await
2018            .unwrap();
2019
2020        // Should return only 20 operations despite 50 being available
2021        assert_eq!(ops.len(), 20);
2022        for (i, op) in ops.iter().enumerate() {
2023            assert_eq!(*op, create_operation::<F>(i as u8));
2024        }
2025
2026        // Verify the proof is valid
2027        let hasher = StandardHasher::new(ForwardFold);
2028        let root = journal_root(&journal);
2029        assert!(verify_proof(
2030            &proof,
2031            &ops,
2032            Location::<F>::new(0),
2033            &root,
2034            &hasher
2035        ));
2036    }
2037
2038    #[test_traced("INFO")]
2039    fn test_historical_proof_limited_by_max_ops_mmr() {
2040        let executor = deterministic::Runner::default();
2041        executor.start(|context| {
2042            test_historical_proof_limited_by_max_ops_inner::<mmr::Family>(context)
2043        });
2044    }
2045
2046    #[test_traced("INFO")]
2047    fn test_historical_proof_limited_by_max_ops_mmb() {
2048        let executor = deterministic::Runner::default();
2049        executor.start(|context| {
2050            test_historical_proof_limited_by_max_ops_inner::<mmb::Family>(context)
2051        });
2052    }
2053
2054    /// Verify historical_proof() at the end of the journal.
2055    async fn test_historical_proof_at_end_of_journal_inner<F: Family + PartialEq>(
2056        context: Context,
2057    ) {
2058        let journal = create_journal_with_ops::<F>(context, "proof_end", 50).await;
2059
2060        let size = journal.size().await;
2061        // Request proof starting near the end
2062        let (proof, ops) = journal
2063            .historical_proof(size, Location::<F>::new(40), NZU64!(20), 0)
2064            .await
2065            .unwrap();
2066
2067        // Should return only 10 operations (positions 40-49)
2068        assert_eq!(ops.len(), 10);
2069        for (i, op) in ops.iter().enumerate() {
2070            assert_eq!(*op, create_operation::<F>((40 + i) as u8));
2071        }
2072
2073        // Verify the proof is valid
2074        let hasher = StandardHasher::new(ForwardFold);
2075        let root = journal_root(&journal);
2076        assert!(verify_proof(
2077            &proof,
2078            &ops,
2079            Location::<F>::new(40),
2080            &root,
2081            &hasher
2082        ));
2083    }
2084
2085    #[test_traced("INFO")]
2086    fn test_historical_proof_at_end_of_journal_mmr() {
2087        let executor = deterministic::Runner::default();
2088        executor.start(test_historical_proof_at_end_of_journal_inner::<mmr::Family>);
2089    }
2090
2091    #[test_traced("INFO")]
2092    fn test_historical_proof_at_end_of_journal_mmb() {
2093        let executor = deterministic::Runner::default();
2094        executor.start(test_historical_proof_at_end_of_journal_inner::<mmb::Family>);
2095    }
2096
2097    /// Verify that historical_proof() returns an error for invalid size.
2098    async fn test_historical_proof_out_of_range_returns_error_inner<F: Family + PartialEq>(
2099        context: Context,
2100    ) {
2101        let journal = create_journal_with_ops::<F>(context, "proof_oob", 5).await;
2102
2103        // Request proof with size > actual journal size
2104        let result = journal
2105            .historical_proof(Location::<F>::new(10), Location::<F>::new(0), NZU64!(1), 0)
2106            .await;
2107
2108        assert!(matches!(
2109            result,
2110            Err(Error::Merkle(merkle::Error::RangeOutOfBounds(_)))
2111        ));
2112    }
2113
2114    #[test_traced("INFO")]
2115    fn test_historical_proof_out_of_range_returns_error_mmr() {
2116        let executor = deterministic::Runner::default();
2117        executor.start(|context| {
2118            test_historical_proof_out_of_range_returns_error_inner::<mmr::Family>(context)
2119        });
2120    }
2121
2122    #[test_traced("INFO")]
2123    fn test_historical_proof_out_of_range_returns_error_mmb() {
2124        let executor = deterministic::Runner::default();
2125        executor.start(|context| {
2126            test_historical_proof_out_of_range_returns_error_inner::<mmb::Family>(context)
2127        });
2128    }
2129
2130    /// Verify that historical_proof() returns an error when start_loc >= size.
2131    async fn test_historical_proof_start_too_large_returns_error_inner<F: Family + PartialEq>(
2132        context: Context,
2133    ) {
2134        let journal = create_journal_with_ops::<F>(context, "proof_start_oob", 5).await;
2135
2136        let size = journal.size().await;
2137        // Request proof starting at size (should fail)
2138        let result = journal.historical_proof(size, size, NZU64!(1), 0).await;
2139
2140        assert!(matches!(
2141            result,
2142            Err(Error::Merkle(merkle::Error::RangeOutOfBounds(_)))
2143        ));
2144    }
2145
2146    #[test_traced("INFO")]
2147    fn test_historical_proof_start_too_large_returns_error_mmr() {
2148        let executor = deterministic::Runner::default();
2149        executor.start(|context| {
2150            test_historical_proof_start_too_large_returns_error_inner::<mmr::Family>(context)
2151        });
2152    }
2153
2154    #[test_traced("INFO")]
2155    fn test_historical_proof_start_too_large_returns_error_mmb() {
2156        let executor = deterministic::Runner::default();
2157        executor.start(|context| {
2158            test_historical_proof_start_too_large_returns_error_inner::<mmb::Family>(context)
2159        });
2160    }
2161
2162    /// Verify historical_proof() for a truly historical state (before more operations added).
2163    async fn test_historical_proof_truly_historical_inner<F: Family + PartialEq>(context: Context) {
2164        // Create journal with initial operations
2165        let mut journal = create_journal_with_ops::<F>(context, "proof_historical", 50).await;
2166
2167        // Capture root at historical state
2168        let hasher = StandardHasher::new(ForwardFold);
2169        let historical_root = journal_root(&journal);
2170        let historical_size = journal.size().await;
2171
2172        // Add more operations after the historical state
2173        for i in 50..100 {
2174            journal
2175                .append(&create_operation::<F>(i as u8))
2176                .await
2177                .unwrap();
2178        }
2179        journal.sync().await.unwrap();
2180
2181        // Generate proof for the historical state
2182        let (proof, ops) = journal
2183            .historical_proof(historical_size, Location::<F>::new(0), NZU64!(50), 0)
2184            .await
2185            .unwrap();
2186
2187        // Verify operations match expected historical operations
2188        assert_eq!(ops.len(), 50);
2189        for (i, op) in ops.iter().enumerate() {
2190            assert_eq!(*op, create_operation::<F>(i as u8));
2191        }
2192
2193        // Verify the proof is valid against the historical root
2194        assert!(verify_proof(
2195            &proof,
2196            &ops,
2197            Location::<F>::new(0),
2198            &historical_root,
2199            &hasher
2200        ));
2201    }
2202
2203    #[test_traced("INFO")]
2204    fn test_historical_proof_truly_historical_mmr() {
2205        let executor = deterministic::Runner::default();
2206        executor.start(test_historical_proof_truly_historical_inner::<mmr::Family>);
2207    }
2208
2209    #[test_traced("INFO")]
2210    fn test_historical_proof_truly_historical_mmb() {
2211        let executor = deterministic::Runner::default();
2212        executor.start(test_historical_proof_truly_historical_inner::<mmb::Family>);
2213    }
2214
2215    /// Verify that historical_proof() returns an error when start_loc is pruned.
2216    async fn test_historical_proof_pruned_location_returns_error_inner<F: Family + PartialEq>(
2217        context: Context,
2218    ) {
2219        let mut journal = create_journal_with_ops::<F>(context, "proof_pruned", 50).await;
2220
2221        journal
2222            .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(25)))
2223            .await
2224            .unwrap();
2225        journal.sync().await.unwrap();
2226        let pruned_boundary = journal.prune(Location::<F>::new(25)).await.unwrap();
2227
2228        // Try to get proof starting at a location before the pruned boundary
2229        let size = journal.size().await;
2230        let start_loc = Location::<F>::new(0);
2231        if start_loc < pruned_boundary {
2232            let result = journal
2233                .historical_proof(size, start_loc, NZU64!(1), 0)
2234                .await;
2235
2236            // Should fail when trying to read pruned operations
2237            assert!(result.is_err());
2238        }
2239    }
2240
2241    #[test_traced("INFO")]
2242    fn test_historical_proof_pruned_location_returns_error_mmr() {
2243        let executor = deterministic::Runner::default();
2244        executor.start(|context| {
2245            test_historical_proof_pruned_location_returns_error_inner::<mmr::Family>(context)
2246        });
2247    }
2248
2249    #[test_traced("INFO")]
2250    fn test_historical_proof_pruned_location_returns_error_mmb() {
2251        let executor = deterministic::Runner::default();
2252        executor.start(|context| {
2253            test_historical_proof_pruned_location_returns_error_inner::<mmb::Family>(context)
2254        });
2255    }
2256
2257    /// Verify replay() with empty journal and multiple operations.
2258    async fn test_replay_operations_inner<F: Family + PartialEq>(context: Context) {
2259        // Test empty journal
2260        let journal = create_empty_journal::<F>(context.child("empty"), "replay").await;
2261        let reader = journal.reader().await;
2262        let stream = reader.replay(NZUsize!(10), 0).await.unwrap();
2263        futures::pin_mut!(stream);
2264        assert!(stream.next().await.is_none());
2265
2266        // Test replaying all operations
2267        let journal = create_journal_with_ops::<F>(context.child("with_ops"), "replay", 50).await;
2268        let reader = journal.reader().await;
2269        let stream = reader.replay(NZUsize!(100), 0).await.unwrap();
2270        futures::pin_mut!(stream);
2271
2272        for i in 0..50 {
2273            let (pos, op) = stream.next().await.unwrap().unwrap();
2274            assert_eq!(pos, i);
2275            assert_eq!(op, create_operation::<F>(i as u8));
2276        }
2277
2278        assert!(stream.next().await.is_none());
2279    }
2280
2281    #[test_traced("INFO")]
2282    fn test_replay_operations_mmr() {
2283        let executor = deterministic::Runner::default();
2284        executor.start(test_replay_operations_inner::<mmr::Family>);
2285    }
2286
2287    #[test_traced("INFO")]
2288    fn test_replay_operations_mmb() {
2289        let executor = deterministic::Runner::default();
2290        executor.start(test_replay_operations_inner::<mmb::Family>);
2291    }
2292
2293    /// Verify replay() starting from a middle location.
2294    async fn test_replay_from_middle_inner<F: Family + PartialEq>(context: Context) {
2295        let journal = create_journal_with_ops::<F>(context, "replay_middle", 50).await;
2296        let reader = journal.reader().await;
2297        let stream = reader.replay(NZUsize!(100), 25).await.unwrap();
2298        futures::pin_mut!(stream);
2299
2300        let mut count = 0;
2301        while let Some(result) = stream.next().await {
2302            let (pos, op) = result.unwrap();
2303            assert_eq!(pos, 25 + count);
2304            assert_eq!(op, create_operation::<F>((25 + count) as u8));
2305            count += 1;
2306        }
2307
2308        // Should have replayed positions 25-49 (25 operations)
2309        assert_eq!(count, 25);
2310    }
2311
2312    #[test_traced("INFO")]
2313    fn test_replay_from_middle_mmr() {
2314        let executor = deterministic::Runner::default();
2315        executor.start(test_replay_from_middle_inner::<mmr::Family>);
2316    }
2317
2318    #[test_traced("INFO")]
2319    fn test_replay_from_middle_mmb() {
2320        let executor = deterministic::Runner::default();
2321        executor.start(test_replay_from_middle_inner::<mmb::Family>);
2322    }
2323
2324    /// Verify the speculative batch API: fork two batches, verify independent roots, apply one.
2325    async fn test_speculative_batch_inner<F: Family + PartialEq>(context: Context) {
2326        let mut journal = create_journal_with_ops::<F>(context, "speculative_batch", 10).await;
2327        let original_root = journal_root(&journal);
2328
2329        // Fork two independent speculative batches.
2330        let b1 = journal.new_batch();
2331        let b2 = journal.new_batch();
2332
2333        // Add different items to each batch.
2334        let op_a = create_operation::<F>(100);
2335        let op_b = create_operation::<F>(200);
2336        let b1 = b1.add(op_a.clone());
2337        let b2 = b2.add(op_b);
2338
2339        // Merkleize and verify independent roots.
2340        let m1 = journal.merkle.with_mem(|mem| b1.merkleize(mem));
2341        let m2 = journal.merkle.with_mem(|mem| b2.merkleize(mem));
2342        assert_ne!(batch_root(&journal, &m1), batch_root(&journal, &m2));
2343        assert_ne!(batch_root(&journal, &m1), original_root);
2344        assert_ne!(batch_root(&journal, &m2), original_root);
2345
2346        // Journal root should be unchanged (batches are speculative).
2347        assert_eq!(journal_root(&journal), original_root);
2348
2349        // Apply batch 1.
2350        let expected_root = batch_root(&journal, &m1);
2351        journal.apply_batch(&m1).await.unwrap();
2352
2353        // Journal should now match the applied batch's root.
2354        assert_eq!(journal_root(&journal), expected_root);
2355        assert_eq!(*journal.size().await, 11);
2356    }
2357
2358    #[test_traced("INFO")]
2359    fn test_speculative_batch_mmr() {
2360        let executor = deterministic::Runner::default();
2361        executor.start(test_speculative_batch_inner::<mmr::Family>);
2362    }
2363
2364    #[test_traced("INFO")]
2365    fn test_speculative_batch_mmb() {
2366        let executor = deterministic::Runner::default();
2367        executor.start(test_speculative_batch_inner::<mmb::Family>);
2368    }
2369
2370    /// Verify stacking: create batch A, merkleize, create batch B from merkleized A,
2371    /// merkleize, and apply. Verify root and items.
2372    async fn test_speculative_batch_stacking_inner<F: Family + PartialEq>(context: Context) {
2373        let mut journal = create_journal_with_ops::<F>(context, "batch_stacking", 10).await;
2374
2375        let op_a = create_operation::<F>(100);
2376        let op_b = create_operation::<F>(200);
2377
2378        let (merkleized_a, merkleized_b) = {
2379            let batch_a = journal.new_batch().add(op_a.clone());
2380            let merkleized_a = journal.merkle.with_mem(|mem| batch_a.merkleize(mem));
2381
2382            let batch_b = merkleized_a.new_batch::<Sha256>().add(op_b.clone());
2383            let merkleized_b = journal.merkle.with_mem(|mem| batch_b.merkleize(mem));
2384            (merkleized_a, merkleized_b)
2385        };
2386
2387        let expected_root = batch_root(&journal, &merkleized_b);
2388        journal.apply_batch(&merkleized_b).await.unwrap();
2389        drop(merkleized_a);
2390
2391        assert_eq!(journal_root(&journal), expected_root);
2392        assert_eq!(*journal.size().await, 12);
2393
2394        // Verify both items were appended correctly.
2395        let read_a = journal.read(Location::<F>::new(10)).await.unwrap();
2396        assert_eq!(read_a, op_a);
2397        let read_b = journal.read(Location::<F>::new(11)).await.unwrap();
2398        assert_eq!(read_b, op_b);
2399    }
2400
2401    #[test_traced("INFO")]
2402    fn test_speculative_batch_stacking_mmr() {
2403        let executor = deterministic::Runner::default();
2404        executor.start(test_speculative_batch_stacking_inner::<mmr::Family>);
2405    }
2406
2407    #[test_traced("INFO")]
2408    fn test_speculative_batch_stacking_mmb() {
2409        let executor = deterministic::Runner::default();
2410        executor.start(test_speculative_batch_stacking_inner::<mmb::Family>);
2411    }
2412
2413    /// Verify sequential batch application: apply batch A, then build and apply batch B
2414    /// from the committed state. Verify root and items.
2415    async fn test_speculative_batch_sequential_inner<F: Family + PartialEq>(context: Context) {
2416        let mut journal = create_journal_with_ops::<F>(context, "batch_sequential", 10).await;
2417
2418        let op_a = create_operation::<F>(100);
2419        let op_b = create_operation::<F>(200);
2420
2421        // Apply batch A.
2422        let batch_a = journal.new_batch().add(op_a.clone());
2423        let merkleized_a = journal.merkle.with_mem(|mem| batch_a.merkleize(mem));
2424        journal.apply_batch(&merkleized_a).await.unwrap();
2425        assert_eq!(*journal.size().await, 11);
2426
2427        // Apply batch B (built on top of the committed A).
2428        let batch_b = journal.new_batch().add(op_b.clone());
2429        let merkleized_b = journal.merkle.with_mem(|mem| batch_b.merkleize(mem));
2430        let expected_root = batch_root(&journal, &merkleized_b);
2431        journal.apply_batch(&merkleized_b).await.unwrap();
2432
2433        assert_eq!(journal_root(&journal), expected_root);
2434        assert_eq!(*journal.size().await, 12);
2435
2436        // Verify both items were appended correctly.
2437        let read_a = journal.read(Location::<F>::new(10)).await.unwrap();
2438        assert_eq!(read_a, op_a);
2439        let read_b = journal.read(Location::<F>::new(11)).await.unwrap();
2440        assert_eq!(read_b, op_b);
2441    }
2442
2443    #[test_traced("INFO")]
2444    fn test_speculative_batch_sequential_mmr() {
2445        let executor = deterministic::Runner::default();
2446        executor.start(test_speculative_batch_sequential_inner::<mmr::Family>);
2447    }
2448
2449    #[test_traced("INFO")]
2450    fn test_speculative_batch_sequential_mmb() {
2451        let executor = deterministic::Runner::default();
2452        executor.start(test_speculative_batch_sequential_inner::<mmb::Family>);
2453    }
2454
2455    async fn test_stale_batch_sibling_inner<F: Family + PartialEq>(context: Context) {
2456        let mut journal = create_empty_journal::<F>(context, "stale-sibling").await;
2457        let op_a = create_operation::<F>(1);
2458        let op_b = create_operation::<F>(2);
2459
2460        // Create two batches from the same base.
2461        let batch_a = journal.new_batch().add(op_a.clone());
2462        let merkleized_a = journal.merkle.with_mem(|mem| batch_a.merkleize(mem));
2463        let batch_b = journal.new_batch().add(op_b);
2464        let merkleized_b = journal.merkle.with_mem(|mem| batch_b.merkleize(mem));
2465
2466        // Apply A -- should succeed.
2467        journal.apply_batch(&merkleized_a).await.unwrap();
2468        let expected_root = journal_root(&journal);
2469        let expected_size = journal.size().await;
2470
2471        // Apply B -- should fail (stale).
2472        let result = journal.apply_batch(&merkleized_b).await;
2473        assert!(
2474            matches!(
2475                result,
2476                Err(super::Error::Merkle(merkle::Error::StaleBatch { .. }))
2477            ),
2478            "expected StaleBatch, got {result:?}"
2479        );
2480
2481        // The stale batch must not mutate the journal or desync it from the Merkle.
2482        assert_eq!(journal_root(&journal), expected_root);
2483        assert_eq!(journal.size().await, expected_size);
2484        let (_, ops) = journal
2485            .proof(Location::<F>::new(0), NZU64!(1), 0)
2486            .await
2487            .unwrap();
2488        assert_eq!(ops, vec![op_a]);
2489    }
2490
2491    #[test_traced("INFO")]
2492    fn test_stale_batch_sibling_mmr() {
2493        let executor = deterministic::Runner::default();
2494        executor.start(test_stale_batch_sibling_inner::<mmr::Family>);
2495    }
2496
2497    #[test_traced("INFO")]
2498    fn test_stale_batch_sibling_mmb() {
2499        let executor = deterministic::Runner::default();
2500        executor.start(test_stale_batch_sibling_inner::<mmb::Family>);
2501    }
2502
2503    async fn test_stale_batch_chained_inner<F: Family + PartialEq>(context: Context) {
2504        let mut journal = create_journal_with_ops::<F>(context, "stale-chained", 5).await;
2505
2506        // Parent batch, then fork two children.
2507        let parent_batch = journal.new_batch().add(create_operation::<F>(10));
2508        let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem));
2509        let batch_a = parent.new_batch::<Sha256>().add(create_operation::<F>(20));
2510        let child_a = journal.merkle.with_mem(|mem| batch_a.merkleize(mem));
2511        let batch_b = parent.new_batch::<Sha256>().add(create_operation::<F>(30));
2512        let child_b = journal.merkle.with_mem(|mem| batch_b.merkleize(mem));
2513
2514        // Apply child_a, then child_b should be stale.
2515        journal.apply_batch(&child_a).await.unwrap();
2516        let result = journal.apply_batch(&child_b).await;
2517        drop(parent);
2518        assert!(
2519            matches!(
2520                result,
2521                Err(super::Error::Merkle(merkle::Error::StaleBatch { .. }))
2522            ),
2523            "expected StaleBatch for sibling, got {result:?}"
2524        );
2525    }
2526
2527    #[test_traced("INFO")]
2528    fn test_stale_batch_chained_mmr() {
2529        let executor = deterministic::Runner::default();
2530        executor.start(test_stale_batch_chained_inner::<mmr::Family>);
2531    }
2532
2533    #[test_traced("INFO")]
2534    fn test_stale_batch_chained_mmb() {
2535        let executor = deterministic::Runner::default();
2536        executor.start(test_stale_batch_chained_inner::<mmb::Family>);
2537    }
2538
2539    async fn test_stale_batch_parent_before_child_inner<F: Family + PartialEq>(context: Context) {
2540        let mut journal = create_empty_journal::<F>(context, "stale-parent-first").await;
2541
2542        // Create parent, then child.
2543        let parent_batch = journal.new_batch().add(create_operation::<F>(1));
2544        let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem));
2545        let child_batch = parent.new_batch::<Sha256>().add(create_operation::<F>(2));
2546        let child = journal.merkle.with_mem(|mem| child_batch.merkleize(mem));
2547
2548        let expected_root = batch_root(&journal, &child);
2549
2550        // Apply parent, then child (sequential commit).
2551        journal.apply_batch(&parent).await.unwrap();
2552        journal.apply_batch(&child).await.unwrap();
2553
2554        assert_eq!(journal_root(&journal), expected_root);
2555        assert_eq!(*journal.size().await, 2);
2556    }
2557
2558    #[test_traced("INFO")]
2559    fn test_stale_batch_parent_before_child_mmr() {
2560        let executor = deterministic::Runner::default();
2561        executor.start(test_stale_batch_parent_before_child_inner::<mmr::Family>);
2562    }
2563
2564    #[test_traced("INFO")]
2565    fn test_stale_batch_parent_before_child_mmb() {
2566        let executor = deterministic::Runner::default();
2567        executor.start(test_stale_batch_parent_before_child_inner::<mmb::Family>);
2568    }
2569
2570    async fn test_stale_batch_child_before_parent_inner<F: Family + PartialEq>(context: Context) {
2571        let mut journal = create_empty_journal::<F>(context, "stale-child-first").await;
2572
2573        // Create parent, then child.
2574        let parent_batch = journal.new_batch().add(create_operation::<F>(1));
2575        let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem));
2576        let child_batch = parent.new_batch::<Sha256>().add(create_operation::<F>(2));
2577        let child = journal.merkle.with_mem(|mem| child_batch.merkleize(mem));
2578
2579        // Apply child first (full chain) -- parent should now be stale.
2580        journal.apply_batch(&child).await.unwrap();
2581        let result = journal.apply_batch(&parent).await;
2582        assert!(
2583            matches!(
2584                result,
2585                Err(super::Error::Merkle(merkle::Error::StaleBatch { .. }))
2586            ),
2587            "expected StaleBatch for parent after child applied, got {result:?}"
2588        );
2589    }
2590
2591    #[test_traced("INFO")]
2592    fn test_stale_batch_child_before_parent_mmr() {
2593        let executor = deterministic::Runner::default();
2594        executor.start(test_stale_batch_child_before_parent_inner::<mmr::Family>);
2595    }
2596
2597    #[test_traced("INFO")]
2598    fn test_stale_batch_child_before_parent_mmb() {
2599        let executor = deterministic::Runner::default();
2600        executor.start(test_stale_batch_child_before_parent_inner::<mmb::Family>);
2601    }
2602
2603    /// Apply parent then child: child skips already-committed ancestor items.
2604    async fn test_apply_batch_skip_ancestor_items_inner<F: Family + PartialEq>(context: Context) {
2605        let mut journal = create_journal_with_ops::<F>(context, "rp-skip", 3).await;
2606
2607        // Parent: 2 items.
2608        let parent_batch = journal
2609            .new_batch()
2610            .add(create_operation::<F>(10))
2611            .add(create_operation::<F>(11));
2612        let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem));
2613
2614        // Child: 3 more items.
2615        let child_batch = parent
2616            .new_batch::<Sha256>()
2617            .add(create_operation::<F>(20))
2618            .add(create_operation::<F>(21))
2619            .add(create_operation::<F>(22));
2620        let child = journal.merkle.with_mem(|mem| child_batch.merkleize(mem));
2621
2622        // Apply parent.
2623        journal.apply_batch(&parent).await.unwrap();
2624
2625        // Apply child (ancestor items already committed, skipped automatically).
2626        journal.apply_batch(&child).await.unwrap();
2627
2628        // Verify all items are present.
2629        let (_, ops) = journal
2630            .proof(Location::<F>::new(3), NZU64!(5), 0)
2631            .await
2632            .unwrap();
2633        assert_eq!(ops.len(), 5);
2634    }
2635
2636    #[test_traced("INFO")]
2637    fn test_apply_batch_skip_ancestor_items_mmr() {
2638        let executor = deterministic::Runner::default();
2639        executor.start(test_apply_batch_skip_ancestor_items_inner::<mmr::Family>);
2640    }
2641
2642    #[test_traced("INFO")]
2643    fn test_apply_batch_skip_ancestor_items_mmb() {
2644        let executor = deterministic::Runner::default();
2645        executor.start(test_apply_batch_skip_ancestor_items_inner::<mmb::Family>);
2646    }
2647
2648    /// `apply_batch` works correctly across a 3-level chain.
2649    async fn test_apply_batch_cross_batch_inner<F: Family + PartialEq>(context: Context) {
2650        let mut journal = create_journal_with_ops::<F>(context, "rp-cross", 2).await;
2651
2652        // Grandparent: 3 items.
2653        let grandparent_batch = journal
2654            .new_batch()
2655            .add(create_operation::<F>(3))
2656            .add(create_operation::<F>(4))
2657            .add(create_operation::<F>(5));
2658        let grandparent = journal
2659            .merkle
2660            .with_mem(|mem| grandparent_batch.merkleize(mem));
2661
2662        // Parent: 2 items.
2663        let parent_batch = grandparent
2664            .new_batch::<Sha256>()
2665            .add(create_operation::<F>(6))
2666            .add(create_operation::<F>(7));
2667        let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem));
2668
2669        // Child: 1 item.
2670        let child_batch = parent.new_batch::<Sha256>().add(create_operation::<F>(8));
2671        let child = journal.merkle.with_mem(|mem| child_batch.merkleize(mem));
2672
2673        // Apply grandparent, then parent, then child sequentially.
2674        journal.apply_batch(&grandparent).await.unwrap();
2675
2676        // Apply parent (ancestor items already committed, skipped automatically).
2677        journal.apply_batch(&parent).await.unwrap();
2678
2679        // Apply child (ancestor items already committed, skipped automatically).
2680        journal.apply_batch(&child).await.unwrap();
2681
2682        // All 8 items (2 base + 3 + 2 + 1) should be present.
2683        assert_eq!(*journal.size().await, 8);
2684
2685        // Verify the actual items at each location.
2686        let (_, ops) = journal
2687            .proof(Location::<F>::new(2), NZU64!(6), 0)
2688            .await
2689            .unwrap();
2690        for (i, op) in ops.iter().enumerate() {
2691            assert_eq!(*op, create_operation::<F>((i + 3) as u8));
2692        }
2693    }
2694
2695    #[test_traced("INFO")]
2696    fn test_apply_batch_cross_batch_mmr() {
2697        let executor = deterministic::Runner::default();
2698        executor.start(test_apply_batch_cross_batch_inner::<mmr::Family>);
2699    }
2700
2701    #[test_traced("INFO")]
2702    fn test_apply_batch_cross_batch_mmb() {
2703        let executor = deterministic::Runner::default();
2704        executor.start(test_apply_batch_cross_batch_inner::<mmb::Family>);
2705    }
2706
2707    /// merkleize_with produces the same root as add + merkleize.
2708    async fn test_merkleize_with_matches_add_inner<F: Family + PartialEq>(context: Context) {
2709        let journal = create_journal_with_ops::<F>(context, "mw-matches", 5).await;
2710
2711        let ops = vec![
2712            create_operation::<F>(10),
2713            create_operation::<F>(11),
2714            create_operation::<F>(12),
2715        ];
2716
2717        // add + merkleize
2718        let mut batch = journal.new_batch();
2719        for op in &ops {
2720            batch = batch.add(op.clone());
2721        }
2722        let expected = journal.merkle.with_mem(|mem| batch.merkleize(mem));
2723
2724        // merkleize_with
2725        let batch = journal.new_batch();
2726        let actual = journal
2727            .merkle
2728            .with_mem(|mem| batch.merkleize_with(mem, Arc::new(ops)));
2729
2730        assert_eq!(
2731            batch_root(&journal, &actual),
2732            batch_root(&journal, &expected)
2733        );
2734    }
2735
2736    #[test_traced("INFO")]
2737    fn test_merkleize_with_matches_add_mmr() {
2738        let executor = deterministic::Runner::default();
2739        executor.start(test_merkleize_with_matches_add_inner::<mmr::Family>);
2740    }
2741
2742    #[test_traced("INFO")]
2743    fn test_merkleize_with_matches_add_mmb() {
2744        let executor = deterministic::Runner::default();
2745        executor.start(test_merkleize_with_matches_add_inner::<mmb::Family>);
2746    }
2747
2748    /// merkleize_with items are readable after apply.
2749    async fn test_merkleize_with_apply_inner<F: Family + PartialEq>(context: Context) {
2750        let mut journal = create_journal_with_ops::<F>(context, "mw-apply", 5).await;
2751
2752        let ops = vec![create_operation::<F>(10), create_operation::<F>(11)];
2753        let batch = journal.new_batch();
2754        let merkleized = journal
2755            .merkle
2756            .with_mem(|mem| batch.merkleize_with(mem, Arc::new(ops.clone())));
2757
2758        let expected_root = batch_root(&journal, &merkleized);
2759        journal.apply_batch(&merkleized).await.unwrap();
2760
2761        assert_eq!(journal_root(&journal), expected_root);
2762        assert_eq!(*journal.size().await, 7);
2763
2764        let reader = journal.reader().await;
2765        assert_eq!(reader.read(5).await.unwrap(), ops[0]);
2766        assert_eq!(reader.read(6).await.unwrap(), ops[1]);
2767    }
2768
2769    #[test_traced("INFO")]
2770    fn test_merkleize_with_apply_mmr() {
2771        let executor = deterministic::Runner::default();
2772        executor.start(test_merkleize_with_apply_inner::<mmr::Family>);
2773    }
2774
2775    #[test_traced("INFO")]
2776    fn test_merkleize_with_apply_mmb() {
2777        let executor = deterministic::Runner::default();
2778        executor.start(test_merkleize_with_apply_inner::<mmb::Family>);
2779    }
2780
2781    /// merkleize_with stores the caller's Arc directly (no deep copy).
2782    async fn test_merkleize_with_shares_arc_inner<F: Family + PartialEq>(context: Context) {
2783        let journal = create_journal_with_ops::<F>(context, "mw-arc", 3).await;
2784
2785        let ops = Arc::new(vec![create_operation::<F>(20), create_operation::<F>(21)]);
2786        let ops_clone = Arc::clone(&ops);
2787        let batch = journal.new_batch();
2788        let merkleized = journal
2789            .merkle
2790            .with_mem(|mem| batch.merkleize_with(mem, ops_clone));
2791
2792        // The batch should hold the same Arc allocation, not a copy.
2793        assert!(Arc::ptr_eq(&merkleized.items, &ops));
2794    }
2795
2796    #[test_traced("INFO")]
2797    fn test_merkleize_with_shares_arc_mmr() {
2798        let executor = deterministic::Runner::default();
2799        executor.start(test_merkleize_with_shares_arc_inner::<mmr::Family>);
2800    }
2801
2802    #[test_traced("INFO")]
2803    fn test_merkleize_with_shares_arc_mmb() {
2804        let executor = deterministic::Runner::default();
2805        executor.start(test_merkleize_with_shares_arc_inner::<mmb::Family>);
2806    }
2807
2808    /// Apply C (grandchild of A) after only A is committed. B's journal items
2809    /// must still be applied -- skip only A's items.
2810    async fn test_apply_batch_skips_only_committed_ancestor_items_inner<F: Family + PartialEq>(
2811        context: Context,
2812    ) {
2813        let mut journal = create_empty_journal::<F>(context.child("storage"), "skip-partial").await;
2814
2815        // Build chain: A -> B -> C
2816        let a_batch = journal.new_batch().add(create_operation::<F>(1));
2817        let a = journal.merkle.with_mem(|mem| a_batch.merkleize(mem));
2818        let b_batch = a.new_batch::<Sha256>().add(create_operation::<F>(2));
2819        let b = journal.merkle.with_mem(|mem| b_batch.merkleize(mem));
2820        let c_batch = b.new_batch::<Sha256>().add(create_operation::<F>(3));
2821        let c = journal.merkle.with_mem(|mem| c_batch.merkleize(mem));
2822
2823        // Apply A, then apply C directly (skipping B's apply_batch).
2824        journal.apply_batch(&a).await.unwrap();
2825        journal.apply_batch(&c).await.unwrap();
2826
2827        // All 3 items should be in the journal.
2828        assert_eq!(*journal.size().await, 3);
2829
2830        // Build a reference that applies all three sequentially.
2831        let mut reference =
2832            create_empty_journal::<F>(context.child("ref"), "skip-partial-ref").await;
2833        for i in 1..=3u8 {
2834            reference.append(&create_operation::<F>(i)).await.unwrap();
2835        }
2836        assert_eq!(journal_root(&journal), journal_root(&reference));
2837    }
2838
2839    #[test_traced("INFO")]
2840    fn test_apply_batch_skips_only_committed_ancestor_items_mmr() {
2841        let executor = deterministic::Runner::default();
2842        executor.start(test_apply_batch_skips_only_committed_ancestor_items_inner::<mmr::Family>);
2843    }
2844
2845    #[test_traced("INFO")]
2846    fn test_apply_batch_skips_only_committed_ancestor_items_mmb() {
2847        let executor = deterministic::Runner::default();
2848        executor.start(test_apply_batch_skips_only_committed_ancestor_items_inner::<mmb::Family>);
2849    }
2850}