Skip to main content

commonware_storage/journal/
authenticated.rs

1//! Authenticated journal implementation.
2//!
3//! An authenticated journal maintains a contiguous journal of items alongside a Merkle Mountain
4//! Range (MMR). The item at index i in the journal corresponds to the leaf at Location i in the
5//! MMR. This structure enables efficient proofs that an item is included in the journal at a
6//! specific location.
7
8use crate::{
9    journal::{
10        contiguous::{fixed, variable, Contiguous, MutableContiguous},
11        Error as JournalError,
12    },
13    mmr::{
14        journaled::{CleanMmr, DirtyMmr, Mmr},
15        mem::{Clean, Dirty, State},
16        Location, Position, Proof, StandardHasher,
17    },
18    Persistable,
19};
20use commonware_codec::{CodecFixedShared, CodecShared, Encode, EncodeShared};
21use commonware_cryptography::{DigestOf, Hasher};
22use commonware_runtime::{Clock, Metrics, Storage};
23use core::num::{NonZeroU64, NonZeroUsize};
24use futures::{future::try_join_all, try_join, TryFutureExt as _};
25use thiserror::Error;
26use tracing::{debug, warn};
27
28/// Errors that can occur when interacting with an authenticated journal.
29#[derive(Error, Debug)]
30pub enum Error {
31    #[error("mmr error: {0}")]
32    Mmr(#[from] crate::mmr::Error),
33
34    #[error("journal error: {0}")]
35    Journal(#[from] super::Error),
36}
37/// An append-only data structure that maintains a sequential journal of items alongside a Merkle
38/// Mountain Range (MMR). The item at index i in the journal corresponds to the leaf at Location i
39/// in the MMR. This structure enables efficient proofs that an item is included in the journal at a
40/// specific location.
41pub struct Journal<E, C, H, S: State<H::Digest> + Send + Sync = Dirty>
42where
43    E: Storage + Clock + Metrics,
44    C: Contiguous<Item: EncodeShared>,
45    H: Hasher,
46{
47    /// MMR where each leaf is an item digest.
48    /// Invariant: leaf i corresponds to item i in the journal.
49    pub(crate) mmr: Mmr<E, H::Digest, S>,
50
51    /// Journal of items.
52    /// Invariant: item i corresponds to leaf i in the MMR.
53    pub(crate) journal: C,
54
55    pub(crate) hasher: StandardHasher<H>,
56}
57
58impl<E, C, H, S> Journal<E, C, H, S>
59where
60    E: Storage + Clock + Metrics,
61    C: Contiguous<Item: EncodeShared>,
62    H: Hasher,
63    S: State<DigestOf<H>> + Send + Sync,
64{
65    /// Returns [start, end) where `start` and `end - 1` are the Locations of the oldest and newest
66    /// retained operations respectively.
67    pub fn bounds(&self) -> std::ops::Range<Location> {
68        let inner = self.journal.bounds();
69        Location::new_unchecked(inner.start)..Location::new_unchecked(inner.end)
70    }
71
72    /// Returns the Location of the next item appended to the journal.
73    pub fn size(&self) -> Location {
74        Location::new_unchecked(self.journal.bounds().end)
75    }
76
77    /// Read an item from the journal at the given location.
78    pub async fn read(&self, loc: Location) -> Result<C::Item, Error> {
79        self.journal.read(*loc).await.map_err(Error::Journal)
80    }
81}
82
83impl<E, C, H, S> Journal<E, C, H, S>
84where
85    E: Storage + Clock + Metrics,
86    C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
87    H: Hasher,
88    S: State<DigestOf<H>> + Send + Sync,
89{
90    /// Durably persist the journal. This is faster than `sync()` but does not persist the MMR,
91    /// meaning recovery will be required on startup if we crash before `sync()`.
92    pub async fn commit(&mut self) -> Result<(), Error> {
93        self.journal.commit().await.map_err(Error::Journal)
94    }
95}
96
97impl<E, C, H> Journal<E, C, H, Clean<H::Digest>>
98where
99    E: Storage + Clock + Metrics,
100    C: MutableContiguous<Item: EncodeShared>,
101    H: Hasher,
102{
103    /// Create a new [Journal] from the given components after aligning the MMR with the journal.
104    pub async fn from_components(
105        mmr: CleanMmr<E, H::Digest>,
106        journal: C,
107        mut hasher: StandardHasher<H>,
108        apply_batch_size: u64,
109    ) -> Result<Self, Error> {
110        let mut mmr =
111            Self::align(mmr.into_dirty(), &journal, &mut hasher, apply_batch_size).await?;
112
113        // Sync the MMR to disk to avoid having to repeat any recovery that may have been performed
114        // on next startup.
115        mmr.sync().await?;
116
117        Ok(Self {
118            mmr,
119            journal,
120            hasher,
121        })
122    }
123
124    /// Align `mmr` to be consistent with `journal`. Any items in `mmr` that aren't in `journal` are
125    /// popped, and any items in `journal` that aren't in `mmr` are added to `mmr`. Items are added
126    /// to `mmr` in batches of size `apply_batch_size` to avoid memory bloat.
127    async fn align(
128        mut mmr: DirtyMmr<E, H::Digest>,
129        journal: &C,
130        hasher: &mut StandardHasher<H>,
131        apply_batch_size: u64,
132    ) -> Result<CleanMmr<E, H::Digest>, Error> {
133        // Pop any MMR elements that are ahead of the journal.
134        // Note mmr_size is the size of the MMR in leaves, not positions.
135        let journal_size = journal.size();
136        let mut mmr_size = mmr.leaves();
137        if mmr_size > journal_size {
138            let pop_count = mmr_size - journal_size;
139            warn!(journal_size, ?pop_count, "popping MMR items");
140            mmr.pop(*pop_count as usize).await?;
141            mmr_size = Location::new_unchecked(journal_size);
142        }
143
144        // If the MMR is behind, replay journal items to catch up.
145        if mmr_size < journal_size {
146            let replay_count = journal_size - *mmr_size;
147            warn!(
148                journal_size,
149                replay_count, "MMR lags behind journal, replaying journal to catch up"
150            );
151
152            let mut batch_size = 0;
153            while mmr_size < journal_size {
154                let op = journal.read(*mmr_size).await?;
155                mmr.add(hasher, &op.encode()).await?;
156                mmr_size += 1;
157                batch_size += 1;
158                if batch_size >= apply_batch_size {
159                    mmr = mmr.merkleize(hasher).into_dirty();
160                    batch_size = 0;
161                }
162            }
163            return Ok(mmr.merkleize(hasher));
164        }
165
166        // At this point the MMR and journal should be consistent.
167        assert_eq!(journal.size(), mmr.leaves());
168
169        Ok(mmr.merkleize(hasher))
170    }
171
172    /// Prune both the MMR and journal to the given location.
173    ///
174    /// # Returns
175    /// The new pruning boundary, which may be less than the requested `prune_loc`.
176    pub async fn prune(&mut self, prune_loc: Location) -> Result<Location, Error> {
177        if self.mmr.size() == 0 {
178            // DB is empty, nothing to prune.
179            return Ok(self.bounds().start);
180        }
181
182        // Sync the MMR before pruning the journal, otherwise the MMR's last element could end up
183        // behind the journal's first element after a crash, and there would be no way to replay
184        // the items between the MMR's last element and the journal's first element.
185        self.mmr.sync().await?;
186
187        // Prune the journal and check if anything was actually pruned
188        if !self.journal.prune(*prune_loc).await? {
189            return Ok(self.bounds().start);
190        }
191
192        let bounds = self.bounds();
193        debug!(size = ?bounds.end, ?prune_loc, boundary = ?bounds.start, "pruned inactive ops");
194
195        // Prune MMR to match the journal's actual boundary
196        self.mmr
197            .prune_to_pos(Position::try_from(bounds.start)?)
198            .await?;
199
200        Ok(bounds.start)
201    }
202}
203
204impl<E, C, H> Journal<E, C, H, Clean<H::Digest>>
205where
206    E: Storage + Clock + Metrics,
207    C: Contiguous<Item: EncodeShared>,
208    H: Hasher,
209{
210    /// Generate a proof of inclusion for items starting at `start_loc`.
211    ///
212    /// Returns a proof and the items corresponding to the leaves in the range `start_loc..end_loc`,
213    /// where `end_loc` is the minimum of the current item count and `start_loc + max_ops`.
214    ///
215    /// # Errors
216    ///
217    /// - Returns [Error::Mmr] with [crate::mmr::Error::LocationOverflow] if `start_loc` >
218    ///   [crate::mmr::MAX_LOCATION].
219    /// - Returns [Error::Mmr] with [crate::mmr::Error::RangeOutOfBounds] if `start_loc` >= current
220    ///   item count.
221    /// - Returns [Error::Journal] with [crate::journal::Error::ItemPruned] if `start_loc` has been
222    ///   pruned.
223    pub async fn proof(
224        &self,
225        start_loc: Location,
226        max_ops: NonZeroU64,
227    ) -> Result<(Proof<H::Digest>, Vec<C::Item>), Error> {
228        self.historical_proof(self.size(), start_loc, max_ops).await
229    }
230
231    /// Generate a historical proof with respect to the state of the MMR when it had
232    /// `historical_leaves` leaves.
233    ///
234    /// Returns a proof and the items corresponding to the leaves in the range `start_loc..end_loc`,
235    /// where `end_loc` is the minimum of `historical_leaves` and `start_loc + max_ops`.
236    ///
237    /// # Errors
238    ///
239    /// - Returns [Error::Mmr] with [crate::mmr::Error::RangeOutOfBounds] if `start_loc` >=
240    ///   `historical_leaves` or `historical_leaves` > number of items in the journal.
241    /// - Returns [Error::Journal] with [crate::journal::Error::ItemPruned] if `start_loc` has been
242    ///   pruned.
243    pub async fn historical_proof(
244        &self,
245        historical_leaves: Location,
246        start_loc: Location,
247        max_ops: NonZeroU64,
248    ) -> Result<(Proof<H::Digest>, Vec<C::Item>), Error> {
249        let leaves = self.size();
250        if historical_leaves > leaves {
251            return Err(crate::mmr::Error::RangeOutOfBounds(leaves).into());
252        }
253        if start_loc >= historical_leaves {
254            return Err(crate::mmr::Error::RangeOutOfBounds(start_loc).into());
255        }
256        let end_loc = std::cmp::min(historical_leaves, start_loc.saturating_add(max_ops.get()));
257
258        let proof = self
259            .mmr
260            .historical_range_proof(historical_leaves, start_loc..end_loc)
261            .await?;
262
263        let mut ops = Vec::with_capacity((*end_loc - *start_loc) as usize);
264        let futures = (*start_loc..*end_loc)
265            .map(|i| self.journal.read(i))
266            .collect::<Vec<_>>();
267        try_join_all(futures)
268            .await?
269            .into_iter()
270            .for_each(|op| ops.push(op));
271
272        Ok((proof, ops))
273    }
274
275    /// Return the root of the MMR.
276    pub const fn root(&self) -> H::Digest {
277        self.mmr.root()
278    }
279
280    /// Convert this journal into its dirty counterpart for batched updates.
281    pub fn into_dirty(self) -> Journal<E, C, H, Dirty> {
282        Journal {
283            mmr: self.mmr.into_dirty(),
284            journal: self.journal,
285            hasher: self.hasher,
286        }
287    }
288}
289
290impl<E, C, H> Journal<E, C, H, Clean<H::Digest>>
291where
292    E: Storage + Clock + Metrics,
293    C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
294    H: Hasher,
295{
296    /// Destroy the authenticated journal, removing all data from disk.
297    pub async fn destroy(self) -> Result<(), Error> {
298        try_join!(
299            self.journal.destroy().map_err(Error::Journal),
300            self.mmr.destroy().map_err(Error::Mmr),
301        )?;
302        Ok(())
303    }
304
305    /// Durably persist the journal, ensuring no recovery is required on startup.
306    pub async fn sync(&mut self) -> Result<(), Error> {
307        try_join!(
308            self.journal.sync().map_err(Error::Journal),
309            self.mmr.sync().map_err(Into::into)
310        )?;
311
312        Ok(())
313    }
314}
315
316impl<E, C, H> Journal<E, C, H, Dirty>
317where
318    E: Storage + Clock + Metrics,
319    C: Contiguous<Item: EncodeShared>,
320    H: Hasher,
321{
322    /// Merkleize the journal and compute the root digest.
323    pub fn merkleize(self) -> Journal<E, C, H, Clean<H::Digest>> {
324        let Self {
325            mmr,
326            journal,
327            mut hasher,
328        } = self;
329        Journal {
330            mmr: mmr.merkleize(&mut hasher),
331            journal,
332            hasher,
333        }
334    }
335}
336
337impl<E, C, H> Journal<E, C, H, Dirty>
338where
339    E: Storage + Clock + Metrics,
340    C: MutableContiguous<Item: EncodeShared>,
341    H: Hasher,
342{
343    pub async fn append(&mut self, item: C::Item) -> Result<Location, Error> {
344        let encoded_item = item.encode();
345
346        // Append item to the journal and update the MMR in parallel.
347        let (_, loc) = try_join!(
348            self.mmr
349                .add(&mut self.hasher, &encoded_item)
350                .map_err(Error::Mmr),
351            self.journal.append(item).map_err(Into::into)
352        )?;
353
354        Ok(Location::new_unchecked(loc))
355    }
356
357    /// Create a new dirty journal from aligned components.
358    pub async fn from_components(
359        mmr: CleanMmr<E, H::Digest>,
360        journal: C,
361        hasher: StandardHasher<H>,
362        apply_batch_size: u64,
363    ) -> Result<Self, Error> {
364        let clean = Journal::<E, C, H, Clean<H::Digest>>::from_components(
365            mmr,
366            journal,
367            hasher,
368            apply_batch_size,
369        )
370        .await?;
371        Ok(clean.into_dirty())
372    }
373}
374
375/// The number of items to apply to the MMR in a single batch.
376const APPLY_BATCH_SIZE: u64 = 1 << 16;
377
378impl<E, O, H> Journal<E, fixed::Journal<E, O>, H, Clean<H::Digest>>
379where
380    E: Storage + Clock + Metrics,
381    O: CodecFixedShared,
382    H: Hasher,
383{
384    /// Create a new [Journal] for fixed-length items.
385    ///
386    /// The journal will be rewound to the last item that matches the `rewind_predicate` on
387    /// initialization.
388    pub async fn new(
389        context: E,
390        mmr_cfg: crate::mmr::journaled::Config,
391        journal_cfg: fixed::Config,
392        rewind_predicate: fn(&O) -> bool,
393    ) -> Result<Self, Error> {
394        let mut journal = fixed::Journal::init(context.with_label("journal"), journal_cfg).await?;
395
396        // Rewind journal to last matching item.
397        journal.rewind_to(rewind_predicate).await?;
398
399        // Align the MMR and journal.
400        let mut hasher = StandardHasher::<H>::new();
401        let mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_cfg).await?;
402        let mut mmr =
403            Self::align(mmr.into_dirty(), &journal, &mut hasher, APPLY_BATCH_SIZE).await?;
404
405        // Sync the journal and MMR to disk to avoid having to repeat any recovery that may have
406        // been performed on next startup.
407        journal.sync().await?;
408        mmr.sync().await?;
409
410        Ok(Self {
411            mmr,
412            journal,
413            hasher,
414        })
415    }
416}
417
418impl<E, O, H> Journal<E, variable::Journal<E, O>, H, Clean<H::Digest>>
419where
420    E: Storage + Clock + Metrics,
421    O: CodecShared,
422    H: Hasher,
423{
424    /// Create a new [Journal] for variable-length items.
425    ///
426    /// The journal will be rewound to the last item that matches the `rewind_predicate` on
427    /// initialization.
428    pub async fn new(
429        context: E,
430        mmr_cfg: crate::mmr::journaled::Config,
431        journal_cfg: variable::Config<O::Cfg>,
432        rewind_predicate: fn(&O) -> bool,
433    ) -> Result<Self, Error> {
434        let mut hasher = StandardHasher::<H>::new();
435        let mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_cfg).await?;
436        let mut journal =
437            variable::Journal::init(context.with_label("journal"), journal_cfg).await?;
438
439        // Rewind to last matching item.
440        journal.rewind_to(rewind_predicate).await?;
441
442        // Align the MMR and journal.
443        let mut mmr =
444            Self::align(mmr.into_dirty(), &journal, &mut hasher, APPLY_BATCH_SIZE).await?;
445
446        // Sync the journal and MMR to disk to avoid having to repeat any recovery that may have
447        // been performed on next startup.
448        journal.sync().await?;
449        mmr.sync().await?;
450
451        Ok(Self {
452            mmr,
453            journal,
454            hasher,
455        })
456    }
457}
458
459impl<E, C, H, S> Contiguous for Journal<E, C, H, S>
460where
461    E: Storage + Clock + Metrics,
462    C: MutableContiguous<Item: EncodeShared>,
463    H: Hasher,
464    S: State<DigestOf<H>> + Send + Sync,
465{
466    type Item = C::Item;
467
468    fn bounds(&self) -> std::ops::Range<u64> {
469        self.journal.bounds()
470    }
471
472    async fn replay(
473        &self,
474        start_pos: u64,
475        buffer: NonZeroUsize,
476    ) -> Result<
477        impl futures::Stream<Item = Result<(u64, Self::Item), JournalError>> + '_,
478        JournalError,
479    > {
480        self.journal.replay(start_pos, buffer).await
481    }
482
483    async fn read(&self, position: u64) -> Result<Self::Item, JournalError> {
484        self.journal.read(position).await
485    }
486}
487
488impl<E, C, H> MutableContiguous for Journal<E, C, H, Dirty>
489where
490    E: Storage + Clock + Metrics,
491    C: MutableContiguous<Item: EncodeShared>,
492    H: Hasher,
493{
494    async fn append(&mut self, item: Self::Item) -> Result<u64, JournalError> {
495        let res = self.append(item).await.map_err(|e| match e {
496            Error::Journal(inner) => inner,
497            Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
498        })?;
499
500        Ok(*res)
501    }
502
503    async fn prune(&mut self, min_position: u64) -> Result<bool, JournalError> {
504        self.journal.prune(min_position).await
505    }
506
507    async fn rewind(&mut self, size: u64) -> Result<(), JournalError> {
508        self.journal.rewind(size).await?;
509
510        let leaves = *self.mmr.leaves();
511        if leaves > size {
512            self.mmr
513                .pop((leaves - size) as usize)
514                .await
515                .map_err(|error| JournalError::Mmr(anyhow::Error::from(error)))?;
516        }
517
518        Ok(())
519    }
520}
521
522impl<E, C, H> Persistable for Journal<E, C, H, Clean<H::Digest>>
523where
524    E: Storage + Clock + Metrics,
525    C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
526    H: Hasher,
527{
528    type Error = JournalError;
529
530    async fn commit(&mut self) -> Result<(), JournalError> {
531        self.commit().await.map_err(|e| match e {
532            Error::Journal(inner) => inner,
533            Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
534        })
535    }
536
537    async fn sync(&mut self) -> Result<(), JournalError> {
538        self.sync().await.map_err(|e| match e {
539            Error::Journal(inner) => inner,
540            Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
541        })
542    }
543
544    async fn destroy(self) -> Result<(), JournalError> {
545        self.destroy().await.map_err(|e| match e {
546            Error::Journal(inner) => inner,
547            Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
548        })
549    }
550}
551
552#[cfg(test)]
553mod tests {
554    use super::*;
555    use crate::{
556        journal::contiguous::fixed::{Config as JConfig, Journal as ContiguousJournal},
557        mmr::{
558            journaled::{Config as MmrConfig, Mmr},
559            Location,
560        },
561        qmdb::{
562            any::unordered::{fixed::Operation, Update},
563            operation::Committable,
564        },
565    };
566    use commonware_codec::Encode;
567    use commonware_cryptography::{sha256, sha256::Digest, Sha256};
568    use commonware_macros::test_traced;
569    use commonware_runtime::{
570        buffer::paged::CacheRef,
571        deterministic::{self, Context},
572        Metrics, Runner as _,
573    };
574    use commonware_utils::{NZUsize, NZU16, NZU64};
575    use futures::StreamExt as _;
576    use std::num::NonZeroU16;
577
578    const PAGE_SIZE: NonZeroU16 = NZU16!(101);
579    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
580
581    /// Create MMR configuration for tests.
582    fn mmr_config(suffix: &str) -> MmrConfig {
583        MmrConfig {
584            journal_partition: format!("mmr_journal_{suffix}"),
585            metadata_partition: format!("mmr_metadata_{suffix}"),
586            items_per_blob: NZU64!(11),
587            write_buffer: NZUsize!(1024),
588            thread_pool: None,
589            page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
590        }
591    }
592
593    /// Create journal configuration for tests.
594    fn journal_config(suffix: &str) -> JConfig {
595        JConfig {
596            partition: format!("journal_{suffix}"),
597            items_per_blob: NZU64!(7),
598            write_buffer: NZUsize!(1024),
599            page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
600        }
601    }
602
603    type AuthenticatedJournal = Journal<
604        deterministic::Context,
605        ContiguousJournal<deterministic::Context, Operation<Digest, Digest>>,
606        Sha256,
607        Clean<sha256::Digest>,
608    >;
609
610    /// Create a new empty authenticated journal.
611    async fn create_empty_journal(context: Context, suffix: &str) -> AuthenticatedJournal {
612        AuthenticatedJournal::new(
613            context,
614            mmr_config(suffix),
615            journal_config(suffix),
616            |op: &Operation<Digest, Digest>| op.is_commit(),
617        )
618        .await
619        .unwrap()
620    }
621
622    /// Create a test operation with predictable values based on index.
623    fn create_operation(index: u8) -> Operation<Digest, Digest> {
624        Operation::Update(Update(
625            Sha256::fill(index),
626            Sha256::fill(index.wrapping_add(1)),
627        ))
628    }
629
630    /// Create an authenticated journal with N committed operations.
631    ///
632    /// Operations are added and then synced to ensure they are committed.
633    async fn create_journal_with_ops(
634        context: Context,
635        suffix: &str,
636        count: usize,
637    ) -> AuthenticatedJournal {
638        let mut journal = create_empty_journal(context, suffix).await.into_dirty();
639
640        for i in 0..count {
641            let op = create_operation(i as u8);
642            let loc = journal.append(op).await.unwrap();
643            assert_eq!(loc, Location::new_unchecked(i as u64));
644        }
645
646        let mut journal = journal.merkleize();
647        journal.sync().await.unwrap();
648        journal
649    }
650
651    /// Create separate MMR and journal components for testing alignment.
652    ///
653    /// These components are created independently and can be manipulated separately to test
654    /// scenarios where the MMR and journal are out of sync (e.g., one ahead of the other).
655    async fn create_components(
656        context: Context,
657        suffix: &str,
658    ) -> (
659        CleanMmr<deterministic::Context, sha256::Digest>,
660        ContiguousJournal<deterministic::Context, Operation<Digest, Digest>>,
661        StandardHasher<Sha256>,
662    ) {
663        let mut hasher = StandardHasher::new();
664        let mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_config(suffix))
665            .await
666            .unwrap();
667        let journal =
668            ContiguousJournal::init(context.with_label("journal"), journal_config(suffix))
669                .await
670                .unwrap();
671        (mmr, journal, hasher)
672    }
673
674    /// Verify that a proof correctly proves the given operations are included in the MMR.
675    fn verify_proof(
676        proof: &crate::mmr::Proof<<Sha256 as commonware_cryptography::Hasher>::Digest>,
677        operations: &[Operation<Digest, Digest>],
678        start_loc: Location,
679        root: &<Sha256 as commonware_cryptography::Hasher>::Digest,
680        hasher: &mut StandardHasher<Sha256>,
681    ) -> bool {
682        let encoded_ops: Vec<_> = operations.iter().map(|op| op.encode()).collect();
683        proof.verify_range_inclusion(hasher, &encoded_ops, start_loc, root)
684    }
685
686    /// Verify that new() creates an empty authenticated journal.
687    #[test_traced("INFO")]
688    fn test_new_creates_empty_journal() {
689        let executor = deterministic::Runner::default();
690        executor.start(|context| async move {
691            let journal = create_empty_journal(context, "new_empty").await;
692
693            let bounds = journal.bounds();
694            assert_eq!(bounds.end, Location::new_unchecked(0));
695            assert_eq!(bounds.start, Location::new_unchecked(0));
696            assert!(bounds.is_empty());
697        });
698    }
699
700    /// Verify that align() correctly handles empty MMR and journal components.
701    #[test_traced("INFO")]
702    fn test_align_with_empty_mmr_and_journal() {
703        let executor = deterministic::Runner::default();
704        executor.start(|context| async move {
705            let (mmr, journal, mut hasher) = create_components(context, "align_empty").await;
706
707            let mmr = Journal::align(mmr.into_dirty(), &journal, &mut hasher, APPLY_BATCH_SIZE)
708                .await
709                .unwrap();
710
711            assert_eq!(mmr.leaves(), Location::new_unchecked(0));
712            assert_eq!(journal.size(), Location::new_unchecked(0));
713        });
714    }
715
716    /// Verify that align() pops MMR elements when MMR is ahead of the journal.
717    #[test_traced("WARN")]
718    fn test_align_when_mmr_ahead() {
719        let executor = deterministic::Runner::default();
720        executor.start(|context| async move {
721            let (mmr, mut journal, mut hasher) = create_components(context, "mmr_ahead").await;
722
723            // Add 20 operations to both MMR and journal
724            let mut mmr = mmr.into_dirty();
725            for i in 0..20 {
726                let op = create_operation(i as u8);
727                let encoded = op.encode();
728                mmr.add(&mut hasher, &encoded).await.unwrap();
729                journal.append(op).await.unwrap();
730            }
731            let mmr = mmr.merkleize(&mut hasher);
732
733            // Add commit operation to journal only (making journal ahead)
734            let commit_op = Operation::CommitFloor(None, Location::new_unchecked(0));
735            journal.append(commit_op).await.unwrap();
736            journal.sync().await.unwrap();
737
738            // MMR has 20 leaves, journal has 21 operations (20 ops + 1 commit)
739            let mmr = Journal::align(mmr.into_dirty(), &journal, &mut hasher, APPLY_BATCH_SIZE)
740                .await
741                .unwrap();
742
743            // MMR should have been popped to match journal
744            assert_eq!(mmr.leaves(), Location::new_unchecked(21));
745            assert_eq!(journal.size(), Location::new_unchecked(21));
746        });
747    }
748
749    /// Verify that align() replays journal operations when journal is ahead of MMR.
750    #[test_traced("WARN")]
751    fn test_align_when_journal_ahead() {
752        let executor = deterministic::Runner::default();
753        executor.start(|context| async move {
754            let (mmr, mut journal, mut hasher) = create_components(context, "journal_ahead").await;
755
756            // Add 20 operations to journal only
757            for i in 0..20 {
758                let op = create_operation(i as u8);
759                journal.append(op).await.unwrap();
760            }
761
762            // Add commit
763            let commit_op = Operation::CommitFloor(None, Location::new_unchecked(0));
764            journal.append(commit_op).await.unwrap();
765            journal.sync().await.unwrap();
766
767            // Journal has 21 operations, MMR has 0 leaves
768            let mmr = Journal::align(mmr.into_dirty(), &journal, &mut hasher, APPLY_BATCH_SIZE)
769                .await
770                .unwrap();
771
772            // MMR should have been replayed to match journal
773            assert_eq!(mmr.leaves(), Location::new_unchecked(21));
774            assert_eq!(journal.size(), Location::new_unchecked(21));
775        });
776    }
777
778    /// Verify that align() discards uncommitted operations.
779    #[test_traced("INFO")]
780    fn test_align_with_mismatched_committed_ops() {
781        let executor = deterministic::Runner::default();
782        executor.start(|context| async move {
783            let mut journal = create_empty_journal(context.with_label("first"), "mismatched")
784                .await
785                .into_dirty();
786
787            // Add 20 uncommitted operations
788            for i in 0..20 {
789                let loc = journal.append(create_operation(i as u8)).await.unwrap();
790                assert_eq!(loc, Location::new_unchecked(i as u64));
791            }
792            let mut journal = journal.merkleize();
793
794            // Don't sync - these are uncommitted
795            // After alignment, they should be discarded
796            let size_before = journal.size();
797            assert_eq!(size_before, 20);
798
799            // Drop and recreate to simulate restart (which calls align internally)
800            journal.sync().await.unwrap();
801            drop(journal);
802            let journal = create_empty_journal(context.with_label("second"), "mismatched").await;
803
804            // Uncommitted operations should be gone
805            assert_eq!(journal.size(), 0);
806        });
807    }
808
809    #[test_traced("INFO")]
810    fn test_rewind() {
811        let executor = deterministic::Runner::default();
812        executor.start(|context| async move {
813            // Test 1: Matching operation is kept
814            {
815                let mut journal = ContiguousJournal::init(
816                    context.with_label("rewind_match"),
817                    journal_config("rewind_match"),
818                )
819                .await
820                .unwrap();
821
822                // Add operations where operation 3 is a commit
823                for i in 0..3 {
824                    journal.append(create_operation(i)).await.unwrap();
825                }
826                journal
827                    .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
828                    .await
829                    .unwrap();
830                for i in 4..7 {
831                    journal.append(create_operation(i)).await.unwrap();
832                }
833
834                // Rewind to last commit
835                let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
836                assert_eq!(final_size, 4);
837                assert_eq!(journal.size(), 4);
838
839                // Verify the commit operation is still there
840                let op = journal.read(3).await.unwrap();
841                assert!(op.is_commit());
842            }
843
844            // Test 2: Last matching operation is chosen when multiple match
845            {
846                let mut journal = ContiguousJournal::init(
847                    context.with_label("rewind_multiple"),
848                    journal_config("rewind_multiple"),
849                )
850                .await
851                .unwrap();
852
853                // Add multiple commits
854                journal.append(create_operation(0)).await.unwrap();
855                journal
856                    .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
857                    .await
858                    .unwrap(); // pos 1
859                journal.append(create_operation(2)).await.unwrap();
860                journal
861                    .append(Operation::CommitFloor(None, Location::new_unchecked(1)))
862                    .await
863                    .unwrap(); // pos 3
864                journal.append(create_operation(4)).await.unwrap();
865
866                // Should rewind to last commit (pos 3)
867                let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
868                assert_eq!(final_size, 4);
869
870                // Verify the last commit is still there
871                let op = journal.read(3).await.unwrap();
872                assert!(op.is_commit());
873
874                // Verify we can't read pos 4
875                assert!(journal.read(4).await.is_err());
876            }
877
878            // Test 3: Rewind to pruning boundary when no match
879            {
880                let mut journal = ContiguousJournal::init(
881                    context.with_label("rewind_no_match"),
882                    journal_config("rewind_no_match"),
883                )
884                .await
885                .unwrap();
886
887                // Add operations with no commits
888                for i in 0..10 {
889                    journal.append(create_operation(i)).await.unwrap();
890                }
891
892                // Rewind should go to pruning boundary (0 for unpruned)
893                let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
894                assert_eq!(final_size, 0, "Should rewind to pruning boundary (0)");
895                assert_eq!(journal.size(), 0);
896            }
897
898            // Test 4: Rewind with existing pruning boundary
899            {
900                let mut journal = ContiguousJournal::init(
901                    context.with_label("rewind_with_pruning"),
902                    journal_config("rewind_with_pruning"),
903                )
904                .await
905                .unwrap();
906
907                // Add operations and a commit at position 10 (past first section boundary of 7)
908                for i in 0..10 {
909                    journal.append(create_operation(i)).await.unwrap();
910                }
911                journal
912                    .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
913                    .await
914                    .unwrap(); // pos 10
915                for i in 11..15 {
916                    journal.append(create_operation(i)).await.unwrap();
917                }
918                journal.sync().await.unwrap();
919
920                // Prune up to position 8 (this will prune section 0, items 0-6, keeping 7+)
921                journal.prune(8).await.unwrap();
922                assert_eq!(journal.bounds().start, Location::new_unchecked(7));
923
924                // Add more uncommitted operations
925                for i in 15..20 {
926                    journal.append(create_operation(i)).await.unwrap();
927                }
928
929                // Rewind should keep the commit at position 10
930                let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
931                assert_eq!(final_size, 11);
932
933                // Verify commit is still there
934                let op = journal.read(10).await.unwrap();
935                assert!(op.is_commit());
936            }
937
938            // Test 5: Rewind with no matches after pruning boundary
939            {
940                let mut journal = ContiguousJournal::init(
941                    context.with_label("rewind_no_match_pruned"),
942                    journal_config("rewind_no_match_pruned"),
943                )
944                .await
945                .unwrap();
946
947                // Add operations with a commit at position 5 (in section 0: 0-6)
948                for i in 0..5 {
949                    journal.append(create_operation(i)).await.unwrap();
950                }
951                journal
952                    .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
953                    .await
954                    .unwrap(); // pos 5
955                for i in 6..10 {
956                    journal.append(create_operation(i)).await.unwrap();
957                }
958                journal.sync().await.unwrap();
959
960                // Prune up to position 8 (this prunes section 0, including the commit at pos 5)
961                // Pruning boundary will be at position 7 (start of section 1)
962                journal.prune(8).await.unwrap();
963                assert_eq!(journal.bounds().start, Location::new_unchecked(7));
964
965                // Add uncommitted operations with no commits (in section 1: 7-13)
966                for i in 10..14 {
967                    journal.append(create_operation(i)).await.unwrap();
968                }
969
970                // Rewind with no matching commits after the pruning boundary
971                // Should rewind to the pruning boundary at position 7
972                let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
973                assert_eq!(final_size, 7);
974            }
975
976            // Test 6: Empty journal
977            {
978                let mut journal = ContiguousJournal::init(
979                    context.with_label("rewind_empty"),
980                    journal_config("rewind_empty"),
981                )
982                .await
983                .unwrap();
984
985                // Rewind empty journal should be no-op
986                let final_size = journal
987                    .rewind_to(|op: &Operation<Digest, Digest>| op.is_commit())
988                    .await
989                    .unwrap();
990                assert_eq!(final_size, 0);
991                assert_eq!(journal.size(), 0);
992            }
993
994            // Test 7: Position based authenticated journal rewind.
995            {
996                let mut journal = AuthenticatedJournal::new(
997                    context,
998                    mmr_config("rewind"),
999                    journal_config("rewind"),
1000                    |op| op.is_commit(),
1001                )
1002                .await
1003                .unwrap()
1004                .into_dirty();
1005
1006                // Add operations with a commit at position 5 (in section 0: 0-6)
1007                for i in 0..5 {
1008                    journal.append(create_operation(i)).await.unwrap();
1009                }
1010                journal
1011                    .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
1012                    .await
1013                    .unwrap(); // pos 5
1014                for i in 6..10 {
1015                    journal.append(create_operation(i)).await.unwrap();
1016                }
1017                let journal = journal.merkleize();
1018                assert_eq!(journal.size(), 10);
1019
1020                let mut journal = journal.into_dirty();
1021                journal.rewind(2).await.unwrap();
1022                assert_eq!(journal.size(), 2);
1023                assert_eq!(journal.mmr.leaves(), 2);
1024                assert_eq!(journal.mmr.size(), 3);
1025                let bounds = journal.bounds();
1026                assert_eq!(bounds.start, Location::new_unchecked(0));
1027                assert!(!bounds.is_empty());
1028
1029                assert!(matches!(
1030                    journal.rewind(3).await,
1031                    Err(JournalError::InvalidRewind(_))
1032                ));
1033
1034                journal.rewind(0).await.unwrap();
1035                let journal = journal.merkleize();
1036                assert_eq!(journal.size(), 0);
1037                assert_eq!(journal.mmr.leaves(), 0);
1038                assert_eq!(journal.mmr.size(), 0);
1039                let bounds = journal.bounds();
1040                assert_eq!(bounds.start, Location::new_unchecked(0));
1041                assert!(bounds.is_empty());
1042
1043                // Test rewinding after pruning.
1044                let mut journal = journal.into_dirty();
1045                for i in 0..255 {
1046                    journal.append(create_operation(i)).await.unwrap();
1047                }
1048
1049                let mut journal = journal.merkleize();
1050                journal.prune(Location::new_unchecked(100)).await.unwrap();
1051                assert_eq!(journal.bounds().start, Location::new_unchecked(98));
1052                let mut journal = journal.into_dirty();
1053                let res = journal.rewind(97).await;
1054                assert!(matches!(res, Err(JournalError::InvalidRewind(97))));
1055                journal.rewind(98).await.unwrap();
1056                let bounds = journal.bounds();
1057                assert_eq!(bounds.end, Location::new_unchecked(98));
1058                assert_eq!(journal.mmr.leaves(), 98);
1059                assert_eq!(bounds.start, Location::new_unchecked(98));
1060                assert!(bounds.is_empty());
1061            }
1062        });
1063    }
1064
1065    /// Verify that append() increments the operation count, returns correct locations, and
1066    /// operations can be read back correctly.
1067    #[test_traced("INFO")]
1068    fn test_apply_op_and_read_operations() {
1069        let executor = deterministic::Runner::default();
1070        executor.start(|context| async move {
1071            let mut journal = create_empty_journal(context, "apply_op").await.into_dirty();
1072
1073            assert_eq!(journal.size(), 0);
1074
1075            // Add 50 operations
1076            let expected_ops: Vec<_> = (0..50).map(|i| create_operation(i as u8)).collect();
1077            for (i, op) in expected_ops.iter().enumerate() {
1078                let loc = journal.append(op.clone()).await.unwrap();
1079                assert_eq!(loc, Location::new_unchecked(i as u64));
1080                assert_eq!(journal.size(), (i + 1) as u64);
1081            }
1082            let mut journal = journal.merkleize();
1083
1084            assert_eq!(journal.size(), 50);
1085
1086            // Verify all operations can be read back correctly
1087            journal.sync().await.unwrap();
1088            for (i, expected_op) in expected_ops.iter().enumerate() {
1089                let read_op = journal
1090                    .read(Location::new_unchecked(i as u64))
1091                    .await
1092                    .unwrap();
1093                assert_eq!(read_op, *expected_op);
1094            }
1095        });
1096    }
1097
1098    /// Verify that read() returns correct operations at various positions.
1099    #[test_traced("INFO")]
1100    fn test_read_operations_at_various_positions() {
1101        let executor = deterministic::Runner::default();
1102        executor.start(|context| async move {
1103            let journal = create_journal_with_ops(context, "read", 50).await;
1104
1105            // Verify reading first operation
1106            let first_op = journal.read(Location::new_unchecked(0)).await.unwrap();
1107            assert_eq!(first_op, create_operation(0));
1108
1109            // Verify reading middle operation
1110            let middle_op = journal.read(Location::new_unchecked(25)).await.unwrap();
1111            assert_eq!(middle_op, create_operation(25));
1112
1113            // Verify reading last operation
1114            let last_op = journal.read(Location::new_unchecked(49)).await.unwrap();
1115            assert_eq!(last_op, create_operation(49));
1116
1117            // Verify all operations match expected values
1118            for i in 0..50 {
1119                let op = journal.read(Location::new_unchecked(i)).await.unwrap();
1120                assert_eq!(op, create_operation(i as u8));
1121            }
1122        });
1123    }
1124
1125    /// Verify that read() returns an error for pruned operations.
1126    #[test_traced("INFO")]
1127    fn test_read_pruned_operation_returns_error() {
1128        let executor = deterministic::Runner::default();
1129        executor.start(|context| async move {
1130            let mut journal = create_journal_with_ops(context, "read_pruned", 100)
1131                .await
1132                .into_dirty();
1133
1134            // Add commit and prune
1135            journal
1136                .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1137                .await
1138                .unwrap();
1139            let mut journal = journal.merkleize();
1140            journal.sync().await.unwrap();
1141            let pruned_boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1142
1143            // Try to read an operation before the pruned boundary
1144            let read_loc = Location::new_unchecked(0);
1145            if read_loc < pruned_boundary {
1146                let result = journal.read(read_loc).await;
1147                assert!(matches!(
1148                    result,
1149                    Err(Error::Journal(crate::journal::Error::ItemPruned(_)))
1150                ));
1151            }
1152        });
1153    }
1154
1155    /// Verify that read() returns an error for out-of-range locations.
1156    #[test_traced("INFO")]
1157    fn test_read_out_of_range_returns_error() {
1158        let executor = deterministic::Runner::default();
1159        executor.start(|context| async move {
1160            let journal = create_journal_with_ops(context, "read_oob", 3).await;
1161
1162            // Try to read beyond the end
1163            let result = journal.read(Location::new_unchecked(10)).await;
1164            assert!(matches!(
1165                result,
1166                Err(Error::Journal(crate::journal::Error::ItemOutOfRange(_)))
1167            ));
1168        });
1169    }
1170
1171    /// Verify that we can read all operations back correctly.
1172    #[test_traced("INFO")]
1173    fn test_read_all_operations_back_correctly() {
1174        let executor = deterministic::Runner::default();
1175        executor.start(|context| async move {
1176            let journal = create_journal_with_ops(context, "read_all", 50).await;
1177
1178            assert_eq!(journal.size(), 50);
1179
1180            // Verify all operations can be read back and match expected values
1181            for i in 0..50 {
1182                let op = journal.read(Location::new_unchecked(i)).await.unwrap();
1183                assert_eq!(op, create_operation(i as u8));
1184            }
1185        });
1186    }
1187
1188    /// Verify that sync() persists operations.
1189    #[test_traced("INFO")]
1190    fn test_sync() {
1191        let executor = deterministic::Runner::default();
1192        executor.start(|context| async move {
1193            let mut journal = create_empty_journal(context.with_label("first"), "close_pending")
1194                .await
1195                .into_dirty();
1196
1197            // Add 20 operations
1198            let expected_ops: Vec<_> = (0..20).map(|i| create_operation(i as u8)).collect();
1199            for (i, op) in expected_ops.iter().enumerate() {
1200                let loc = journal.append(op.clone()).await.unwrap();
1201                assert_eq!(loc, Location::new_unchecked(i as u64),);
1202            }
1203
1204            // Add commit operation to commit the operations
1205            let commit_loc = journal
1206                .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
1207                .await
1208                .unwrap();
1209            let mut journal = journal.merkleize();
1210            assert_eq!(
1211                commit_loc,
1212                Location::new_unchecked(20),
1213                "commit should be at location 20"
1214            );
1215            journal.sync().await.unwrap();
1216
1217            // Reopen and verify the operations persisted
1218            drop(journal);
1219            let journal = create_empty_journal(context.with_label("second"), "close_pending").await;
1220            assert_eq!(journal.size(), 21);
1221
1222            // Verify all operations can be read back
1223            for (i, expected_op) in expected_ops.iter().enumerate() {
1224                let read_op = journal
1225                    .read(Location::new_unchecked(i as u64))
1226                    .await
1227                    .unwrap();
1228                assert_eq!(read_op, *expected_op);
1229            }
1230        });
1231    }
1232
1233    /// Verify that pruning an empty journal returns the boundary.
1234    #[test_traced("INFO")]
1235    fn test_prune_empty_journal() {
1236        let executor = deterministic::Runner::default();
1237        executor.start(|context| async move {
1238            let mut journal = create_empty_journal(context, "prune_empty").await;
1239
1240            let boundary = journal.prune(Location::new_unchecked(0)).await.unwrap();
1241
1242            assert_eq!(boundary, Location::new_unchecked(0));
1243        });
1244    }
1245
1246    /// Verify that pruning to a specific location works correctly.
1247    #[test_traced("INFO")]
1248    fn test_prune_to_location() {
1249        let executor = deterministic::Runner::default();
1250        executor.start(|context| async move {
1251            let mut journal = create_journal_with_ops(context, "prune_to", 100)
1252                .await
1253                .into_dirty();
1254
1255            // Add commit at position 50
1256            journal
1257                .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1258                .await
1259                .unwrap();
1260            let mut journal = journal.merkleize();
1261            journal.sync().await.unwrap();
1262
1263            let boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1264
1265            // Boundary should be <= requested location (may align to section boundary)
1266            assert!(boundary <= Location::new_unchecked(50));
1267        });
1268    }
1269
1270    /// Verify that prune() returns the actual boundary (which may differ from requested).
1271    #[test_traced("INFO")]
1272    fn test_prune_returns_actual_boundary() {
1273        let executor = deterministic::Runner::default();
1274        executor.start(|context| async move {
1275            let mut journal = create_journal_with_ops(context, "prune_boundary", 100)
1276                .await
1277                .into_dirty();
1278
1279            journal
1280                .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1281                .await
1282                .unwrap();
1283            let mut journal = journal.merkleize();
1284            journal.sync().await.unwrap();
1285
1286            let requested = Location::new_unchecked(50);
1287            let actual = journal.prune(requested).await.unwrap();
1288
1289            // Actual boundary should match bounds.start
1290            let bounds = journal.bounds();
1291            assert!(!bounds.is_empty());
1292            assert_eq!(actual, bounds.start);
1293
1294            // Actual may be <= requested due to section alignment
1295            assert!(actual <= requested);
1296        });
1297    }
1298
1299    /// Verify that pruning doesn't change the operation count.
1300    #[test_traced("INFO")]
1301    fn test_prune_preserves_operation_count() {
1302        let executor = deterministic::Runner::default();
1303        executor.start(|context| async move {
1304            let mut journal = create_journal_with_ops(context, "prune_count", 100)
1305                .await
1306                .into_dirty();
1307
1308            journal
1309                .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1310                .await
1311                .unwrap();
1312            let mut journal = journal.merkleize();
1313            journal.sync().await.unwrap();
1314
1315            let count_before = journal.size();
1316            journal.prune(Location::new_unchecked(50)).await.unwrap();
1317            let count_after = journal.size();
1318
1319            assert_eq!(count_before, count_after);
1320        });
1321    }
1322
1323    /// Verify bounds() for empty journal, no pruning, and after pruning.
1324    #[test_traced("INFO")]
1325    fn test_bounds_empty_and_pruned() {
1326        let executor = deterministic::Runner::default();
1327        executor.start(|context| async move {
1328            // Test empty journal
1329            let journal = create_empty_journal(context.with_label("empty"), "oldest").await;
1330            assert!(journal.bounds().is_empty());
1331            journal.destroy().await.unwrap();
1332
1333            // Test no pruning
1334            let journal =
1335                create_journal_with_ops(context.with_label("no_prune"), "oldest", 100).await;
1336            let bounds = journal.bounds();
1337            assert!(!bounds.is_empty());
1338            assert_eq!(bounds.start, Location::new_unchecked(0));
1339            journal.destroy().await.unwrap();
1340
1341            // Test after pruning
1342            let journal =
1343                create_journal_with_ops(context.with_label("pruned"), "oldest", 100).await;
1344            let mut journal = journal.into_dirty();
1345            journal
1346                .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1347                .await
1348                .unwrap();
1349            let mut journal = journal.merkleize();
1350            journal.sync().await.unwrap();
1351
1352            let pruned_boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1353
1354            // Should match the pruned boundary (may be <= 50 due to section alignment)
1355            let bounds = journal.bounds();
1356            assert!(!bounds.is_empty());
1357            assert_eq!(bounds.start, pruned_boundary);
1358            // Should be <= requested location (50)
1359            assert!(pruned_boundary <= Location::new_unchecked(50));
1360            journal.destroy().await.unwrap();
1361        });
1362    }
1363
1364    /// Verify bounds().start for empty journal, no pruning, and after pruning.
1365    #[test_traced("INFO")]
1366    fn test_bounds_start_after_prune() {
1367        let executor = deterministic::Runner::default();
1368        executor.start(|context| async move {
1369            // Test empty journal
1370            let journal = create_empty_journal(context.with_label("empty"), "boundary").await;
1371            assert_eq!(journal.bounds().start, Location::new_unchecked(0));
1372
1373            // Test no pruning
1374            let journal =
1375                create_journal_with_ops(context.with_label("no_prune"), "boundary", 100).await;
1376            assert_eq!(journal.bounds().start, Location::new_unchecked(0));
1377
1378            // Test after pruning
1379            let mut journal =
1380                create_journal_with_ops(context.with_label("pruned"), "boundary", 100)
1381                    .await
1382                    .into_dirty();
1383            journal
1384                .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1385                .await
1386                .unwrap();
1387            let mut journal = journal.merkleize();
1388            journal.sync().await.unwrap();
1389
1390            let pruned_boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1391
1392            assert_eq!(journal.bounds().start, pruned_boundary);
1393        });
1394    }
1395
1396    /// Verify that MMR prunes to the journal's actual boundary, not the requested location.
1397    #[test_traced("INFO")]
1398    fn test_mmr_prunes_to_journal_boundary() {
1399        let executor = deterministic::Runner::default();
1400        executor.start(|context| async move {
1401            let mut journal = create_journal_with_ops(context, "mmr_boundary", 50)
1402                .await
1403                .into_dirty();
1404
1405            journal
1406                .append(Operation::CommitFloor(None, Location::new_unchecked(25)))
1407                .await
1408                .unwrap();
1409            let mut journal = journal.merkleize();
1410            journal.sync().await.unwrap();
1411
1412            let pruned_boundary = journal.prune(Location::new_unchecked(25)).await.unwrap();
1413
1414            // Verify MMR and journal remain in sync
1415            assert!(!journal.bounds().is_empty());
1416            assert_eq!(pruned_boundary, journal.bounds().start);
1417
1418            // Verify boundary is at or before requested (due to section alignment)
1419            assert!(pruned_boundary <= Location::new_unchecked(25));
1420
1421            // Verify operation count is unchanged
1422            assert_eq!(journal.size(), 51);
1423        });
1424    }
1425
1426    /// Verify proof() for multiple operations.
1427    #[test_traced("INFO")]
1428    fn test_proof_multiple_operations() {
1429        let executor = deterministic::Runner::default();
1430        executor.start(|context| async move {
1431            let journal = create_journal_with_ops(context, "proof_multi", 50).await;
1432
1433            let (proof, ops) = journal
1434                .proof(Location::new_unchecked(0), NZU64!(50))
1435                .await
1436                .unwrap();
1437
1438            assert_eq!(ops.len(), 50);
1439            for (i, op) in ops.iter().enumerate() {
1440                assert_eq!(*op, create_operation(i as u8));
1441            }
1442
1443            // Verify the proof is valid
1444            let mut hasher = StandardHasher::new();
1445            let root = journal.root();
1446            assert!(verify_proof(
1447                &proof,
1448                &ops,
1449                Location::new_unchecked(0),
1450                &root,
1451                &mut hasher
1452            ));
1453        });
1454    }
1455
1456    /// Verify that historical_proof() respects the max_ops limit.
1457    #[test_traced("INFO")]
1458    fn test_historical_proof_limited_by_max_ops() {
1459        let executor = deterministic::Runner::default();
1460        executor.start(|context| async move {
1461            let journal = create_journal_with_ops(context, "proof_limit", 50).await;
1462
1463            let size = journal.size();
1464            let (proof, ops) = journal
1465                .historical_proof(size, Location::new_unchecked(0), NZU64!(20))
1466                .await
1467                .unwrap();
1468
1469            // Should return only 20 operations despite 50 being available
1470            assert_eq!(ops.len(), 20);
1471            for (i, op) in ops.iter().enumerate() {
1472                assert_eq!(*op, create_operation(i as u8));
1473            }
1474
1475            // Verify the proof is valid
1476            let mut hasher = StandardHasher::new();
1477            let root = journal.root();
1478            assert!(verify_proof(
1479                &proof,
1480                &ops,
1481                Location::new_unchecked(0),
1482                &root,
1483                &mut hasher
1484            ));
1485        });
1486    }
1487
1488    /// Verify historical_proof() at the end of the journal.
1489    #[test_traced("INFO")]
1490    fn test_historical_proof_at_end_of_journal() {
1491        let executor = deterministic::Runner::default();
1492        executor.start(|context| async move {
1493            let journal = create_journal_with_ops(context, "proof_end", 50).await;
1494
1495            let size = journal.size();
1496            // Request proof starting near the end
1497            let (proof, ops) = journal
1498                .historical_proof(size, Location::new_unchecked(40), NZU64!(20))
1499                .await
1500                .unwrap();
1501
1502            // Should return only 10 operations (positions 40-49)
1503            assert_eq!(ops.len(), 10);
1504            for (i, op) in ops.iter().enumerate() {
1505                assert_eq!(*op, create_operation((40 + i) as u8));
1506            }
1507
1508            // Verify the proof is valid
1509            let mut hasher = StandardHasher::new();
1510            let root = journal.root();
1511            assert!(verify_proof(
1512                &proof,
1513                &ops,
1514                Location::new_unchecked(40),
1515                &root,
1516                &mut hasher
1517            ));
1518        });
1519    }
1520
1521    /// Verify that historical_proof() returns an error for invalid size.
1522    #[test_traced("INFO")]
1523    fn test_historical_proof_out_of_range_returns_error() {
1524        let executor = deterministic::Runner::default();
1525        executor.start(|context| async move {
1526            let journal = create_journal_with_ops(context, "proof_oob", 5).await;
1527
1528            // Request proof with size > actual journal size
1529            let result = journal
1530                .historical_proof(
1531                    Location::new_unchecked(10),
1532                    Location::new_unchecked(0),
1533                    NZU64!(1),
1534                )
1535                .await;
1536
1537            assert!(matches!(
1538                result,
1539                Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
1540            ));
1541        });
1542    }
1543
1544    /// Verify that historical_proof() returns an error when start_loc >= size.
1545    #[test_traced("INFO")]
1546    fn test_historical_proof_start_too_large_returns_error() {
1547        let executor = deterministic::Runner::default();
1548        executor.start(|context| async move {
1549            let journal = create_journal_with_ops(context, "proof_start_oob", 5).await;
1550
1551            let size = journal.size();
1552            // Request proof starting at size (should fail)
1553            let result = journal.historical_proof(size, size, NZU64!(1)).await;
1554
1555            assert!(matches!(
1556                result,
1557                Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
1558            ));
1559        });
1560    }
1561
1562    /// Verify historical_proof() for a truly historical state (before more operations added).
1563    #[test_traced("INFO")]
1564    fn test_historical_proof_truly_historical() {
1565        let executor = deterministic::Runner::default();
1566        executor.start(|context| async move {
1567            // Create journal with initial operations
1568            let journal = create_journal_with_ops(context, "proof_historical", 50).await;
1569
1570            // Capture root at historical state
1571            let mut hasher = StandardHasher::new();
1572            let historical_root = journal.root();
1573            let historical_size = journal.size();
1574
1575            // Add more operations after the historical state
1576            let mut journal = journal.into_dirty();
1577            for i in 50..100 {
1578                journal.append(create_operation(i as u8)).await.unwrap();
1579            }
1580            let mut journal = journal.merkleize();
1581            journal.sync().await.unwrap();
1582
1583            // Generate proof for the historical state
1584            let (proof, ops) = journal
1585                .historical_proof(historical_size, Location::new_unchecked(0), NZU64!(50))
1586                .await
1587                .unwrap();
1588
1589            // Verify operations match expected historical operations
1590            assert_eq!(ops.len(), 50);
1591            for (i, op) in ops.iter().enumerate() {
1592                assert_eq!(*op, create_operation(i as u8));
1593            }
1594
1595            // Verify the proof is valid against the historical root
1596            assert!(verify_proof(
1597                &proof,
1598                &ops,
1599                Location::new_unchecked(0),
1600                &historical_root,
1601                &mut hasher
1602            ));
1603        });
1604    }
1605
1606    /// Verify that historical_proof() returns an error when start_loc is pruned.
1607    #[test_traced("INFO")]
1608    fn test_historical_proof_pruned_location_returns_error() {
1609        let executor = deterministic::Runner::default();
1610        executor.start(|context| async move {
1611            let journal = create_journal_with_ops(context, "proof_pruned", 50).await;
1612
1613            let mut journal = journal.into_dirty();
1614            journal
1615                .append(Operation::CommitFloor(None, Location::new_unchecked(25)))
1616                .await
1617                .unwrap();
1618            let mut journal = journal.merkleize();
1619            journal.sync().await.unwrap();
1620            let pruned_boundary = journal.prune(Location::new_unchecked(25)).await.unwrap();
1621
1622            // Try to get proof starting at a location before the pruned boundary
1623            let size = journal.size();
1624            let start_loc = Location::new_unchecked(0);
1625            if start_loc < pruned_boundary {
1626                let result = journal.historical_proof(size, start_loc, NZU64!(1)).await;
1627
1628                // Should fail when trying to read pruned operations
1629                assert!(result.is_err());
1630            }
1631        });
1632    }
1633
1634    /// Verify replay() with empty journal and multiple operations.
1635    #[test_traced("INFO")]
1636    fn test_replay_operations() {
1637        let executor = deterministic::Runner::default();
1638        executor.start(|context| async move {
1639            // Test empty journal
1640            let journal = create_empty_journal(context.with_label("empty"), "replay").await;
1641            let stream = journal.replay(0, NZUsize!(10)).await.unwrap();
1642            futures::pin_mut!(stream);
1643            assert!(stream.next().await.is_none());
1644
1645            // Test replaying all operations
1646            let journal =
1647                create_journal_with_ops(context.with_label("with_ops"), "replay", 50).await;
1648            let stream = journal.replay(0, NZUsize!(100)).await.unwrap();
1649            futures::pin_mut!(stream);
1650
1651            for i in 0..50 {
1652                let (pos, op) = stream.next().await.unwrap().unwrap();
1653                assert_eq!(pos, i);
1654                assert_eq!(op, create_operation(i as u8));
1655            }
1656
1657            assert!(stream.next().await.is_none());
1658        });
1659    }
1660
1661    /// Verify replay() starting from a middle location.
1662    #[test_traced("INFO")]
1663    fn test_replay_from_middle() {
1664        let executor = deterministic::Runner::default();
1665        executor.start(|context| async move {
1666            let journal = create_journal_with_ops(context, "replay_middle", 50).await;
1667            let stream = journal.replay(25, NZUsize!(100)).await.unwrap();
1668            futures::pin_mut!(stream);
1669
1670            let mut count = 0;
1671            while let Some(result) = stream.next().await {
1672                let (pos, op) = result.unwrap();
1673                assert_eq!(pos, 25 + count);
1674                assert_eq!(op, create_operation((25 + count) as u8));
1675                count += 1;
1676            }
1677
1678            // Should have replayed positions 25-49 (25 operations)
1679            assert_eq!(count, 25);
1680        });
1681    }
1682}