Skip to main content

commonware_storage/journal/
authenticated.rs

1//! Authenticated journal implementation.
2//!
3//! An authenticated journal maintains a contiguous journal of items alongside a Merkle Mountain
4//! Range (MMR). The item at index i in the journal corresponds to the leaf at Location i in the
5//! MMR. This structure enables efficient proofs that an item is included in the journal at a
6//! specific location.
7
8use crate::{
9    journal::{
10        contiguous::{fixed, variable, Contiguous, Mutable, Reader},
11        Error as JournalError,
12    },
13    mmr::{
14        batch,
15        journaled::Mmr,
16        read::{BatchChainInfo, Readable},
17        Error as MmrError, Location, Position, Proof, StandardHasher,
18    },
19    Persistable,
20};
21use alloc::{collections::BTreeMap, sync::Arc, vec::Vec};
22use commonware_codec::{CodecFixedShared, CodecShared, Encode, EncodeShared};
23use commonware_cryptography::{Digest, Hasher};
24use commonware_runtime::{Clock, Metrics, Storage};
25use core::num::NonZeroU64;
26use futures::{future::try_join_all, try_join, TryFutureExt as _};
27use thiserror::Error;
28use tracing::{debug, warn};
29
30/// Errors that can occur when interacting with an authenticated journal.
31#[derive(Error, Debug)]
32pub enum Error {
33    #[error("mmr error: {0}")]
34    Mmr(#[from] crate::mmr::Error),
35
36    #[error("journal error: {0}")]
37    Journal(#[from] super::Error),
38}
39
40/// A chain of batches whose items can be collected in append order.
41pub trait BatchChain<Item> {
42    /// Collect the items from the deepest ancestor batch up to and including the current batch
43    /// in append order.
44    fn collect(&self, into: &mut Vec<Arc<Vec<Item>>>);
45}
46
47impl<E: Storage + Clock + Metrics, D: Digest, Item> BatchChain<Item> for Mmr<E, D> {
48    // Recursion base case.
49    fn collect(&self, _into: &mut Vec<Arc<Vec<Item>>>) {}
50}
51
52/// A speculative batch whose root digest has not yet been computed,
53/// in contrast to [MerkleizedBatch].
54pub struct UnmerkleizedBatch<'a, H: Hasher, P: Readable<H::Digest>, Item> {
55    // The inner batch of MMR leaf digests.
56    inner: batch::UnmerkleizedBatch<'a, H::Digest, P>,
57    // The hasher to use for hashing the items.
58    hasher: StandardHasher<H>,
59    // The items to append.
60    items: Vec<Item>,
61}
62
63impl<'a, H: Hasher, P: Readable<H::Digest>, Item: Encode> UnmerkleizedBatch<'a, H, P, Item> {
64    /// Add an item to the batch.
65    pub fn add(&mut self, item: Item) {
66        let encoded = item.encode();
67        self.inner.add(&mut self.hasher, &encoded);
68        self.items.push(item);
69    }
70
71    /// Merkleize the batch, computing the root digest.
72    pub fn merkleize(mut self) -> MerkleizedBatch<'a, H, P, Item> {
73        MerkleizedBatch {
74            inner: self.inner.merkleize(&mut self.hasher),
75            items: Arc::new(self.items),
76        }
77    }
78}
79
80/// A speculative batch whose root digest has been computed,
81/// in contrast to [UnmerkleizedBatch].
82pub struct MerkleizedBatch<'a, H: Hasher, P: Readable<H::Digest>, Item> {
83    // The inner batch of MMR leaf digests.
84    inner: batch::MerkleizedBatch<'a, H::Digest, P>,
85    // The items to append.
86    items: Arc<Vec<Item>>,
87}
88
89impl<'a, H: Hasher, P: Readable<H::Digest>, Item> MerkleizedBatch<'a, H, P, Item> {
90    /// Return the root digest of the authenticated journal after this batch is applied.
91    pub fn root(&self) -> H::Digest {
92        self.inner.root()
93    }
94}
95
96impl<'a, H: Hasher, P: Readable<H::Digest>, Item: Send + Sync> Readable<H::Digest>
97    for MerkleizedBatch<'a, H, P, Item>
98{
99    fn size(&self) -> Position {
100        self.inner.size()
101    }
102    fn get_node(&self, pos: Position) -> Option<H::Digest> {
103        self.inner.get_node(pos)
104    }
105    fn root(&self) -> H::Digest {
106        self.inner.root()
107    }
108    fn pruned_to_pos(&self) -> Position {
109        self.inner.pruned_to_pos()
110    }
111}
112
113impl<'a, H: Hasher, P: Readable<H::Digest> + BatchChainInfo<H::Digest>, Item: Send + Sync>
114    BatchChainInfo<H::Digest> for MerkleizedBatch<'a, H, P, Item>
115{
116    fn base_size(&self) -> Position {
117        self.inner.base_size()
118    }
119    fn collect_overwrites(&self, into: &mut BTreeMap<Position, H::Digest>) {
120        self.inner.collect_overwrites(into);
121    }
122}
123
124impl<'a, H: Hasher, P: Readable<H::Digest> + BatchChain<Item>, Item: Send + Sync> BatchChain<Item>
125    for MerkleizedBatch<'a, H, P, Item>
126{
127    fn collect(&self, into: &mut Vec<Arc<Vec<Item>>>) {
128        self.inner.parent().collect(into); // recurse to parent first
129        into.push(self.items.clone()); // Arc clone, not data clone
130    }
131}
132
133impl<'a, H: Hasher, P: Readable<H::Digest>, Item: Send + Sync + Encode>
134    MerkleizedBatch<'a, H, P, Item>
135{
136    /// Create a new speculative batch of operations with this batch as its parent.
137    pub fn new_batch(&self) -> UnmerkleizedBatch<'_, H, Self, Item> {
138        let inner = batch::UnmerkleizedBatch::new(self);
139        #[cfg(feature = "std")]
140        let inner = inner.with_pool(self.inner.pool());
141        UnmerkleizedBatch {
142            inner,
143            hasher: StandardHasher::new(),
144            items: Vec::new(),
145        }
146    }
147}
148
149impl<'a, H: Hasher, P, Item: Send + Sync> MerkleizedBatch<'a, H, P, Item>
150where
151    P: Readable<H::Digest> + BatchChainInfo<H::Digest> + BatchChain<Item>,
152{
153    /// Consume this batch, collecting the changes from its ancestors and itself into a
154    /// [Changeset] which can be applied to the journal.
155    pub fn finalize(self) -> Changeset<H::Digest, Item> {
156        let mut items = Vec::new();
157        self.collect(&mut items);
158        Changeset {
159            changeset: self.inner.finalize(),
160            items,
161        }
162    }
163}
164
165/// An owned changeset that can be applied to the journal.
166pub struct Changeset<D: Digest, Item> {
167    // The inner MMR changeset.
168    changeset: batch::Changeset<D>,
169    // The items to append.
170    items: Vec<Arc<Vec<Item>>>,
171}
172
173/// An append-only data structure that maintains a sequential journal of items alongside a Merkle
174/// Mountain Range (MMR). The item at index i in the journal corresponds to the leaf at Location i
175/// in the MMR. This structure enables efficient proofs that an item is included in the journal at a
176/// specific location.
177pub struct Journal<E, C, H>
178where
179    E: Storage + Clock + Metrics,
180    C: Contiguous<Item: EncodeShared>,
181    H: Hasher,
182{
183    /// MMR where each leaf is an item digest.
184    /// Invariant: leaf i corresponds to item i in the journal.
185    pub(crate) mmr: Mmr<E, H::Digest>,
186
187    /// Journal of items.
188    /// Invariant: item i corresponds to leaf i in the MMR.
189    pub(crate) journal: C,
190
191    pub(crate) hasher: StandardHasher<H>,
192}
193
194impl<E, C, H> Journal<E, C, H>
195where
196    E: Storage + Clock + Metrics,
197    C: Contiguous<Item: EncodeShared>,
198    H: Hasher,
199{
200    /// Returns the Location of the next item appended to the journal.
201    pub async fn size(&self) -> Location {
202        Location::new(self.journal.size().await)
203    }
204
205    /// Return the root of the MMR.
206    pub fn root(&self) -> H::Digest {
207        self.mmr.root()
208    }
209
210    /// Create a speculative batch atop this journal.
211    pub fn new_batch(&self) -> UnmerkleizedBatch<'_, H, Mmr<E, H::Digest>, C::Item> {
212        UnmerkleizedBatch {
213            inner: self.mmr.new_batch(),
214            hasher: StandardHasher::new(),
215            items: Vec::new(),
216        }
217    }
218}
219
220impl<E, C, H> Journal<E, C, H>
221where
222    E: Storage + Clock + Metrics,
223    C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
224    H: Hasher,
225{
226    /// Durably persist the journal. This is faster than `sync()` but does not persist the MMR,
227    /// meaning recovery will be required on startup if we crash before `sync()`.
228    pub async fn commit(&self) -> Result<(), Error> {
229        self.journal.commit().await.map_err(Error::Journal)
230    }
231}
232
233impl<E, C, H> Journal<E, C, H>
234where
235    E: Storage + Clock + Metrics,
236    C: Mutable<Item: EncodeShared>,
237    H: Hasher,
238{
239    /// Create a new [Journal] from the given components after aligning the MMR with the journal.
240    pub async fn from_components(
241        mut mmr: Mmr<E, H::Digest>,
242        journal: C,
243        mut hasher: StandardHasher<H>,
244        apply_batch_size: u64,
245    ) -> Result<Self, Error> {
246        Self::align(&mut mmr, &journal, &mut hasher, apply_batch_size).await?;
247
248        // Sync the MMR to disk to avoid having to repeat any recovery that may have been performed
249        // on next startup.
250        mmr.sync().await?;
251
252        Ok(Self {
253            mmr,
254            journal,
255            hasher,
256        })
257    }
258
259    /// Align `mmr` to be consistent with `journal`. Any items in `mmr` that aren't in `journal` are
260    /// popped, and any items in `journal` that aren't in `mmr` are added to `mmr`. Items are added
261    /// to `mmr` in batches of size `apply_batch_size` to avoid memory bloat.
262    async fn align(
263        mmr: &mut Mmr<E, H::Digest>,
264        journal: &C,
265        hasher: &mut StandardHasher<H>,
266        apply_batch_size: u64,
267    ) -> Result<(), Error> {
268        // Rewind MMR elements that are ahead of the journal.
269        // Note mmr_size is the size of the MMR in leaves, not positions.
270        let journal_size = journal.size().await;
271        let mut mmr_size = mmr.leaves();
272        if mmr_size > journal_size {
273            let rewind_count = mmr_size - journal_size;
274            warn!(
275                journal_size,
276                ?rewind_count,
277                "rewinding MMR to match journal"
278            );
279            mmr.rewind(*rewind_count as usize, hasher).await?;
280            mmr_size = Location::new(journal_size);
281        }
282
283        // If the MMR is behind, replay journal items to catch up.
284        if mmr_size < journal_size {
285            let replay_count = journal_size - *mmr_size;
286            warn!(
287                ?journal_size,
288                replay_count, "MMR lags behind journal, replaying journal to catch up"
289            );
290
291            let reader = journal.reader().await;
292            while mmr_size < journal_size {
293                let changeset = {
294                    let mut batch = mmr.new_batch();
295                    let mut count = 0u64;
296                    while count < apply_batch_size && mmr_size < journal_size {
297                        let op = reader.read(*mmr_size).await?;
298                        batch.add(hasher, &op.encode());
299                        mmr_size += 1;
300                        count += 1;
301                    }
302                    batch.merkleize(hasher).finalize()
303                };
304                mmr.apply(changeset)?;
305            }
306            return Ok(());
307        }
308
309        // At this point the MMR and journal should be consistent.
310        assert_eq!(journal.size().await, *mmr.leaves());
311
312        Ok(())
313    }
314
315    /// Append an item to the journal and update the MMR.
316    pub async fn append(&mut self, item: &C::Item) -> Result<Location, Error> {
317        let encoded_item = item.encode();
318
319        // Append item to the journal, then update the MMR state.
320        let loc = self.journal.append(item).await?;
321        let changeset = {
322            let mut batch = self.mmr.new_batch();
323            batch.add(&mut self.hasher, &encoded_item);
324            batch.merkleize(&mut self.hasher).finalize()
325        };
326        self.mmr.apply(changeset)?;
327
328        Ok(Location::new(loc))
329    }
330
331    /// Apply a changeset to the journal.
332    ///
333    /// A changeset is only valid if the journal has not been modified since the
334    /// batch that produced it was created. Multiple batches can be forked from the
335    /// same parent for speculative execution, but only one may be applied. Applying
336    /// a stale changeset returns an error.
337    pub async fn apply_batch(&mut self, batch: Changeset<H::Digest, C::Item>) -> Result<(), Error> {
338        let actual = self.mmr.size();
339        if batch.changeset.base_size != actual {
340            return Err(MmrError::StaleChangeset {
341                expected: batch.changeset.base_size,
342                actual,
343            }
344            .into());
345        }
346
347        for items in &batch.items {
348            for item in items.iter() {
349                self.journal.append(item).await?;
350            }
351        }
352        self.mmr.apply(batch.changeset)?;
353        debug_assert_eq!(*self.mmr.leaves(), self.journal.size().await);
354        Ok(())
355    }
356
357    /// Prune both the MMR and journal to the given location.
358    ///
359    /// # Returns
360    /// The new pruning boundary, which may be less than the requested `prune_loc`.
361    pub async fn prune(&mut self, prune_loc: Location) -> Result<Location, Error> {
362        if self.mmr.size() == 0 {
363            // DB is empty, nothing to prune.
364            return Ok(Location::new(self.reader().await.bounds().start));
365        }
366
367        // Sync the MMR before pruning the journal, otherwise the MMR's last element could end up
368        // behind the journal's first element after a crash, and there would be no way to replay
369        // the items between the MMR's last element and the journal's first element.
370        self.mmr.sync().await?;
371
372        // Prune the journal and check if anything was actually pruned
373        if !self.journal.prune(*prune_loc).await? {
374            return Ok(Location::new(self.reader().await.bounds().start));
375        }
376
377        let bounds = self.reader().await.bounds();
378        debug!(size = ?bounds.end, ?prune_loc, boundary = ?bounds.start, "pruned inactive ops");
379
380        // Prune MMR to match the journal's actual boundary
381        self.mmr.prune(Location::from(bounds.start)).await?;
382
383        Ok(Location::new(bounds.start))
384    }
385}
386
387impl<E, C, H> Journal<E, C, H>
388where
389    E: Storage + Clock + Metrics,
390    C: Contiguous<Item: EncodeShared>,
391    H: Hasher,
392{
393    /// Generate a proof of inclusion for items starting at `start_loc`.
394    ///
395    /// Returns a proof and the items corresponding to the leaves in the range `start_loc..end_loc`,
396    /// where `end_loc` is the minimum of the current item count and `start_loc + max_ops`.
397    ///
398    /// # Errors
399    ///
400    /// - Returns [Error::Mmr] with [MmrError::LocationOverflow] if `start_loc` >
401    ///   [crate::mmr::MAX_LOCATION].
402    /// - Returns [Error::Mmr] with [MmrError::RangeOutOfBounds] if `start_loc` >= current
403    ///   item count.
404    /// - Returns [Error::Journal] with [crate::journal::Error::ItemPruned] if `start_loc` has been
405    ///   pruned.
406    pub async fn proof(
407        &self,
408        start_loc: Location,
409        max_ops: NonZeroU64,
410    ) -> Result<(Proof<H::Digest>, Vec<C::Item>), Error> {
411        self.historical_proof(self.size().await, start_loc, max_ops)
412            .await
413    }
414
415    /// Generate a historical proof with respect to the state of the MMR when it had
416    /// `historical_leaves` leaves.
417    ///
418    /// Returns a proof and the items corresponding to the leaves in the range `start_loc..end_loc`,
419    /// where `end_loc` is the minimum of `historical_leaves` and `start_loc + max_ops`.
420    ///
421    /// # Errors
422    ///
423    /// - Returns [Error::Mmr] with [MmrError::RangeOutOfBounds] if `start_loc` >=
424    ///   `historical_leaves` or `historical_leaves` > number of items in the journal.
425    /// - Returns [Error::Journal] with [crate::journal::Error::ItemPruned] if `start_loc` has been
426    ///   pruned.
427    pub async fn historical_proof(
428        &self,
429        historical_leaves: Location,
430        start_loc: Location,
431        max_ops: NonZeroU64,
432    ) -> Result<(Proof<H::Digest>, Vec<C::Item>), Error> {
433        // Acquire a reader guard to prevent pruning from advancing while we read.
434        let reader = self.journal.reader().await;
435        let bounds = reader.bounds();
436
437        if *historical_leaves > bounds.end {
438            return Err(MmrError::RangeOutOfBounds(Location::new(bounds.end)).into());
439        }
440        if start_loc >= historical_leaves {
441            return Err(MmrError::RangeOutOfBounds(start_loc).into());
442        }
443
444        let end_loc = std::cmp::min(historical_leaves, start_loc.saturating_add(max_ops.get()));
445
446        let proof = self
447            .mmr
448            .historical_range_proof(historical_leaves, start_loc..end_loc)
449            .await?;
450
451        let mut ops = Vec::with_capacity((*end_loc - *start_loc) as usize);
452        let futures = (*start_loc..*end_loc)
453            .map(|i| reader.read(i))
454            .collect::<Vec<_>>();
455        try_join_all(futures)
456            .await?
457            .into_iter()
458            .for_each(|op| ops.push(op));
459
460        Ok((proof, ops))
461    }
462}
463
464impl<E, C, H> Journal<E, C, H>
465where
466    E: Storage + Clock + Metrics,
467    C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
468    H: Hasher,
469{
470    /// Destroy the authenticated journal, removing all data from disk.
471    pub async fn destroy(self) -> Result<(), Error> {
472        try_join!(
473            self.journal.destroy().map_err(Error::Journal),
474            self.mmr.destroy().map_err(Error::Mmr),
475        )?;
476
477        Ok(())
478    }
479
480    /// Durably persist the journal, ensuring no recovery is required on startup.
481    pub async fn sync(&self) -> Result<(), Error> {
482        try_join!(
483            self.journal.sync().map_err(Error::Journal),
484            self.mmr.sync().map_err(Error::Mmr)
485        )?;
486
487        Ok(())
488    }
489}
490
491/// The number of items to apply to the MMR in a single batch.
492const APPLY_BATCH_SIZE: u64 = 1 << 16;
493
494impl<E, O, H> Journal<E, fixed::Journal<E, O>, H>
495where
496    E: Storage + Clock + Metrics,
497    O: CodecFixedShared,
498    H: Hasher,
499{
500    /// Create a new [Journal] for fixed-length items.
501    ///
502    /// The journal will be rewound to the last item that matches the `rewind_predicate` on
503    /// initialization.
504    pub async fn new(
505        context: E,
506        mmr_cfg: crate::mmr::journaled::Config,
507        journal_cfg: fixed::Config,
508        rewind_predicate: fn(&O) -> bool,
509    ) -> Result<Self, Error> {
510        let mut journal = fixed::Journal::init(context.with_label("journal"), journal_cfg).await?;
511
512        // Rewind journal to last matching item.
513        journal.rewind_to(rewind_predicate).await?;
514
515        // Align the MMR and journal.
516        let mut hasher = StandardHasher::<H>::new();
517        let mut mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_cfg).await?;
518        Self::align(&mut mmr, &journal, &mut hasher, APPLY_BATCH_SIZE).await?;
519
520        // Sync the journal and MMR to disk to avoid having to repeat any recovery that may have
521        // been performed on next startup.
522        journal.sync().await?;
523        mmr.sync().await?;
524
525        Ok(Self {
526            mmr,
527            journal,
528            hasher,
529        })
530    }
531}
532
533impl<E, O, H> Journal<E, variable::Journal<E, O>, H>
534where
535    E: Storage + Clock + Metrics,
536    O: CodecShared,
537    H: Hasher,
538{
539    /// Create a new [Journal] for variable-length items.
540    ///
541    /// The journal will be rewound to the last item that matches the `rewind_predicate` on
542    /// initialization.
543    pub async fn new(
544        context: E,
545        mmr_cfg: crate::mmr::journaled::Config,
546        journal_cfg: variable::Config<O::Cfg>,
547        rewind_predicate: fn(&O) -> bool,
548    ) -> Result<Self, Error> {
549        let mut hasher = StandardHasher::<H>::new();
550        let mut mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_cfg).await?;
551        let mut journal =
552            variable::Journal::init(context.with_label("journal"), journal_cfg).await?;
553
554        // Rewind to last matching item.
555        journal.rewind_to(rewind_predicate).await?;
556
557        // Align the MMR and journal.
558        Self::align(&mut mmr, &journal, &mut hasher, APPLY_BATCH_SIZE).await?;
559
560        // Sync the journal and MMR to disk to avoid having to repeat any recovery that may have
561        // been performed on next startup.
562        journal.sync().await?;
563        mmr.sync().await?;
564
565        Ok(Self {
566            mmr,
567            journal,
568            hasher,
569        })
570    }
571}
572
573impl<E, C, H> Contiguous for Journal<E, C, H>
574where
575    E: Storage + Clock + Metrics,
576    C: Contiguous<Item: EncodeShared>,
577    H: Hasher,
578{
579    type Item = C::Item;
580
581    async fn reader(&self) -> impl Reader<Item = C::Item> + '_ {
582        self.journal.reader().await
583    }
584
585    async fn size(&self) -> u64 {
586        self.journal.size().await
587    }
588}
589
590impl<E, C, H> Mutable for Journal<E, C, H>
591where
592    E: Storage + Clock + Metrics,
593    C: Mutable<Item: EncodeShared>,
594    H: Hasher,
595{
596    async fn append(&mut self, item: &Self::Item) -> Result<u64, JournalError> {
597        let res = self.append(item).await.map_err(|e| match e {
598            Error::Journal(inner) => inner,
599            Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
600        })?;
601
602        Ok(*res)
603    }
604
605    async fn prune(&mut self, min_position: u64) -> Result<bool, JournalError> {
606        self.journal.prune(min_position).await
607    }
608
609    async fn rewind(&mut self, size: u64) -> Result<(), JournalError> {
610        self.journal.rewind(size).await?;
611
612        let leaves = *self.mmr.leaves();
613        if leaves > size {
614            self.mmr
615                .rewind((leaves - size) as usize, &mut self.hasher)
616                .await
617                .map_err(|error| JournalError::Mmr(anyhow::Error::from(error)))?;
618        }
619
620        Ok(())
621    }
622}
623
624impl<E, C, H> Persistable for Journal<E, C, H>
625where
626    E: Storage + Clock + Metrics,
627    C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
628    H: Hasher,
629{
630    type Error = JournalError;
631
632    async fn commit(&self) -> Result<(), JournalError> {
633        self.commit().await.map_err(|e| match e {
634            Error::Journal(inner) => inner,
635            Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
636        })
637    }
638
639    async fn sync(&self) -> Result<(), JournalError> {
640        self.sync().await.map_err(|e| match e {
641            Error::Journal(inner) => inner,
642            Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
643        })
644    }
645
646    async fn destroy(self) -> Result<(), JournalError> {
647        self.destroy().await.map_err(|e| match e {
648            Error::Journal(inner) => inner,
649            Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
650        })
651    }
652}
653
654#[cfg(test)]
655impl<E, C, H> Journal<E, C, H>
656where
657    E: Storage + Clock + Metrics,
658    C: Contiguous<Item: EncodeShared>,
659    H: Hasher,
660{
661    /// Test helper: Read the item at the given location.
662    pub(crate) async fn read(&self, loc: Location) -> Result<C::Item, Error> {
663        self.journal
664            .reader()
665            .await
666            .read(*loc)
667            .await
668            .map_err(Error::Journal)
669    }
670}
671
672#[cfg(test)]
673mod tests {
674    use super::*;
675    use crate::{
676        journal::contiguous::fixed::{Config as JConfig, Journal as ContiguousJournal},
677        mmr::{
678            journaled::{Config as MmrConfig, Mmr},
679            Location,
680        },
681        qmdb::{
682            any::unordered::{fixed::Operation, Update},
683            operation::Committable,
684        },
685    };
686    use commonware_codec::Encode;
687    use commonware_cryptography::{sha256, sha256::Digest, Sha256};
688    use commonware_macros::test_traced;
689    use commonware_runtime::{
690        buffer::paged::CacheRef,
691        deterministic::{self, Context},
692        BufferPooler, Metrics, Runner as _,
693    };
694    use commonware_utils::{NZUsize, NZU16, NZU64};
695    use futures::StreamExt as _;
696    use std::num::{NonZeroU16, NonZeroUsize};
697
698    const PAGE_SIZE: NonZeroU16 = NZU16!(101);
699    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
700
701    /// Create MMR configuration for tests.
702    fn mmr_config(suffix: &str, pooler: &impl BufferPooler) -> MmrConfig {
703        MmrConfig {
704            journal_partition: format!("mmr-journal-{suffix}"),
705            metadata_partition: format!("mmr-metadata-{suffix}"),
706            items_per_blob: NZU64!(11),
707            write_buffer: NZUsize!(1024),
708            thread_pool: None,
709            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
710        }
711    }
712
713    /// Create journal configuration for tests.
714    fn journal_config(suffix: &str, pooler: &impl BufferPooler) -> JConfig {
715        JConfig {
716            partition: format!("journal-{suffix}"),
717            items_per_blob: NZU64!(7),
718            write_buffer: NZUsize!(1024),
719            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
720        }
721    }
722
723    type AuthenticatedJournal = Journal<
724        deterministic::Context,
725        ContiguousJournal<deterministic::Context, Operation<Digest, Digest>>,
726        Sha256,
727    >;
728
729    /// Create a new empty authenticated journal.
730    async fn create_empty_journal(context: Context, suffix: &str) -> AuthenticatedJournal {
731        let mmr_cfg = mmr_config(suffix, &context);
732        let journal_cfg = journal_config(suffix, &context);
733        AuthenticatedJournal::new(
734            context,
735            mmr_cfg,
736            journal_cfg,
737            |op: &Operation<Digest, Digest>| op.is_commit(),
738        )
739        .await
740        .unwrap()
741    }
742
743    /// Create a test operation with predictable values based on index.
744    fn create_operation(index: u8) -> Operation<Digest, Digest> {
745        Operation::Update(Update(
746            Sha256::fill(index),
747            Sha256::fill(index.wrapping_add(1)),
748        ))
749    }
750
751    /// Create an authenticated journal with N committed operations.
752    ///
753    /// Operations are added and then synced to ensure they are committed.
754    async fn create_journal_with_ops(
755        context: Context,
756        suffix: &str,
757        count: usize,
758    ) -> AuthenticatedJournal {
759        let mut journal = create_empty_journal(context, suffix).await;
760
761        for i in 0..count {
762            let op = create_operation(i as u8);
763            let loc = journal.append(&op).await.unwrap();
764            assert_eq!(loc, Location::new(i as u64));
765        }
766
767        journal.sync().await.unwrap();
768        journal
769    }
770
771    /// Create separate MMR and journal components for testing alignment.
772    ///
773    /// These components are created independently and can be manipulated separately to test
774    /// scenarios where the MMR and journal are out of sync (e.g., one ahead of the other).
775    async fn create_components(
776        context: Context,
777        suffix: &str,
778    ) -> (
779        Mmr<deterministic::Context, sha256::Digest>,
780        ContiguousJournal<deterministic::Context, Operation<Digest, Digest>>,
781        StandardHasher<Sha256>,
782    ) {
783        let mut hasher = StandardHasher::new();
784        let mmr = Mmr::init(
785            context.with_label("mmr"),
786            &mut hasher,
787            mmr_config(suffix, &context),
788        )
789        .await
790        .unwrap();
791        let journal = ContiguousJournal::init(
792            context.with_label("journal"),
793            journal_config(suffix, &context),
794        )
795        .await
796        .unwrap();
797        (mmr, journal, hasher)
798    }
799
800    /// Verify that a proof correctly proves the given operations are included in the MMR.
801    fn verify_proof(
802        proof: &crate::mmr::Proof<<Sha256 as commonware_cryptography::Hasher>::Digest>,
803        operations: &[Operation<Digest, Digest>],
804        start_loc: Location,
805        root: &<Sha256 as commonware_cryptography::Hasher>::Digest,
806        hasher: &mut StandardHasher<Sha256>,
807    ) -> bool {
808        let encoded_ops: Vec<_> = operations.iter().map(|op| op.encode()).collect();
809        proof.verify_range_inclusion(hasher, &encoded_ops, start_loc, root)
810    }
811
812    /// Verify that new() creates an empty authenticated journal.
813    #[test_traced("INFO")]
814    fn test_new_creates_empty_journal() {
815        let executor = deterministic::Runner::default();
816        executor.start(|context| async move {
817            let journal = create_empty_journal(context, "new-empty").await;
818
819            let bounds = journal.reader().await.bounds();
820            assert_eq!(bounds.end, 0);
821            assert_eq!(bounds.start, 0);
822            assert!(bounds.is_empty());
823        });
824    }
825
826    /// Verify that align() correctly handles empty MMR and journal components.
827    #[test_traced("INFO")]
828    fn test_align_with_empty_mmr_and_journal() {
829        let executor = deterministic::Runner::default();
830        executor.start(|context| async move {
831            let (mut mmr, journal, mut hasher) = create_components(context, "align-empty").await;
832
833            AuthenticatedJournal::align(&mut mmr, &journal, &mut hasher, APPLY_BATCH_SIZE)
834                .await
835                .unwrap();
836
837            assert_eq!(mmr.leaves(), Location::new(0));
838            assert_eq!(journal.size().await, 0);
839        });
840    }
841
842    /// Verify that align() pops MMR elements when MMR is ahead of the journal.
843    #[test_traced("WARN")]
844    fn test_align_when_mmr_ahead() {
845        let executor = deterministic::Runner::default();
846        executor.start(|context| async move {
847            let (mut mmr, journal, mut hasher) = create_components(context, "mmr-ahead").await;
848
849            // Add 20 operations to both MMR and journal
850            {
851                let changeset = {
852                    let mut batch = mmr.new_batch();
853                    for i in 0..20 {
854                        let op = create_operation(i as u8);
855                        let encoded = op.encode();
856                        batch.add(&mut hasher, &encoded);
857                        journal.append(&op).await.unwrap();
858                    }
859                    batch.merkleize(&mut hasher).finalize()
860                };
861                mmr.apply(changeset).unwrap();
862            }
863
864            // Add commit operation to journal only (making journal ahead)
865            let commit_op = Operation::CommitFloor(None, Location::new(0));
866            journal.append(&commit_op).await.unwrap();
867            journal.sync().await.unwrap();
868
869            // MMR has 20 leaves, journal has 21 operations (20 ops + 1 commit)
870            AuthenticatedJournal::align(&mut mmr, &journal, &mut hasher, APPLY_BATCH_SIZE)
871                .await
872                .unwrap();
873
874            // MMR should have been aligned to match journal
875            assert_eq!(mmr.leaves(), Location::new(21));
876            assert_eq!(journal.size().await, 21);
877        });
878    }
879
880    /// Verify that align() replays journal operations when journal is ahead of MMR.
881    #[test_traced("WARN")]
882    fn test_align_when_journal_ahead() {
883        let executor = deterministic::Runner::default();
884        executor.start(|context| async move {
885            let (mut mmr, journal, mut hasher) = create_components(context, "journal-ahead").await;
886
887            // Add 20 operations to journal only
888            for i in 0..20 {
889                let op = create_operation(i as u8);
890                journal.append(&op).await.unwrap();
891            }
892
893            // Add commit
894            let commit_op = Operation::CommitFloor(None, Location::new(0));
895            journal.append(&commit_op).await.unwrap();
896            journal.sync().await.unwrap();
897
898            // Journal has 21 operations, MMR has 0 leaves
899            AuthenticatedJournal::align(&mut mmr, &journal, &mut hasher, APPLY_BATCH_SIZE)
900                .await
901                .unwrap();
902
903            // MMR should have been replayed to match journal
904            assert_eq!(mmr.leaves(), Location::new(21));
905            assert_eq!(journal.size().await, 21);
906        });
907    }
908
909    /// Verify that align() discards uncommitted operations.
910    #[test_traced("INFO")]
911    fn test_align_with_mismatched_committed_ops() {
912        let executor = deterministic::Runner::default();
913        executor.start(|context| async move {
914            let mut journal = create_empty_journal(context.with_label("first"), "mismatched").await;
915
916            // Add 20 uncommitted operations
917            for i in 0..20 {
918                let loc = journal.append(&create_operation(i as u8)).await.unwrap();
919                assert_eq!(loc, Location::new(i as u64));
920            }
921
922            // Don't sync - these are uncommitted
923            // After alignment, they should be discarded
924            let size_before = journal.size().await;
925            assert_eq!(size_before, 20);
926
927            // Drop and recreate to simulate restart (which calls align internally)
928            journal.sync().await.unwrap();
929            drop(journal);
930            let journal = create_empty_journal(context.with_label("second"), "mismatched").await;
931
932            // Uncommitted operations should be gone
933            assert_eq!(journal.size().await, 0);
934        });
935    }
936
937    #[test_traced("INFO")]
938    fn test_rewind() {
939        let executor = deterministic::Runner::default();
940        executor.start(|context| async move {
941            // Test 1: Matching operation is kept
942            {
943                let mut journal = ContiguousJournal::init(
944                    context.with_label("rewind_match"),
945                    journal_config("rewind-match", &context),
946                )
947                .await
948                .unwrap();
949
950                // Add operations where operation 3 is a commit
951                for i in 0..3 {
952                    journal.append(&create_operation(i)).await.unwrap();
953                }
954                journal
955                    .append(&Operation::CommitFloor(None, Location::new(0)))
956                    .await
957                    .unwrap();
958                for i in 4..7 {
959                    journal.append(&create_operation(i)).await.unwrap();
960                }
961
962                // Rewind to last commit
963                let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
964                assert_eq!(final_size, 4);
965                assert_eq!(journal.size().await, 4);
966
967                // Verify the commit operation is still there
968                let op = journal.read(3).await.unwrap();
969                assert!(op.is_commit());
970            }
971
972            // Test 2: Last matching operation is chosen when multiple match
973            {
974                let mut journal = ContiguousJournal::init(
975                    context.with_label("rewind_multiple"),
976                    journal_config("rewind-multiple", &context),
977                )
978                .await
979                .unwrap();
980
981                // Add multiple commits
982                journal.append(&create_operation(0)).await.unwrap();
983                journal
984                    .append(&Operation::CommitFloor(None, Location::new(0)))
985                    .await
986                    .unwrap(); // pos 1
987                journal.append(&create_operation(2)).await.unwrap();
988                journal
989                    .append(&Operation::CommitFloor(None, Location::new(1)))
990                    .await
991                    .unwrap(); // pos 3
992                journal.append(&create_operation(4)).await.unwrap();
993
994                // Should rewind to last commit (pos 3)
995                let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
996                assert_eq!(final_size, 4);
997
998                // Verify the last commit is still there
999                let op = journal.read(3).await.unwrap();
1000                assert!(op.is_commit());
1001
1002                // Verify we can't read pos 4
1003                assert!(journal.read(4).await.is_err());
1004            }
1005
1006            // Test 3: Rewind to pruning boundary when no match
1007            {
1008                let mut journal = ContiguousJournal::init(
1009                    context.with_label("rewind_no_match"),
1010                    journal_config("rewind-no-match", &context),
1011                )
1012                .await
1013                .unwrap();
1014
1015                // Add operations with no commits
1016                for i in 0..10 {
1017                    journal.append(&create_operation(i)).await.unwrap();
1018                }
1019
1020                // Rewind should go to pruning boundary (0 for unpruned)
1021                let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1022                assert_eq!(final_size, 0, "Should rewind to pruning boundary (0)");
1023                assert_eq!(journal.size().await, 0);
1024            }
1025
1026            // Test 4: Rewind with existing pruning boundary
1027            {
1028                let mut journal = ContiguousJournal::init(
1029                    context.with_label("rewind_with_pruning"),
1030                    journal_config("rewind-with-pruning", &context),
1031                )
1032                .await
1033                .unwrap();
1034
1035                // Add operations and a commit at position 10 (past first section boundary of 7)
1036                for i in 0..10 {
1037                    journal.append(&create_operation(i)).await.unwrap();
1038                }
1039                journal
1040                    .append(&Operation::CommitFloor(None, Location::new(0)))
1041                    .await
1042                    .unwrap(); // pos 10
1043                for i in 11..15 {
1044                    journal.append(&create_operation(i)).await.unwrap();
1045                }
1046                journal.sync().await.unwrap();
1047
1048                // Prune up to position 8 (this will prune section 0, items 0-6, keeping 7+)
1049                journal.prune(8).await.unwrap();
1050                assert_eq!(journal.reader().await.bounds().start, 7);
1051
1052                // Add more uncommitted operations
1053                for i in 15..20 {
1054                    journal.append(&create_operation(i)).await.unwrap();
1055                }
1056
1057                // Rewind should keep the commit at position 10
1058                let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1059                assert_eq!(final_size, 11);
1060
1061                // Verify commit is still there
1062                let op = journal.read(10).await.unwrap();
1063                assert!(op.is_commit());
1064            }
1065
1066            // Test 5: Rewind with no matches after pruning boundary
1067            {
1068                let mut journal = ContiguousJournal::init(
1069                    context.with_label("rewind_no_match_pruned"),
1070                    journal_config("rewind-no-match-pruned", &context),
1071                )
1072                .await
1073                .unwrap();
1074
1075                // Add operations with a commit at position 5 (in section 0: 0-6)
1076                for i in 0..5 {
1077                    journal.append(&create_operation(i)).await.unwrap();
1078                }
1079                journal
1080                    .append(&Operation::CommitFloor(None, Location::new(0)))
1081                    .await
1082                    .unwrap(); // pos 5
1083                for i in 6..10 {
1084                    journal.append(&create_operation(i)).await.unwrap();
1085                }
1086                journal.sync().await.unwrap();
1087
1088                // Prune up to position 8 (this prunes section 0, including the commit at pos 5)
1089                // Pruning boundary will be at position 7 (start of section 1)
1090                journal.prune(8).await.unwrap();
1091                assert_eq!(journal.reader().await.bounds().start, 7);
1092
1093                // Add uncommitted operations with no commits (in section 1: 7-13)
1094                for i in 10..14 {
1095                    journal.append(&create_operation(i)).await.unwrap();
1096                }
1097
1098                // Rewind with no matching commits after the pruning boundary
1099                // Should rewind to the pruning boundary at position 7
1100                let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1101                assert_eq!(final_size, 7);
1102            }
1103
1104            // Test 6: Empty journal
1105            {
1106                let mut journal = ContiguousJournal::init(
1107                    context.with_label("rewind_empty"),
1108                    journal_config("rewind-empty", &context),
1109                )
1110                .await
1111                .unwrap();
1112
1113                // Rewind empty journal should be no-op
1114                let final_size = journal
1115                    .rewind_to(|op: &Operation<Digest, Digest>| op.is_commit())
1116                    .await
1117                    .unwrap();
1118                assert_eq!(final_size, 0);
1119                assert_eq!(journal.size().await, 0);
1120            }
1121
1122            // Test 7: Position based authenticated journal rewind.
1123            {
1124                let mmr_cfg = mmr_config("rewind", &context);
1125                let journal_cfg = journal_config("rewind", &context);
1126                let mut journal =
1127                    AuthenticatedJournal::new(context, mmr_cfg, journal_cfg, |op| op.is_commit())
1128                        .await
1129                        .unwrap();
1130
1131                // Add operations with a commit at position 5 (in section 0: 0-6)
1132                for i in 0..5 {
1133                    journal.append(&create_operation(i)).await.unwrap();
1134                }
1135                journal
1136                    .append(&Operation::CommitFloor(None, Location::new(0)))
1137                    .await
1138                    .unwrap(); // pos 5
1139                for i in 6..10 {
1140                    journal.append(&create_operation(i)).await.unwrap();
1141                }
1142                assert_eq!(journal.size().await, 10);
1143
1144                journal.rewind(2).await.unwrap();
1145                assert_eq!(journal.size().await, 2);
1146                assert_eq!(journal.mmr.leaves(), 2);
1147                assert_eq!(journal.mmr.size(), 3);
1148                let bounds = journal.reader().await.bounds();
1149                assert_eq!(bounds.start, 0);
1150                assert!(!bounds.is_empty());
1151
1152                assert!(matches!(
1153                    journal.rewind(3).await,
1154                    Err(JournalError::InvalidRewind(_))
1155                ));
1156
1157                journal.rewind(0).await.unwrap();
1158                assert_eq!(journal.size().await, 0);
1159                assert_eq!(journal.mmr.leaves(), 0);
1160                assert_eq!(journal.mmr.size(), 0);
1161                let bounds = journal.reader().await.bounds();
1162                assert_eq!(bounds.start, 0);
1163                assert!(bounds.is_empty());
1164
1165                // Test rewinding after pruning.
1166                for i in 0..255 {
1167                    journal.append(&create_operation(i)).await.unwrap();
1168                }
1169                journal.prune(Location::new(100)).await.unwrap();
1170                assert_eq!(journal.reader().await.bounds().start, 98);
1171                let res = journal.rewind(97).await;
1172                assert!(matches!(res, Err(JournalError::InvalidRewind(97))));
1173                journal.rewind(98).await.unwrap();
1174                let bounds = journal.reader().await.bounds();
1175                assert_eq!(bounds.end, 98);
1176                assert_eq!(journal.mmr.leaves(), 98);
1177                assert_eq!(bounds.start, 98);
1178                assert!(bounds.is_empty());
1179            }
1180        });
1181    }
1182
1183    /// Verify that append() increments the operation count, returns correct locations, and
1184    /// operations can be read back correctly.
1185    #[test_traced("INFO")]
1186    fn test_apply_op_and_read_operations() {
1187        let executor = deterministic::Runner::default();
1188        executor.start(|context| async move {
1189            let mut journal = create_empty_journal(context, "apply_op").await;
1190
1191            assert_eq!(journal.size().await, 0);
1192
1193            // Add 50 operations
1194            let expected_ops: Vec<_> = (0..50).map(|i| create_operation(i as u8)).collect();
1195            for (i, op) in expected_ops.iter().enumerate() {
1196                let loc = journal.append(op).await.unwrap();
1197                assert_eq!(loc, Location::new(i as u64));
1198                assert_eq!(journal.size().await, (i + 1) as u64);
1199            }
1200
1201            assert_eq!(journal.size().await, 50);
1202
1203            // Verify all operations can be read back correctly
1204            journal.sync().await.unwrap();
1205            for (i, expected_op) in expected_ops.iter().enumerate() {
1206                let read_op = journal.read(Location::new(i as u64)).await.unwrap();
1207                assert_eq!(read_op, *expected_op);
1208            }
1209        });
1210    }
1211
1212    /// Verify that read() returns correct operations at various positions.
1213    #[test_traced("INFO")]
1214    fn test_read_operations_at_various_positions() {
1215        let executor = deterministic::Runner::default();
1216        executor.start(|context| async move {
1217            let journal = create_journal_with_ops(context, "read", 50).await;
1218
1219            // Verify reading first operation
1220            let first_op = journal.read(Location::new(0)).await.unwrap();
1221            assert_eq!(first_op, create_operation(0));
1222
1223            // Verify reading middle operation
1224            let middle_op = journal.read(Location::new(25)).await.unwrap();
1225            assert_eq!(middle_op, create_operation(25));
1226
1227            // Verify reading last operation
1228            let last_op = journal.read(Location::new(49)).await.unwrap();
1229            assert_eq!(last_op, create_operation(49));
1230
1231            // Verify all operations match expected values
1232            for i in 0..50 {
1233                let op = journal.read(Location::new(i)).await.unwrap();
1234                assert_eq!(op, create_operation(i as u8));
1235            }
1236        });
1237    }
1238
1239    /// Verify that read() returns an error for pruned operations.
1240    #[test_traced("INFO")]
1241    fn test_read_pruned_operation_returns_error() {
1242        let executor = deterministic::Runner::default();
1243        executor.start(|context| async move {
1244            let mut journal = create_journal_with_ops(context, "read_pruned", 100).await;
1245
1246            // Add commit and prune
1247            journal
1248                .append(&Operation::CommitFloor(None, Location::new(50)))
1249                .await
1250                .unwrap();
1251            journal.sync().await.unwrap();
1252            let pruned_boundary = journal.prune(Location::new(50)).await.unwrap();
1253
1254            // Try to read an operation before the pruned boundary
1255            let read_loc = Location::new(0);
1256            if read_loc < pruned_boundary {
1257                let result = journal.read(read_loc).await;
1258                assert!(matches!(
1259                    result,
1260                    Err(Error::Journal(crate::journal::Error::ItemPruned(_)))
1261                ));
1262            }
1263        });
1264    }
1265
1266    /// Verify that read() returns an error for out-of-range locations.
1267    #[test_traced("INFO")]
1268    fn test_read_out_of_range_returns_error() {
1269        let executor = deterministic::Runner::default();
1270        executor.start(|context| async move {
1271            let journal = create_journal_with_ops(context, "read_oob", 3).await;
1272
1273            // Try to read beyond the end
1274            let result = journal.read(Location::new(10)).await;
1275            assert!(matches!(
1276                result,
1277                Err(Error::Journal(crate::journal::Error::ItemOutOfRange(_)))
1278            ));
1279        });
1280    }
1281
1282    /// Verify that we can read all operations back correctly.
1283    #[test_traced("INFO")]
1284    fn test_read_all_operations_back_correctly() {
1285        let executor = deterministic::Runner::default();
1286        executor.start(|context| async move {
1287            let journal = create_journal_with_ops(context, "read_all", 50).await;
1288
1289            assert_eq!(journal.size().await, 50);
1290
1291            // Verify all operations can be read back and match expected values
1292            for i in 0..50 {
1293                let op = journal.read(Location::new(i)).await.unwrap();
1294                assert_eq!(op, create_operation(i as u8));
1295            }
1296        });
1297    }
1298
1299    /// Verify that sync() persists operations.
1300    #[test_traced("INFO")]
1301    fn test_sync() {
1302        let executor = deterministic::Runner::default();
1303        executor.start(|context| async move {
1304            let mut journal =
1305                create_empty_journal(context.with_label("first"), "close_pending").await;
1306
1307            // Add 20 operations
1308            let expected_ops: Vec<_> = (0..20).map(|i| create_operation(i as u8)).collect();
1309            for (i, op) in expected_ops.iter().enumerate() {
1310                let loc = journal.append(op).await.unwrap();
1311                assert_eq!(loc, Location::new(i as u64),);
1312            }
1313
1314            // Add commit operation to commit the operations
1315            let commit_loc = journal
1316                .append(&Operation::CommitFloor(None, Location::new(0)))
1317                .await
1318                .unwrap();
1319            assert_eq!(
1320                commit_loc,
1321                Location::new(20),
1322                "commit should be at location 20"
1323            );
1324            journal.sync().await.unwrap();
1325
1326            // Reopen and verify the operations persisted
1327            drop(journal);
1328            let journal = create_empty_journal(context.with_label("second"), "close_pending").await;
1329            assert_eq!(journal.size().await, 21);
1330
1331            // Verify all operations can be read back
1332            for (i, expected_op) in expected_ops.iter().enumerate() {
1333                let read_op = journal.read(Location::new(i as u64)).await.unwrap();
1334                assert_eq!(read_op, *expected_op);
1335            }
1336        });
1337    }
1338
1339    /// Verify that pruning an empty journal returns the boundary.
1340    #[test_traced("INFO")]
1341    fn test_prune_empty_journal() {
1342        let executor = deterministic::Runner::default();
1343        executor.start(|context| async move {
1344            let mut journal = create_empty_journal(context, "prune_empty").await;
1345
1346            let boundary = journal.prune(Location::new(0)).await.unwrap();
1347
1348            assert_eq!(boundary, Location::new(0));
1349        });
1350    }
1351
1352    /// Verify that pruning to a specific location works correctly.
1353    #[test_traced("INFO")]
1354    fn test_prune_to_location() {
1355        let executor = deterministic::Runner::default();
1356        executor.start(|context| async move {
1357            let mut journal = create_journal_with_ops(context, "prune_to", 100).await;
1358
1359            // Add commit at position 50
1360            journal
1361                .append(&Operation::CommitFloor(None, Location::new(50)))
1362                .await
1363                .unwrap();
1364            journal.sync().await.unwrap();
1365
1366            let boundary = journal.prune(Location::new(50)).await.unwrap();
1367
1368            // Boundary should be <= requested location (may align to section boundary)
1369            assert!(boundary <= Location::new(50));
1370        });
1371    }
1372
1373    /// Verify that prune() returns the actual boundary (which may differ from requested).
1374    #[test_traced("INFO")]
1375    fn test_prune_returns_actual_boundary() {
1376        let executor = deterministic::Runner::default();
1377        executor.start(|context| async move {
1378            let mut journal = create_journal_with_ops(context, "prune_boundary", 100).await;
1379
1380            journal
1381                .append(&Operation::CommitFloor(None, Location::new(50)))
1382                .await
1383                .unwrap();
1384            journal.sync().await.unwrap();
1385
1386            let requested = Location::new(50);
1387            let actual = journal.prune(requested).await.unwrap();
1388
1389            // Actual boundary should match bounds.start
1390            let bounds = journal.reader().await.bounds();
1391            assert!(!bounds.is_empty());
1392            assert_eq!(actual, bounds.start);
1393
1394            // Actual may be <= requested due to section alignment
1395            assert!(actual <= requested);
1396        });
1397    }
1398
1399    /// Verify that pruning doesn't change the operation count.
1400    #[test_traced("INFO")]
1401    fn test_prune_preserves_operation_count() {
1402        let executor = deterministic::Runner::default();
1403        executor.start(|context| async move {
1404            let mut journal = create_journal_with_ops(context, "prune_count", 100).await;
1405
1406            journal
1407                .append(&Operation::CommitFloor(None, Location::new(50)))
1408                .await
1409                .unwrap();
1410            journal.sync().await.unwrap();
1411
1412            let count_before = journal.size().await;
1413            journal.prune(Location::new(50)).await.unwrap();
1414            let count_after = journal.size().await;
1415
1416            assert_eq!(count_before, count_after);
1417        });
1418    }
1419
1420    /// Verify bounds() for empty journal, no pruning, and after pruning.
1421    #[test_traced("INFO")]
1422    fn test_bounds_empty_and_pruned() {
1423        let executor = deterministic::Runner::default();
1424        executor.start(|context| async move {
1425            // Test empty journal
1426            let journal = create_empty_journal(context.with_label("empty"), "oldest").await;
1427            assert!(journal.reader().await.bounds().is_empty());
1428            journal.destroy().await.unwrap();
1429
1430            // Test no pruning
1431            let journal =
1432                create_journal_with_ops(context.with_label("no_prune"), "oldest", 100).await;
1433            let bounds = journal.reader().await.bounds();
1434            assert!(!bounds.is_empty());
1435            assert_eq!(bounds.start, 0);
1436            journal.destroy().await.unwrap();
1437
1438            // Test after pruning
1439            let mut journal =
1440                create_journal_with_ops(context.with_label("pruned"), "oldest", 100).await;
1441            journal
1442                .append(&Operation::CommitFloor(None, Location::new(50)))
1443                .await
1444                .unwrap();
1445            journal.sync().await.unwrap();
1446
1447            let pruned_boundary = journal.prune(Location::new(50)).await.unwrap();
1448
1449            // Should match the pruned boundary (may be <= 50 due to section alignment)
1450            let bounds = journal.reader().await.bounds();
1451            assert!(!bounds.is_empty());
1452            assert_eq!(bounds.start, pruned_boundary);
1453            // Should be <= requested location (50)
1454            assert!(pruned_boundary <= 50);
1455            journal.destroy().await.unwrap();
1456        });
1457    }
1458
1459    /// Verify bounds().start for empty journal, no pruning, and after pruning.
1460    #[test_traced("INFO")]
1461    fn test_bounds_start_after_prune() {
1462        let executor = deterministic::Runner::default();
1463        executor.start(|context| async move {
1464            // Test empty journal
1465            let journal = create_empty_journal(context.with_label("empty"), "boundary").await;
1466            assert_eq!(journal.reader().await.bounds().start, 0);
1467
1468            // Test no pruning
1469            let journal =
1470                create_journal_with_ops(context.with_label("no_prune"), "boundary", 100).await;
1471            assert_eq!(journal.reader().await.bounds().start, 0);
1472
1473            // Test after pruning
1474            let mut journal =
1475                create_journal_with_ops(context.with_label("pruned"), "boundary", 100).await;
1476            journal
1477                .append(&Operation::CommitFloor(None, Location::new(50)))
1478                .await
1479                .unwrap();
1480            journal.sync().await.unwrap();
1481
1482            let pruned_boundary = journal.prune(Location::new(50)).await.unwrap();
1483
1484            assert_eq!(journal.reader().await.bounds().start, pruned_boundary);
1485        });
1486    }
1487
1488    /// Verify that MMR prunes to the journal's actual boundary, not the requested location.
1489    #[test_traced("INFO")]
1490    fn test_mmr_prunes_to_journal_boundary() {
1491        let executor = deterministic::Runner::default();
1492        executor.start(|context| async move {
1493            let mut journal = create_journal_with_ops(context, "mmr_boundary", 50).await;
1494
1495            journal
1496                .append(&Operation::CommitFloor(None, Location::new(25)))
1497                .await
1498                .unwrap();
1499            journal.sync().await.unwrap();
1500
1501            let pruned_boundary = journal.prune(Location::new(25)).await.unwrap();
1502
1503            // Verify MMR and journal remain in sync
1504            let bounds = journal.reader().await.bounds();
1505            assert!(!bounds.is_empty());
1506            assert_eq!(pruned_boundary, bounds.start);
1507
1508            // Verify boundary is at or before requested (due to section alignment)
1509            assert!(pruned_boundary <= Location::new(25));
1510
1511            // Verify operation count is unchanged
1512            assert_eq!(journal.size().await, 51);
1513        });
1514    }
1515
1516    /// Verify proof() for multiple operations.
1517    #[test_traced("INFO")]
1518    fn test_proof_multiple_operations() {
1519        let executor = deterministic::Runner::default();
1520        executor.start(|context| async move {
1521            let journal = create_journal_with_ops(context, "proof_multi", 50).await;
1522
1523            let (proof, ops) = journal.proof(Location::new(0), NZU64!(50)).await.unwrap();
1524
1525            assert_eq!(ops.len(), 50);
1526            for (i, op) in ops.iter().enumerate() {
1527                assert_eq!(*op, create_operation(i as u8));
1528            }
1529
1530            // Verify the proof is valid
1531            let mut hasher = StandardHasher::new();
1532            let root = journal.root();
1533            assert!(verify_proof(
1534                &proof,
1535                &ops,
1536                Location::new(0),
1537                &root,
1538                &mut hasher
1539            ));
1540        });
1541    }
1542
1543    /// Verify that historical_proof() respects the max_ops limit.
1544    #[test_traced("INFO")]
1545    fn test_historical_proof_limited_by_max_ops() {
1546        let executor = deterministic::Runner::default();
1547        executor.start(|context| async move {
1548            let journal = create_journal_with_ops(context, "proof_limit", 50).await;
1549
1550            let size = journal.size().await;
1551            let (proof, ops) = journal
1552                .historical_proof(size, Location::new(0), NZU64!(20))
1553                .await
1554                .unwrap();
1555
1556            // Should return only 20 operations despite 50 being available
1557            assert_eq!(ops.len(), 20);
1558            for (i, op) in ops.iter().enumerate() {
1559                assert_eq!(*op, create_operation(i as u8));
1560            }
1561
1562            // Verify the proof is valid
1563            let mut hasher = StandardHasher::new();
1564            let root = journal.root();
1565            assert!(verify_proof(
1566                &proof,
1567                &ops,
1568                Location::new(0),
1569                &root,
1570                &mut hasher
1571            ));
1572        });
1573    }
1574
1575    /// Verify historical_proof() at the end of the journal.
1576    #[test_traced("INFO")]
1577    fn test_historical_proof_at_end_of_journal() {
1578        let executor = deterministic::Runner::default();
1579        executor.start(|context| async move {
1580            let journal = create_journal_with_ops(context, "proof_end", 50).await;
1581
1582            let size = journal.size().await;
1583            // Request proof starting near the end
1584            let (proof, ops) = journal
1585                .historical_proof(size, Location::new(40), NZU64!(20))
1586                .await
1587                .unwrap();
1588
1589            // Should return only 10 operations (positions 40-49)
1590            assert_eq!(ops.len(), 10);
1591            for (i, op) in ops.iter().enumerate() {
1592                assert_eq!(*op, create_operation((40 + i) as u8));
1593            }
1594
1595            // Verify the proof is valid
1596            let mut hasher = StandardHasher::new();
1597            let root = journal.root();
1598            assert!(verify_proof(
1599                &proof,
1600                &ops,
1601                Location::new(40),
1602                &root,
1603                &mut hasher
1604            ));
1605        });
1606    }
1607
1608    /// Verify that historical_proof() returns an error for invalid size.
1609    #[test_traced("INFO")]
1610    fn test_historical_proof_out_of_range_returns_error() {
1611        let executor = deterministic::Runner::default();
1612        executor.start(|context| async move {
1613            let journal = create_journal_with_ops(context, "proof_oob", 5).await;
1614
1615            // Request proof with size > actual journal size
1616            let result = journal
1617                .historical_proof(Location::new(10), Location::new(0), NZU64!(1))
1618                .await;
1619
1620            assert!(matches!(
1621                result,
1622                Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
1623            ));
1624        });
1625    }
1626
1627    /// Verify that historical_proof() returns an error when start_loc >= size.
1628    #[test_traced("INFO")]
1629    fn test_historical_proof_start_too_large_returns_error() {
1630        let executor = deterministic::Runner::default();
1631        executor.start(|context| async move {
1632            let journal = create_journal_with_ops(context, "proof_start_oob", 5).await;
1633
1634            let size = journal.size().await;
1635            // Request proof starting at size (should fail)
1636            let result = journal.historical_proof(size, size, NZU64!(1)).await;
1637
1638            assert!(matches!(
1639                result,
1640                Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
1641            ));
1642        });
1643    }
1644
1645    /// Verify historical_proof() for a truly historical state (before more operations added).
1646    #[test_traced("INFO")]
1647    fn test_historical_proof_truly_historical() {
1648        let executor = deterministic::Runner::default();
1649        executor.start(|context| async move {
1650            // Create journal with initial operations
1651            let mut journal = create_journal_with_ops(context, "proof_historical", 50).await;
1652
1653            // Capture root at historical state
1654            let mut hasher = StandardHasher::new();
1655            let historical_root = journal.root();
1656            let historical_size = journal.size().await;
1657
1658            // Add more operations after the historical state
1659            for i in 50..100 {
1660                journal.append(&create_operation(i as u8)).await.unwrap();
1661            }
1662            journal.sync().await.unwrap();
1663
1664            // Generate proof for the historical state
1665            let (proof, ops) = journal
1666                .historical_proof(historical_size, Location::new(0), NZU64!(50))
1667                .await
1668                .unwrap();
1669
1670            // Verify operations match expected historical operations
1671            assert_eq!(ops.len(), 50);
1672            for (i, op) in ops.iter().enumerate() {
1673                assert_eq!(*op, create_operation(i as u8));
1674            }
1675
1676            // Verify the proof is valid against the historical root
1677            assert!(verify_proof(
1678                &proof,
1679                &ops,
1680                Location::new(0),
1681                &historical_root,
1682                &mut hasher
1683            ));
1684        });
1685    }
1686
1687    /// Verify that historical_proof() returns an error when start_loc is pruned.
1688    #[test_traced("INFO")]
1689    fn test_historical_proof_pruned_location_returns_error() {
1690        let executor = deterministic::Runner::default();
1691        executor.start(|context| async move {
1692            let mut journal = create_journal_with_ops(context, "proof_pruned", 50).await;
1693
1694            journal
1695                .append(&Operation::CommitFloor(None, Location::new(25)))
1696                .await
1697                .unwrap();
1698            journal.sync().await.unwrap();
1699            let pruned_boundary = journal.prune(Location::new(25)).await.unwrap();
1700
1701            // Try to get proof starting at a location before the pruned boundary
1702            let size = journal.size().await;
1703            let start_loc = Location::new(0);
1704            if start_loc < pruned_boundary {
1705                let result = journal.historical_proof(size, start_loc, NZU64!(1)).await;
1706
1707                // Should fail when trying to read pruned operations
1708                assert!(result.is_err());
1709            }
1710        });
1711    }
1712
1713    /// Verify replay() with empty journal and multiple operations.
1714    #[test_traced("INFO")]
1715    fn test_replay_operations() {
1716        let executor = deterministic::Runner::default();
1717        executor.start(|context| async move {
1718            // Test empty journal
1719            let journal = create_empty_journal(context.with_label("empty"), "replay").await;
1720            let reader = journal.reader().await;
1721            let stream = reader.replay(NZUsize!(10), 0).await.unwrap();
1722            futures::pin_mut!(stream);
1723            assert!(stream.next().await.is_none());
1724
1725            // Test replaying all operations
1726            let journal =
1727                create_journal_with_ops(context.with_label("with_ops"), "replay", 50).await;
1728            let reader = journal.reader().await;
1729            let stream = reader.replay(NZUsize!(100), 0).await.unwrap();
1730            futures::pin_mut!(stream);
1731
1732            for i in 0..50 {
1733                let (pos, op) = stream.next().await.unwrap().unwrap();
1734                assert_eq!(pos, i);
1735                assert_eq!(op, create_operation(i as u8));
1736            }
1737
1738            assert!(stream.next().await.is_none());
1739        });
1740    }
1741
1742    /// Verify replay() starting from a middle location.
1743    #[test_traced("INFO")]
1744    fn test_replay_from_middle() {
1745        let executor = deterministic::Runner::default();
1746        executor.start(|context| async move {
1747            let journal = create_journal_with_ops(context, "replay_middle", 50).await;
1748            let reader = journal.reader().await;
1749            let stream = reader.replay(NZUsize!(100), 25).await.unwrap();
1750            futures::pin_mut!(stream);
1751
1752            let mut count = 0;
1753            while let Some(result) = stream.next().await {
1754                let (pos, op) = result.unwrap();
1755                assert_eq!(pos, 25 + count);
1756                assert_eq!(op, create_operation((25 + count) as u8));
1757                count += 1;
1758            }
1759
1760            // Should have replayed positions 25-49 (25 operations)
1761            assert_eq!(count, 25);
1762        });
1763    }
1764
1765    /// Verify the speculative batch API: fork two batches, verify independent roots, apply one.
1766    #[test_traced("INFO")]
1767    fn test_speculative_batch() {
1768        let executor = deterministic::Runner::default();
1769        executor.start(|context| async move {
1770            let mut journal = create_journal_with_ops(context, "speculative_batch", 10).await;
1771            let original_root = journal.root();
1772
1773            // Fork two independent speculative batches.
1774            let mut b1 = journal.new_batch();
1775            let mut b2 = journal.new_batch();
1776
1777            // Add different items to each batch.
1778            let op_a = create_operation(100);
1779            let op_b = create_operation(200);
1780            b1.add(op_a.clone());
1781            b2.add(op_b);
1782
1783            // Merkleize and verify independent roots.
1784            let m1 = b1.merkleize();
1785            let m2 = b2.merkleize();
1786            assert_ne!(m1.root(), m2.root());
1787            assert_ne!(m1.root(), original_root);
1788            assert_ne!(m2.root(), original_root);
1789
1790            // Journal root should be unchanged (batches are speculative).
1791            assert_eq!(journal.root(), original_root);
1792
1793            // Finalize batch 1 and apply.
1794            let expected_root = m1.root();
1795            let finalized = m1.finalize();
1796            drop(m2); // release borrow on &journal
1797            journal.apply_batch(finalized).await.unwrap();
1798
1799            // Journal should now match the applied batch's root.
1800            assert_eq!(journal.root(), expected_root);
1801            assert_eq!(*journal.size().await, 11);
1802        });
1803    }
1804
1805    /// Verify stacking: create batch A, merkleize, create batch B from merkleized A,
1806    /// merkleize, finalize, and apply. Verify root and items.
1807    #[test_traced("INFO")]
1808    fn test_speculative_batch_stacking() {
1809        let executor = deterministic::Runner::default();
1810        executor.start(|context| async move {
1811            let mut journal = create_journal_with_ops(context, "batch_stacking", 10).await;
1812
1813            let op_a = create_operation(100);
1814            let op_b = create_operation(200);
1815
1816            // Build stacked batches in a block so intermediate borrows drop.
1817            let (expected_root, finalized) = {
1818                let mut batch_a = journal.new_batch();
1819                batch_a.add(op_a.clone());
1820                let merkleized_a = batch_a.merkleize();
1821
1822                let mut batch_b = merkleized_a.new_batch();
1823                batch_b.add(op_b.clone());
1824                let merkleized_b = batch_b.merkleize();
1825
1826                let root = merkleized_b.root();
1827                (root, merkleized_b.finalize())
1828                // merkleized_a dropped here, releasing &journal.mmr
1829            };
1830
1831            journal.apply_batch(finalized).await.unwrap();
1832
1833            assert_eq!(journal.root(), expected_root);
1834            assert_eq!(*journal.size().await, 12);
1835
1836            // Verify both items were appended correctly.
1837            let read_a = journal.read(Location::new(10)).await.unwrap();
1838            assert_eq!(read_a, op_a);
1839            let read_b = journal.read(Location::new(11)).await.unwrap();
1840            assert_eq!(read_b, op_b);
1841        });
1842    }
1843
1844    #[test_traced("INFO")]
1845    fn test_stale_batch_sibling() {
1846        let executor = deterministic::Runner::default();
1847        executor.start(|context| async move {
1848            let mut journal = create_empty_journal(context, "stale-sibling").await;
1849            let op_a = create_operation(1);
1850            let op_b = create_operation(2);
1851
1852            // Create two batches from the same base.
1853            let finalized_a = {
1854                let mut batch = journal.new_batch();
1855                batch.add(op_a.clone());
1856                batch.merkleize().finalize()
1857            };
1858            let finalized_b = {
1859                let mut batch = journal.new_batch();
1860                batch.add(op_b);
1861                batch.merkleize().finalize()
1862            };
1863
1864            // Apply A -- should succeed.
1865            journal.apply_batch(finalized_a).await.unwrap();
1866            let expected_root = journal.root();
1867            let expected_size = journal.size().await;
1868
1869            // Apply B -- should fail (stale).
1870            let result = journal.apply_batch(finalized_b).await;
1871            assert!(
1872                matches!(
1873                    result,
1874                    Err(super::Error::Mmr(crate::mmr::Error::StaleChangeset { .. }))
1875                ),
1876                "expected StaleChangeset, got {result:?}"
1877            );
1878
1879            // The stale batch must not mutate the journal or desync it from the MMR.
1880            assert_eq!(journal.root(), expected_root);
1881            assert_eq!(journal.size().await, expected_size);
1882            let (_, ops) = journal.proof(Location::new(0), NZU64!(1)).await.unwrap();
1883            assert_eq!(ops, vec![op_a]);
1884        });
1885    }
1886
1887    #[test_traced("INFO")]
1888    fn test_stale_batch_chained() {
1889        let executor = deterministic::Runner::default();
1890        executor.start(|context| async move {
1891            let mut journal = create_journal_with_ops(context, "stale-chained", 5).await;
1892
1893            // Parent batch, then fork two children.
1894            let parent = {
1895                let mut batch = journal.new_batch();
1896                batch.add(create_operation(10));
1897                batch.merkleize()
1898            };
1899            let child_a = {
1900                let mut batch = parent.new_batch();
1901                batch.add(create_operation(20));
1902                batch.merkleize().finalize()
1903            };
1904            let child_b = {
1905                let mut batch = parent.new_batch();
1906                batch.add(create_operation(30));
1907                batch.merkleize().finalize()
1908            };
1909            drop(parent);
1910
1911            // Apply child_a, then child_b should be stale.
1912            journal.apply_batch(child_a).await.unwrap();
1913            let result = journal.apply_batch(child_b).await;
1914            assert!(
1915                matches!(
1916                    result,
1917                    Err(super::Error::Mmr(crate::mmr::Error::StaleChangeset { .. }))
1918                ),
1919                "expected StaleChangeset for sibling, got {result:?}"
1920            );
1921        });
1922    }
1923
1924    #[test_traced("INFO")]
1925    fn test_stale_batch_parent_before_child() {
1926        let executor = deterministic::Runner::default();
1927        executor.start(|context| async move {
1928            let mut journal = create_empty_journal(context, "stale-parent-first").await;
1929
1930            // Create parent, then child.
1931            let (parent_finalized, child_finalized) = {
1932                let parent = {
1933                    let mut batch = journal.new_batch();
1934                    batch.add(create_operation(1));
1935                    batch.merkleize()
1936                };
1937                let child = {
1938                    let mut batch = parent.new_batch();
1939                    batch.add(create_operation(2));
1940                    batch.merkleize().finalize()
1941                };
1942                (parent.finalize(), child)
1943            };
1944
1945            // Apply parent first -- child should now be stale.
1946            journal.apply_batch(parent_finalized).await.unwrap();
1947            let result = journal.apply_batch(child_finalized).await;
1948            assert!(
1949                matches!(
1950                    result,
1951                    Err(super::Error::Mmr(crate::mmr::Error::StaleChangeset { .. }))
1952                ),
1953                "expected StaleChangeset for child after parent applied, got {result:?}"
1954            );
1955        });
1956    }
1957
1958    #[test_traced("INFO")]
1959    fn test_stale_batch_child_before_parent() {
1960        let executor = deterministic::Runner::default();
1961        executor.start(|context| async move {
1962            let mut journal = create_empty_journal(context, "stale-child-first").await;
1963
1964            // Create parent, then child.
1965            let (parent_finalized, child_finalized) = {
1966                let parent = {
1967                    let mut batch = journal.new_batch();
1968                    batch.add(create_operation(1));
1969                    batch.merkleize()
1970                };
1971                let child = {
1972                    let mut batch = parent.new_batch();
1973                    batch.add(create_operation(2));
1974                    batch.merkleize().finalize()
1975                };
1976                (parent.finalize(), child)
1977            };
1978
1979            // Apply child first -- parent should now be stale.
1980            journal.apply_batch(child_finalized).await.unwrap();
1981            let result = journal.apply_batch(parent_finalized).await;
1982            assert!(
1983                matches!(
1984                    result,
1985                    Err(super::Error::Mmr(crate::mmr::Error::StaleChangeset { .. }))
1986                ),
1987                "expected StaleChangeset for parent after child applied, got {result:?}"
1988            );
1989        });
1990    }
1991}