commonware_storage/journal/
authenticated.rs

1//! Authenticated journal implementation.
2//!
3//! An authenticated journal maintains a contiguous journal of items alongside a Merkle Mountain
4//! Range (MMR). The item at index i in the journal corresponds to the leaf at Location i in the
5//! MMR. This structure enables efficient proofs that an item is included in the journal at a
6//! specific location.
7
8use crate::{
9    journal::{
10        contiguous::{fixed, variable, Contiguous, MutableContiguous, PersistableContiguous},
11        Error as JournalError,
12    },
13    mmr::{
14        journaled::{CleanMmr, Mmr},
15        mem::{Clean, Dirty, State},
16        Location, Position, Proof, StandardHasher,
17    },
18};
19use commonware_codec::{Codec, CodecFixed, Encode};
20use commonware_cryptography::{DigestOf, Hasher};
21use commonware_runtime::{Clock, Metrics, Storage};
22use core::num::{NonZeroU64, NonZeroUsize};
23use futures::{future::try_join_all, try_join, TryFutureExt as _};
24use thiserror::Error;
25use tracing::{debug, warn};
26
27/// Errors that can occur when interacting with an authenticated journal.
28#[derive(Error, Debug)]
29pub enum Error {
30    #[error("mmr error: {0}")]
31    Mmr(#[from] crate::mmr::Error),
32
33    #[error("journal error: {0}")]
34    Journal(#[from] super::Error),
35}
36/// An append-only data structure that maintains a sequential journal of items alongside a Merkle
37/// Mountain Range (MMR). The item at index i in the journal corresponds to the leaf at Location i
38/// in the MMR. This structure enables efficient proofs that an item is included in the journal at a
39/// specific location.
40pub struct Journal<E, C, H, S: State<H::Digest> = Dirty>
41where
42    E: Storage + Clock + Metrics,
43    C: Contiguous<Item: Encode>,
44    H: Hasher,
45{
46    /// MMR where each leaf is an item digest.
47    /// Invariant: leaf i corresponds to item i in the journal.
48    pub(crate) mmr: Mmr<E, H::Digest, S>,
49
50    /// Journal of items.
51    /// Invariant: item i corresponds to leaf i in the MMR.
52    pub(crate) journal: C,
53
54    pub(crate) hasher: StandardHasher<H>,
55}
56
57impl<E, C, H, S> Journal<E, C, H, S>
58where
59    E: Storage + Clock + Metrics,
60    C: Contiguous<Item: Encode>,
61    H: Hasher,
62    S: State<DigestOf<H>>,
63{
64    /// Returns the number of items in the journal.
65    pub fn size(&self) -> Location {
66        Location::new_unchecked(self.journal.size())
67    }
68
69    /// Returns the oldest retained location in the journal.
70    pub fn oldest_retained_loc(&self) -> Option<Location> {
71        self.journal
72            .oldest_retained_pos()
73            .map(Location::new_unchecked)
74    }
75
76    /// Returns the pruning boundary for the journal.
77    pub fn pruning_boundary(&self) -> Location {
78        self.journal.pruning_boundary().into()
79    }
80
81    /// Read an item from the journal at the given location.
82    pub async fn read(&self, loc: Location) -> Result<C::Item, Error> {
83        self.journal.read(*loc).await.map_err(Error::Journal)
84    }
85}
86
87impl<E, C, H, S> Journal<E, C, H, S>
88where
89    E: Storage + Clock + Metrics,
90    C: MutableContiguous<Item: Encode>,
91    H: Hasher,
92    S: State<DigestOf<H>>,
93{
94    pub async fn append(&mut self, item: C::Item) -> Result<Location, Error> {
95        let encoded_item = item.encode();
96
97        // Append item to the journal and update the MMR in parallel.
98        let (_, loc) = try_join!(
99            self.mmr
100                .add(&mut self.hasher, &encoded_item)
101                .map_err(Error::Mmr),
102            self.journal.append(item).map_err(Into::into)
103        )?;
104
105        Ok(Location::new_unchecked(loc))
106    }
107}
108
109impl<E, C, H, S> Journal<E, C, H, S>
110where
111    E: Storage + Clock + Metrics,
112    C: PersistableContiguous<Item: Encode>,
113    H: Hasher,
114    S: State<DigestOf<H>>,
115{
116    /// Durably persist the journal. This is faster than `sync()` but does not persist the MMR,
117    /// meaning recovery will be required on startup if we crash before `sync()` or `close()`.
118    pub async fn commit(&mut self) -> Result<(), Error> {
119        self.journal.commit().await.map_err(Error::Journal)
120    }
121}
122
123impl<E, C, H> Journal<E, C, H, Clean<H::Digest>>
124where
125    E: Storage + Clock + Metrics,
126    C: MutableContiguous<Item: Encode>,
127    H: Hasher,
128{
129    /// Create a new [Journal] from the given components after aligning the MMR with the journal.
130    pub async fn from_components(
131        mmr: CleanMmr<E, H::Digest>,
132        journal: C,
133        mut hasher: StandardHasher<H>,
134        apply_batch_size: u64,
135    ) -> Result<Self, Error> {
136        let mut mmr = Self::align(mmr, &journal, &mut hasher, apply_batch_size).await?;
137
138        // Sync the MMR to disk to avoid having to repeat any recovery that may have been performed
139        // on next startup.
140        mmr.sync().await?;
141
142        Ok(Self {
143            mmr,
144            journal,
145            hasher,
146        })
147    }
148
149    /// Align `mmr` to be consistent with `journal`. Any items in `mmr` that aren't in `journal` are
150    /// popped, and any items in `journal` that aren't in `mmr` are added to `mmr`. Items are added
151    /// to `mmr` in batches of size `apply_batch_size` to avoid memory bloat.
152    async fn align(
153        mut mmr: CleanMmr<E, H::Digest>,
154        journal: &C,
155        hasher: &mut StandardHasher<H>,
156        apply_batch_size: u64,
157    ) -> Result<CleanMmr<E, H::Digest>, Error> {
158        // Pop any MMR elements that are ahead of the journal.
159        // Note mmr_size is the size of the MMR in leaves, not positions.
160        let journal_size = journal.size();
161        let mut mmr_size = mmr.leaves();
162        if mmr_size > journal_size {
163            let pop_count = mmr_size - journal_size;
164            warn!(journal_size, ?pop_count, "popping MMR items");
165            mmr.pop(hasher, *pop_count as usize).await?;
166            mmr_size = Location::new_unchecked(journal_size);
167        }
168
169        // If the MMR is behind, replay journal items to catch up.
170        if mmr_size < journal_size {
171            let replay_count = journal_size - *mmr_size;
172            warn!(
173                journal_size,
174                replay_count, "MMR lags behind journal, replaying journal to catch up"
175            );
176
177            let mut mmr = mmr.into_dirty();
178            let mut batch_size = 0;
179            while mmr_size < journal_size {
180                let op = journal.read(*mmr_size).await?;
181                mmr.add(hasher, &op.encode()).await?;
182                mmr_size += 1;
183                batch_size += 1;
184                if batch_size >= apply_batch_size {
185                    mmr = mmr.merkleize(hasher).into_dirty();
186                    batch_size = 0;
187                }
188            }
189            return Ok(mmr.merkleize(hasher));
190        }
191
192        // At this point the MMR and journal should be consistent.
193        assert_eq!(journal.size(), mmr.leaves());
194
195        Ok(mmr)
196    }
197
198    /// Prune both the MMR and journal to the given location.
199    ///
200    /// # Returns
201    /// The new pruning boundary, which may be less than the requested `prune_loc`.
202    pub async fn prune(&mut self, prune_loc: Location) -> Result<Location, Error> {
203        if self.mmr.size() == 0 {
204            // DB is empty, nothing to prune.
205            return Ok(self.pruning_boundary());
206        }
207
208        // Sync the mmr before pruning the journal, otherwise the MMR tip could end up behind the
209        // journal's pruning boundary on restart from an unclean shutdown, and there would be no way
210        // to replay the items between the MMR tip and the journal pruning boundary.
211        self.mmr.sync().await?;
212
213        // Prune the journal and check if anything was actually pruned
214        if !self.journal.prune(*prune_loc).await? {
215            return Ok(self.pruning_boundary());
216        }
217
218        let pruning_boundary = self.pruning_boundary();
219        let size = self.size();
220        debug!(?size, ?prune_loc, ?pruning_boundary, "pruned inactive ops");
221
222        // Prune MMR to match the journal's actual boundary
223        self.mmr
224            .prune_to_pos(Position::try_from(pruning_boundary)?)
225            .await?;
226
227        Ok(pruning_boundary)
228    }
229}
230
231impl<E, C, H> Journal<E, C, H, Clean<H::Digest>>
232where
233    E: Storage + Clock + Metrics,
234    C: Contiguous<Item: Encode>,
235    H: Hasher,
236{
237    /// Generate a proof of inclusion for items starting at `start_loc`.
238    ///
239    /// Returns a proof and the items corresponding to the leaves in the range `start_loc..end_loc`,
240    /// where `end_loc` is the minimum of the current item count and `start_loc + max_ops`.
241    ///
242    /// # Errors
243    ///
244    /// - Returns [Error::Mmr] with [crate::mmr::Error::LocationOverflow] if `start_loc` >
245    ///   [crate::mmr::MAX_LOCATION].
246    /// - Returns [Error::Mmr] with [crate::mmr::Error::RangeOutOfBounds] if `start_loc` >= current
247    ///   item count.
248    /// - Returns [Error::Journal] with [crate::journal::Error::ItemPruned] if `start_loc` has been
249    ///   pruned.
250    pub async fn proof(
251        &self,
252        start_loc: Location,
253        max_ops: NonZeroU64,
254    ) -> Result<(Proof<H::Digest>, Vec<C::Item>), Error> {
255        self.historical_proof(self.size(), start_loc, max_ops).await
256    }
257
258    /// Generate a historical proof with respect to the state of the MMR when it had
259    /// `historical_size` items.
260    ///
261    /// Returns a proof and the items corresponding to the leaves in the range `start_loc..end_loc`,
262    /// where `end_loc` is the minimum of `historical_size` and `start_loc + max_ops`.
263    ///
264    /// # Errors
265    ///
266    /// - Returns [Error::Mmr] with [crate::mmr::Error::LocationOverflow] if `historical_size` or
267    ///   `start_loc` > [crate::mmr::MAX_LOCATION].
268    /// - Returns [Error::Mmr] with [crate::mmr::Error::RangeOutOfBounds] if `start_loc` >=
269    ///   `historical_size` or `historical_size` > number of items in the journal.
270    /// - Returns [Error::Journal] with [crate::journal::Error::ItemPruned] if `start_loc` has been
271    ///   pruned.
272    pub async fn historical_proof(
273        &self,
274        historical_size: Location,
275        start_loc: Location,
276        max_ops: NonZeroU64,
277    ) -> Result<(Proof<H::Digest>, Vec<C::Item>), Error> {
278        let size = self.size();
279        if historical_size > size {
280            return Err(crate::mmr::Error::RangeOutOfBounds(size).into());
281        }
282        if start_loc >= historical_size {
283            return Err(crate::mmr::Error::RangeOutOfBounds(start_loc).into());
284        }
285        let end_loc = std::cmp::min(historical_size, start_loc.saturating_add(max_ops.get()));
286
287        let mmr_size = Position::try_from(historical_size)?;
288        let proof = self
289            .mmr
290            .historical_range_proof(mmr_size, start_loc..end_loc)
291            .await?;
292
293        let mut ops = Vec::with_capacity((*end_loc - *start_loc) as usize);
294        let futures = (*start_loc..*end_loc)
295            .map(|i| self.journal.read(i))
296            .collect::<Vec<_>>();
297        try_join_all(futures)
298            .await?
299            .into_iter()
300            .for_each(|op| ops.push(op));
301
302        Ok((proof, ops))
303    }
304
305    /// Return the root of the MMR.
306    pub const fn root(&self) -> H::Digest {
307        self.mmr.root()
308    }
309
310    /// Convert this journal into its dirty counterpart for batched updates.
311    pub fn into_dirty(self) -> Journal<E, C, H, Dirty> {
312        Journal {
313            mmr: self.mmr.into_dirty(),
314            journal: self.journal,
315            hasher: self.hasher,
316        }
317    }
318}
319
320impl<E, C, H> Journal<E, C, H, Clean<H::Digest>>
321where
322    E: Storage + Clock + Metrics,
323    C: PersistableContiguous<Item: Encode>,
324    H: Hasher,
325{
326    /// Close the authenticated journal, syncing all pending writes.
327    pub async fn close(self) -> Result<(), Error> {
328        try_join!(
329            self.journal.close().map_err(Error::Journal),
330            self.mmr.close().map_err(Error::Mmr),
331        )?;
332        Ok(())
333    }
334
335    /// Destroy the authenticated journal, removing all data from disk.
336    pub async fn destroy(self) -> Result<(), Error> {
337        try_join!(
338            self.journal.destroy().map_err(Error::Journal),
339            self.mmr.destroy().map_err(Error::Mmr),
340        )?;
341        Ok(())
342    }
343
344    /// Durably persist the journal, ensuring no recovery is required on startup.
345    pub async fn sync(&mut self) -> Result<(), Error> {
346        try_join!(
347            self.journal.sync().map_err(Error::Journal),
348            self.mmr.sync().map_err(Into::into)
349        )?;
350
351        Ok(())
352    }
353}
354
355impl<E, C, H> Journal<E, C, H, Dirty>
356where
357    E: Storage + Clock + Metrics,
358    C: Contiguous<Item: Encode>,
359    H: Hasher,
360{
361    /// Merkleize the journal and compute the root digest.
362    pub fn merkleize(self) -> Journal<E, C, H, Clean<H::Digest>> {
363        let Self {
364            mmr,
365            journal,
366            mut hasher,
367        } = self;
368        Journal {
369            mmr: mmr.merkleize(&mut hasher),
370            journal,
371            hasher,
372        }
373    }
374}
375
376impl<E, C, H> Journal<E, C, H, Dirty>
377where
378    E: Storage + Clock + Metrics,
379    C: MutableContiguous<Item: Encode>,
380    H: Hasher,
381{
382    /// Create a new dirty journal from aligned components.
383    pub async fn from_components(
384        mmr: CleanMmr<E, H::Digest>,
385        journal: C,
386        hasher: StandardHasher<H>,
387        apply_batch_size: u64,
388    ) -> Result<Self, Error> {
389        let clean = Journal::<E, C, H, Clean<H::Digest>>::from_components(
390            mmr,
391            journal,
392            hasher,
393            apply_batch_size,
394        )
395        .await?;
396        Ok(clean.into_dirty())
397    }
398}
399
400/// The number of items to apply to the MMR in a single batch.
401const APPLY_BATCH_SIZE: u64 = 1 << 16;
402
403impl<E, O, H> Journal<E, fixed::Journal<E, O>, H, Clean<H::Digest>>
404where
405    E: Storage + Clock + Metrics,
406    O: CodecFixed<Cfg = ()>,
407    H: Hasher,
408{
409    /// Create a new [Journal] for fixed-length items.
410    ///
411    /// The journal will be rewound to the last item that matches the `rewind_predicate` on
412    /// initialization.
413    pub async fn new(
414        context: E,
415        mmr_cfg: crate::mmr::journaled::Config,
416        journal_cfg: fixed::Config,
417        rewind_predicate: fn(&O) -> bool,
418    ) -> Result<Self, Error> {
419        let mut journal = fixed::Journal::init(context.with_label("journal"), journal_cfg).await?;
420
421        // Rewind journal to last matching item.
422        journal.rewind_to(rewind_predicate).await?;
423
424        // Align the MMR and journal.
425        let mut hasher = StandardHasher::<H>::new();
426        let mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_cfg).await?;
427        let mut mmr = Self::align(mmr, &journal, &mut hasher, APPLY_BATCH_SIZE).await?;
428
429        // Sync the journal and MMR to disk to avoid having to repeat any recovery that may have
430        // been performed on next startup.
431        journal.sync().await?;
432        mmr.sync().await?;
433
434        Ok(Self {
435            mmr,
436            journal,
437            hasher,
438        })
439    }
440}
441
442impl<E, O, H> Journal<E, variable::Journal<E, O>, H, Clean<H::Digest>>
443where
444    E: Storage + Clock + Metrics,
445    O: Codec + Encode,
446    H: Hasher,
447{
448    /// Create a new [Journal] for variable-length items.
449    ///
450    /// The journal will be rewound to the last item that matches the `rewind_predicate` on
451    /// initialization.
452    pub async fn new(
453        context: E,
454        mmr_cfg: crate::mmr::journaled::Config,
455        journal_cfg: variable::Config<O::Cfg>,
456        rewind_predicate: fn(&O) -> bool,
457    ) -> Result<Self, Error> {
458        let mut hasher = StandardHasher::<H>::new();
459        let mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_cfg).await?;
460        let mut journal =
461            variable::Journal::init(context.with_label("journal"), journal_cfg).await?;
462
463        // Rewind to last matching item.
464        journal.rewind_to(rewind_predicate).await?;
465
466        // Align the MMR and journal.
467        let mut mmr = Self::align(mmr, &journal, &mut hasher, APPLY_BATCH_SIZE).await?;
468
469        // Sync the journal and MMR to disk to avoid having to repeat any recovery that may have
470        // been performed on next startup.
471        journal.sync().await?;
472        mmr.sync().await?;
473
474        Ok(Self {
475            mmr,
476            journal,
477            hasher,
478        })
479    }
480}
481
482impl<E, C, H, S> Contiguous for Journal<E, C, H, S>
483where
484    E: Storage + Clock + Metrics,
485    C: MutableContiguous<Item: Encode>,
486    H: Hasher,
487    S: State<DigestOf<H>>,
488{
489    type Item = C::Item;
490
491    fn size(&self) -> u64 {
492        self.journal.size()
493    }
494
495    fn oldest_retained_pos(&self) -> Option<u64> {
496        self.journal.oldest_retained_pos()
497    }
498
499    fn pruning_boundary(&self) -> u64 {
500        self.journal.pruning_boundary()
501    }
502
503    async fn replay(
504        &self,
505        start_pos: u64,
506        buffer: NonZeroUsize,
507    ) -> Result<
508        impl futures::Stream<Item = Result<(u64, Self::Item), JournalError>> + '_,
509        JournalError,
510    > {
511        self.journal.replay(start_pos, buffer).await
512    }
513
514    async fn read(&self, position: u64) -> Result<Self::Item, JournalError> {
515        self.journal.read(position).await
516    }
517}
518
519impl<E, C, H> MutableContiguous for Journal<E, C, H, Dirty>
520where
521    E: Storage + Clock + Metrics,
522    C: MutableContiguous<Item: Encode>,
523    H: Hasher,
524{
525    async fn append(&mut self, item: Self::Item) -> Result<u64, JournalError> {
526        let res = self.append(item).await.map_err(|e| match e {
527            Error::Journal(inner) => inner,
528            Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
529        })?;
530
531        Ok(*res)
532    }
533
534    async fn prune(&mut self, min_position: u64) -> Result<bool, JournalError> {
535        self.journal.prune(min_position).await
536    }
537
538    async fn rewind(&mut self, size: u64) -> Result<(), JournalError> {
539        self.journal.rewind(size).await?;
540
541        let leaves = *self.mmr.leaves();
542        if leaves > size {
543            self.mmr
544                .pop((leaves - size) as usize)
545                .await
546                .map_err(|error| JournalError::Mmr(anyhow::Error::from(error)))?;
547        }
548
549        Ok(())
550    }
551}
552
553impl<E, C, H> MutableContiguous for Journal<E, C, H, Clean<H::Digest>>
554where
555    E: Storage + Clock + Metrics,
556    C: MutableContiguous<Item: Encode>,
557    H: Hasher,
558{
559    async fn append(&mut self, item: Self::Item) -> Result<u64, JournalError> {
560        let loc = self.append(item).await.map_err(|e| match e {
561            Error::Journal(inner) => inner,
562            Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
563        })?;
564
565        Ok(*loc)
566    }
567
568    async fn prune(&mut self, min_position: u64) -> Result<bool, JournalError> {
569        let old_pruning_boundary = self.pruning_boundary();
570        let pruning_boundary = self
571            .prune(Location::new_unchecked(min_position))
572            .await
573            .map_err(|e| match e {
574                Error::Journal(inner) => inner,
575                Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
576            })?;
577
578        Ok(old_pruning_boundary != pruning_boundary)
579    }
580
581    async fn rewind(&mut self, size: u64) -> Result<(), JournalError> {
582        self.journal.rewind(size).await?;
583
584        let leaves = *self.mmr.leaves();
585        if leaves > size {
586            self.mmr
587                .pop(&mut self.hasher, (leaves - size) as usize)
588                .await
589                .map_err(|error| JournalError::Mmr(anyhow::Error::from(error)))?;
590        }
591
592        Ok(())
593    }
594}
595
596impl<E, C, H> PersistableContiguous for Journal<E, C, H, Clean<H::Digest>>
597where
598    E: Storage + Clock + Metrics,
599    C: PersistableContiguous<Item: Encode>,
600    H: Hasher,
601{
602    async fn commit(&mut self) -> Result<(), JournalError> {
603        self.commit().await.map_err(|e| match e {
604            Error::Journal(inner) => inner,
605            Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
606        })
607    }
608
609    async fn sync(&mut self) -> Result<(), JournalError> {
610        self.sync().await.map_err(|e| match e {
611            Error::Journal(inner) => inner,
612            Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
613        })
614    }
615
616    async fn close(self) -> Result<(), JournalError> {
617        self.close().await.map_err(|e| match e {
618            Error::Journal(inner) => inner,
619            Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
620        })
621    }
622
623    async fn destroy(self) -> Result<(), JournalError> {
624        self.destroy().await.map_err(|e| match e {
625            Error::Journal(inner) => inner,
626            Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
627        })
628    }
629}
630
631#[cfg(test)]
632mod tests {
633    use super::*;
634    use crate::{
635        journal::contiguous::fixed::{Config as JConfig, Journal as ContiguousJournal},
636        mmr::{
637            journaled::{Config as MmrConfig, Mmr},
638            Location,
639        },
640        qmdb::{
641            any::unordered::{fixed::Operation, Update},
642            operation::Committable,
643        },
644    };
645    use commonware_codec::Encode;
646    use commonware_cryptography::{sha256, sha256::Digest, Sha256};
647    use commonware_macros::test_traced;
648    use commonware_runtime::{
649        buffer::PoolRef,
650        deterministic::{self, Context},
651        Runner as _,
652    };
653    use commonware_utils::{NZUsize, NZU64};
654    use futures::StreamExt as _;
655
656    const PAGE_SIZE: usize = 101;
657    const PAGE_CACHE_SIZE: usize = 11;
658
659    /// Create MMR configuration for tests.
660    fn mmr_config(suffix: &str) -> MmrConfig {
661        MmrConfig {
662            journal_partition: format!("mmr_journal_{suffix}"),
663            metadata_partition: format!("mmr_metadata_{suffix}"),
664            items_per_blob: NZU64!(11),
665            write_buffer: NZUsize!(1024),
666            thread_pool: None,
667            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
668        }
669    }
670
671    /// Create journal configuration for tests.
672    fn journal_config(suffix: &str) -> JConfig {
673        JConfig {
674            partition: format!("journal_{suffix}"),
675            items_per_blob: NZU64!(7),
676            write_buffer: NZUsize!(1024),
677            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
678        }
679    }
680
681    type AuthenticatedJournal = Journal<
682        deterministic::Context,
683        ContiguousJournal<deterministic::Context, Operation<Digest, Digest>>,
684        Sha256,
685        Clean<sha256::Digest>,
686    >;
687
688    /// Create a new empty authenticated journal.
689    async fn create_empty_journal(context: Context, suffix: &str) -> AuthenticatedJournal {
690        AuthenticatedJournal::new(
691            context,
692            mmr_config(suffix),
693            journal_config(suffix),
694            |op: &Operation<Digest, Digest>| op.is_commit(),
695        )
696        .await
697        .unwrap()
698    }
699
700    /// Create a test operation with predictable values based on index.
701    fn create_operation(index: u8) -> Operation<Digest, Digest> {
702        Operation::Update(Update(
703            Sha256::fill(index),
704            Sha256::fill(index.wrapping_add(1)),
705        ))
706    }
707
708    /// Create an authenticated journal with N committed operations.
709    ///
710    /// Operations are added and then synced to ensure they are committed.
711    async fn create_journal_with_ops(
712        context: Context,
713        suffix: &str,
714        count: usize,
715    ) -> AuthenticatedJournal {
716        let mut journal = create_empty_journal(context, suffix).await;
717
718        for i in 0..count {
719            let op = create_operation(i as u8);
720            let loc = journal.append(op).await.unwrap();
721            assert_eq!(loc, Location::new_unchecked(i as u64));
722        }
723
724        journal.sync().await.unwrap();
725        journal
726    }
727
728    /// Create separate MMR and journal components for testing alignment.
729    ///
730    /// These components are created independently and can be manipulated separately to test
731    /// scenarios where the MMR and journal are out of sync (e.g., one ahead of the other).
732    async fn create_components(
733        context: Context,
734        suffix: &str,
735    ) -> (
736        CleanMmr<deterministic::Context, sha256::Digest>,
737        ContiguousJournal<deterministic::Context, Operation<Digest, Digest>>,
738        StandardHasher<Sha256>,
739    ) {
740        let mut hasher = StandardHasher::new();
741        let mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_config(suffix))
742            .await
743            .unwrap();
744        let journal =
745            ContiguousJournal::init(context.with_label("journal"), journal_config(suffix))
746                .await
747                .unwrap();
748        (mmr, journal, hasher)
749    }
750
751    /// Verify that a proof correctly proves the given operations are included in the MMR.
752    fn verify_proof(
753        proof: &crate::mmr::Proof<<Sha256 as commonware_cryptography::Hasher>::Digest>,
754        operations: &[Operation<Digest, Digest>],
755        start_loc: Location,
756        root: &<Sha256 as commonware_cryptography::Hasher>::Digest,
757        hasher: &mut StandardHasher<Sha256>,
758    ) -> bool {
759        let encoded_ops: Vec<_> = operations.iter().map(|op| op.encode()).collect();
760        proof.verify_range_inclusion(hasher, &encoded_ops, start_loc, root)
761    }
762
763    /// Verify that new() creates an empty authenticated journal.
764    #[test_traced("INFO")]
765    fn test_new_creates_empty_journal() {
766        let executor = deterministic::Runner::default();
767        executor.start(|context| async move {
768            let journal = create_empty_journal(context, "new_empty").await;
769
770            assert_eq!(journal.size(), 0);
771            assert_eq!(journal.pruning_boundary(), 0);
772            assert_eq!(journal.oldest_retained_pos(), None);
773        });
774    }
775
776    /// Verify that align() correctly handles empty MMR and journal components.
777    #[test_traced("INFO")]
778    fn test_align_with_empty_mmr_and_journal() {
779        let executor = deterministic::Runner::default();
780        executor.start(|context| async move {
781            let (mmr, journal, mut hasher) = create_components(context, "align_empty").await;
782
783            let mmr = Journal::align(mmr, &journal, &mut hasher, APPLY_BATCH_SIZE)
784                .await
785                .unwrap();
786
787            assert_eq!(mmr.leaves(), Location::new_unchecked(0));
788            assert_eq!(journal.size(), Location::new_unchecked(0));
789        });
790    }
791
792    /// Verify that align() pops MMR elements when MMR is ahead of the journal.
793    #[test_traced("WARN")]
794    fn test_align_when_mmr_ahead() {
795        let executor = deterministic::Runner::default();
796        executor.start(|context| async move {
797            let (mut mmr, mut journal, mut hasher) = create_components(context, "mmr_ahead").await;
798
799            // Add 20 operations to both MMR and journal
800            for i in 0..20 {
801                let op = create_operation(i as u8);
802                let encoded = op.encode();
803                mmr.add(&mut hasher, &encoded).await.unwrap();
804                journal.append(op).await.unwrap();
805            }
806
807            // Add commit operation to journal only (making journal ahead)
808            let commit_op = Operation::CommitFloor(None, Location::new_unchecked(0));
809            journal.append(commit_op).await.unwrap();
810            journal.sync().await.unwrap();
811
812            // MMR has 20 leaves, journal has 21 operations (20 ops + 1 commit)
813            let mmr = Journal::align(mmr, &journal, &mut hasher, APPLY_BATCH_SIZE)
814                .await
815                .unwrap();
816
817            // MMR should have been popped to match journal
818            assert_eq!(mmr.leaves(), Location::new_unchecked(21));
819            assert_eq!(journal.size(), Location::new_unchecked(21));
820        });
821    }
822
823    /// Verify that align() replays journal operations when journal is ahead of MMR.
824    #[test_traced("WARN")]
825    fn test_align_when_journal_ahead() {
826        let executor = deterministic::Runner::default();
827        executor.start(|context| async move {
828            let (mut mmr, mut journal, mut hasher) =
829                create_components(context, "journal_ahead").await;
830
831            // Add 20 operations to journal only
832            for i in 0..20 {
833                let op = create_operation(i as u8);
834                journal.append(op).await.unwrap();
835            }
836
837            // Add commit
838            let commit_op = Operation::CommitFloor(None, Location::new_unchecked(0));
839            journal.append(commit_op).await.unwrap();
840            journal.sync().await.unwrap();
841
842            // Journal has 21 operations, MMR has 0 leaves
843            mmr = Journal::align(mmr, &journal, &mut hasher, APPLY_BATCH_SIZE)
844                .await
845                .unwrap();
846
847            // MMR should have been replayed to match journal
848            assert_eq!(mmr.leaves(), Location::new_unchecked(21));
849            assert_eq!(journal.size(), Location::new_unchecked(21));
850        });
851    }
852
853    /// Verify that align() discards uncommitted operations.
854    #[test_traced("INFO")]
855    fn test_align_with_mismatched_committed_ops() {
856        let executor = deterministic::Runner::default();
857        executor.start(|context| async move {
858            let mut journal = create_empty_journal(context.clone(), "mismatched").await;
859
860            // Add 20 uncommitted operations
861            for i in 0..20 {
862                let loc = journal.append(create_operation(i as u8)).await.unwrap();
863                assert_eq!(loc, Location::new_unchecked(i as u64));
864            }
865
866            // Don't sync - these are uncommitted
867            // After alignment, they should be discarded
868            let size_before = journal.size();
869            assert_eq!(size_before, 20);
870
871            // Close and recreate to simulate restart (which calls align internally)
872            journal.close().await.unwrap();
873            let journal = create_empty_journal(context, "mismatched").await;
874
875            // Uncommitted operations should be gone
876            assert_eq!(journal.size(), 0);
877        });
878    }
879
880    #[test_traced("INFO")]
881    fn test_rewind() {
882        let executor = deterministic::Runner::default();
883        executor.start(|context| async move {
884            // Test 1: Matching operation is kept
885            {
886                let mut journal = ContiguousJournal::init(
887                    context.with_label("rewind_match"),
888                    journal_config("rewind_match"),
889                )
890                .await
891                .unwrap();
892
893                // Add operations where operation 3 is a commit
894                for i in 0..3 {
895                    journal.append(create_operation(i)).await.unwrap();
896                }
897                journal
898                    .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
899                    .await
900                    .unwrap();
901                for i in 4..7 {
902                    journal.append(create_operation(i)).await.unwrap();
903                }
904
905                // Rewind to last commit
906                let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
907                assert_eq!(final_size, 4);
908                assert_eq!(journal.size(), 4);
909
910                // Verify the commit operation is still there
911                let op = journal.read(3).await.unwrap();
912                assert!(op.is_commit());
913            }
914
915            // Test 2: Last matching operation is chosen when multiple match
916            {
917                let mut journal = ContiguousJournal::init(
918                    context.with_label("rewind_multiple"),
919                    journal_config("rewind_multiple"),
920                )
921                .await
922                .unwrap();
923
924                // Add multiple commits
925                journal.append(create_operation(0)).await.unwrap();
926                journal
927                    .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
928                    .await
929                    .unwrap(); // pos 1
930                journal.append(create_operation(2)).await.unwrap();
931                journal
932                    .append(Operation::CommitFloor(None, Location::new_unchecked(1)))
933                    .await
934                    .unwrap(); // pos 3
935                journal.append(create_operation(4)).await.unwrap();
936
937                // Should rewind to last commit (pos 3)
938                let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
939                assert_eq!(final_size, 4);
940
941                // Verify the last commit is still there
942                let op = journal.read(3).await.unwrap();
943                assert!(op.is_commit());
944
945                // Verify we can't read pos 4
946                assert!(journal.read(4).await.is_err());
947            }
948
949            // Test 3: Rewind to pruning boundary when no match
950            {
951                let mut journal = ContiguousJournal::init(
952                    context.with_label("rewind_no_match"),
953                    journal_config("rewind_no_match"),
954                )
955                .await
956                .unwrap();
957
958                // Add operations with no commits
959                for i in 0..10 {
960                    journal.append(create_operation(i)).await.unwrap();
961                }
962
963                // Rewind should go to pruning boundary (0 for unpruned)
964                let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
965                assert_eq!(final_size, 0, "Should rewind to pruning boundary (0)");
966                assert_eq!(journal.size(), 0);
967            }
968
969            // Test 4: Rewind with existing pruning boundary
970            {
971                let mut journal = ContiguousJournal::init(
972                    context.with_label("rewind_with_pruning"),
973                    journal_config("rewind_with_pruning"),
974                )
975                .await
976                .unwrap();
977
978                // Add operations and a commit at position 10 (past first section boundary of 7)
979                for i in 0..10 {
980                    journal.append(create_operation(i)).await.unwrap();
981                }
982                journal
983                    .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
984                    .await
985                    .unwrap(); // pos 10
986                for i in 11..15 {
987                    journal.append(create_operation(i)).await.unwrap();
988                }
989                journal.sync().await.unwrap();
990
991                // Prune up to position 8 (this will prune section 0, items 0-6, keeping 7+)
992                journal.prune(8).await.unwrap();
993                let oldest = journal.oldest_retained_pos();
994                assert_eq!(oldest, Some(7));
995
996                // Add more uncommitted operations
997                for i in 15..20 {
998                    journal.append(create_operation(i)).await.unwrap();
999                }
1000
1001                // Rewind should keep the commit at position 10
1002                let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1003                assert_eq!(final_size, 11);
1004
1005                // Verify commit is still there
1006                let op = journal.read(10).await.unwrap();
1007                assert!(op.is_commit());
1008            }
1009
1010            // Test 5: Rewind with no matches after pruning boundary
1011            {
1012                let mut journal = ContiguousJournal::init(
1013                    context.with_label("rewind_no_match_pruned"),
1014                    journal_config("rewind_no_match_pruned"),
1015                )
1016                .await
1017                .unwrap();
1018
1019                // Add operations with a commit at position 5 (in section 0: 0-6)
1020                for i in 0..5 {
1021                    journal.append(create_operation(i)).await.unwrap();
1022                }
1023                journal
1024                    .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
1025                    .await
1026                    .unwrap(); // pos 5
1027                for i in 6..10 {
1028                    journal.append(create_operation(i)).await.unwrap();
1029                }
1030                journal.sync().await.unwrap();
1031
1032                // Prune up to position 8 (this prunes section 0, including the commit at pos 5)
1033                // Pruning boundary will be at position 7 (start of section 1)
1034                journal.prune(8).await.unwrap();
1035                let oldest = journal.oldest_retained_pos();
1036                assert_eq!(oldest, Some(7));
1037
1038                // Add uncommitted operations with no commits (in section 1: 7-13)
1039                for i in 10..14 {
1040                    journal.append(create_operation(i)).await.unwrap();
1041                }
1042
1043                // Rewind with no matching commits after the pruning boundary
1044                // Should rewind to the pruning boundary at position 7
1045                let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1046                assert_eq!(final_size, 7);
1047            }
1048
1049            // Test 6: Empty journal
1050            {
1051                let mut journal = ContiguousJournal::init(
1052                    context.with_label("rewind_empty"),
1053                    journal_config("rewind_empty"),
1054                )
1055                .await
1056                .unwrap();
1057
1058                // Rewind empty journal should be no-op
1059                let final_size = journal
1060                    .rewind_to(|op: &Operation<Digest, Digest>| op.is_commit())
1061                    .await
1062                    .unwrap();
1063                assert_eq!(final_size, 0);
1064                assert_eq!(journal.size(), 0);
1065            }
1066
1067            // Test 7: Position based authenticated journal rewind.
1068            {
1069                let mut journal = AuthenticatedJournal::new(
1070                    context,
1071                    mmr_config("rewind"),
1072                    journal_config("rewind"),
1073                    |op| op.is_commit(),
1074                )
1075                .await
1076                .unwrap();
1077
1078                // Add operations with a commit at position 5 (in section 0: 0-6)
1079                for i in 0..5 {
1080                    journal.append(create_operation(i)).await.unwrap();
1081                }
1082                journal
1083                    .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
1084                    .await
1085                    .unwrap(); // pos 5
1086                for i in 6..10 {
1087                    journal.append(create_operation(i)).await.unwrap();
1088                }
1089                assert_eq!(journal.size(), 10);
1090
1091                journal.rewind(2).await.unwrap();
1092                assert_eq!(journal.size(), 2);
1093                assert_eq!(journal.mmr.leaves(), 2);
1094                assert_eq!(journal.mmr.size(), 3);
1095                assert_eq!(journal.pruning_boundary(), 0);
1096                assert_eq!(journal.oldest_retained_pos(), Some(0));
1097
1098                assert!(matches!(
1099                    journal.rewind(3).await,
1100                    Err(JournalError::InvalidRewind(_))
1101                ));
1102
1103                journal.rewind(0).await.unwrap();
1104                assert_eq!(journal.size(), 0);
1105                assert_eq!(journal.mmr.leaves(), 0);
1106                assert_eq!(journal.mmr.size(), 0);
1107                assert_eq!(journal.pruning_boundary(), 0);
1108                assert_eq!(journal.oldest_retained_pos(), None);
1109
1110                // Test rewinding after pruning.
1111                for i in 0..255 {
1112                    journal.append(create_operation(i)).await.unwrap();
1113                }
1114                MutableContiguous::prune(&mut journal, 100).await.unwrap();
1115                assert_eq!(journal.pruning_boundary(), 98);
1116                let res = journal.rewind(97).await;
1117                assert!(matches!(res, Err(JournalError::InvalidRewind(97))));
1118                journal.rewind(98).await.unwrap();
1119                assert_eq!(journal.size(), 98);
1120                assert_eq!(journal.mmr.leaves(), 98);
1121                assert_eq!(journal.pruning_boundary(), 98);
1122                assert_eq!(journal.oldest_retained_pos(), None);
1123            }
1124        });
1125    }
1126
1127    /// Verify that append() increments the operation count, returns correct locations, and
1128    /// operations can be read back correctly.
1129    #[test_traced("INFO")]
1130    fn test_apply_op_and_read_operations() {
1131        let executor = deterministic::Runner::default();
1132        executor.start(|context| async move {
1133            let mut journal = create_empty_journal(context, "apply_op").await;
1134
1135            assert_eq!(journal.size(), 0);
1136
1137            // Add 50 operations
1138            let expected_ops: Vec<_> = (0..50).map(|i| create_operation(i as u8)).collect();
1139            for (i, op) in expected_ops.iter().enumerate() {
1140                let loc = journal.append(op.clone()).await.unwrap();
1141                assert_eq!(loc, Location::new_unchecked(i as u64));
1142                assert_eq!(journal.size(), (i + 1) as u64);
1143            }
1144
1145            assert_eq!(journal.size(), 50);
1146
1147            // Verify all operations can be read back correctly
1148            journal.sync().await.unwrap();
1149            for (i, expected_op) in expected_ops.iter().enumerate() {
1150                let read_op = journal
1151                    .read(Location::new_unchecked(i as u64))
1152                    .await
1153                    .unwrap();
1154                assert_eq!(read_op, *expected_op);
1155            }
1156        });
1157    }
1158
1159    /// Verify that read() returns correct operations at various positions.
1160    #[test_traced("INFO")]
1161    fn test_read_operations_at_various_positions() {
1162        let executor = deterministic::Runner::default();
1163        executor.start(|context| async move {
1164            let journal = create_journal_with_ops(context, "read", 50).await;
1165
1166            // Verify reading first operation
1167            let first_op = journal.read(Location::new_unchecked(0)).await.unwrap();
1168            assert_eq!(first_op, create_operation(0));
1169
1170            // Verify reading middle operation
1171            let middle_op = journal.read(Location::new_unchecked(25)).await.unwrap();
1172            assert_eq!(middle_op, create_operation(25));
1173
1174            // Verify reading last operation
1175            let last_op = journal.read(Location::new_unchecked(49)).await.unwrap();
1176            assert_eq!(last_op, create_operation(49));
1177
1178            // Verify all operations match expected values
1179            for i in 0..50 {
1180                let op = journal.read(Location::new_unchecked(i)).await.unwrap();
1181                assert_eq!(op, create_operation(i as u8));
1182            }
1183        });
1184    }
1185
1186    /// Verify that read() returns an error for pruned operations.
1187    #[test_traced("INFO")]
1188    fn test_read_pruned_operation_returns_error() {
1189        let executor = deterministic::Runner::default();
1190        executor.start(|context| async move {
1191            let mut journal = create_journal_with_ops(context, "read_pruned", 100).await;
1192
1193            // Add commit and prune
1194            journal
1195                .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1196                .await
1197                .unwrap();
1198            journal.sync().await.unwrap();
1199            let pruned_boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1200
1201            // Try to read an operation before the pruned boundary
1202            let read_loc = Location::new_unchecked(0);
1203            if read_loc < pruned_boundary {
1204                let result = journal.read(read_loc).await;
1205                assert!(matches!(
1206                    result,
1207                    Err(Error::Journal(crate::journal::Error::ItemPruned(_)))
1208                ));
1209            }
1210        });
1211    }
1212
1213    /// Verify that read() returns an error for out-of-range locations.
1214    #[test_traced("INFO")]
1215    fn test_read_out_of_range_returns_error() {
1216        let executor = deterministic::Runner::default();
1217        executor.start(|context| async move {
1218            let journal = create_journal_with_ops(context, "read_oob", 3).await;
1219
1220            // Try to read beyond the end
1221            let result = journal.read(Location::new_unchecked(10)).await;
1222            assert!(matches!(
1223                result,
1224                Err(Error::Journal(crate::journal::Error::ItemOutOfRange(_)))
1225            ));
1226        });
1227    }
1228
1229    /// Verify that we can read all operations back correctly.
1230    #[test_traced("INFO")]
1231    fn test_read_all_operations_back_correctly() {
1232        let executor = deterministic::Runner::default();
1233        executor.start(|context| async move {
1234            let journal = create_journal_with_ops(context, "read_all", 50).await;
1235
1236            assert_eq!(journal.size(), 50);
1237
1238            // Verify all operations can be read back and match expected values
1239            for i in 0..50 {
1240                let op = journal.read(Location::new_unchecked(i)).await.unwrap();
1241                assert_eq!(op, create_operation(i as u8));
1242            }
1243        });
1244    }
1245
1246    /// Verify that close() syncs pending operations.
1247    #[test_traced("INFO")]
1248    fn test_close_with_pending_operations() {
1249        let executor = deterministic::Runner::default();
1250        executor.start(|context| async move {
1251            let mut journal = create_empty_journal(context.clone(), "close_pending").await;
1252
1253            // Add 20 operations
1254            let expected_ops: Vec<_> = (0..20).map(|i| create_operation(i as u8)).collect();
1255            for (i, op) in expected_ops.iter().enumerate() {
1256                let loc = journal.append(op.clone()).await.unwrap();
1257                assert_eq!(loc, Location::new_unchecked(i as u64),);
1258            }
1259
1260            // Add commit operation to commit the operations
1261            let commit_loc = journal
1262                .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
1263                .await
1264                .unwrap();
1265            assert_eq!(
1266                commit_loc,
1267                Location::new_unchecked(20),
1268                "commit should be at location 20"
1269            );
1270            journal.close().await.unwrap();
1271
1272            // Reopen and verify the operations persisted
1273            let journal = create_empty_journal(context, "close_pending").await;
1274            assert_eq!(journal.size(), 21);
1275
1276            // Verify all operations can be read back
1277            for (i, expected_op) in expected_ops.iter().enumerate() {
1278                let read_op = journal
1279                    .read(Location::new_unchecked(i as u64))
1280                    .await
1281                    .unwrap();
1282                assert_eq!(read_op, *expected_op);
1283            }
1284        });
1285    }
1286
1287    /// Verify that pruning an empty journal returns the boundary.
1288    #[test_traced("INFO")]
1289    fn test_prune_empty_journal() {
1290        let executor = deterministic::Runner::default();
1291        executor.start(|context| async move {
1292            let mut journal = create_empty_journal(context, "prune_empty").await;
1293
1294            let boundary = journal.prune(Location::new_unchecked(0)).await.unwrap();
1295
1296            assert_eq!(boundary, Location::new_unchecked(0));
1297        });
1298    }
1299
1300    /// Verify that pruning to a specific location works correctly.
1301    #[test_traced("INFO")]
1302    fn test_prune_to_location() {
1303        let executor = deterministic::Runner::default();
1304        executor.start(|context| async move {
1305            let mut journal = create_journal_with_ops(context, "prune_to", 100).await;
1306
1307            // Add commit at position 50
1308            journal
1309                .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1310                .await
1311                .unwrap();
1312            journal.sync().await.unwrap();
1313
1314            let boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1315
1316            // Boundary should be <= requested location (may align to section boundary)
1317            assert!(boundary <= Location::new_unchecked(50));
1318        });
1319    }
1320
1321    /// Verify that prune() returns the actual boundary (which may differ from requested).
1322    #[test_traced("INFO")]
1323    fn test_prune_returns_actual_boundary() {
1324        let executor = deterministic::Runner::default();
1325        executor.start(|context| async move {
1326            let mut journal = create_journal_with_ops(context, "prune_boundary", 100).await;
1327
1328            journal
1329                .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1330                .await
1331                .unwrap();
1332            journal.sync().await.unwrap();
1333
1334            let requested = Location::new_unchecked(50);
1335            let actual = journal.prune(requested).await.unwrap();
1336
1337            // Actual boundary should match oldest_retained_loc
1338            let oldest = journal.oldest_retained_loc().unwrap();
1339            assert_eq!(actual, oldest);
1340
1341            // Actual may be <= requested due to section alignment
1342            assert!(actual <= requested);
1343        });
1344    }
1345
1346    /// Verify that pruning doesn't change the operation count.
1347    #[test_traced("INFO")]
1348    fn test_prune_preserves_operation_count() {
1349        let executor = deterministic::Runner::default();
1350        executor.start(|context| async move {
1351            let mut journal = create_journal_with_ops(context, "prune_count", 100).await;
1352
1353            journal
1354                .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1355                .await
1356                .unwrap();
1357            journal.sync().await.unwrap();
1358
1359            let count_before = journal.size();
1360            journal.prune(Location::new_unchecked(50)).await.unwrap();
1361            let count_after = journal.size();
1362
1363            assert_eq!(count_before, count_after);
1364        });
1365    }
1366
1367    /// Verify oldest_retained_loc() for empty journal, no pruning, and after pruning.
1368    #[test_traced("INFO")]
1369    fn test_oldest_retained_loc() {
1370        let executor = deterministic::Runner::default();
1371        executor.start(|context| async move {
1372            // Test empty journal
1373            let journal = create_empty_journal(context.clone(), "oldest").await;
1374            let oldest = journal.oldest_retained_loc();
1375            assert_eq!(oldest, None);
1376
1377            // Test no pruning
1378            let journal = create_journal_with_ops(context.clone(), "oldest", 100).await;
1379            let oldest = journal.oldest_retained_loc();
1380            assert_eq!(oldest, Some(Location::new_unchecked(0)));
1381
1382            // Test after pruning
1383            let mut journal = create_journal_with_ops(context, "oldest", 100).await;
1384            journal
1385                .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1386                .await
1387                .unwrap();
1388            journal.sync().await.unwrap();
1389
1390            let pruned_boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1391
1392            let oldest_loc = journal.oldest_retained_loc().unwrap();
1393            // Should match the pruned boundary (may be <= 50 due to section alignment)
1394            assert_eq!(oldest_loc, pruned_boundary);
1395            // Should be <= requested location (50)
1396            assert!(oldest_loc <= Location::new_unchecked(50));
1397        });
1398    }
1399
1400    /// Verify pruning_boundary() for empty journal, no pruning, and after pruning.
1401    #[test_traced("INFO")]
1402    fn test_pruning_boundary() {
1403        let executor = deterministic::Runner::default();
1404        executor.start(|context| async move {
1405            // Test empty journal
1406            let journal = create_empty_journal(context.clone(), "boundary").await;
1407            let boundary = journal.pruning_boundary();
1408            assert_eq!(boundary, Location::new_unchecked(0));
1409
1410            // Test no pruning
1411            let journal = create_journal_with_ops(context.clone(), "boundary", 100).await;
1412            let boundary = journal.pruning_boundary();
1413            assert_eq!(boundary, Location::new_unchecked(0));
1414
1415            // Test after pruning
1416            let mut journal = create_journal_with_ops(context, "boundary", 100).await;
1417            journal
1418                .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1419                .await
1420                .unwrap();
1421            journal.sync().await.unwrap();
1422
1423            let pruned_boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1424
1425            let boundary = journal.pruning_boundary();
1426            assert_eq!(boundary, pruned_boundary);
1427        });
1428    }
1429
1430    /// Verify that MMR prunes to the journal's actual boundary, not the requested location.
1431    #[test_traced("INFO")]
1432    fn test_mmr_prunes_to_journal_boundary() {
1433        let executor = deterministic::Runner::default();
1434        executor.start(|context| async move {
1435            let mut journal = create_journal_with_ops(context, "mmr_boundary", 50).await;
1436
1437            journal
1438                .append(Operation::CommitFloor(None, Location::new_unchecked(25)))
1439                .await
1440                .unwrap();
1441            journal.sync().await.unwrap();
1442
1443            let pruned_boundary = journal.prune(Location::new_unchecked(25)).await.unwrap();
1444
1445            // Verify MMR and journal remain in sync
1446            let oldest_retained = journal.oldest_retained_loc();
1447            assert_eq!(Some(pruned_boundary), oldest_retained);
1448
1449            // Verify boundary is at or before requested (due to section alignment)
1450            assert!(pruned_boundary <= Location::new_unchecked(25));
1451
1452            // Verify operation count is unchanged
1453            assert_eq!(journal.size(), 51);
1454        });
1455    }
1456
1457    /// Verify proof() for multiple operations.
1458    #[test_traced("INFO")]
1459    fn test_proof_multiple_operations() {
1460        let executor = deterministic::Runner::default();
1461        executor.start(|context| async move {
1462            let journal = create_journal_with_ops(context, "proof_multi", 50).await;
1463
1464            let (proof, ops) = journal
1465                .proof(Location::new_unchecked(0), NZU64!(50))
1466                .await
1467                .unwrap();
1468
1469            assert_eq!(ops.len(), 50);
1470            for (i, op) in ops.iter().enumerate() {
1471                assert_eq!(*op, create_operation(i as u8));
1472            }
1473
1474            // Verify the proof is valid
1475            let mut hasher = StandardHasher::new();
1476            let root = journal.root();
1477            assert!(verify_proof(
1478                &proof,
1479                &ops,
1480                Location::new_unchecked(0),
1481                &root,
1482                &mut hasher
1483            ));
1484        });
1485    }
1486
1487    /// Verify that historical_proof() respects the max_ops limit.
1488    #[test_traced("INFO")]
1489    fn test_historical_proof_limited_by_max_ops() {
1490        let executor = deterministic::Runner::default();
1491        executor.start(|context| async move {
1492            let journal = create_journal_with_ops(context, "proof_limit", 50).await;
1493
1494            let size = journal.size();
1495            let (proof, ops) = journal
1496                .historical_proof(size, Location::new_unchecked(0), NZU64!(20))
1497                .await
1498                .unwrap();
1499
1500            // Should return only 20 operations despite 50 being available
1501            assert_eq!(ops.len(), 20);
1502            for (i, op) in ops.iter().enumerate() {
1503                assert_eq!(*op, create_operation(i as u8));
1504            }
1505
1506            // Verify the proof is valid
1507            let mut hasher = StandardHasher::new();
1508            let root = journal.root();
1509            assert!(verify_proof(
1510                &proof,
1511                &ops,
1512                Location::new_unchecked(0),
1513                &root,
1514                &mut hasher
1515            ));
1516        });
1517    }
1518
1519    /// Verify historical_proof() at the end of the journal.
1520    #[test_traced("INFO")]
1521    fn test_historical_proof_at_end_of_journal() {
1522        let executor = deterministic::Runner::default();
1523        executor.start(|context| async move {
1524            let journal = create_journal_with_ops(context, "proof_end", 50).await;
1525
1526            let size = journal.size();
1527            // Request proof starting near the end
1528            let (proof, ops) = journal
1529                .historical_proof(size, Location::new_unchecked(40), NZU64!(20))
1530                .await
1531                .unwrap();
1532
1533            // Should return only 10 operations (positions 40-49)
1534            assert_eq!(ops.len(), 10);
1535            for (i, op) in ops.iter().enumerate() {
1536                assert_eq!(*op, create_operation((40 + i) as u8));
1537            }
1538
1539            // Verify the proof is valid
1540            let mut hasher = StandardHasher::new();
1541            let root = journal.root();
1542            assert!(verify_proof(
1543                &proof,
1544                &ops,
1545                Location::new_unchecked(40),
1546                &root,
1547                &mut hasher
1548            ));
1549        });
1550    }
1551
1552    /// Verify that historical_proof() returns an error for invalid size.
1553    #[test_traced("INFO")]
1554    fn test_historical_proof_out_of_range_returns_error() {
1555        let executor = deterministic::Runner::default();
1556        executor.start(|context| async move {
1557            let journal = create_journal_with_ops(context, "proof_oob", 5).await;
1558
1559            // Request proof with size > actual journal size
1560            let result = journal
1561                .historical_proof(
1562                    Location::new_unchecked(10),
1563                    Location::new_unchecked(0),
1564                    NZU64!(1),
1565                )
1566                .await;
1567
1568            assert!(matches!(
1569                result,
1570                Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
1571            ));
1572        });
1573    }
1574
1575    /// Verify that historical_proof() returns an error when start_loc >= size.
1576    #[test_traced("INFO")]
1577    fn test_historical_proof_start_too_large_returns_error() {
1578        let executor = deterministic::Runner::default();
1579        executor.start(|context| async move {
1580            let journal = create_journal_with_ops(context, "proof_start_oob", 5).await;
1581
1582            let size = journal.size();
1583            // Request proof starting at size (should fail)
1584            let result = journal.historical_proof(size, size, NZU64!(1)).await;
1585
1586            assert!(matches!(
1587                result,
1588                Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
1589            ));
1590        });
1591    }
1592
1593    /// Verify historical_proof() for a truly historical state (before more operations added).
1594    #[test_traced("INFO")]
1595    fn test_historical_proof_truly_historical() {
1596        let executor = deterministic::Runner::default();
1597        executor.start(|context| async move {
1598            // Create journal with initial operations
1599            let mut journal = create_journal_with_ops(context, "proof_historical", 50).await;
1600
1601            // Capture root at historical state
1602            let mut hasher = StandardHasher::new();
1603            let historical_root = journal.root();
1604            let historical_size = journal.size();
1605
1606            // Add more operations after the historical state
1607            for i in 50..100 {
1608                journal.append(create_operation(i as u8)).await.unwrap();
1609            }
1610            journal.sync().await.unwrap();
1611
1612            // Generate proof for the historical state
1613            let (proof, ops) = journal
1614                .historical_proof(historical_size, Location::new_unchecked(0), NZU64!(50))
1615                .await
1616                .unwrap();
1617
1618            // Verify operations match expected historical operations
1619            assert_eq!(ops.len(), 50);
1620            for (i, op) in ops.iter().enumerate() {
1621                assert_eq!(*op, create_operation(i as u8));
1622            }
1623
1624            // Verify the proof is valid against the historical root
1625            assert!(verify_proof(
1626                &proof,
1627                &ops,
1628                Location::new_unchecked(0),
1629                &historical_root,
1630                &mut hasher
1631            ));
1632        });
1633    }
1634
1635    /// Verify that historical_proof() returns an error when start_loc is pruned.
1636    #[test_traced("INFO")]
1637    fn test_historical_proof_pruned_location_returns_error() {
1638        let executor = deterministic::Runner::default();
1639        executor.start(|context| async move {
1640            let mut journal = create_journal_with_ops(context, "proof_pruned", 50).await;
1641
1642            journal
1643                .append(Operation::CommitFloor(None, Location::new_unchecked(25)))
1644                .await
1645                .unwrap();
1646            journal.sync().await.unwrap();
1647            let pruned_boundary = journal.prune(Location::new_unchecked(25)).await.unwrap();
1648
1649            // Try to get proof starting at a location before the pruned boundary
1650            let size = journal.size();
1651            let start_loc = Location::new_unchecked(0);
1652            if start_loc < pruned_boundary {
1653                let result = journal.historical_proof(size, start_loc, NZU64!(1)).await;
1654
1655                // Should fail when trying to read pruned operations
1656                assert!(result.is_err());
1657            }
1658        });
1659    }
1660
1661    /// Verify replay() with empty journal and multiple operations.
1662    #[test_traced("INFO")]
1663    fn test_replay_operations() {
1664        let executor = deterministic::Runner::default();
1665        executor.start(|context| async move {
1666            // Test empty journal
1667            let journal = create_empty_journal(context.clone(), "replay").await;
1668            let stream = journal.replay(0, NZUsize!(10)).await.unwrap();
1669            futures::pin_mut!(stream);
1670            assert!(stream.next().await.is_none());
1671
1672            // Test replaying all operations
1673            let journal = create_journal_with_ops(context, "replay", 50).await;
1674            let stream = journal.replay(0, NZUsize!(100)).await.unwrap();
1675            futures::pin_mut!(stream);
1676
1677            for i in 0..50 {
1678                let (pos, op) = stream.next().await.unwrap().unwrap();
1679                assert_eq!(pos, i);
1680                assert_eq!(op, create_operation(i as u8));
1681            }
1682
1683            assert!(stream.next().await.is_none());
1684        });
1685    }
1686
1687    /// Verify replay() starting from a middle location.
1688    #[test_traced("INFO")]
1689    fn test_replay_from_middle() {
1690        let executor = deterministic::Runner::default();
1691        executor.start(|context| async move {
1692            let journal = create_journal_with_ops(context, "replay_middle", 50).await;
1693            let stream = journal.replay(25, NZUsize!(100)).await.unwrap();
1694            futures::pin_mut!(stream);
1695
1696            let mut count = 0;
1697            while let Some(result) = stream.next().await {
1698                let (pos, op) = result.unwrap();
1699                assert_eq!(pos, 25 + count);
1700                assert_eq!(op, create_operation((25 + count) as u8));
1701                count += 1;
1702            }
1703
1704            // Should have replayed positions 25-49 (25 operations)
1705            assert_eq!(count, 25);
1706        });
1707    }
1708}