commonware_storage/journal/
authenticated.rs

1//! Authenticated journal implementation.
2//!
3//! An authenticated journal maintains a contiguous journal of items alongside a Merkle Mountain
4//! Range (MMR). The item at index i in the journal corresponds to the leaf at Location i in the
5//! MMR. This structure enables efficient proofs that an item is included in the journal at a
6//! specific location.
7
8use crate::{
9    journal::{
10        contiguous::{fixed, variable, Contiguous, MutableContiguous},
11        Error as JournalError,
12    },
13    mmr::{
14        journaled::{CleanMmr, Mmr},
15        mem::{Clean, Dirty, State},
16        Location, Position, Proof, StandardHasher,
17    },
18    Persistable,
19};
20use commonware_codec::{CodecFixedShared, CodecShared, Encode, EncodeShared};
21use commonware_cryptography::{DigestOf, Hasher};
22use commonware_runtime::{Clock, Metrics, Storage};
23use core::num::{NonZeroU64, NonZeroUsize};
24use futures::{future::try_join_all, try_join, TryFutureExt as _};
25use thiserror::Error;
26use tracing::{debug, warn};
27
28/// Errors that can occur when interacting with an authenticated journal.
29#[derive(Error, Debug)]
30pub enum Error {
31    #[error("mmr error: {0}")]
32    Mmr(#[from] crate::mmr::Error),
33
34    #[error("journal error: {0}")]
35    Journal(#[from] super::Error),
36}
37/// An append-only data structure that maintains a sequential journal of items alongside a Merkle
38/// Mountain Range (MMR). The item at index i in the journal corresponds to the leaf at Location i
39/// in the MMR. This structure enables efficient proofs that an item is included in the journal at a
40/// specific location.
41pub struct Journal<E, C, H, S: State<H::Digest> + Send + Sync = Dirty>
42where
43    E: Storage + Clock + Metrics,
44    C: Contiguous<Item: EncodeShared>,
45    H: Hasher,
46{
47    /// MMR where each leaf is an item digest.
48    /// Invariant: leaf i corresponds to item i in the journal.
49    pub(crate) mmr: Mmr<E, H::Digest, S>,
50
51    /// Journal of items.
52    /// Invariant: item i corresponds to leaf i in the MMR.
53    pub(crate) journal: C,
54
55    pub(crate) hasher: StandardHasher<H>,
56}
57
58impl<E, C, H, S> Journal<E, C, H, S>
59where
60    E: Storage + Clock + Metrics,
61    C: Contiguous<Item: EncodeShared>,
62    H: Hasher,
63    S: State<DigestOf<H>> + Send + Sync,
64{
65    /// Returns the number of items in the journal.
66    pub fn size(&self) -> Location {
67        Location::new_unchecked(self.journal.size())
68    }
69
70    /// Returns the oldest retained location in the journal.
71    pub fn oldest_retained_loc(&self) -> Option<Location> {
72        self.journal
73            .oldest_retained_pos()
74            .map(Location::new_unchecked)
75    }
76
77    /// Returns the pruning boundary for the journal.
78    pub fn pruning_boundary(&self) -> Location {
79        self.journal.pruning_boundary().into()
80    }
81
82    /// Read an item from the journal at the given location.
83    pub async fn read(&self, loc: Location) -> Result<C::Item, Error> {
84        self.journal.read(*loc).await.map_err(Error::Journal)
85    }
86}
87
88impl<E, C, H, S> Journal<E, C, H, S>
89where
90    E: Storage + Clock + Metrics,
91    C: MutableContiguous<Item: EncodeShared>,
92    H: Hasher,
93    S: State<DigestOf<H>> + Send + Sync,
94{
95    pub async fn append(&mut self, item: C::Item) -> Result<Location, Error> {
96        let encoded_item = item.encode();
97
98        // Append item to the journal and update the MMR in parallel.
99        let (_, loc) = try_join!(
100            self.mmr
101                .add(&mut self.hasher, &encoded_item)
102                .map_err(Error::Mmr),
103            self.journal.append(item).map_err(Into::into)
104        )?;
105
106        Ok(Location::new_unchecked(loc))
107    }
108}
109
110impl<E, C, H, S> Journal<E, C, H, S>
111where
112    E: Storage + Clock + Metrics,
113    C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
114    H: Hasher,
115    S: State<DigestOf<H>> + Send + Sync,
116{
117    /// Durably persist the journal. This is faster than `sync()` but does not persist the MMR,
118    /// meaning recovery will be required on startup if we crash before `sync()`.
119    pub async fn commit(&mut self) -> Result<(), Error> {
120        self.journal.commit().await.map_err(Error::Journal)
121    }
122}
123
124impl<E, C, H> Journal<E, C, H, Clean<H::Digest>>
125where
126    E: Storage + Clock + Metrics,
127    C: MutableContiguous<Item: EncodeShared>,
128    H: Hasher,
129{
130    /// Create a new [Journal] from the given components after aligning the MMR with the journal.
131    pub async fn from_components(
132        mmr: CleanMmr<E, H::Digest>,
133        journal: C,
134        mut hasher: StandardHasher<H>,
135        apply_batch_size: u64,
136    ) -> Result<Self, Error> {
137        let mut mmr = Self::align(mmr, &journal, &mut hasher, apply_batch_size).await?;
138
139        // Sync the MMR to disk to avoid having to repeat any recovery that may have been performed
140        // on next startup.
141        mmr.sync().await?;
142
143        Ok(Self {
144            mmr,
145            journal,
146            hasher,
147        })
148    }
149
150    /// Align `mmr` to be consistent with `journal`. Any items in `mmr` that aren't in `journal` are
151    /// popped, and any items in `journal` that aren't in `mmr` are added to `mmr`. Items are added
152    /// to `mmr` in batches of size `apply_batch_size` to avoid memory bloat.
153    async fn align(
154        mut mmr: CleanMmr<E, H::Digest>,
155        journal: &C,
156        hasher: &mut StandardHasher<H>,
157        apply_batch_size: u64,
158    ) -> Result<CleanMmr<E, H::Digest>, Error> {
159        // Pop any MMR elements that are ahead of the journal.
160        // Note mmr_size is the size of the MMR in leaves, not positions.
161        let journal_size = journal.size();
162        let mut mmr_size = mmr.leaves();
163        if mmr_size > journal_size {
164            let pop_count = mmr_size - journal_size;
165            warn!(journal_size, ?pop_count, "popping MMR items");
166            mmr.pop(hasher, *pop_count as usize).await?;
167            mmr_size = Location::new_unchecked(journal_size);
168        }
169
170        // If the MMR is behind, replay journal items to catch up.
171        if mmr_size < journal_size {
172            let replay_count = journal_size - *mmr_size;
173            warn!(
174                journal_size,
175                replay_count, "MMR lags behind journal, replaying journal to catch up"
176            );
177
178            let mut mmr = mmr.into_dirty();
179            let mut batch_size = 0;
180            while mmr_size < journal_size {
181                let op = journal.read(*mmr_size).await?;
182                mmr.add(hasher, &op.encode()).await?;
183                mmr_size += 1;
184                batch_size += 1;
185                if batch_size >= apply_batch_size {
186                    mmr = mmr.merkleize(hasher).into_dirty();
187                    batch_size = 0;
188                }
189            }
190            return Ok(mmr.merkleize(hasher));
191        }
192
193        // At this point the MMR and journal should be consistent.
194        assert_eq!(journal.size(), mmr.leaves());
195
196        Ok(mmr)
197    }
198
199    /// Prune both the MMR and journal to the given location.
200    ///
201    /// # Returns
202    /// The new pruning boundary, which may be less than the requested `prune_loc`.
203    pub async fn prune(&mut self, prune_loc: Location) -> Result<Location, Error> {
204        if self.mmr.size() == 0 {
205            // DB is empty, nothing to prune.
206            return Ok(self.pruning_boundary());
207        }
208
209        // Sync the mmr before pruning the journal, otherwise the MMR tip could end up behind the
210        // journal's pruning boundary on restart from an unclean shutdown, and there would be no way
211        // to replay the items between the MMR tip and the journal pruning boundary.
212        self.mmr.sync().await?;
213
214        // Prune the journal and check if anything was actually pruned
215        if !self.journal.prune(*prune_loc).await? {
216            return Ok(self.pruning_boundary());
217        }
218
219        let pruning_boundary = self.pruning_boundary();
220        let size = self.size();
221        debug!(?size, ?prune_loc, ?pruning_boundary, "pruned inactive ops");
222
223        // Prune MMR to match the journal's actual boundary
224        self.mmr
225            .prune_to_pos(Position::try_from(pruning_boundary)?)
226            .await?;
227
228        Ok(pruning_boundary)
229    }
230}
231
232impl<E, C, H> Journal<E, C, H, Clean<H::Digest>>
233where
234    E: Storage + Clock + Metrics,
235    C: Contiguous<Item: EncodeShared>,
236    H: Hasher,
237{
238    /// Generate a proof of inclusion for items starting at `start_loc`.
239    ///
240    /// Returns a proof and the items corresponding to the leaves in the range `start_loc..end_loc`,
241    /// where `end_loc` is the minimum of the current item count and `start_loc + max_ops`.
242    ///
243    /// # Errors
244    ///
245    /// - Returns [Error::Mmr] with [crate::mmr::Error::LocationOverflow] if `start_loc` >
246    ///   [crate::mmr::MAX_LOCATION].
247    /// - Returns [Error::Mmr] with [crate::mmr::Error::RangeOutOfBounds] if `start_loc` >= current
248    ///   item count.
249    /// - Returns [Error::Journal] with [crate::journal::Error::ItemPruned] if `start_loc` has been
250    ///   pruned.
251    pub async fn proof(
252        &self,
253        start_loc: Location,
254        max_ops: NonZeroU64,
255    ) -> Result<(Proof<H::Digest>, Vec<C::Item>), Error> {
256        self.historical_proof(self.size(), start_loc, max_ops).await
257    }
258
259    /// Generate a historical proof with respect to the state of the MMR when it had
260    /// `historical_size` items.
261    ///
262    /// Returns a proof and the items corresponding to the leaves in the range `start_loc..end_loc`,
263    /// where `end_loc` is the minimum of `historical_size` and `start_loc + max_ops`.
264    ///
265    /// # Errors
266    ///
267    /// - Returns [Error::Mmr] with [crate::mmr::Error::LocationOverflow] if `historical_size` or
268    ///   `start_loc` > [crate::mmr::MAX_LOCATION].
269    /// - Returns [Error::Mmr] with [crate::mmr::Error::RangeOutOfBounds] if `start_loc` >=
270    ///   `historical_size` or `historical_size` > number of items in the journal.
271    /// - Returns [Error::Journal] with [crate::journal::Error::ItemPruned] if `start_loc` has been
272    ///   pruned.
273    pub async fn historical_proof(
274        &self,
275        historical_size: Location,
276        start_loc: Location,
277        max_ops: NonZeroU64,
278    ) -> Result<(Proof<H::Digest>, Vec<C::Item>), Error> {
279        let size = self.size();
280        if historical_size > size {
281            return Err(crate::mmr::Error::RangeOutOfBounds(size).into());
282        }
283        if start_loc >= historical_size {
284            return Err(crate::mmr::Error::RangeOutOfBounds(start_loc).into());
285        }
286        let end_loc = std::cmp::min(historical_size, start_loc.saturating_add(max_ops.get()));
287
288        let mmr_size = Position::try_from(historical_size)?;
289        let proof = self
290            .mmr
291            .historical_range_proof(mmr_size, start_loc..end_loc)
292            .await?;
293
294        let mut ops = Vec::with_capacity((*end_loc - *start_loc) as usize);
295        let futures = (*start_loc..*end_loc)
296            .map(|i| self.journal.read(i))
297            .collect::<Vec<_>>();
298        try_join_all(futures)
299            .await?
300            .into_iter()
301            .for_each(|op| ops.push(op));
302
303        Ok((proof, ops))
304    }
305
306    /// Return the root of the MMR.
307    pub const fn root(&self) -> H::Digest {
308        self.mmr.root()
309    }
310
311    /// Convert this journal into its dirty counterpart for batched updates.
312    pub fn into_dirty(self) -> Journal<E, C, H, Dirty> {
313        Journal {
314            mmr: self.mmr.into_dirty(),
315            journal: self.journal,
316            hasher: self.hasher,
317        }
318    }
319}
320
321impl<E, C, H> Journal<E, C, H, Clean<H::Digest>>
322where
323    E: Storage + Clock + Metrics,
324    C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
325    H: Hasher,
326{
327    /// Destroy the authenticated journal, removing all data from disk.
328    pub async fn destroy(self) -> Result<(), Error> {
329        try_join!(
330            self.journal.destroy().map_err(Error::Journal),
331            self.mmr.destroy().map_err(Error::Mmr),
332        )?;
333        Ok(())
334    }
335
336    /// Durably persist the journal, ensuring no recovery is required on startup.
337    pub async fn sync(&mut self) -> Result<(), Error> {
338        try_join!(
339            self.journal.sync().map_err(Error::Journal),
340            self.mmr.sync().map_err(Into::into)
341        )?;
342
343        Ok(())
344    }
345}
346
347impl<E, C, H> Journal<E, C, H, Dirty>
348where
349    E: Storage + Clock + Metrics,
350    C: Contiguous<Item: EncodeShared>,
351    H: Hasher,
352{
353    /// Merkleize the journal and compute the root digest.
354    pub fn merkleize(self) -> Journal<E, C, H, Clean<H::Digest>> {
355        let Self {
356            mmr,
357            journal,
358            mut hasher,
359        } = self;
360        Journal {
361            mmr: mmr.merkleize(&mut hasher),
362            journal,
363            hasher,
364        }
365    }
366}
367
368impl<E, C, H> Journal<E, C, H, Dirty>
369where
370    E: Storage + Clock + Metrics,
371    C: MutableContiguous<Item: EncodeShared>,
372    H: Hasher,
373{
374    /// Create a new dirty journal from aligned components.
375    pub async fn from_components(
376        mmr: CleanMmr<E, H::Digest>,
377        journal: C,
378        hasher: StandardHasher<H>,
379        apply_batch_size: u64,
380    ) -> Result<Self, Error> {
381        let clean = Journal::<E, C, H, Clean<H::Digest>>::from_components(
382            mmr,
383            journal,
384            hasher,
385            apply_batch_size,
386        )
387        .await?;
388        Ok(clean.into_dirty())
389    }
390}
391
392/// The number of items to apply to the MMR in a single batch.
393const APPLY_BATCH_SIZE: u64 = 1 << 16;
394
395impl<E, O, H> Journal<E, fixed::Journal<E, O>, H, Clean<H::Digest>>
396where
397    E: Storage + Clock + Metrics,
398    O: CodecFixedShared,
399    H: Hasher,
400{
401    /// Create a new [Journal] for fixed-length items.
402    ///
403    /// The journal will be rewound to the last item that matches the `rewind_predicate` on
404    /// initialization.
405    pub async fn new(
406        context: E,
407        mmr_cfg: crate::mmr::journaled::Config,
408        journal_cfg: fixed::Config,
409        rewind_predicate: fn(&O) -> bool,
410    ) -> Result<Self, Error> {
411        let mut journal = fixed::Journal::init(context.with_label("journal"), journal_cfg).await?;
412
413        // Rewind journal to last matching item.
414        journal.rewind_to(rewind_predicate).await?;
415
416        // Align the MMR and journal.
417        let mut hasher = StandardHasher::<H>::new();
418        let mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_cfg).await?;
419        let mut mmr = Self::align(mmr, &journal, &mut hasher, APPLY_BATCH_SIZE).await?;
420
421        // Sync the journal and MMR to disk to avoid having to repeat any recovery that may have
422        // been performed on next startup.
423        journal.sync().await?;
424        mmr.sync().await?;
425
426        Ok(Self {
427            mmr,
428            journal,
429            hasher,
430        })
431    }
432}
433
434impl<E, O, H> Journal<E, variable::Journal<E, O>, H, Clean<H::Digest>>
435where
436    E: Storage + Clock + Metrics,
437    O: CodecShared,
438    H: Hasher,
439{
440    /// Create a new [Journal] for variable-length items.
441    ///
442    /// The journal will be rewound to the last item that matches the `rewind_predicate` on
443    /// initialization.
444    pub async fn new(
445        context: E,
446        mmr_cfg: crate::mmr::journaled::Config,
447        journal_cfg: variable::Config<O::Cfg>,
448        rewind_predicate: fn(&O) -> bool,
449    ) -> Result<Self, Error> {
450        let mut hasher = StandardHasher::<H>::new();
451        let mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_cfg).await?;
452        let mut journal =
453            variable::Journal::init(context.with_label("journal"), journal_cfg).await?;
454
455        // Rewind to last matching item.
456        journal.rewind_to(rewind_predicate).await?;
457
458        // Align the MMR and journal.
459        let mut mmr = Self::align(mmr, &journal, &mut hasher, APPLY_BATCH_SIZE).await?;
460
461        // Sync the journal and MMR to disk to avoid having to repeat any recovery that may have
462        // been performed on next startup.
463        journal.sync().await?;
464        mmr.sync().await?;
465
466        Ok(Self {
467            mmr,
468            journal,
469            hasher,
470        })
471    }
472}
473
474impl<E, C, H, S> Contiguous for Journal<E, C, H, S>
475where
476    E: Storage + Clock + Metrics,
477    C: MutableContiguous<Item: EncodeShared>,
478    H: Hasher,
479    S: State<DigestOf<H>> + Send + Sync,
480{
481    type Item = C::Item;
482
483    fn size(&self) -> u64 {
484        self.journal.size()
485    }
486
487    fn oldest_retained_pos(&self) -> Option<u64> {
488        self.journal.oldest_retained_pos()
489    }
490
491    fn pruning_boundary(&self) -> u64 {
492        self.journal.pruning_boundary()
493    }
494
495    async fn replay(
496        &self,
497        start_pos: u64,
498        buffer: NonZeroUsize,
499    ) -> Result<
500        impl futures::Stream<Item = Result<(u64, Self::Item), JournalError>> + '_,
501        JournalError,
502    > {
503        self.journal.replay(start_pos, buffer).await
504    }
505
506    async fn read(&self, position: u64) -> Result<Self::Item, JournalError> {
507        self.journal.read(position).await
508    }
509}
510
511impl<E, C, H> MutableContiguous for Journal<E, C, H, Dirty>
512where
513    E: Storage + Clock + Metrics,
514    C: MutableContiguous<Item: EncodeShared>,
515    H: Hasher,
516{
517    async fn append(&mut self, item: Self::Item) -> Result<u64, JournalError> {
518        let res = self.append(item).await.map_err(|e| match e {
519            Error::Journal(inner) => inner,
520            Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
521        })?;
522
523        Ok(*res)
524    }
525
526    async fn prune(&mut self, min_position: u64) -> Result<bool, JournalError> {
527        self.journal.prune(min_position).await
528    }
529
530    async fn rewind(&mut self, size: u64) -> Result<(), JournalError> {
531        self.journal.rewind(size).await?;
532
533        let leaves = *self.mmr.leaves();
534        if leaves > size {
535            self.mmr
536                .pop((leaves - size) as usize)
537                .await
538                .map_err(|error| JournalError::Mmr(anyhow::Error::from(error)))?;
539        }
540
541        Ok(())
542    }
543}
544
545impl<E, C, H> MutableContiguous for Journal<E, C, H, Clean<H::Digest>>
546where
547    E: Storage + Clock + Metrics,
548    C: MutableContiguous<Item: EncodeShared>,
549    H: Hasher,
550{
551    async fn append(&mut self, item: Self::Item) -> Result<u64, JournalError> {
552        let loc = self.append(item).await.map_err(|e| match e {
553            Error::Journal(inner) => inner,
554            Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
555        })?;
556
557        Ok(*loc)
558    }
559
560    async fn prune(&mut self, min_position: u64) -> Result<bool, JournalError> {
561        let old_pruning_boundary = self.pruning_boundary();
562        let pruning_boundary = self
563            .prune(Location::new_unchecked(min_position))
564            .await
565            .map_err(|e| match e {
566                Error::Journal(inner) => inner,
567                Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
568            })?;
569
570        Ok(old_pruning_boundary != pruning_boundary)
571    }
572
573    async fn rewind(&mut self, size: u64) -> Result<(), JournalError> {
574        self.journal.rewind(size).await?;
575
576        let leaves = *self.mmr.leaves();
577        if leaves > size {
578            self.mmr
579                .pop(&mut self.hasher, (leaves - size) as usize)
580                .await
581                .map_err(|error| JournalError::Mmr(anyhow::Error::from(error)))?;
582        }
583
584        Ok(())
585    }
586}
587
588impl<E, C, H> Persistable for Journal<E, C, H, Clean<H::Digest>>
589where
590    E: Storage + Clock + Metrics,
591    C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
592    H: Hasher,
593{
594    type Error = JournalError;
595
596    async fn commit(&mut self) -> Result<(), JournalError> {
597        self.commit().await.map_err(|e| match e {
598            Error::Journal(inner) => inner,
599            Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
600        })
601    }
602
603    async fn sync(&mut self) -> Result<(), JournalError> {
604        self.sync().await.map_err(|e| match e {
605            Error::Journal(inner) => inner,
606            Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
607        })
608    }
609
610    async fn destroy(self) -> Result<(), JournalError> {
611        self.destroy().await.map_err(|e| match e {
612            Error::Journal(inner) => inner,
613            Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
614        })
615    }
616}
617
618#[cfg(test)]
619mod tests {
620    use super::*;
621    use crate::{
622        journal::contiguous::fixed::{Config as JConfig, Journal as ContiguousJournal},
623        mmr::{
624            journaled::{Config as MmrConfig, Mmr},
625            Location,
626        },
627        qmdb::{
628            any::unordered::{fixed::Operation, Update},
629            operation::Committable,
630        },
631    };
632    use commonware_codec::Encode;
633    use commonware_cryptography::{sha256, sha256::Digest, Sha256};
634    use commonware_macros::test_traced;
635    use commonware_runtime::{
636        buffer::PoolRef,
637        deterministic::{self, Context},
638        Runner as _,
639    };
640    use commonware_utils::{NZUsize, NZU16, NZU64};
641    use futures::StreamExt as _;
642    use std::num::NonZeroU16;
643
644    const PAGE_SIZE: NonZeroU16 = NZU16!(101);
645    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
646
647    /// Create MMR configuration for tests.
648    fn mmr_config(suffix: &str) -> MmrConfig {
649        MmrConfig {
650            journal_partition: format!("mmr_journal_{suffix}"),
651            metadata_partition: format!("mmr_metadata_{suffix}"),
652            items_per_blob: NZU64!(11),
653            write_buffer: NZUsize!(1024),
654            thread_pool: None,
655            buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
656        }
657    }
658
659    /// Create journal configuration for tests.
660    fn journal_config(suffix: &str) -> JConfig {
661        JConfig {
662            partition: format!("journal_{suffix}"),
663            items_per_blob: NZU64!(7),
664            write_buffer: NZUsize!(1024),
665            buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
666        }
667    }
668
669    type AuthenticatedJournal = Journal<
670        deterministic::Context,
671        ContiguousJournal<deterministic::Context, Operation<Digest, Digest>>,
672        Sha256,
673        Clean<sha256::Digest>,
674    >;
675
676    /// Create a new empty authenticated journal.
677    async fn create_empty_journal(context: Context, suffix: &str) -> AuthenticatedJournal {
678        AuthenticatedJournal::new(
679            context,
680            mmr_config(suffix),
681            journal_config(suffix),
682            |op: &Operation<Digest, Digest>| op.is_commit(),
683        )
684        .await
685        .unwrap()
686    }
687
688    /// Create a test operation with predictable values based on index.
689    fn create_operation(index: u8) -> Operation<Digest, Digest> {
690        Operation::Update(Update(
691            Sha256::fill(index),
692            Sha256::fill(index.wrapping_add(1)),
693        ))
694    }
695
696    /// Create an authenticated journal with N committed operations.
697    ///
698    /// Operations are added and then synced to ensure they are committed.
699    async fn create_journal_with_ops(
700        context: Context,
701        suffix: &str,
702        count: usize,
703    ) -> AuthenticatedJournal {
704        let mut journal = create_empty_journal(context, suffix).await;
705
706        for i in 0..count {
707            let op = create_operation(i as u8);
708            let loc = journal.append(op).await.unwrap();
709            assert_eq!(loc, Location::new_unchecked(i as u64));
710        }
711
712        journal.sync().await.unwrap();
713        journal
714    }
715
716    /// Create separate MMR and journal components for testing alignment.
717    ///
718    /// These components are created independently and can be manipulated separately to test
719    /// scenarios where the MMR and journal are out of sync (e.g., one ahead of the other).
720    async fn create_components(
721        context: Context,
722        suffix: &str,
723    ) -> (
724        CleanMmr<deterministic::Context, sha256::Digest>,
725        ContiguousJournal<deterministic::Context, Operation<Digest, Digest>>,
726        StandardHasher<Sha256>,
727    ) {
728        let mut hasher = StandardHasher::new();
729        let mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_config(suffix))
730            .await
731            .unwrap();
732        let journal =
733            ContiguousJournal::init(context.with_label("journal"), journal_config(suffix))
734                .await
735                .unwrap();
736        (mmr, journal, hasher)
737    }
738
739    /// Verify that a proof correctly proves the given operations are included in the MMR.
740    fn verify_proof(
741        proof: &crate::mmr::Proof<<Sha256 as commonware_cryptography::Hasher>::Digest>,
742        operations: &[Operation<Digest, Digest>],
743        start_loc: Location,
744        root: &<Sha256 as commonware_cryptography::Hasher>::Digest,
745        hasher: &mut StandardHasher<Sha256>,
746    ) -> bool {
747        let encoded_ops: Vec<_> = operations.iter().map(|op| op.encode()).collect();
748        proof.verify_range_inclusion(hasher, &encoded_ops, start_loc, root)
749    }
750
751    /// Verify that new() creates an empty authenticated journal.
752    #[test_traced("INFO")]
753    fn test_new_creates_empty_journal() {
754        let executor = deterministic::Runner::default();
755        executor.start(|context| async move {
756            let journal = create_empty_journal(context, "new_empty").await;
757
758            assert_eq!(journal.size(), 0);
759            assert_eq!(journal.pruning_boundary(), 0);
760            assert_eq!(journal.oldest_retained_pos(), None);
761        });
762    }
763
764    /// Verify that align() correctly handles empty MMR and journal components.
765    #[test_traced("INFO")]
766    fn test_align_with_empty_mmr_and_journal() {
767        let executor = deterministic::Runner::default();
768        executor.start(|context| async move {
769            let (mmr, journal, mut hasher) = create_components(context, "align_empty").await;
770
771            let mmr = Journal::align(mmr, &journal, &mut hasher, APPLY_BATCH_SIZE)
772                .await
773                .unwrap();
774
775            assert_eq!(mmr.leaves(), Location::new_unchecked(0));
776            assert_eq!(journal.size(), Location::new_unchecked(0));
777        });
778    }
779
780    /// Verify that align() pops MMR elements when MMR is ahead of the journal.
781    #[test_traced("WARN")]
782    fn test_align_when_mmr_ahead() {
783        let executor = deterministic::Runner::default();
784        executor.start(|context| async move {
785            let (mut mmr, mut journal, mut hasher) = create_components(context, "mmr_ahead").await;
786
787            // Add 20 operations to both MMR and journal
788            for i in 0..20 {
789                let op = create_operation(i as u8);
790                let encoded = op.encode();
791                mmr.add(&mut hasher, &encoded).await.unwrap();
792                journal.append(op).await.unwrap();
793            }
794
795            // Add commit operation to journal only (making journal ahead)
796            let commit_op = Operation::CommitFloor(None, Location::new_unchecked(0));
797            journal.append(commit_op).await.unwrap();
798            journal.sync().await.unwrap();
799
800            // MMR has 20 leaves, journal has 21 operations (20 ops + 1 commit)
801            let mmr = Journal::align(mmr, &journal, &mut hasher, APPLY_BATCH_SIZE)
802                .await
803                .unwrap();
804
805            // MMR should have been popped to match journal
806            assert_eq!(mmr.leaves(), Location::new_unchecked(21));
807            assert_eq!(journal.size(), Location::new_unchecked(21));
808        });
809    }
810
811    /// Verify that align() replays journal operations when journal is ahead of MMR.
812    #[test_traced("WARN")]
813    fn test_align_when_journal_ahead() {
814        let executor = deterministic::Runner::default();
815        executor.start(|context| async move {
816            let (mut mmr, mut journal, mut hasher) =
817                create_components(context, "journal_ahead").await;
818
819            // Add 20 operations to journal only
820            for i in 0..20 {
821                let op = create_operation(i as u8);
822                journal.append(op).await.unwrap();
823            }
824
825            // Add commit
826            let commit_op = Operation::CommitFloor(None, Location::new_unchecked(0));
827            journal.append(commit_op).await.unwrap();
828            journal.sync().await.unwrap();
829
830            // Journal has 21 operations, MMR has 0 leaves
831            mmr = Journal::align(mmr, &journal, &mut hasher, APPLY_BATCH_SIZE)
832                .await
833                .unwrap();
834
835            // MMR should have been replayed to match journal
836            assert_eq!(mmr.leaves(), Location::new_unchecked(21));
837            assert_eq!(journal.size(), Location::new_unchecked(21));
838        });
839    }
840
841    /// Verify that align() discards uncommitted operations.
842    #[test_traced("INFO")]
843    fn test_align_with_mismatched_committed_ops() {
844        let executor = deterministic::Runner::default();
845        executor.start(|context| async move {
846            let mut journal = create_empty_journal(context.clone(), "mismatched").await;
847
848            // Add 20 uncommitted operations
849            for i in 0..20 {
850                let loc = journal.append(create_operation(i as u8)).await.unwrap();
851                assert_eq!(loc, Location::new_unchecked(i as u64));
852            }
853
854            // Don't sync - these are uncommitted
855            // After alignment, they should be discarded
856            let size_before = journal.size();
857            assert_eq!(size_before, 20);
858
859            // Drop and recreate to simulate restart (which calls align internally)
860            journal.sync().await.unwrap();
861            drop(journal);
862            let journal = create_empty_journal(context, "mismatched").await;
863
864            // Uncommitted operations should be gone
865            assert_eq!(journal.size(), 0);
866        });
867    }
868
869    #[test_traced("INFO")]
870    fn test_rewind() {
871        let executor = deterministic::Runner::default();
872        executor.start(|context| async move {
873            // Test 1: Matching operation is kept
874            {
875                let mut journal = ContiguousJournal::init(
876                    context.with_label("rewind_match"),
877                    journal_config("rewind_match"),
878                )
879                .await
880                .unwrap();
881
882                // Add operations where operation 3 is a commit
883                for i in 0..3 {
884                    journal.append(create_operation(i)).await.unwrap();
885                }
886                journal
887                    .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
888                    .await
889                    .unwrap();
890                for i in 4..7 {
891                    journal.append(create_operation(i)).await.unwrap();
892                }
893
894                // Rewind to last commit
895                let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
896                assert_eq!(final_size, 4);
897                assert_eq!(journal.size(), 4);
898
899                // Verify the commit operation is still there
900                let op = journal.read(3).await.unwrap();
901                assert!(op.is_commit());
902            }
903
904            // Test 2: Last matching operation is chosen when multiple match
905            {
906                let mut journal = ContiguousJournal::init(
907                    context.with_label("rewind_multiple"),
908                    journal_config("rewind_multiple"),
909                )
910                .await
911                .unwrap();
912
913                // Add multiple commits
914                journal.append(create_operation(0)).await.unwrap();
915                journal
916                    .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
917                    .await
918                    .unwrap(); // pos 1
919                journal.append(create_operation(2)).await.unwrap();
920                journal
921                    .append(Operation::CommitFloor(None, Location::new_unchecked(1)))
922                    .await
923                    .unwrap(); // pos 3
924                journal.append(create_operation(4)).await.unwrap();
925
926                // Should rewind to last commit (pos 3)
927                let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
928                assert_eq!(final_size, 4);
929
930                // Verify the last commit is still there
931                let op = journal.read(3).await.unwrap();
932                assert!(op.is_commit());
933
934                // Verify we can't read pos 4
935                assert!(journal.read(4).await.is_err());
936            }
937
938            // Test 3: Rewind to pruning boundary when no match
939            {
940                let mut journal = ContiguousJournal::init(
941                    context.with_label("rewind_no_match"),
942                    journal_config("rewind_no_match"),
943                )
944                .await
945                .unwrap();
946
947                // Add operations with no commits
948                for i in 0..10 {
949                    journal.append(create_operation(i)).await.unwrap();
950                }
951
952                // Rewind should go to pruning boundary (0 for unpruned)
953                let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
954                assert_eq!(final_size, 0, "Should rewind to pruning boundary (0)");
955                assert_eq!(journal.size(), 0);
956            }
957
958            // Test 4: Rewind with existing pruning boundary
959            {
960                let mut journal = ContiguousJournal::init(
961                    context.with_label("rewind_with_pruning"),
962                    journal_config("rewind_with_pruning"),
963                )
964                .await
965                .unwrap();
966
967                // Add operations and a commit at position 10 (past first section boundary of 7)
968                for i in 0..10 {
969                    journal.append(create_operation(i)).await.unwrap();
970                }
971                journal
972                    .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
973                    .await
974                    .unwrap(); // pos 10
975                for i in 11..15 {
976                    journal.append(create_operation(i)).await.unwrap();
977                }
978                journal.sync().await.unwrap();
979
980                // Prune up to position 8 (this will prune section 0, items 0-6, keeping 7+)
981                journal.prune(8).await.unwrap();
982                let oldest = journal.oldest_retained_pos();
983                assert_eq!(oldest, Some(7));
984
985                // Add more uncommitted operations
986                for i in 15..20 {
987                    journal.append(create_operation(i)).await.unwrap();
988                }
989
990                // Rewind should keep the commit at position 10
991                let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
992                assert_eq!(final_size, 11);
993
994                // Verify commit is still there
995                let op = journal.read(10).await.unwrap();
996                assert!(op.is_commit());
997            }
998
999            // Test 5: Rewind with no matches after pruning boundary
1000            {
1001                let mut journal = ContiguousJournal::init(
1002                    context.with_label("rewind_no_match_pruned"),
1003                    journal_config("rewind_no_match_pruned"),
1004                )
1005                .await
1006                .unwrap();
1007
1008                // Add operations with a commit at position 5 (in section 0: 0-6)
1009                for i in 0..5 {
1010                    journal.append(create_operation(i)).await.unwrap();
1011                }
1012                journal
1013                    .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
1014                    .await
1015                    .unwrap(); // pos 5
1016                for i in 6..10 {
1017                    journal.append(create_operation(i)).await.unwrap();
1018                }
1019                journal.sync().await.unwrap();
1020
1021                // Prune up to position 8 (this prunes section 0, including the commit at pos 5)
1022                // Pruning boundary will be at position 7 (start of section 1)
1023                journal.prune(8).await.unwrap();
1024                let oldest = journal.oldest_retained_pos();
1025                assert_eq!(oldest, Some(7));
1026
1027                // Add uncommitted operations with no commits (in section 1: 7-13)
1028                for i in 10..14 {
1029                    journal.append(create_operation(i)).await.unwrap();
1030                }
1031
1032                // Rewind with no matching commits after the pruning boundary
1033                // Should rewind to the pruning boundary at position 7
1034                let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1035                assert_eq!(final_size, 7);
1036            }
1037
1038            // Test 6: Empty journal
1039            {
1040                let mut journal = ContiguousJournal::init(
1041                    context.with_label("rewind_empty"),
1042                    journal_config("rewind_empty"),
1043                )
1044                .await
1045                .unwrap();
1046
1047                // Rewind empty journal should be no-op
1048                let final_size = journal
1049                    .rewind_to(|op: &Operation<Digest, Digest>| op.is_commit())
1050                    .await
1051                    .unwrap();
1052                assert_eq!(final_size, 0);
1053                assert_eq!(journal.size(), 0);
1054            }
1055
1056            // Test 7: Position based authenticated journal rewind.
1057            {
1058                let mut journal = AuthenticatedJournal::new(
1059                    context,
1060                    mmr_config("rewind"),
1061                    journal_config("rewind"),
1062                    |op| op.is_commit(),
1063                )
1064                .await
1065                .unwrap();
1066
1067                // Add operations with a commit at position 5 (in section 0: 0-6)
1068                for i in 0..5 {
1069                    journal.append(create_operation(i)).await.unwrap();
1070                }
1071                journal
1072                    .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
1073                    .await
1074                    .unwrap(); // pos 5
1075                for i in 6..10 {
1076                    journal.append(create_operation(i)).await.unwrap();
1077                }
1078                assert_eq!(journal.size(), 10);
1079
1080                journal.rewind(2).await.unwrap();
1081                assert_eq!(journal.size(), 2);
1082                assert_eq!(journal.mmr.leaves(), 2);
1083                assert_eq!(journal.mmr.size(), 3);
1084                assert_eq!(journal.pruning_boundary(), 0);
1085                assert_eq!(journal.oldest_retained_pos(), Some(0));
1086
1087                assert!(matches!(
1088                    journal.rewind(3).await,
1089                    Err(JournalError::InvalidRewind(_))
1090                ));
1091
1092                journal.rewind(0).await.unwrap();
1093                assert_eq!(journal.size(), 0);
1094                assert_eq!(journal.mmr.leaves(), 0);
1095                assert_eq!(journal.mmr.size(), 0);
1096                assert_eq!(journal.pruning_boundary(), 0);
1097                assert_eq!(journal.oldest_retained_pos(), None);
1098
1099                // Test rewinding after pruning.
1100                for i in 0..255 {
1101                    journal.append(create_operation(i)).await.unwrap();
1102                }
1103                MutableContiguous::prune(&mut journal, 100).await.unwrap();
1104                assert_eq!(journal.pruning_boundary(), 98);
1105                let res = journal.rewind(97).await;
1106                assert!(matches!(res, Err(JournalError::InvalidRewind(97))));
1107                journal.rewind(98).await.unwrap();
1108                assert_eq!(journal.size(), 98);
1109                assert_eq!(journal.mmr.leaves(), 98);
1110                assert_eq!(journal.pruning_boundary(), 98);
1111                assert_eq!(journal.oldest_retained_pos(), None);
1112            }
1113        });
1114    }
1115
1116    /// Verify that append() increments the operation count, returns correct locations, and
1117    /// operations can be read back correctly.
1118    #[test_traced("INFO")]
1119    fn test_apply_op_and_read_operations() {
1120        let executor = deterministic::Runner::default();
1121        executor.start(|context| async move {
1122            let mut journal = create_empty_journal(context, "apply_op").await;
1123
1124            assert_eq!(journal.size(), 0);
1125
1126            // Add 50 operations
1127            let expected_ops: Vec<_> = (0..50).map(|i| create_operation(i as u8)).collect();
1128            for (i, op) in expected_ops.iter().enumerate() {
1129                let loc = journal.append(op.clone()).await.unwrap();
1130                assert_eq!(loc, Location::new_unchecked(i as u64));
1131                assert_eq!(journal.size(), (i + 1) as u64);
1132            }
1133
1134            assert_eq!(journal.size(), 50);
1135
1136            // Verify all operations can be read back correctly
1137            journal.sync().await.unwrap();
1138            for (i, expected_op) in expected_ops.iter().enumerate() {
1139                let read_op = journal
1140                    .read(Location::new_unchecked(i as u64))
1141                    .await
1142                    .unwrap();
1143                assert_eq!(read_op, *expected_op);
1144            }
1145        });
1146    }
1147
1148    /// Verify that read() returns correct operations at various positions.
1149    #[test_traced("INFO")]
1150    fn test_read_operations_at_various_positions() {
1151        let executor = deterministic::Runner::default();
1152        executor.start(|context| async move {
1153            let journal = create_journal_with_ops(context, "read", 50).await;
1154
1155            // Verify reading first operation
1156            let first_op = journal.read(Location::new_unchecked(0)).await.unwrap();
1157            assert_eq!(first_op, create_operation(0));
1158
1159            // Verify reading middle operation
1160            let middle_op = journal.read(Location::new_unchecked(25)).await.unwrap();
1161            assert_eq!(middle_op, create_operation(25));
1162
1163            // Verify reading last operation
1164            let last_op = journal.read(Location::new_unchecked(49)).await.unwrap();
1165            assert_eq!(last_op, create_operation(49));
1166
1167            // Verify all operations match expected values
1168            for i in 0..50 {
1169                let op = journal.read(Location::new_unchecked(i)).await.unwrap();
1170                assert_eq!(op, create_operation(i as u8));
1171            }
1172        });
1173    }
1174
1175    /// Verify that read() returns an error for pruned operations.
1176    #[test_traced("INFO")]
1177    fn test_read_pruned_operation_returns_error() {
1178        let executor = deterministic::Runner::default();
1179        executor.start(|context| async move {
1180            let mut journal = create_journal_with_ops(context, "read_pruned", 100).await;
1181
1182            // Add commit and prune
1183            journal
1184                .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1185                .await
1186                .unwrap();
1187            journal.sync().await.unwrap();
1188            let pruned_boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1189
1190            // Try to read an operation before the pruned boundary
1191            let read_loc = Location::new_unchecked(0);
1192            if read_loc < pruned_boundary {
1193                let result = journal.read(read_loc).await;
1194                assert!(matches!(
1195                    result,
1196                    Err(Error::Journal(crate::journal::Error::ItemPruned(_)))
1197                ));
1198            }
1199        });
1200    }
1201
1202    /// Verify that read() returns an error for out-of-range locations.
1203    #[test_traced("INFO")]
1204    fn test_read_out_of_range_returns_error() {
1205        let executor = deterministic::Runner::default();
1206        executor.start(|context| async move {
1207            let journal = create_journal_with_ops(context, "read_oob", 3).await;
1208
1209            // Try to read beyond the end
1210            let result = journal.read(Location::new_unchecked(10)).await;
1211            assert!(matches!(
1212                result,
1213                Err(Error::Journal(crate::journal::Error::ItemOutOfRange(_)))
1214            ));
1215        });
1216    }
1217
1218    /// Verify that we can read all operations back correctly.
1219    #[test_traced("INFO")]
1220    fn test_read_all_operations_back_correctly() {
1221        let executor = deterministic::Runner::default();
1222        executor.start(|context| async move {
1223            let journal = create_journal_with_ops(context, "read_all", 50).await;
1224
1225            assert_eq!(journal.size(), 50);
1226
1227            // Verify all operations can be read back and match expected values
1228            for i in 0..50 {
1229                let op = journal.read(Location::new_unchecked(i)).await.unwrap();
1230                assert_eq!(op, create_operation(i as u8));
1231            }
1232        });
1233    }
1234
1235    /// Verify that sync() persists operations.
1236    #[test_traced("INFO")]
1237    fn test_sync() {
1238        let executor = deterministic::Runner::default();
1239        executor.start(|context| async move {
1240            let mut journal = create_empty_journal(context.clone(), "close_pending").await;
1241
1242            // Add 20 operations
1243            let expected_ops: Vec<_> = (0..20).map(|i| create_operation(i as u8)).collect();
1244            for (i, op) in expected_ops.iter().enumerate() {
1245                let loc = journal.append(op.clone()).await.unwrap();
1246                assert_eq!(loc, Location::new_unchecked(i as u64),);
1247            }
1248
1249            // Add commit operation to commit the operations
1250            let commit_loc = journal
1251                .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
1252                .await
1253                .unwrap();
1254            assert_eq!(
1255                commit_loc,
1256                Location::new_unchecked(20),
1257                "commit should be at location 20"
1258            );
1259            journal.sync().await.unwrap();
1260
1261            // Reopen and verify the operations persisted
1262            drop(journal);
1263            let journal = create_empty_journal(context, "close_pending").await;
1264            assert_eq!(journal.size(), 21);
1265
1266            // Verify all operations can be read back
1267            for (i, expected_op) in expected_ops.iter().enumerate() {
1268                let read_op = journal
1269                    .read(Location::new_unchecked(i as u64))
1270                    .await
1271                    .unwrap();
1272                assert_eq!(read_op, *expected_op);
1273            }
1274        });
1275    }
1276
1277    /// Verify that pruning an empty journal returns the boundary.
1278    #[test_traced("INFO")]
1279    fn test_prune_empty_journal() {
1280        let executor = deterministic::Runner::default();
1281        executor.start(|context| async move {
1282            let mut journal = create_empty_journal(context, "prune_empty").await;
1283
1284            let boundary = journal.prune(Location::new_unchecked(0)).await.unwrap();
1285
1286            assert_eq!(boundary, Location::new_unchecked(0));
1287        });
1288    }
1289
1290    /// Verify that pruning to a specific location works correctly.
1291    #[test_traced("INFO")]
1292    fn test_prune_to_location() {
1293        let executor = deterministic::Runner::default();
1294        executor.start(|context| async move {
1295            let mut journal = create_journal_with_ops(context, "prune_to", 100).await;
1296
1297            // Add commit at position 50
1298            journal
1299                .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1300                .await
1301                .unwrap();
1302            journal.sync().await.unwrap();
1303
1304            let boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1305
1306            // Boundary should be <= requested location (may align to section boundary)
1307            assert!(boundary <= Location::new_unchecked(50));
1308        });
1309    }
1310
1311    /// Verify that prune() returns the actual boundary (which may differ from requested).
1312    #[test_traced("INFO")]
1313    fn test_prune_returns_actual_boundary() {
1314        let executor = deterministic::Runner::default();
1315        executor.start(|context| async move {
1316            let mut journal = create_journal_with_ops(context, "prune_boundary", 100).await;
1317
1318            journal
1319                .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1320                .await
1321                .unwrap();
1322            journal.sync().await.unwrap();
1323
1324            let requested = Location::new_unchecked(50);
1325            let actual = journal.prune(requested).await.unwrap();
1326
1327            // Actual boundary should match oldest_retained_loc
1328            let oldest = journal.oldest_retained_loc().unwrap();
1329            assert_eq!(actual, oldest);
1330
1331            // Actual may be <= requested due to section alignment
1332            assert!(actual <= requested);
1333        });
1334    }
1335
1336    /// Verify that pruning doesn't change the operation count.
1337    #[test_traced("INFO")]
1338    fn test_prune_preserves_operation_count() {
1339        let executor = deterministic::Runner::default();
1340        executor.start(|context| async move {
1341            let mut journal = create_journal_with_ops(context, "prune_count", 100).await;
1342
1343            journal
1344                .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1345                .await
1346                .unwrap();
1347            journal.sync().await.unwrap();
1348
1349            let count_before = journal.size();
1350            journal.prune(Location::new_unchecked(50)).await.unwrap();
1351            let count_after = journal.size();
1352
1353            assert_eq!(count_before, count_after);
1354        });
1355    }
1356
1357    /// Verify oldest_retained_loc() for empty journal, no pruning, and after pruning.
1358    #[test_traced("INFO")]
1359    fn test_oldest_retained_loc() {
1360        let executor = deterministic::Runner::default();
1361        executor.start(|context| async move {
1362            // Test empty journal
1363            let journal = create_empty_journal(context.clone(), "oldest").await;
1364            let oldest = journal.oldest_retained_loc();
1365            assert_eq!(oldest, None);
1366
1367            // Test no pruning
1368            let journal = create_journal_with_ops(context.clone(), "oldest", 100).await;
1369            let oldest = journal.oldest_retained_loc();
1370            assert_eq!(oldest, Some(Location::new_unchecked(0)));
1371
1372            // Test after pruning
1373            let mut journal = create_journal_with_ops(context, "oldest", 100).await;
1374            journal
1375                .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1376                .await
1377                .unwrap();
1378            journal.sync().await.unwrap();
1379
1380            let pruned_boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1381
1382            let oldest_loc = journal.oldest_retained_loc().unwrap();
1383            // Should match the pruned boundary (may be <= 50 due to section alignment)
1384            assert_eq!(oldest_loc, pruned_boundary);
1385            // Should be <= requested location (50)
1386            assert!(oldest_loc <= Location::new_unchecked(50));
1387        });
1388    }
1389
1390    /// Verify pruning_boundary() for empty journal, no pruning, and after pruning.
1391    #[test_traced("INFO")]
1392    fn test_pruning_boundary() {
1393        let executor = deterministic::Runner::default();
1394        executor.start(|context| async move {
1395            // Test empty journal
1396            let journal = create_empty_journal(context.clone(), "boundary").await;
1397            let boundary = journal.pruning_boundary();
1398            assert_eq!(boundary, Location::new_unchecked(0));
1399
1400            // Test no pruning
1401            let journal = create_journal_with_ops(context.clone(), "boundary", 100).await;
1402            let boundary = journal.pruning_boundary();
1403            assert_eq!(boundary, Location::new_unchecked(0));
1404
1405            // Test after pruning
1406            let mut journal = create_journal_with_ops(context, "boundary", 100).await;
1407            journal
1408                .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1409                .await
1410                .unwrap();
1411            journal.sync().await.unwrap();
1412
1413            let pruned_boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1414
1415            let boundary = journal.pruning_boundary();
1416            assert_eq!(boundary, pruned_boundary);
1417        });
1418    }
1419
1420    /// Verify that MMR prunes to the journal's actual boundary, not the requested location.
1421    #[test_traced("INFO")]
1422    fn test_mmr_prunes_to_journal_boundary() {
1423        let executor = deterministic::Runner::default();
1424        executor.start(|context| async move {
1425            let mut journal = create_journal_with_ops(context, "mmr_boundary", 50).await;
1426
1427            journal
1428                .append(Operation::CommitFloor(None, Location::new_unchecked(25)))
1429                .await
1430                .unwrap();
1431            journal.sync().await.unwrap();
1432
1433            let pruned_boundary = journal.prune(Location::new_unchecked(25)).await.unwrap();
1434
1435            // Verify MMR and journal remain in sync
1436            let oldest_retained = journal.oldest_retained_loc();
1437            assert_eq!(Some(pruned_boundary), oldest_retained);
1438
1439            // Verify boundary is at or before requested (due to section alignment)
1440            assert!(pruned_boundary <= Location::new_unchecked(25));
1441
1442            // Verify operation count is unchanged
1443            assert_eq!(journal.size(), 51);
1444        });
1445    }
1446
1447    /// Verify proof() for multiple operations.
1448    #[test_traced("INFO")]
1449    fn test_proof_multiple_operations() {
1450        let executor = deterministic::Runner::default();
1451        executor.start(|context| async move {
1452            let journal = create_journal_with_ops(context, "proof_multi", 50).await;
1453
1454            let (proof, ops) = journal
1455                .proof(Location::new_unchecked(0), NZU64!(50))
1456                .await
1457                .unwrap();
1458
1459            assert_eq!(ops.len(), 50);
1460            for (i, op) in ops.iter().enumerate() {
1461                assert_eq!(*op, create_operation(i as u8));
1462            }
1463
1464            // Verify the proof is valid
1465            let mut hasher = StandardHasher::new();
1466            let root = journal.root();
1467            assert!(verify_proof(
1468                &proof,
1469                &ops,
1470                Location::new_unchecked(0),
1471                &root,
1472                &mut hasher
1473            ));
1474        });
1475    }
1476
1477    /// Verify that historical_proof() respects the max_ops limit.
1478    #[test_traced("INFO")]
1479    fn test_historical_proof_limited_by_max_ops() {
1480        let executor = deterministic::Runner::default();
1481        executor.start(|context| async move {
1482            let journal = create_journal_with_ops(context, "proof_limit", 50).await;
1483
1484            let size = journal.size();
1485            let (proof, ops) = journal
1486                .historical_proof(size, Location::new_unchecked(0), NZU64!(20))
1487                .await
1488                .unwrap();
1489
1490            // Should return only 20 operations despite 50 being available
1491            assert_eq!(ops.len(), 20);
1492            for (i, op) in ops.iter().enumerate() {
1493                assert_eq!(*op, create_operation(i as u8));
1494            }
1495
1496            // Verify the proof is valid
1497            let mut hasher = StandardHasher::new();
1498            let root = journal.root();
1499            assert!(verify_proof(
1500                &proof,
1501                &ops,
1502                Location::new_unchecked(0),
1503                &root,
1504                &mut hasher
1505            ));
1506        });
1507    }
1508
1509    /// Verify historical_proof() at the end of the journal.
1510    #[test_traced("INFO")]
1511    fn test_historical_proof_at_end_of_journal() {
1512        let executor = deterministic::Runner::default();
1513        executor.start(|context| async move {
1514            let journal = create_journal_with_ops(context, "proof_end", 50).await;
1515
1516            let size = journal.size();
1517            // Request proof starting near the end
1518            let (proof, ops) = journal
1519                .historical_proof(size, Location::new_unchecked(40), NZU64!(20))
1520                .await
1521                .unwrap();
1522
1523            // Should return only 10 operations (positions 40-49)
1524            assert_eq!(ops.len(), 10);
1525            for (i, op) in ops.iter().enumerate() {
1526                assert_eq!(*op, create_operation((40 + i) as u8));
1527            }
1528
1529            // Verify the proof is valid
1530            let mut hasher = StandardHasher::new();
1531            let root = journal.root();
1532            assert!(verify_proof(
1533                &proof,
1534                &ops,
1535                Location::new_unchecked(40),
1536                &root,
1537                &mut hasher
1538            ));
1539        });
1540    }
1541
1542    /// Verify that historical_proof() returns an error for invalid size.
1543    #[test_traced("INFO")]
1544    fn test_historical_proof_out_of_range_returns_error() {
1545        let executor = deterministic::Runner::default();
1546        executor.start(|context| async move {
1547            let journal = create_journal_with_ops(context, "proof_oob", 5).await;
1548
1549            // Request proof with size > actual journal size
1550            let result = journal
1551                .historical_proof(
1552                    Location::new_unchecked(10),
1553                    Location::new_unchecked(0),
1554                    NZU64!(1),
1555                )
1556                .await;
1557
1558            assert!(matches!(
1559                result,
1560                Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
1561            ));
1562        });
1563    }
1564
1565    /// Verify that historical_proof() returns an error when start_loc >= size.
1566    #[test_traced("INFO")]
1567    fn test_historical_proof_start_too_large_returns_error() {
1568        let executor = deterministic::Runner::default();
1569        executor.start(|context| async move {
1570            let journal = create_journal_with_ops(context, "proof_start_oob", 5).await;
1571
1572            let size = journal.size();
1573            // Request proof starting at size (should fail)
1574            let result = journal.historical_proof(size, size, NZU64!(1)).await;
1575
1576            assert!(matches!(
1577                result,
1578                Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
1579            ));
1580        });
1581    }
1582
1583    /// Verify historical_proof() for a truly historical state (before more operations added).
1584    #[test_traced("INFO")]
1585    fn test_historical_proof_truly_historical() {
1586        let executor = deterministic::Runner::default();
1587        executor.start(|context| async move {
1588            // Create journal with initial operations
1589            let mut journal = create_journal_with_ops(context, "proof_historical", 50).await;
1590
1591            // Capture root at historical state
1592            let mut hasher = StandardHasher::new();
1593            let historical_root = journal.root();
1594            let historical_size = journal.size();
1595
1596            // Add more operations after the historical state
1597            for i in 50..100 {
1598                journal.append(create_operation(i as u8)).await.unwrap();
1599            }
1600            journal.sync().await.unwrap();
1601
1602            // Generate proof for the historical state
1603            let (proof, ops) = journal
1604                .historical_proof(historical_size, Location::new_unchecked(0), NZU64!(50))
1605                .await
1606                .unwrap();
1607
1608            // Verify operations match expected historical operations
1609            assert_eq!(ops.len(), 50);
1610            for (i, op) in ops.iter().enumerate() {
1611                assert_eq!(*op, create_operation(i as u8));
1612            }
1613
1614            // Verify the proof is valid against the historical root
1615            assert!(verify_proof(
1616                &proof,
1617                &ops,
1618                Location::new_unchecked(0),
1619                &historical_root,
1620                &mut hasher
1621            ));
1622        });
1623    }
1624
1625    /// Verify that historical_proof() returns an error when start_loc is pruned.
1626    #[test_traced("INFO")]
1627    fn test_historical_proof_pruned_location_returns_error() {
1628        let executor = deterministic::Runner::default();
1629        executor.start(|context| async move {
1630            let mut journal = create_journal_with_ops(context, "proof_pruned", 50).await;
1631
1632            journal
1633                .append(Operation::CommitFloor(None, Location::new_unchecked(25)))
1634                .await
1635                .unwrap();
1636            journal.sync().await.unwrap();
1637            let pruned_boundary = journal.prune(Location::new_unchecked(25)).await.unwrap();
1638
1639            // Try to get proof starting at a location before the pruned boundary
1640            let size = journal.size();
1641            let start_loc = Location::new_unchecked(0);
1642            if start_loc < pruned_boundary {
1643                let result = journal.historical_proof(size, start_loc, NZU64!(1)).await;
1644
1645                // Should fail when trying to read pruned operations
1646                assert!(result.is_err());
1647            }
1648        });
1649    }
1650
1651    /// Verify replay() with empty journal and multiple operations.
1652    #[test_traced("INFO")]
1653    fn test_replay_operations() {
1654        let executor = deterministic::Runner::default();
1655        executor.start(|context| async move {
1656            // Test empty journal
1657            let journal = create_empty_journal(context.clone(), "replay").await;
1658            let stream = journal.replay(0, NZUsize!(10)).await.unwrap();
1659            futures::pin_mut!(stream);
1660            assert!(stream.next().await.is_none());
1661
1662            // Test replaying all operations
1663            let journal = create_journal_with_ops(context, "replay", 50).await;
1664            let stream = journal.replay(0, NZUsize!(100)).await.unwrap();
1665            futures::pin_mut!(stream);
1666
1667            for i in 0..50 {
1668                let (pos, op) = stream.next().await.unwrap().unwrap();
1669                assert_eq!(pos, i);
1670                assert_eq!(op, create_operation(i as u8));
1671            }
1672
1673            assert!(stream.next().await.is_none());
1674        });
1675    }
1676
1677    /// Verify replay() starting from a middle location.
1678    #[test_traced("INFO")]
1679    fn test_replay_from_middle() {
1680        let executor = deterministic::Runner::default();
1681        executor.start(|context| async move {
1682            let journal = create_journal_with_ops(context, "replay_middle", 50).await;
1683            let stream = journal.replay(25, NZUsize!(100)).await.unwrap();
1684            futures::pin_mut!(stream);
1685
1686            let mut count = 0;
1687            while let Some(result) = stream.next().await {
1688                let (pos, op) = result.unwrap();
1689                assert_eq!(pos, 25 + count);
1690                assert_eq!(op, create_operation((25 + count) as u8));
1691                count += 1;
1692            }
1693
1694            // Should have replayed positions 25-49 (25 operations)
1695            assert_eq!(count, 25);
1696        });
1697    }
1698}