1use crate::{
9 journal::{
10 contiguous::{fixed, variable, Contiguous, MutableContiguous, PersistableContiguous},
11 Error as JournalError,
12 },
13 mmr::{
14 journaled::{CleanMmr, Mmr},
15 mem::{Clean, Dirty, State},
16 Location, Position, Proof, StandardHasher,
17 },
18};
19use commonware_codec::{Codec, CodecFixed, Encode};
20use commonware_cryptography::{DigestOf, Hasher};
21use commonware_runtime::{Clock, Metrics, Storage};
22use core::num::{NonZeroU64, NonZeroUsize};
23use futures::{future::try_join_all, try_join, TryFutureExt as _};
24use thiserror::Error;
25use tracing::{debug, warn};
26
27#[derive(Error, Debug)]
29pub enum Error {
30 #[error("mmr error: {0}")]
31 Mmr(#[from] crate::mmr::Error),
32
33 #[error("journal error: {0}")]
34 Journal(#[from] super::Error),
35}
36pub struct Journal<E, C, H, S: State<H::Digest> = Dirty>
41where
42 E: Storage + Clock + Metrics,
43 C: Contiguous<Item: Encode>,
44 H: Hasher,
45{
46 pub(crate) mmr: Mmr<E, H::Digest, S>,
49
50 pub(crate) journal: C,
53
54 pub(crate) hasher: StandardHasher<H>,
55}
56
57impl<E, C, H, S> Journal<E, C, H, S>
58where
59 E: Storage + Clock + Metrics,
60 C: Contiguous<Item: Encode>,
61 H: Hasher,
62 S: State<DigestOf<H>>,
63{
64 pub fn size(&self) -> Location {
66 Location::new_unchecked(self.journal.size())
67 }
68
69 pub fn oldest_retained_loc(&self) -> Option<Location> {
71 self.journal
72 .oldest_retained_pos()
73 .map(Location::new_unchecked)
74 }
75
76 pub fn pruning_boundary(&self) -> Location {
78 self.journal.pruning_boundary().into()
79 }
80
81 pub async fn read(&self, loc: Location) -> Result<C::Item, Error> {
83 self.journal.read(*loc).await.map_err(Error::Journal)
84 }
85}
86
87impl<E, C, H, S> Journal<E, C, H, S>
88where
89 E: Storage + Clock + Metrics,
90 C: MutableContiguous<Item: Encode>,
91 H: Hasher,
92 S: State<DigestOf<H>>,
93{
94 pub async fn append(&mut self, item: C::Item) -> Result<Location, Error> {
95 let encoded_item = item.encode();
96
97 let (_, loc) = try_join!(
99 self.mmr
100 .add(&mut self.hasher, &encoded_item)
101 .map_err(Error::Mmr),
102 self.journal.append(item).map_err(Into::into)
103 )?;
104
105 Ok(Location::new_unchecked(loc))
106 }
107}
108
109impl<E, C, H, S> Journal<E, C, H, S>
110where
111 E: Storage + Clock + Metrics,
112 C: PersistableContiguous<Item: Encode>,
113 H: Hasher,
114 S: State<DigestOf<H>>,
115{
116 pub async fn commit(&mut self) -> Result<(), Error> {
119 self.journal.commit().await.map_err(Error::Journal)
120 }
121}
122
123impl<E, C, H> Journal<E, C, H, Clean<H::Digest>>
124where
125 E: Storage + Clock + Metrics,
126 C: MutableContiguous<Item: Encode>,
127 H: Hasher,
128{
129 pub async fn from_components(
131 mmr: CleanMmr<E, H::Digest>,
132 journal: C,
133 mut hasher: StandardHasher<H>,
134 apply_batch_size: u64,
135 ) -> Result<Self, Error> {
136 let mut mmr = Self::align(mmr, &journal, &mut hasher, apply_batch_size).await?;
137
138 mmr.sync().await?;
141
142 Ok(Self {
143 mmr,
144 journal,
145 hasher,
146 })
147 }
148
149 async fn align(
153 mut mmr: CleanMmr<E, H::Digest>,
154 journal: &C,
155 hasher: &mut StandardHasher<H>,
156 apply_batch_size: u64,
157 ) -> Result<CleanMmr<E, H::Digest>, Error> {
158 let journal_size = journal.size();
161 let mut mmr_size = mmr.leaves();
162 if mmr_size > journal_size {
163 let pop_count = mmr_size - journal_size;
164 warn!(journal_size, ?pop_count, "popping MMR items");
165 mmr.pop(hasher, *pop_count as usize).await?;
166 mmr_size = Location::new_unchecked(journal_size);
167 }
168
169 if mmr_size < journal_size {
171 let replay_count = journal_size - *mmr_size;
172 warn!(
173 journal_size,
174 replay_count, "MMR lags behind journal, replaying journal to catch up"
175 );
176
177 let mut mmr = mmr.into_dirty();
178 let mut batch_size = 0;
179 while mmr_size < journal_size {
180 let op = journal.read(*mmr_size).await?;
181 mmr.add(hasher, &op.encode()).await?;
182 mmr_size += 1;
183 batch_size += 1;
184 if batch_size >= apply_batch_size {
185 mmr = mmr.merkleize(hasher).into_dirty();
186 batch_size = 0;
187 }
188 }
189 return Ok(mmr.merkleize(hasher));
190 }
191
192 assert_eq!(journal.size(), mmr.leaves());
194
195 Ok(mmr)
196 }
197
198 pub async fn prune(&mut self, prune_loc: Location) -> Result<Location, Error> {
203 if self.mmr.size() == 0 {
204 return Ok(self.pruning_boundary());
206 }
207
208 self.mmr.sync().await?;
212
213 if !self.journal.prune(*prune_loc).await? {
215 return Ok(self.pruning_boundary());
216 }
217
218 let pruning_boundary = self.pruning_boundary();
219 let size = self.size();
220 debug!(?size, ?prune_loc, ?pruning_boundary, "pruned inactive ops");
221
222 self.mmr
224 .prune_to_pos(Position::try_from(pruning_boundary)?)
225 .await?;
226
227 Ok(pruning_boundary)
228 }
229}
230
231impl<E, C, H> Journal<E, C, H, Clean<H::Digest>>
232where
233 E: Storage + Clock + Metrics,
234 C: Contiguous<Item: Encode>,
235 H: Hasher,
236{
237 pub async fn proof(
251 &self,
252 start_loc: Location,
253 max_ops: NonZeroU64,
254 ) -> Result<(Proof<H::Digest>, Vec<C::Item>), Error> {
255 self.historical_proof(self.size(), start_loc, max_ops).await
256 }
257
258 pub async fn historical_proof(
273 &self,
274 historical_size: Location,
275 start_loc: Location,
276 max_ops: NonZeroU64,
277 ) -> Result<(Proof<H::Digest>, Vec<C::Item>), Error> {
278 let size = self.size();
279 if historical_size > size {
280 return Err(crate::mmr::Error::RangeOutOfBounds(size).into());
281 }
282 if start_loc >= historical_size {
283 return Err(crate::mmr::Error::RangeOutOfBounds(start_loc).into());
284 }
285 let end_loc = std::cmp::min(historical_size, start_loc.saturating_add(max_ops.get()));
286
287 let mmr_size = Position::try_from(historical_size)?;
288 let proof = self
289 .mmr
290 .historical_range_proof(mmr_size, start_loc..end_loc)
291 .await?;
292
293 let mut ops = Vec::with_capacity((*end_loc - *start_loc) as usize);
294 let futures = (*start_loc..*end_loc)
295 .map(|i| self.journal.read(i))
296 .collect::<Vec<_>>();
297 try_join_all(futures)
298 .await?
299 .into_iter()
300 .for_each(|op| ops.push(op));
301
302 Ok((proof, ops))
303 }
304
305 pub const fn root(&self) -> H::Digest {
307 self.mmr.root()
308 }
309
310 pub fn into_dirty(self) -> Journal<E, C, H, Dirty> {
312 Journal {
313 mmr: self.mmr.into_dirty(),
314 journal: self.journal,
315 hasher: self.hasher,
316 }
317 }
318}
319
320impl<E, C, H> Journal<E, C, H, Clean<H::Digest>>
321where
322 E: Storage + Clock + Metrics,
323 C: PersistableContiguous<Item: Encode>,
324 H: Hasher,
325{
326 pub async fn close(self) -> Result<(), Error> {
328 try_join!(
329 self.journal.close().map_err(Error::Journal),
330 self.mmr.close().map_err(Error::Mmr),
331 )?;
332 Ok(())
333 }
334
335 pub async fn destroy(self) -> Result<(), Error> {
337 try_join!(
338 self.journal.destroy().map_err(Error::Journal),
339 self.mmr.destroy().map_err(Error::Mmr),
340 )?;
341 Ok(())
342 }
343
344 pub async fn sync(&mut self) -> Result<(), Error> {
346 try_join!(
347 self.journal.sync().map_err(Error::Journal),
348 self.mmr.sync().map_err(Into::into)
349 )?;
350
351 Ok(())
352 }
353}
354
355impl<E, C, H> Journal<E, C, H, Dirty>
356where
357 E: Storage + Clock + Metrics,
358 C: Contiguous<Item: Encode>,
359 H: Hasher,
360{
361 pub fn merkleize(self) -> Journal<E, C, H, Clean<H::Digest>> {
363 let Self {
364 mmr,
365 journal,
366 mut hasher,
367 } = self;
368 Journal {
369 mmr: mmr.merkleize(&mut hasher),
370 journal,
371 hasher,
372 }
373 }
374}
375
376impl<E, C, H> Journal<E, C, H, Dirty>
377where
378 E: Storage + Clock + Metrics,
379 C: MutableContiguous<Item: Encode>,
380 H: Hasher,
381{
382 pub async fn from_components(
384 mmr: CleanMmr<E, H::Digest>,
385 journal: C,
386 hasher: StandardHasher<H>,
387 apply_batch_size: u64,
388 ) -> Result<Self, Error> {
389 let clean = Journal::<E, C, H, Clean<H::Digest>>::from_components(
390 mmr,
391 journal,
392 hasher,
393 apply_batch_size,
394 )
395 .await?;
396 Ok(clean.into_dirty())
397 }
398}
399
400const APPLY_BATCH_SIZE: u64 = 1 << 16;
402
403impl<E, O, H> Journal<E, fixed::Journal<E, O>, H, Clean<H::Digest>>
404where
405 E: Storage + Clock + Metrics,
406 O: CodecFixed<Cfg = ()>,
407 H: Hasher,
408{
409 pub async fn new(
414 context: E,
415 mmr_cfg: crate::mmr::journaled::Config,
416 journal_cfg: fixed::Config,
417 rewind_predicate: fn(&O) -> bool,
418 ) -> Result<Self, Error> {
419 let mut journal = fixed::Journal::init(context.with_label("journal"), journal_cfg).await?;
420
421 journal.rewind_to(rewind_predicate).await?;
423
424 let mut hasher = StandardHasher::<H>::new();
426 let mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_cfg).await?;
427 let mut mmr = Self::align(mmr, &journal, &mut hasher, APPLY_BATCH_SIZE).await?;
428
429 journal.sync().await?;
432 mmr.sync().await?;
433
434 Ok(Self {
435 mmr,
436 journal,
437 hasher,
438 })
439 }
440}
441
442impl<E, O, H> Journal<E, variable::Journal<E, O>, H, Clean<H::Digest>>
443where
444 E: Storage + Clock + Metrics,
445 O: Codec + Encode,
446 H: Hasher,
447{
448 pub async fn new(
453 context: E,
454 mmr_cfg: crate::mmr::journaled::Config,
455 journal_cfg: variable::Config<O::Cfg>,
456 rewind_predicate: fn(&O) -> bool,
457 ) -> Result<Self, Error> {
458 let mut hasher = StandardHasher::<H>::new();
459 let mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_cfg).await?;
460 let mut journal =
461 variable::Journal::init(context.with_label("journal"), journal_cfg).await?;
462
463 journal.rewind_to(rewind_predicate).await?;
465
466 let mut mmr = Self::align(mmr, &journal, &mut hasher, APPLY_BATCH_SIZE).await?;
468
469 journal.sync().await?;
472 mmr.sync().await?;
473
474 Ok(Self {
475 mmr,
476 journal,
477 hasher,
478 })
479 }
480}
481
482impl<E, C, H, S> Contiguous for Journal<E, C, H, S>
483where
484 E: Storage + Clock + Metrics,
485 C: MutableContiguous<Item: Encode>,
486 H: Hasher,
487 S: State<DigestOf<H>>,
488{
489 type Item = C::Item;
490
491 fn size(&self) -> u64 {
492 self.journal.size()
493 }
494
495 fn oldest_retained_pos(&self) -> Option<u64> {
496 self.journal.oldest_retained_pos()
497 }
498
499 fn pruning_boundary(&self) -> u64 {
500 self.journal.pruning_boundary()
501 }
502
503 async fn replay(
504 &self,
505 start_pos: u64,
506 buffer: NonZeroUsize,
507 ) -> Result<
508 impl futures::Stream<Item = Result<(u64, Self::Item), JournalError>> + '_,
509 JournalError,
510 > {
511 self.journal.replay(start_pos, buffer).await
512 }
513
514 async fn read(&self, position: u64) -> Result<Self::Item, JournalError> {
515 self.journal.read(position).await
516 }
517}
518
519impl<E, C, H> MutableContiguous for Journal<E, C, H, Dirty>
520where
521 E: Storage + Clock + Metrics,
522 C: MutableContiguous<Item: Encode>,
523 H: Hasher,
524{
525 async fn append(&mut self, item: Self::Item) -> Result<u64, JournalError> {
526 let res = self.append(item).await.map_err(|e| match e {
527 Error::Journal(inner) => inner,
528 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
529 })?;
530
531 Ok(*res)
532 }
533
534 async fn prune(&mut self, min_position: u64) -> Result<bool, JournalError> {
535 self.journal.prune(min_position).await
536 }
537
538 async fn rewind(&mut self, size: u64) -> Result<(), JournalError> {
539 self.journal.rewind(size).await?;
540
541 let leaves = *self.mmr.leaves();
542 if leaves > size {
543 self.mmr
544 .pop((leaves - size) as usize)
545 .await
546 .map_err(|error| JournalError::Mmr(anyhow::Error::from(error)))?;
547 }
548
549 Ok(())
550 }
551}
552
553impl<E, C, H> MutableContiguous for Journal<E, C, H, Clean<H::Digest>>
554where
555 E: Storage + Clock + Metrics,
556 C: MutableContiguous<Item: Encode>,
557 H: Hasher,
558{
559 async fn append(&mut self, item: Self::Item) -> Result<u64, JournalError> {
560 let loc = self.append(item).await.map_err(|e| match e {
561 Error::Journal(inner) => inner,
562 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
563 })?;
564
565 Ok(*loc)
566 }
567
568 async fn prune(&mut self, min_position: u64) -> Result<bool, JournalError> {
569 let old_pruning_boundary = self.pruning_boundary();
570 let pruning_boundary = self
571 .prune(Location::new_unchecked(min_position))
572 .await
573 .map_err(|e| match e {
574 Error::Journal(inner) => inner,
575 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
576 })?;
577
578 Ok(old_pruning_boundary != pruning_boundary)
579 }
580
581 async fn rewind(&mut self, size: u64) -> Result<(), JournalError> {
582 self.journal.rewind(size).await?;
583
584 let leaves = *self.mmr.leaves();
585 if leaves > size {
586 self.mmr
587 .pop(&mut self.hasher, (leaves - size) as usize)
588 .await
589 .map_err(|error| JournalError::Mmr(anyhow::Error::from(error)))?;
590 }
591
592 Ok(())
593 }
594}
595
596impl<E, C, H> PersistableContiguous for Journal<E, C, H, Clean<H::Digest>>
597where
598 E: Storage + Clock + Metrics,
599 C: PersistableContiguous<Item: Encode>,
600 H: Hasher,
601{
602 async fn commit(&mut self) -> Result<(), JournalError> {
603 self.commit().await.map_err(|e| match e {
604 Error::Journal(inner) => inner,
605 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
606 })
607 }
608
609 async fn sync(&mut self) -> Result<(), JournalError> {
610 self.sync().await.map_err(|e| match e {
611 Error::Journal(inner) => inner,
612 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
613 })
614 }
615
616 async fn close(self) -> Result<(), JournalError> {
617 self.close().await.map_err(|e| match e {
618 Error::Journal(inner) => inner,
619 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
620 })
621 }
622
623 async fn destroy(self) -> Result<(), JournalError> {
624 self.destroy().await.map_err(|e| match e {
625 Error::Journal(inner) => inner,
626 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
627 })
628 }
629}
630
631#[cfg(test)]
632mod tests {
633 use super::*;
634 use crate::{
635 journal::contiguous::fixed::{Config as JConfig, Journal as ContiguousJournal},
636 mmr::{
637 journaled::{Config as MmrConfig, Mmr},
638 Location,
639 },
640 qmdb::{
641 any::unordered::{fixed::Operation, Update},
642 operation::Committable,
643 },
644 };
645 use commonware_codec::Encode;
646 use commonware_cryptography::{sha256, sha256::Digest, Sha256};
647 use commonware_macros::test_traced;
648 use commonware_runtime::{
649 buffer::PoolRef,
650 deterministic::{self, Context},
651 Runner as _,
652 };
653 use commonware_utils::{NZUsize, NZU64};
654 use futures::StreamExt as _;
655
656 const PAGE_SIZE: usize = 101;
657 const PAGE_CACHE_SIZE: usize = 11;
658
659 fn mmr_config(suffix: &str) -> MmrConfig {
661 MmrConfig {
662 journal_partition: format!("mmr_journal_{suffix}"),
663 metadata_partition: format!("mmr_metadata_{suffix}"),
664 items_per_blob: NZU64!(11),
665 write_buffer: NZUsize!(1024),
666 thread_pool: None,
667 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
668 }
669 }
670
671 fn journal_config(suffix: &str) -> JConfig {
673 JConfig {
674 partition: format!("journal_{suffix}"),
675 items_per_blob: NZU64!(7),
676 write_buffer: NZUsize!(1024),
677 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
678 }
679 }
680
681 type AuthenticatedJournal = Journal<
682 deterministic::Context,
683 ContiguousJournal<deterministic::Context, Operation<Digest, Digest>>,
684 Sha256,
685 Clean<sha256::Digest>,
686 >;
687
688 async fn create_empty_journal(context: Context, suffix: &str) -> AuthenticatedJournal {
690 AuthenticatedJournal::new(
691 context,
692 mmr_config(suffix),
693 journal_config(suffix),
694 |op: &Operation<Digest, Digest>| op.is_commit(),
695 )
696 .await
697 .unwrap()
698 }
699
700 fn create_operation(index: u8) -> Operation<Digest, Digest> {
702 Operation::Update(Update(
703 Sha256::fill(index),
704 Sha256::fill(index.wrapping_add(1)),
705 ))
706 }
707
708 async fn create_journal_with_ops(
712 context: Context,
713 suffix: &str,
714 count: usize,
715 ) -> AuthenticatedJournal {
716 let mut journal = create_empty_journal(context, suffix).await;
717
718 for i in 0..count {
719 let op = create_operation(i as u8);
720 let loc = journal.append(op).await.unwrap();
721 assert_eq!(loc, Location::new_unchecked(i as u64));
722 }
723
724 journal.sync().await.unwrap();
725 journal
726 }
727
728 async fn create_components(
733 context: Context,
734 suffix: &str,
735 ) -> (
736 CleanMmr<deterministic::Context, sha256::Digest>,
737 ContiguousJournal<deterministic::Context, Operation<Digest, Digest>>,
738 StandardHasher<Sha256>,
739 ) {
740 let mut hasher = StandardHasher::new();
741 let mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_config(suffix))
742 .await
743 .unwrap();
744 let journal =
745 ContiguousJournal::init(context.with_label("journal"), journal_config(suffix))
746 .await
747 .unwrap();
748 (mmr, journal, hasher)
749 }
750
751 fn verify_proof(
753 proof: &crate::mmr::Proof<<Sha256 as commonware_cryptography::Hasher>::Digest>,
754 operations: &[Operation<Digest, Digest>],
755 start_loc: Location,
756 root: &<Sha256 as commonware_cryptography::Hasher>::Digest,
757 hasher: &mut StandardHasher<Sha256>,
758 ) -> bool {
759 let encoded_ops: Vec<_> = operations.iter().map(|op| op.encode()).collect();
760 proof.verify_range_inclusion(hasher, &encoded_ops, start_loc, root)
761 }
762
763 #[test_traced("INFO")]
765 fn test_new_creates_empty_journal() {
766 let executor = deterministic::Runner::default();
767 executor.start(|context| async move {
768 let journal = create_empty_journal(context, "new_empty").await;
769
770 assert_eq!(journal.size(), 0);
771 assert_eq!(journal.pruning_boundary(), 0);
772 assert_eq!(journal.oldest_retained_pos(), None);
773 });
774 }
775
776 #[test_traced("INFO")]
778 fn test_align_with_empty_mmr_and_journal() {
779 let executor = deterministic::Runner::default();
780 executor.start(|context| async move {
781 let (mmr, journal, mut hasher) = create_components(context, "align_empty").await;
782
783 let mmr = Journal::align(mmr, &journal, &mut hasher, APPLY_BATCH_SIZE)
784 .await
785 .unwrap();
786
787 assert_eq!(mmr.leaves(), Location::new_unchecked(0));
788 assert_eq!(journal.size(), Location::new_unchecked(0));
789 });
790 }
791
792 #[test_traced("WARN")]
794 fn test_align_when_mmr_ahead() {
795 let executor = deterministic::Runner::default();
796 executor.start(|context| async move {
797 let (mut mmr, mut journal, mut hasher) = create_components(context, "mmr_ahead").await;
798
799 for i in 0..20 {
801 let op = create_operation(i as u8);
802 let encoded = op.encode();
803 mmr.add(&mut hasher, &encoded).await.unwrap();
804 journal.append(op).await.unwrap();
805 }
806
807 let commit_op = Operation::CommitFloor(None, Location::new_unchecked(0));
809 journal.append(commit_op).await.unwrap();
810 journal.sync().await.unwrap();
811
812 let mmr = Journal::align(mmr, &journal, &mut hasher, APPLY_BATCH_SIZE)
814 .await
815 .unwrap();
816
817 assert_eq!(mmr.leaves(), Location::new_unchecked(21));
819 assert_eq!(journal.size(), Location::new_unchecked(21));
820 });
821 }
822
823 #[test_traced("WARN")]
825 fn test_align_when_journal_ahead() {
826 let executor = deterministic::Runner::default();
827 executor.start(|context| async move {
828 let (mut mmr, mut journal, mut hasher) =
829 create_components(context, "journal_ahead").await;
830
831 for i in 0..20 {
833 let op = create_operation(i as u8);
834 journal.append(op).await.unwrap();
835 }
836
837 let commit_op = Operation::CommitFloor(None, Location::new_unchecked(0));
839 journal.append(commit_op).await.unwrap();
840 journal.sync().await.unwrap();
841
842 mmr = Journal::align(mmr, &journal, &mut hasher, APPLY_BATCH_SIZE)
844 .await
845 .unwrap();
846
847 assert_eq!(mmr.leaves(), Location::new_unchecked(21));
849 assert_eq!(journal.size(), Location::new_unchecked(21));
850 });
851 }
852
853 #[test_traced("INFO")]
855 fn test_align_with_mismatched_committed_ops() {
856 let executor = deterministic::Runner::default();
857 executor.start(|context| async move {
858 let mut journal = create_empty_journal(context.clone(), "mismatched").await;
859
860 for i in 0..20 {
862 let loc = journal.append(create_operation(i as u8)).await.unwrap();
863 assert_eq!(loc, Location::new_unchecked(i as u64));
864 }
865
866 let size_before = journal.size();
869 assert_eq!(size_before, 20);
870
871 journal.close().await.unwrap();
873 let journal = create_empty_journal(context, "mismatched").await;
874
875 assert_eq!(journal.size(), 0);
877 });
878 }
879
880 #[test_traced("INFO")]
881 fn test_rewind() {
882 let executor = deterministic::Runner::default();
883 executor.start(|context| async move {
884 {
886 let mut journal = ContiguousJournal::init(
887 context.with_label("rewind_match"),
888 journal_config("rewind_match"),
889 )
890 .await
891 .unwrap();
892
893 for i in 0..3 {
895 journal.append(create_operation(i)).await.unwrap();
896 }
897 journal
898 .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
899 .await
900 .unwrap();
901 for i in 4..7 {
902 journal.append(create_operation(i)).await.unwrap();
903 }
904
905 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
907 assert_eq!(final_size, 4);
908 assert_eq!(journal.size(), 4);
909
910 let op = journal.read(3).await.unwrap();
912 assert!(op.is_commit());
913 }
914
915 {
917 let mut journal = ContiguousJournal::init(
918 context.with_label("rewind_multiple"),
919 journal_config("rewind_multiple"),
920 )
921 .await
922 .unwrap();
923
924 journal.append(create_operation(0)).await.unwrap();
926 journal
927 .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
928 .await
929 .unwrap(); journal.append(create_operation(2)).await.unwrap();
931 journal
932 .append(Operation::CommitFloor(None, Location::new_unchecked(1)))
933 .await
934 .unwrap(); journal.append(create_operation(4)).await.unwrap();
936
937 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
939 assert_eq!(final_size, 4);
940
941 let op = journal.read(3).await.unwrap();
943 assert!(op.is_commit());
944
945 assert!(journal.read(4).await.is_err());
947 }
948
949 {
951 let mut journal = ContiguousJournal::init(
952 context.with_label("rewind_no_match"),
953 journal_config("rewind_no_match"),
954 )
955 .await
956 .unwrap();
957
958 for i in 0..10 {
960 journal.append(create_operation(i)).await.unwrap();
961 }
962
963 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
965 assert_eq!(final_size, 0, "Should rewind to pruning boundary (0)");
966 assert_eq!(journal.size(), 0);
967 }
968
969 {
971 let mut journal = ContiguousJournal::init(
972 context.with_label("rewind_with_pruning"),
973 journal_config("rewind_with_pruning"),
974 )
975 .await
976 .unwrap();
977
978 for i in 0..10 {
980 journal.append(create_operation(i)).await.unwrap();
981 }
982 journal
983 .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
984 .await
985 .unwrap(); for i in 11..15 {
987 journal.append(create_operation(i)).await.unwrap();
988 }
989 journal.sync().await.unwrap();
990
991 journal.prune(8).await.unwrap();
993 let oldest = journal.oldest_retained_pos();
994 assert_eq!(oldest, Some(7));
995
996 for i in 15..20 {
998 journal.append(create_operation(i)).await.unwrap();
999 }
1000
1001 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1003 assert_eq!(final_size, 11);
1004
1005 let op = journal.read(10).await.unwrap();
1007 assert!(op.is_commit());
1008 }
1009
1010 {
1012 let mut journal = ContiguousJournal::init(
1013 context.with_label("rewind_no_match_pruned"),
1014 journal_config("rewind_no_match_pruned"),
1015 )
1016 .await
1017 .unwrap();
1018
1019 for i in 0..5 {
1021 journal.append(create_operation(i)).await.unwrap();
1022 }
1023 journal
1024 .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
1025 .await
1026 .unwrap(); for i in 6..10 {
1028 journal.append(create_operation(i)).await.unwrap();
1029 }
1030 journal.sync().await.unwrap();
1031
1032 journal.prune(8).await.unwrap();
1035 let oldest = journal.oldest_retained_pos();
1036 assert_eq!(oldest, Some(7));
1037
1038 for i in 10..14 {
1040 journal.append(create_operation(i)).await.unwrap();
1041 }
1042
1043 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1046 assert_eq!(final_size, 7);
1047 }
1048
1049 {
1051 let mut journal = ContiguousJournal::init(
1052 context.with_label("rewind_empty"),
1053 journal_config("rewind_empty"),
1054 )
1055 .await
1056 .unwrap();
1057
1058 let final_size = journal
1060 .rewind_to(|op: &Operation<Digest, Digest>| op.is_commit())
1061 .await
1062 .unwrap();
1063 assert_eq!(final_size, 0);
1064 assert_eq!(journal.size(), 0);
1065 }
1066
1067 {
1069 let mut journal = AuthenticatedJournal::new(
1070 context,
1071 mmr_config("rewind"),
1072 journal_config("rewind"),
1073 |op| op.is_commit(),
1074 )
1075 .await
1076 .unwrap();
1077
1078 for i in 0..5 {
1080 journal.append(create_operation(i)).await.unwrap();
1081 }
1082 journal
1083 .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
1084 .await
1085 .unwrap(); for i in 6..10 {
1087 journal.append(create_operation(i)).await.unwrap();
1088 }
1089 assert_eq!(journal.size(), 10);
1090
1091 journal.rewind(2).await.unwrap();
1092 assert_eq!(journal.size(), 2);
1093 assert_eq!(journal.mmr.leaves(), 2);
1094 assert_eq!(journal.mmr.size(), 3);
1095 assert_eq!(journal.pruning_boundary(), 0);
1096 assert_eq!(journal.oldest_retained_pos(), Some(0));
1097
1098 assert!(matches!(
1099 journal.rewind(3).await,
1100 Err(JournalError::InvalidRewind(_))
1101 ));
1102
1103 journal.rewind(0).await.unwrap();
1104 assert_eq!(journal.size(), 0);
1105 assert_eq!(journal.mmr.leaves(), 0);
1106 assert_eq!(journal.mmr.size(), 0);
1107 assert_eq!(journal.pruning_boundary(), 0);
1108 assert_eq!(journal.oldest_retained_pos(), None);
1109
1110 for i in 0..255 {
1112 journal.append(create_operation(i)).await.unwrap();
1113 }
1114 MutableContiguous::prune(&mut journal, 100).await.unwrap();
1115 assert_eq!(journal.pruning_boundary(), 98);
1116 let res = journal.rewind(97).await;
1117 assert!(matches!(res, Err(JournalError::InvalidRewind(97))));
1118 journal.rewind(98).await.unwrap();
1119 assert_eq!(journal.size(), 98);
1120 assert_eq!(journal.mmr.leaves(), 98);
1121 assert_eq!(journal.pruning_boundary(), 98);
1122 assert_eq!(journal.oldest_retained_pos(), None);
1123 }
1124 });
1125 }
1126
1127 #[test_traced("INFO")]
1130 fn test_apply_op_and_read_operations() {
1131 let executor = deterministic::Runner::default();
1132 executor.start(|context| async move {
1133 let mut journal = create_empty_journal(context, "apply_op").await;
1134
1135 assert_eq!(journal.size(), 0);
1136
1137 let expected_ops: Vec<_> = (0..50).map(|i| create_operation(i as u8)).collect();
1139 for (i, op) in expected_ops.iter().enumerate() {
1140 let loc = journal.append(op.clone()).await.unwrap();
1141 assert_eq!(loc, Location::new_unchecked(i as u64));
1142 assert_eq!(journal.size(), (i + 1) as u64);
1143 }
1144
1145 assert_eq!(journal.size(), 50);
1146
1147 journal.sync().await.unwrap();
1149 for (i, expected_op) in expected_ops.iter().enumerate() {
1150 let read_op = journal
1151 .read(Location::new_unchecked(i as u64))
1152 .await
1153 .unwrap();
1154 assert_eq!(read_op, *expected_op);
1155 }
1156 });
1157 }
1158
1159 #[test_traced("INFO")]
1161 fn test_read_operations_at_various_positions() {
1162 let executor = deterministic::Runner::default();
1163 executor.start(|context| async move {
1164 let journal = create_journal_with_ops(context, "read", 50).await;
1165
1166 let first_op = journal.read(Location::new_unchecked(0)).await.unwrap();
1168 assert_eq!(first_op, create_operation(0));
1169
1170 let middle_op = journal.read(Location::new_unchecked(25)).await.unwrap();
1172 assert_eq!(middle_op, create_operation(25));
1173
1174 let last_op = journal.read(Location::new_unchecked(49)).await.unwrap();
1176 assert_eq!(last_op, create_operation(49));
1177
1178 for i in 0..50 {
1180 let op = journal.read(Location::new_unchecked(i)).await.unwrap();
1181 assert_eq!(op, create_operation(i as u8));
1182 }
1183 });
1184 }
1185
1186 #[test_traced("INFO")]
1188 fn test_read_pruned_operation_returns_error() {
1189 let executor = deterministic::Runner::default();
1190 executor.start(|context| async move {
1191 let mut journal = create_journal_with_ops(context, "read_pruned", 100).await;
1192
1193 journal
1195 .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1196 .await
1197 .unwrap();
1198 journal.sync().await.unwrap();
1199 let pruned_boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1200
1201 let read_loc = Location::new_unchecked(0);
1203 if read_loc < pruned_boundary {
1204 let result = journal.read(read_loc).await;
1205 assert!(matches!(
1206 result,
1207 Err(Error::Journal(crate::journal::Error::ItemPruned(_)))
1208 ));
1209 }
1210 });
1211 }
1212
1213 #[test_traced("INFO")]
1215 fn test_read_out_of_range_returns_error() {
1216 let executor = deterministic::Runner::default();
1217 executor.start(|context| async move {
1218 let journal = create_journal_with_ops(context, "read_oob", 3).await;
1219
1220 let result = journal.read(Location::new_unchecked(10)).await;
1222 assert!(matches!(
1223 result,
1224 Err(Error::Journal(crate::journal::Error::ItemOutOfRange(_)))
1225 ));
1226 });
1227 }
1228
1229 #[test_traced("INFO")]
1231 fn test_read_all_operations_back_correctly() {
1232 let executor = deterministic::Runner::default();
1233 executor.start(|context| async move {
1234 let journal = create_journal_with_ops(context, "read_all", 50).await;
1235
1236 assert_eq!(journal.size(), 50);
1237
1238 for i in 0..50 {
1240 let op = journal.read(Location::new_unchecked(i)).await.unwrap();
1241 assert_eq!(op, create_operation(i as u8));
1242 }
1243 });
1244 }
1245
1246 #[test_traced("INFO")]
1248 fn test_close_with_pending_operations() {
1249 let executor = deterministic::Runner::default();
1250 executor.start(|context| async move {
1251 let mut journal = create_empty_journal(context.clone(), "close_pending").await;
1252
1253 let expected_ops: Vec<_> = (0..20).map(|i| create_operation(i as u8)).collect();
1255 for (i, op) in expected_ops.iter().enumerate() {
1256 let loc = journal.append(op.clone()).await.unwrap();
1257 assert_eq!(loc, Location::new_unchecked(i as u64),);
1258 }
1259
1260 let commit_loc = journal
1262 .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
1263 .await
1264 .unwrap();
1265 assert_eq!(
1266 commit_loc,
1267 Location::new_unchecked(20),
1268 "commit should be at location 20"
1269 );
1270 journal.close().await.unwrap();
1271
1272 let journal = create_empty_journal(context, "close_pending").await;
1274 assert_eq!(journal.size(), 21);
1275
1276 for (i, expected_op) in expected_ops.iter().enumerate() {
1278 let read_op = journal
1279 .read(Location::new_unchecked(i as u64))
1280 .await
1281 .unwrap();
1282 assert_eq!(read_op, *expected_op);
1283 }
1284 });
1285 }
1286
1287 #[test_traced("INFO")]
1289 fn test_prune_empty_journal() {
1290 let executor = deterministic::Runner::default();
1291 executor.start(|context| async move {
1292 let mut journal = create_empty_journal(context, "prune_empty").await;
1293
1294 let boundary = journal.prune(Location::new_unchecked(0)).await.unwrap();
1295
1296 assert_eq!(boundary, Location::new_unchecked(0));
1297 });
1298 }
1299
1300 #[test_traced("INFO")]
1302 fn test_prune_to_location() {
1303 let executor = deterministic::Runner::default();
1304 executor.start(|context| async move {
1305 let mut journal = create_journal_with_ops(context, "prune_to", 100).await;
1306
1307 journal
1309 .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1310 .await
1311 .unwrap();
1312 journal.sync().await.unwrap();
1313
1314 let boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1315
1316 assert!(boundary <= Location::new_unchecked(50));
1318 });
1319 }
1320
1321 #[test_traced("INFO")]
1323 fn test_prune_returns_actual_boundary() {
1324 let executor = deterministic::Runner::default();
1325 executor.start(|context| async move {
1326 let mut journal = create_journal_with_ops(context, "prune_boundary", 100).await;
1327
1328 journal
1329 .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1330 .await
1331 .unwrap();
1332 journal.sync().await.unwrap();
1333
1334 let requested = Location::new_unchecked(50);
1335 let actual = journal.prune(requested).await.unwrap();
1336
1337 let oldest = journal.oldest_retained_loc().unwrap();
1339 assert_eq!(actual, oldest);
1340
1341 assert!(actual <= requested);
1343 });
1344 }
1345
1346 #[test_traced("INFO")]
1348 fn test_prune_preserves_operation_count() {
1349 let executor = deterministic::Runner::default();
1350 executor.start(|context| async move {
1351 let mut journal = create_journal_with_ops(context, "prune_count", 100).await;
1352
1353 journal
1354 .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1355 .await
1356 .unwrap();
1357 journal.sync().await.unwrap();
1358
1359 let count_before = journal.size();
1360 journal.prune(Location::new_unchecked(50)).await.unwrap();
1361 let count_after = journal.size();
1362
1363 assert_eq!(count_before, count_after);
1364 });
1365 }
1366
1367 #[test_traced("INFO")]
1369 fn test_oldest_retained_loc() {
1370 let executor = deterministic::Runner::default();
1371 executor.start(|context| async move {
1372 let journal = create_empty_journal(context.clone(), "oldest").await;
1374 let oldest = journal.oldest_retained_loc();
1375 assert_eq!(oldest, None);
1376
1377 let journal = create_journal_with_ops(context.clone(), "oldest", 100).await;
1379 let oldest = journal.oldest_retained_loc();
1380 assert_eq!(oldest, Some(Location::new_unchecked(0)));
1381
1382 let mut journal = create_journal_with_ops(context, "oldest", 100).await;
1384 journal
1385 .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1386 .await
1387 .unwrap();
1388 journal.sync().await.unwrap();
1389
1390 let pruned_boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1391
1392 let oldest_loc = journal.oldest_retained_loc().unwrap();
1393 assert_eq!(oldest_loc, pruned_boundary);
1395 assert!(oldest_loc <= Location::new_unchecked(50));
1397 });
1398 }
1399
1400 #[test_traced("INFO")]
1402 fn test_pruning_boundary() {
1403 let executor = deterministic::Runner::default();
1404 executor.start(|context| async move {
1405 let journal = create_empty_journal(context.clone(), "boundary").await;
1407 let boundary = journal.pruning_boundary();
1408 assert_eq!(boundary, Location::new_unchecked(0));
1409
1410 let journal = create_journal_with_ops(context.clone(), "boundary", 100).await;
1412 let boundary = journal.pruning_boundary();
1413 assert_eq!(boundary, Location::new_unchecked(0));
1414
1415 let mut journal = create_journal_with_ops(context, "boundary", 100).await;
1417 journal
1418 .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1419 .await
1420 .unwrap();
1421 journal.sync().await.unwrap();
1422
1423 let pruned_boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1424
1425 let boundary = journal.pruning_boundary();
1426 assert_eq!(boundary, pruned_boundary);
1427 });
1428 }
1429
1430 #[test_traced("INFO")]
1432 fn test_mmr_prunes_to_journal_boundary() {
1433 let executor = deterministic::Runner::default();
1434 executor.start(|context| async move {
1435 let mut journal = create_journal_with_ops(context, "mmr_boundary", 50).await;
1436
1437 journal
1438 .append(Operation::CommitFloor(None, Location::new_unchecked(25)))
1439 .await
1440 .unwrap();
1441 journal.sync().await.unwrap();
1442
1443 let pruned_boundary = journal.prune(Location::new_unchecked(25)).await.unwrap();
1444
1445 let oldest_retained = journal.oldest_retained_loc();
1447 assert_eq!(Some(pruned_boundary), oldest_retained);
1448
1449 assert!(pruned_boundary <= Location::new_unchecked(25));
1451
1452 assert_eq!(journal.size(), 51);
1454 });
1455 }
1456
1457 #[test_traced("INFO")]
1459 fn test_proof_multiple_operations() {
1460 let executor = deterministic::Runner::default();
1461 executor.start(|context| async move {
1462 let journal = create_journal_with_ops(context, "proof_multi", 50).await;
1463
1464 let (proof, ops) = journal
1465 .proof(Location::new_unchecked(0), NZU64!(50))
1466 .await
1467 .unwrap();
1468
1469 assert_eq!(ops.len(), 50);
1470 for (i, op) in ops.iter().enumerate() {
1471 assert_eq!(*op, create_operation(i as u8));
1472 }
1473
1474 let mut hasher = StandardHasher::new();
1476 let root = journal.root();
1477 assert!(verify_proof(
1478 &proof,
1479 &ops,
1480 Location::new_unchecked(0),
1481 &root,
1482 &mut hasher
1483 ));
1484 });
1485 }
1486
1487 #[test_traced("INFO")]
1489 fn test_historical_proof_limited_by_max_ops() {
1490 let executor = deterministic::Runner::default();
1491 executor.start(|context| async move {
1492 let journal = create_journal_with_ops(context, "proof_limit", 50).await;
1493
1494 let size = journal.size();
1495 let (proof, ops) = journal
1496 .historical_proof(size, Location::new_unchecked(0), NZU64!(20))
1497 .await
1498 .unwrap();
1499
1500 assert_eq!(ops.len(), 20);
1502 for (i, op) in ops.iter().enumerate() {
1503 assert_eq!(*op, create_operation(i as u8));
1504 }
1505
1506 let mut hasher = StandardHasher::new();
1508 let root = journal.root();
1509 assert!(verify_proof(
1510 &proof,
1511 &ops,
1512 Location::new_unchecked(0),
1513 &root,
1514 &mut hasher
1515 ));
1516 });
1517 }
1518
1519 #[test_traced("INFO")]
1521 fn test_historical_proof_at_end_of_journal() {
1522 let executor = deterministic::Runner::default();
1523 executor.start(|context| async move {
1524 let journal = create_journal_with_ops(context, "proof_end", 50).await;
1525
1526 let size = journal.size();
1527 let (proof, ops) = journal
1529 .historical_proof(size, Location::new_unchecked(40), NZU64!(20))
1530 .await
1531 .unwrap();
1532
1533 assert_eq!(ops.len(), 10);
1535 for (i, op) in ops.iter().enumerate() {
1536 assert_eq!(*op, create_operation((40 + i) as u8));
1537 }
1538
1539 let mut hasher = StandardHasher::new();
1541 let root = journal.root();
1542 assert!(verify_proof(
1543 &proof,
1544 &ops,
1545 Location::new_unchecked(40),
1546 &root,
1547 &mut hasher
1548 ));
1549 });
1550 }
1551
1552 #[test_traced("INFO")]
1554 fn test_historical_proof_out_of_range_returns_error() {
1555 let executor = deterministic::Runner::default();
1556 executor.start(|context| async move {
1557 let journal = create_journal_with_ops(context, "proof_oob", 5).await;
1558
1559 let result = journal
1561 .historical_proof(
1562 Location::new_unchecked(10),
1563 Location::new_unchecked(0),
1564 NZU64!(1),
1565 )
1566 .await;
1567
1568 assert!(matches!(
1569 result,
1570 Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
1571 ));
1572 });
1573 }
1574
1575 #[test_traced("INFO")]
1577 fn test_historical_proof_start_too_large_returns_error() {
1578 let executor = deterministic::Runner::default();
1579 executor.start(|context| async move {
1580 let journal = create_journal_with_ops(context, "proof_start_oob", 5).await;
1581
1582 let size = journal.size();
1583 let result = journal.historical_proof(size, size, NZU64!(1)).await;
1585
1586 assert!(matches!(
1587 result,
1588 Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
1589 ));
1590 });
1591 }
1592
1593 #[test_traced("INFO")]
1595 fn test_historical_proof_truly_historical() {
1596 let executor = deterministic::Runner::default();
1597 executor.start(|context| async move {
1598 let mut journal = create_journal_with_ops(context, "proof_historical", 50).await;
1600
1601 let mut hasher = StandardHasher::new();
1603 let historical_root = journal.root();
1604 let historical_size = journal.size();
1605
1606 for i in 50..100 {
1608 journal.append(create_operation(i as u8)).await.unwrap();
1609 }
1610 journal.sync().await.unwrap();
1611
1612 let (proof, ops) = journal
1614 .historical_proof(historical_size, Location::new_unchecked(0), NZU64!(50))
1615 .await
1616 .unwrap();
1617
1618 assert_eq!(ops.len(), 50);
1620 for (i, op) in ops.iter().enumerate() {
1621 assert_eq!(*op, create_operation(i as u8));
1622 }
1623
1624 assert!(verify_proof(
1626 &proof,
1627 &ops,
1628 Location::new_unchecked(0),
1629 &historical_root,
1630 &mut hasher
1631 ));
1632 });
1633 }
1634
1635 #[test_traced("INFO")]
1637 fn test_historical_proof_pruned_location_returns_error() {
1638 let executor = deterministic::Runner::default();
1639 executor.start(|context| async move {
1640 let mut journal = create_journal_with_ops(context, "proof_pruned", 50).await;
1641
1642 journal
1643 .append(Operation::CommitFloor(None, Location::new_unchecked(25)))
1644 .await
1645 .unwrap();
1646 journal.sync().await.unwrap();
1647 let pruned_boundary = journal.prune(Location::new_unchecked(25)).await.unwrap();
1648
1649 let size = journal.size();
1651 let start_loc = Location::new_unchecked(0);
1652 if start_loc < pruned_boundary {
1653 let result = journal.historical_proof(size, start_loc, NZU64!(1)).await;
1654
1655 assert!(result.is_err());
1657 }
1658 });
1659 }
1660
1661 #[test_traced("INFO")]
1663 fn test_replay_operations() {
1664 let executor = deterministic::Runner::default();
1665 executor.start(|context| async move {
1666 let journal = create_empty_journal(context.clone(), "replay").await;
1668 let stream = journal.replay(0, NZUsize!(10)).await.unwrap();
1669 futures::pin_mut!(stream);
1670 assert!(stream.next().await.is_none());
1671
1672 let journal = create_journal_with_ops(context, "replay", 50).await;
1674 let stream = journal.replay(0, NZUsize!(100)).await.unwrap();
1675 futures::pin_mut!(stream);
1676
1677 for i in 0..50 {
1678 let (pos, op) = stream.next().await.unwrap().unwrap();
1679 assert_eq!(pos, i);
1680 assert_eq!(op, create_operation(i as u8));
1681 }
1682
1683 assert!(stream.next().await.is_none());
1684 });
1685 }
1686
1687 #[test_traced("INFO")]
1689 fn test_replay_from_middle() {
1690 let executor = deterministic::Runner::default();
1691 executor.start(|context| async move {
1692 let journal = create_journal_with_ops(context, "replay_middle", 50).await;
1693 let stream = journal.replay(25, NZUsize!(100)).await.unwrap();
1694 futures::pin_mut!(stream);
1695
1696 let mut count = 0;
1697 while let Some(result) = stream.next().await {
1698 let (pos, op) = result.unwrap();
1699 assert_eq!(pos, 25 + count);
1700 assert_eq!(op, create_operation((25 + count) as u8));
1701 count += 1;
1702 }
1703
1704 assert_eq!(count, 25);
1706 });
1707 }
1708}