1use crate::{
9 journal::{
10 contiguous::{fixed, variable, Contiguous, MutableContiguous},
11 Error as JournalError,
12 },
13 mmr::{
14 journaled::{CleanMmr, Mmr},
15 mem::{Clean, Dirty, State},
16 Location, Position, Proof, StandardHasher,
17 },
18 Persistable,
19};
20use commonware_codec::{CodecFixedShared, CodecShared, Encode, EncodeShared};
21use commonware_cryptography::{DigestOf, Hasher};
22use commonware_runtime::{Clock, Metrics, Storage};
23use core::num::{NonZeroU64, NonZeroUsize};
24use futures::{future::try_join_all, try_join, TryFutureExt as _};
25use thiserror::Error;
26use tracing::{debug, warn};
27
28#[derive(Error, Debug)]
30pub enum Error {
31 #[error("mmr error: {0}")]
32 Mmr(#[from] crate::mmr::Error),
33
34 #[error("journal error: {0}")]
35 Journal(#[from] super::Error),
36}
37pub struct Journal<E, C, H, S: State<H::Digest> + Send + Sync = Dirty>
42where
43 E: Storage + Clock + Metrics,
44 C: Contiguous<Item: EncodeShared>,
45 H: Hasher,
46{
47 pub(crate) mmr: Mmr<E, H::Digest, S>,
50
51 pub(crate) journal: C,
54
55 pub(crate) hasher: StandardHasher<H>,
56}
57
58impl<E, C, H, S> Journal<E, C, H, S>
59where
60 E: Storage + Clock + Metrics,
61 C: Contiguous<Item: EncodeShared>,
62 H: Hasher,
63 S: State<DigestOf<H>> + Send + Sync,
64{
65 pub fn size(&self) -> Location {
67 Location::new_unchecked(self.journal.size())
68 }
69
70 pub fn oldest_retained_loc(&self) -> Option<Location> {
72 self.journal
73 .oldest_retained_pos()
74 .map(Location::new_unchecked)
75 }
76
77 pub fn pruning_boundary(&self) -> Location {
79 self.journal.pruning_boundary().into()
80 }
81
82 pub async fn read(&self, loc: Location) -> Result<C::Item, Error> {
84 self.journal.read(*loc).await.map_err(Error::Journal)
85 }
86}
87
88impl<E, C, H, S> Journal<E, C, H, S>
89where
90 E: Storage + Clock + Metrics,
91 C: MutableContiguous<Item: EncodeShared>,
92 H: Hasher,
93 S: State<DigestOf<H>> + Send + Sync,
94{
95 pub async fn append(&mut self, item: C::Item) -> Result<Location, Error> {
96 let encoded_item = item.encode();
97
98 let (_, loc) = try_join!(
100 self.mmr
101 .add(&mut self.hasher, &encoded_item)
102 .map_err(Error::Mmr),
103 self.journal.append(item).map_err(Into::into)
104 )?;
105
106 Ok(Location::new_unchecked(loc))
107 }
108}
109
110impl<E, C, H, S> Journal<E, C, H, S>
111where
112 E: Storage + Clock + Metrics,
113 C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
114 H: Hasher,
115 S: State<DigestOf<H>> + Send + Sync,
116{
117 pub async fn commit(&mut self) -> Result<(), Error> {
120 self.journal.commit().await.map_err(Error::Journal)
121 }
122}
123
124impl<E, C, H> Journal<E, C, H, Clean<H::Digest>>
125where
126 E: Storage + Clock + Metrics,
127 C: MutableContiguous<Item: EncodeShared>,
128 H: Hasher,
129{
130 pub async fn from_components(
132 mmr: CleanMmr<E, H::Digest>,
133 journal: C,
134 mut hasher: StandardHasher<H>,
135 apply_batch_size: u64,
136 ) -> Result<Self, Error> {
137 let mut mmr = Self::align(mmr, &journal, &mut hasher, apply_batch_size).await?;
138
139 mmr.sync().await?;
142
143 Ok(Self {
144 mmr,
145 journal,
146 hasher,
147 })
148 }
149
150 async fn align(
154 mut mmr: CleanMmr<E, H::Digest>,
155 journal: &C,
156 hasher: &mut StandardHasher<H>,
157 apply_batch_size: u64,
158 ) -> Result<CleanMmr<E, H::Digest>, Error> {
159 let journal_size = journal.size();
162 let mut mmr_size = mmr.leaves();
163 if mmr_size > journal_size {
164 let pop_count = mmr_size - journal_size;
165 warn!(journal_size, ?pop_count, "popping MMR items");
166 mmr.pop(hasher, *pop_count as usize).await?;
167 mmr_size = Location::new_unchecked(journal_size);
168 }
169
170 if mmr_size < journal_size {
172 let replay_count = journal_size - *mmr_size;
173 warn!(
174 journal_size,
175 replay_count, "MMR lags behind journal, replaying journal to catch up"
176 );
177
178 let mut mmr = mmr.into_dirty();
179 let mut batch_size = 0;
180 while mmr_size < journal_size {
181 let op = journal.read(*mmr_size).await?;
182 mmr.add(hasher, &op.encode()).await?;
183 mmr_size += 1;
184 batch_size += 1;
185 if batch_size >= apply_batch_size {
186 mmr = mmr.merkleize(hasher).into_dirty();
187 batch_size = 0;
188 }
189 }
190 return Ok(mmr.merkleize(hasher));
191 }
192
193 assert_eq!(journal.size(), mmr.leaves());
195
196 Ok(mmr)
197 }
198
199 pub async fn prune(&mut self, prune_loc: Location) -> Result<Location, Error> {
204 if self.mmr.size() == 0 {
205 return Ok(self.pruning_boundary());
207 }
208
209 self.mmr.sync().await?;
213
214 if !self.journal.prune(*prune_loc).await? {
216 return Ok(self.pruning_boundary());
217 }
218
219 let pruning_boundary = self.pruning_boundary();
220 let size = self.size();
221 debug!(?size, ?prune_loc, ?pruning_boundary, "pruned inactive ops");
222
223 self.mmr
225 .prune_to_pos(Position::try_from(pruning_boundary)?)
226 .await?;
227
228 Ok(pruning_boundary)
229 }
230}
231
232impl<E, C, H> Journal<E, C, H, Clean<H::Digest>>
233where
234 E: Storage + Clock + Metrics,
235 C: Contiguous<Item: EncodeShared>,
236 H: Hasher,
237{
238 pub async fn proof(
252 &self,
253 start_loc: Location,
254 max_ops: NonZeroU64,
255 ) -> Result<(Proof<H::Digest>, Vec<C::Item>), Error> {
256 self.historical_proof(self.size(), start_loc, max_ops).await
257 }
258
259 pub async fn historical_proof(
274 &self,
275 historical_size: Location,
276 start_loc: Location,
277 max_ops: NonZeroU64,
278 ) -> Result<(Proof<H::Digest>, Vec<C::Item>), Error> {
279 let size = self.size();
280 if historical_size > size {
281 return Err(crate::mmr::Error::RangeOutOfBounds(size).into());
282 }
283 if start_loc >= historical_size {
284 return Err(crate::mmr::Error::RangeOutOfBounds(start_loc).into());
285 }
286 let end_loc = std::cmp::min(historical_size, start_loc.saturating_add(max_ops.get()));
287
288 let mmr_size = Position::try_from(historical_size)?;
289 let proof = self
290 .mmr
291 .historical_range_proof(mmr_size, start_loc..end_loc)
292 .await?;
293
294 let mut ops = Vec::with_capacity((*end_loc - *start_loc) as usize);
295 let futures = (*start_loc..*end_loc)
296 .map(|i| self.journal.read(i))
297 .collect::<Vec<_>>();
298 try_join_all(futures)
299 .await?
300 .into_iter()
301 .for_each(|op| ops.push(op));
302
303 Ok((proof, ops))
304 }
305
306 pub const fn root(&self) -> H::Digest {
308 self.mmr.root()
309 }
310
311 pub fn into_dirty(self) -> Journal<E, C, H, Dirty> {
313 Journal {
314 mmr: self.mmr.into_dirty(),
315 journal: self.journal,
316 hasher: self.hasher,
317 }
318 }
319}
320
321impl<E, C, H> Journal<E, C, H, Clean<H::Digest>>
322where
323 E: Storage + Clock + Metrics,
324 C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
325 H: Hasher,
326{
327 pub async fn destroy(self) -> Result<(), Error> {
329 try_join!(
330 self.journal.destroy().map_err(Error::Journal),
331 self.mmr.destroy().map_err(Error::Mmr),
332 )?;
333 Ok(())
334 }
335
336 pub async fn sync(&mut self) -> Result<(), Error> {
338 try_join!(
339 self.journal.sync().map_err(Error::Journal),
340 self.mmr.sync().map_err(Into::into)
341 )?;
342
343 Ok(())
344 }
345}
346
347impl<E, C, H> Journal<E, C, H, Dirty>
348where
349 E: Storage + Clock + Metrics,
350 C: Contiguous<Item: EncodeShared>,
351 H: Hasher,
352{
353 pub fn merkleize(self) -> Journal<E, C, H, Clean<H::Digest>> {
355 let Self {
356 mmr,
357 journal,
358 mut hasher,
359 } = self;
360 Journal {
361 mmr: mmr.merkleize(&mut hasher),
362 journal,
363 hasher,
364 }
365 }
366}
367
368impl<E, C, H> Journal<E, C, H, Dirty>
369where
370 E: Storage + Clock + Metrics,
371 C: MutableContiguous<Item: EncodeShared>,
372 H: Hasher,
373{
374 pub async fn from_components(
376 mmr: CleanMmr<E, H::Digest>,
377 journal: C,
378 hasher: StandardHasher<H>,
379 apply_batch_size: u64,
380 ) -> Result<Self, Error> {
381 let clean = Journal::<E, C, H, Clean<H::Digest>>::from_components(
382 mmr,
383 journal,
384 hasher,
385 apply_batch_size,
386 )
387 .await?;
388 Ok(clean.into_dirty())
389 }
390}
391
392const APPLY_BATCH_SIZE: u64 = 1 << 16;
394
395impl<E, O, H> Journal<E, fixed::Journal<E, O>, H, Clean<H::Digest>>
396where
397 E: Storage + Clock + Metrics,
398 O: CodecFixedShared,
399 H: Hasher,
400{
401 pub async fn new(
406 context: E,
407 mmr_cfg: crate::mmr::journaled::Config,
408 journal_cfg: fixed::Config,
409 rewind_predicate: fn(&O) -> bool,
410 ) -> Result<Self, Error> {
411 let mut journal = fixed::Journal::init(context.with_label("journal"), journal_cfg).await?;
412
413 journal.rewind_to(rewind_predicate).await?;
415
416 let mut hasher = StandardHasher::<H>::new();
418 let mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_cfg).await?;
419 let mut mmr = Self::align(mmr, &journal, &mut hasher, APPLY_BATCH_SIZE).await?;
420
421 journal.sync().await?;
424 mmr.sync().await?;
425
426 Ok(Self {
427 mmr,
428 journal,
429 hasher,
430 })
431 }
432}
433
434impl<E, O, H> Journal<E, variable::Journal<E, O>, H, Clean<H::Digest>>
435where
436 E: Storage + Clock + Metrics,
437 O: CodecShared,
438 H: Hasher,
439{
440 pub async fn new(
445 context: E,
446 mmr_cfg: crate::mmr::journaled::Config,
447 journal_cfg: variable::Config<O::Cfg>,
448 rewind_predicate: fn(&O) -> bool,
449 ) -> Result<Self, Error> {
450 let mut hasher = StandardHasher::<H>::new();
451 let mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_cfg).await?;
452 let mut journal =
453 variable::Journal::init(context.with_label("journal"), journal_cfg).await?;
454
455 journal.rewind_to(rewind_predicate).await?;
457
458 let mut mmr = Self::align(mmr, &journal, &mut hasher, APPLY_BATCH_SIZE).await?;
460
461 journal.sync().await?;
464 mmr.sync().await?;
465
466 Ok(Self {
467 mmr,
468 journal,
469 hasher,
470 })
471 }
472}
473
474impl<E, C, H, S> Contiguous for Journal<E, C, H, S>
475where
476 E: Storage + Clock + Metrics,
477 C: MutableContiguous<Item: EncodeShared>,
478 H: Hasher,
479 S: State<DigestOf<H>> + Send + Sync,
480{
481 type Item = C::Item;
482
483 fn size(&self) -> u64 {
484 self.journal.size()
485 }
486
487 fn oldest_retained_pos(&self) -> Option<u64> {
488 self.journal.oldest_retained_pos()
489 }
490
491 fn pruning_boundary(&self) -> u64 {
492 self.journal.pruning_boundary()
493 }
494
495 async fn replay(
496 &self,
497 start_pos: u64,
498 buffer: NonZeroUsize,
499 ) -> Result<
500 impl futures::Stream<Item = Result<(u64, Self::Item), JournalError>> + '_,
501 JournalError,
502 > {
503 self.journal.replay(start_pos, buffer).await
504 }
505
506 async fn read(&self, position: u64) -> Result<Self::Item, JournalError> {
507 self.journal.read(position).await
508 }
509}
510
511impl<E, C, H> MutableContiguous for Journal<E, C, H, Dirty>
512where
513 E: Storage + Clock + Metrics,
514 C: MutableContiguous<Item: EncodeShared>,
515 H: Hasher,
516{
517 async fn append(&mut self, item: Self::Item) -> Result<u64, JournalError> {
518 let res = self.append(item).await.map_err(|e| match e {
519 Error::Journal(inner) => inner,
520 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
521 })?;
522
523 Ok(*res)
524 }
525
526 async fn prune(&mut self, min_position: u64) -> Result<bool, JournalError> {
527 self.journal.prune(min_position).await
528 }
529
530 async fn rewind(&mut self, size: u64) -> Result<(), JournalError> {
531 self.journal.rewind(size).await?;
532
533 let leaves = *self.mmr.leaves();
534 if leaves > size {
535 self.mmr
536 .pop((leaves - size) as usize)
537 .await
538 .map_err(|error| JournalError::Mmr(anyhow::Error::from(error)))?;
539 }
540
541 Ok(())
542 }
543}
544
545impl<E, C, H> MutableContiguous for Journal<E, C, H, Clean<H::Digest>>
546where
547 E: Storage + Clock + Metrics,
548 C: MutableContiguous<Item: EncodeShared>,
549 H: Hasher,
550{
551 async fn append(&mut self, item: Self::Item) -> Result<u64, JournalError> {
552 let loc = self.append(item).await.map_err(|e| match e {
553 Error::Journal(inner) => inner,
554 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
555 })?;
556
557 Ok(*loc)
558 }
559
560 async fn prune(&mut self, min_position: u64) -> Result<bool, JournalError> {
561 let old_pruning_boundary = self.pruning_boundary();
562 let pruning_boundary = self
563 .prune(Location::new_unchecked(min_position))
564 .await
565 .map_err(|e| match e {
566 Error::Journal(inner) => inner,
567 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
568 })?;
569
570 Ok(old_pruning_boundary != pruning_boundary)
571 }
572
573 async fn rewind(&mut self, size: u64) -> Result<(), JournalError> {
574 self.journal.rewind(size).await?;
575
576 let leaves = *self.mmr.leaves();
577 if leaves > size {
578 self.mmr
579 .pop(&mut self.hasher, (leaves - size) as usize)
580 .await
581 .map_err(|error| JournalError::Mmr(anyhow::Error::from(error)))?;
582 }
583
584 Ok(())
585 }
586}
587
588impl<E, C, H> Persistable for Journal<E, C, H, Clean<H::Digest>>
589where
590 E: Storage + Clock + Metrics,
591 C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
592 H: Hasher,
593{
594 type Error = JournalError;
595
596 async fn commit(&mut self) -> Result<(), JournalError> {
597 self.commit().await.map_err(|e| match e {
598 Error::Journal(inner) => inner,
599 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
600 })
601 }
602
603 async fn sync(&mut self) -> Result<(), JournalError> {
604 self.sync().await.map_err(|e| match e {
605 Error::Journal(inner) => inner,
606 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
607 })
608 }
609
610 async fn destroy(self) -> Result<(), JournalError> {
611 self.destroy().await.map_err(|e| match e {
612 Error::Journal(inner) => inner,
613 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
614 })
615 }
616}
617
618#[cfg(test)]
619mod tests {
620 use super::*;
621 use crate::{
622 journal::contiguous::fixed::{Config as JConfig, Journal as ContiguousJournal},
623 mmr::{
624 journaled::{Config as MmrConfig, Mmr},
625 Location,
626 },
627 qmdb::{
628 any::unordered::{fixed::Operation, Update},
629 operation::Committable,
630 },
631 };
632 use commonware_codec::Encode;
633 use commonware_cryptography::{sha256, sha256::Digest, Sha256};
634 use commonware_macros::test_traced;
635 use commonware_runtime::{
636 buffer::PoolRef,
637 deterministic::{self, Context},
638 Runner as _,
639 };
640 use commonware_utils::{NZUsize, NZU16, NZU64};
641 use futures::StreamExt as _;
642 use std::num::NonZeroU16;
643
644 const PAGE_SIZE: NonZeroU16 = NZU16!(101);
645 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
646
647 fn mmr_config(suffix: &str) -> MmrConfig {
649 MmrConfig {
650 journal_partition: format!("mmr_journal_{suffix}"),
651 metadata_partition: format!("mmr_metadata_{suffix}"),
652 items_per_blob: NZU64!(11),
653 write_buffer: NZUsize!(1024),
654 thread_pool: None,
655 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
656 }
657 }
658
659 fn journal_config(suffix: &str) -> JConfig {
661 JConfig {
662 partition: format!("journal_{suffix}"),
663 items_per_blob: NZU64!(7),
664 write_buffer: NZUsize!(1024),
665 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
666 }
667 }
668
669 type AuthenticatedJournal = Journal<
670 deterministic::Context,
671 ContiguousJournal<deterministic::Context, Operation<Digest, Digest>>,
672 Sha256,
673 Clean<sha256::Digest>,
674 >;
675
676 async fn create_empty_journal(context: Context, suffix: &str) -> AuthenticatedJournal {
678 AuthenticatedJournal::new(
679 context,
680 mmr_config(suffix),
681 journal_config(suffix),
682 |op: &Operation<Digest, Digest>| op.is_commit(),
683 )
684 .await
685 .unwrap()
686 }
687
688 fn create_operation(index: u8) -> Operation<Digest, Digest> {
690 Operation::Update(Update(
691 Sha256::fill(index),
692 Sha256::fill(index.wrapping_add(1)),
693 ))
694 }
695
696 async fn create_journal_with_ops(
700 context: Context,
701 suffix: &str,
702 count: usize,
703 ) -> AuthenticatedJournal {
704 let mut journal = create_empty_journal(context, suffix).await;
705
706 for i in 0..count {
707 let op = create_operation(i as u8);
708 let loc = journal.append(op).await.unwrap();
709 assert_eq!(loc, Location::new_unchecked(i as u64));
710 }
711
712 journal.sync().await.unwrap();
713 journal
714 }
715
716 async fn create_components(
721 context: Context,
722 suffix: &str,
723 ) -> (
724 CleanMmr<deterministic::Context, sha256::Digest>,
725 ContiguousJournal<deterministic::Context, Operation<Digest, Digest>>,
726 StandardHasher<Sha256>,
727 ) {
728 let mut hasher = StandardHasher::new();
729 let mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_config(suffix))
730 .await
731 .unwrap();
732 let journal =
733 ContiguousJournal::init(context.with_label("journal"), journal_config(suffix))
734 .await
735 .unwrap();
736 (mmr, journal, hasher)
737 }
738
739 fn verify_proof(
741 proof: &crate::mmr::Proof<<Sha256 as commonware_cryptography::Hasher>::Digest>,
742 operations: &[Operation<Digest, Digest>],
743 start_loc: Location,
744 root: &<Sha256 as commonware_cryptography::Hasher>::Digest,
745 hasher: &mut StandardHasher<Sha256>,
746 ) -> bool {
747 let encoded_ops: Vec<_> = operations.iter().map(|op| op.encode()).collect();
748 proof.verify_range_inclusion(hasher, &encoded_ops, start_loc, root)
749 }
750
751 #[test_traced("INFO")]
753 fn test_new_creates_empty_journal() {
754 let executor = deterministic::Runner::default();
755 executor.start(|context| async move {
756 let journal = create_empty_journal(context, "new_empty").await;
757
758 assert_eq!(journal.size(), 0);
759 assert_eq!(journal.pruning_boundary(), 0);
760 assert_eq!(journal.oldest_retained_pos(), None);
761 });
762 }
763
764 #[test_traced("INFO")]
766 fn test_align_with_empty_mmr_and_journal() {
767 let executor = deterministic::Runner::default();
768 executor.start(|context| async move {
769 let (mmr, journal, mut hasher) = create_components(context, "align_empty").await;
770
771 let mmr = Journal::align(mmr, &journal, &mut hasher, APPLY_BATCH_SIZE)
772 .await
773 .unwrap();
774
775 assert_eq!(mmr.leaves(), Location::new_unchecked(0));
776 assert_eq!(journal.size(), Location::new_unchecked(0));
777 });
778 }
779
780 #[test_traced("WARN")]
782 fn test_align_when_mmr_ahead() {
783 let executor = deterministic::Runner::default();
784 executor.start(|context| async move {
785 let (mut mmr, mut journal, mut hasher) = create_components(context, "mmr_ahead").await;
786
787 for i in 0..20 {
789 let op = create_operation(i as u8);
790 let encoded = op.encode();
791 mmr.add(&mut hasher, &encoded).await.unwrap();
792 journal.append(op).await.unwrap();
793 }
794
795 let commit_op = Operation::CommitFloor(None, Location::new_unchecked(0));
797 journal.append(commit_op).await.unwrap();
798 journal.sync().await.unwrap();
799
800 let mmr = Journal::align(mmr, &journal, &mut hasher, APPLY_BATCH_SIZE)
802 .await
803 .unwrap();
804
805 assert_eq!(mmr.leaves(), Location::new_unchecked(21));
807 assert_eq!(journal.size(), Location::new_unchecked(21));
808 });
809 }
810
811 #[test_traced("WARN")]
813 fn test_align_when_journal_ahead() {
814 let executor = deterministic::Runner::default();
815 executor.start(|context| async move {
816 let (mut mmr, mut journal, mut hasher) =
817 create_components(context, "journal_ahead").await;
818
819 for i in 0..20 {
821 let op = create_operation(i as u8);
822 journal.append(op).await.unwrap();
823 }
824
825 let commit_op = Operation::CommitFloor(None, Location::new_unchecked(0));
827 journal.append(commit_op).await.unwrap();
828 journal.sync().await.unwrap();
829
830 mmr = Journal::align(mmr, &journal, &mut hasher, APPLY_BATCH_SIZE)
832 .await
833 .unwrap();
834
835 assert_eq!(mmr.leaves(), Location::new_unchecked(21));
837 assert_eq!(journal.size(), Location::new_unchecked(21));
838 });
839 }
840
841 #[test_traced("INFO")]
843 fn test_align_with_mismatched_committed_ops() {
844 let executor = deterministic::Runner::default();
845 executor.start(|context| async move {
846 let mut journal = create_empty_journal(context.clone(), "mismatched").await;
847
848 for i in 0..20 {
850 let loc = journal.append(create_operation(i as u8)).await.unwrap();
851 assert_eq!(loc, Location::new_unchecked(i as u64));
852 }
853
854 let size_before = journal.size();
857 assert_eq!(size_before, 20);
858
859 journal.sync().await.unwrap();
861 drop(journal);
862 let journal = create_empty_journal(context, "mismatched").await;
863
864 assert_eq!(journal.size(), 0);
866 });
867 }
868
869 #[test_traced("INFO")]
870 fn test_rewind() {
871 let executor = deterministic::Runner::default();
872 executor.start(|context| async move {
873 {
875 let mut journal = ContiguousJournal::init(
876 context.with_label("rewind_match"),
877 journal_config("rewind_match"),
878 )
879 .await
880 .unwrap();
881
882 for i in 0..3 {
884 journal.append(create_operation(i)).await.unwrap();
885 }
886 journal
887 .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
888 .await
889 .unwrap();
890 for i in 4..7 {
891 journal.append(create_operation(i)).await.unwrap();
892 }
893
894 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
896 assert_eq!(final_size, 4);
897 assert_eq!(journal.size(), 4);
898
899 let op = journal.read(3).await.unwrap();
901 assert!(op.is_commit());
902 }
903
904 {
906 let mut journal = ContiguousJournal::init(
907 context.with_label("rewind_multiple"),
908 journal_config("rewind_multiple"),
909 )
910 .await
911 .unwrap();
912
913 journal.append(create_operation(0)).await.unwrap();
915 journal
916 .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
917 .await
918 .unwrap(); journal.append(create_operation(2)).await.unwrap();
920 journal
921 .append(Operation::CommitFloor(None, Location::new_unchecked(1)))
922 .await
923 .unwrap(); journal.append(create_operation(4)).await.unwrap();
925
926 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
928 assert_eq!(final_size, 4);
929
930 let op = journal.read(3).await.unwrap();
932 assert!(op.is_commit());
933
934 assert!(journal.read(4).await.is_err());
936 }
937
938 {
940 let mut journal = ContiguousJournal::init(
941 context.with_label("rewind_no_match"),
942 journal_config("rewind_no_match"),
943 )
944 .await
945 .unwrap();
946
947 for i in 0..10 {
949 journal.append(create_operation(i)).await.unwrap();
950 }
951
952 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
954 assert_eq!(final_size, 0, "Should rewind to pruning boundary (0)");
955 assert_eq!(journal.size(), 0);
956 }
957
958 {
960 let mut journal = ContiguousJournal::init(
961 context.with_label("rewind_with_pruning"),
962 journal_config("rewind_with_pruning"),
963 )
964 .await
965 .unwrap();
966
967 for i in 0..10 {
969 journal.append(create_operation(i)).await.unwrap();
970 }
971 journal
972 .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
973 .await
974 .unwrap(); for i in 11..15 {
976 journal.append(create_operation(i)).await.unwrap();
977 }
978 journal.sync().await.unwrap();
979
980 journal.prune(8).await.unwrap();
982 let oldest = journal.oldest_retained_pos();
983 assert_eq!(oldest, Some(7));
984
985 for i in 15..20 {
987 journal.append(create_operation(i)).await.unwrap();
988 }
989
990 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
992 assert_eq!(final_size, 11);
993
994 let op = journal.read(10).await.unwrap();
996 assert!(op.is_commit());
997 }
998
999 {
1001 let mut journal = ContiguousJournal::init(
1002 context.with_label("rewind_no_match_pruned"),
1003 journal_config("rewind_no_match_pruned"),
1004 )
1005 .await
1006 .unwrap();
1007
1008 for i in 0..5 {
1010 journal.append(create_operation(i)).await.unwrap();
1011 }
1012 journal
1013 .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
1014 .await
1015 .unwrap(); for i in 6..10 {
1017 journal.append(create_operation(i)).await.unwrap();
1018 }
1019 journal.sync().await.unwrap();
1020
1021 journal.prune(8).await.unwrap();
1024 let oldest = journal.oldest_retained_pos();
1025 assert_eq!(oldest, Some(7));
1026
1027 for i in 10..14 {
1029 journal.append(create_operation(i)).await.unwrap();
1030 }
1031
1032 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1035 assert_eq!(final_size, 7);
1036 }
1037
1038 {
1040 let mut journal = ContiguousJournal::init(
1041 context.with_label("rewind_empty"),
1042 journal_config("rewind_empty"),
1043 )
1044 .await
1045 .unwrap();
1046
1047 let final_size = journal
1049 .rewind_to(|op: &Operation<Digest, Digest>| op.is_commit())
1050 .await
1051 .unwrap();
1052 assert_eq!(final_size, 0);
1053 assert_eq!(journal.size(), 0);
1054 }
1055
1056 {
1058 let mut journal = AuthenticatedJournal::new(
1059 context,
1060 mmr_config("rewind"),
1061 journal_config("rewind"),
1062 |op| op.is_commit(),
1063 )
1064 .await
1065 .unwrap();
1066
1067 for i in 0..5 {
1069 journal.append(create_operation(i)).await.unwrap();
1070 }
1071 journal
1072 .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
1073 .await
1074 .unwrap(); for i in 6..10 {
1076 journal.append(create_operation(i)).await.unwrap();
1077 }
1078 assert_eq!(journal.size(), 10);
1079
1080 journal.rewind(2).await.unwrap();
1081 assert_eq!(journal.size(), 2);
1082 assert_eq!(journal.mmr.leaves(), 2);
1083 assert_eq!(journal.mmr.size(), 3);
1084 assert_eq!(journal.pruning_boundary(), 0);
1085 assert_eq!(journal.oldest_retained_pos(), Some(0));
1086
1087 assert!(matches!(
1088 journal.rewind(3).await,
1089 Err(JournalError::InvalidRewind(_))
1090 ));
1091
1092 journal.rewind(0).await.unwrap();
1093 assert_eq!(journal.size(), 0);
1094 assert_eq!(journal.mmr.leaves(), 0);
1095 assert_eq!(journal.mmr.size(), 0);
1096 assert_eq!(journal.pruning_boundary(), 0);
1097 assert_eq!(journal.oldest_retained_pos(), None);
1098
1099 for i in 0..255 {
1101 journal.append(create_operation(i)).await.unwrap();
1102 }
1103 MutableContiguous::prune(&mut journal, 100).await.unwrap();
1104 assert_eq!(journal.pruning_boundary(), 98);
1105 let res = journal.rewind(97).await;
1106 assert!(matches!(res, Err(JournalError::InvalidRewind(97))));
1107 journal.rewind(98).await.unwrap();
1108 assert_eq!(journal.size(), 98);
1109 assert_eq!(journal.mmr.leaves(), 98);
1110 assert_eq!(journal.pruning_boundary(), 98);
1111 assert_eq!(journal.oldest_retained_pos(), None);
1112 }
1113 });
1114 }
1115
1116 #[test_traced("INFO")]
1119 fn test_apply_op_and_read_operations() {
1120 let executor = deterministic::Runner::default();
1121 executor.start(|context| async move {
1122 let mut journal = create_empty_journal(context, "apply_op").await;
1123
1124 assert_eq!(journal.size(), 0);
1125
1126 let expected_ops: Vec<_> = (0..50).map(|i| create_operation(i as u8)).collect();
1128 for (i, op) in expected_ops.iter().enumerate() {
1129 let loc = journal.append(op.clone()).await.unwrap();
1130 assert_eq!(loc, Location::new_unchecked(i as u64));
1131 assert_eq!(journal.size(), (i + 1) as u64);
1132 }
1133
1134 assert_eq!(journal.size(), 50);
1135
1136 journal.sync().await.unwrap();
1138 for (i, expected_op) in expected_ops.iter().enumerate() {
1139 let read_op = journal
1140 .read(Location::new_unchecked(i as u64))
1141 .await
1142 .unwrap();
1143 assert_eq!(read_op, *expected_op);
1144 }
1145 });
1146 }
1147
1148 #[test_traced("INFO")]
1150 fn test_read_operations_at_various_positions() {
1151 let executor = deterministic::Runner::default();
1152 executor.start(|context| async move {
1153 let journal = create_journal_with_ops(context, "read", 50).await;
1154
1155 let first_op = journal.read(Location::new_unchecked(0)).await.unwrap();
1157 assert_eq!(first_op, create_operation(0));
1158
1159 let middle_op = journal.read(Location::new_unchecked(25)).await.unwrap();
1161 assert_eq!(middle_op, create_operation(25));
1162
1163 let last_op = journal.read(Location::new_unchecked(49)).await.unwrap();
1165 assert_eq!(last_op, create_operation(49));
1166
1167 for i in 0..50 {
1169 let op = journal.read(Location::new_unchecked(i)).await.unwrap();
1170 assert_eq!(op, create_operation(i as u8));
1171 }
1172 });
1173 }
1174
1175 #[test_traced("INFO")]
1177 fn test_read_pruned_operation_returns_error() {
1178 let executor = deterministic::Runner::default();
1179 executor.start(|context| async move {
1180 let mut journal = create_journal_with_ops(context, "read_pruned", 100).await;
1181
1182 journal
1184 .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1185 .await
1186 .unwrap();
1187 journal.sync().await.unwrap();
1188 let pruned_boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1189
1190 let read_loc = Location::new_unchecked(0);
1192 if read_loc < pruned_boundary {
1193 let result = journal.read(read_loc).await;
1194 assert!(matches!(
1195 result,
1196 Err(Error::Journal(crate::journal::Error::ItemPruned(_)))
1197 ));
1198 }
1199 });
1200 }
1201
1202 #[test_traced("INFO")]
1204 fn test_read_out_of_range_returns_error() {
1205 let executor = deterministic::Runner::default();
1206 executor.start(|context| async move {
1207 let journal = create_journal_with_ops(context, "read_oob", 3).await;
1208
1209 let result = journal.read(Location::new_unchecked(10)).await;
1211 assert!(matches!(
1212 result,
1213 Err(Error::Journal(crate::journal::Error::ItemOutOfRange(_)))
1214 ));
1215 });
1216 }
1217
1218 #[test_traced("INFO")]
1220 fn test_read_all_operations_back_correctly() {
1221 let executor = deterministic::Runner::default();
1222 executor.start(|context| async move {
1223 let journal = create_journal_with_ops(context, "read_all", 50).await;
1224
1225 assert_eq!(journal.size(), 50);
1226
1227 for i in 0..50 {
1229 let op = journal.read(Location::new_unchecked(i)).await.unwrap();
1230 assert_eq!(op, create_operation(i as u8));
1231 }
1232 });
1233 }
1234
1235 #[test_traced("INFO")]
1237 fn test_sync() {
1238 let executor = deterministic::Runner::default();
1239 executor.start(|context| async move {
1240 let mut journal = create_empty_journal(context.clone(), "close_pending").await;
1241
1242 let expected_ops: Vec<_> = (0..20).map(|i| create_operation(i as u8)).collect();
1244 for (i, op) in expected_ops.iter().enumerate() {
1245 let loc = journal.append(op.clone()).await.unwrap();
1246 assert_eq!(loc, Location::new_unchecked(i as u64),);
1247 }
1248
1249 let commit_loc = journal
1251 .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
1252 .await
1253 .unwrap();
1254 assert_eq!(
1255 commit_loc,
1256 Location::new_unchecked(20),
1257 "commit should be at location 20"
1258 );
1259 journal.sync().await.unwrap();
1260
1261 drop(journal);
1263 let journal = create_empty_journal(context, "close_pending").await;
1264 assert_eq!(journal.size(), 21);
1265
1266 for (i, expected_op) in expected_ops.iter().enumerate() {
1268 let read_op = journal
1269 .read(Location::new_unchecked(i as u64))
1270 .await
1271 .unwrap();
1272 assert_eq!(read_op, *expected_op);
1273 }
1274 });
1275 }
1276
1277 #[test_traced("INFO")]
1279 fn test_prune_empty_journal() {
1280 let executor = deterministic::Runner::default();
1281 executor.start(|context| async move {
1282 let mut journal = create_empty_journal(context, "prune_empty").await;
1283
1284 let boundary = journal.prune(Location::new_unchecked(0)).await.unwrap();
1285
1286 assert_eq!(boundary, Location::new_unchecked(0));
1287 });
1288 }
1289
1290 #[test_traced("INFO")]
1292 fn test_prune_to_location() {
1293 let executor = deterministic::Runner::default();
1294 executor.start(|context| async move {
1295 let mut journal = create_journal_with_ops(context, "prune_to", 100).await;
1296
1297 journal
1299 .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1300 .await
1301 .unwrap();
1302 journal.sync().await.unwrap();
1303
1304 let boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1305
1306 assert!(boundary <= Location::new_unchecked(50));
1308 });
1309 }
1310
1311 #[test_traced("INFO")]
1313 fn test_prune_returns_actual_boundary() {
1314 let executor = deterministic::Runner::default();
1315 executor.start(|context| async move {
1316 let mut journal = create_journal_with_ops(context, "prune_boundary", 100).await;
1317
1318 journal
1319 .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1320 .await
1321 .unwrap();
1322 journal.sync().await.unwrap();
1323
1324 let requested = Location::new_unchecked(50);
1325 let actual = journal.prune(requested).await.unwrap();
1326
1327 let oldest = journal.oldest_retained_loc().unwrap();
1329 assert_eq!(actual, oldest);
1330
1331 assert!(actual <= requested);
1333 });
1334 }
1335
1336 #[test_traced("INFO")]
1338 fn test_prune_preserves_operation_count() {
1339 let executor = deterministic::Runner::default();
1340 executor.start(|context| async move {
1341 let mut journal = create_journal_with_ops(context, "prune_count", 100).await;
1342
1343 journal
1344 .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1345 .await
1346 .unwrap();
1347 journal.sync().await.unwrap();
1348
1349 let count_before = journal.size();
1350 journal.prune(Location::new_unchecked(50)).await.unwrap();
1351 let count_after = journal.size();
1352
1353 assert_eq!(count_before, count_after);
1354 });
1355 }
1356
1357 #[test_traced("INFO")]
1359 fn test_oldest_retained_loc() {
1360 let executor = deterministic::Runner::default();
1361 executor.start(|context| async move {
1362 let journal = create_empty_journal(context.clone(), "oldest").await;
1364 let oldest = journal.oldest_retained_loc();
1365 assert_eq!(oldest, None);
1366
1367 let journal = create_journal_with_ops(context.clone(), "oldest", 100).await;
1369 let oldest = journal.oldest_retained_loc();
1370 assert_eq!(oldest, Some(Location::new_unchecked(0)));
1371
1372 let mut journal = create_journal_with_ops(context, "oldest", 100).await;
1374 journal
1375 .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1376 .await
1377 .unwrap();
1378 journal.sync().await.unwrap();
1379
1380 let pruned_boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1381
1382 let oldest_loc = journal.oldest_retained_loc().unwrap();
1383 assert_eq!(oldest_loc, pruned_boundary);
1385 assert!(oldest_loc <= Location::new_unchecked(50));
1387 });
1388 }
1389
1390 #[test_traced("INFO")]
1392 fn test_pruning_boundary() {
1393 let executor = deterministic::Runner::default();
1394 executor.start(|context| async move {
1395 let journal = create_empty_journal(context.clone(), "boundary").await;
1397 let boundary = journal.pruning_boundary();
1398 assert_eq!(boundary, Location::new_unchecked(0));
1399
1400 let journal = create_journal_with_ops(context.clone(), "boundary", 100).await;
1402 let boundary = journal.pruning_boundary();
1403 assert_eq!(boundary, Location::new_unchecked(0));
1404
1405 let mut journal = create_journal_with_ops(context, "boundary", 100).await;
1407 journal
1408 .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1409 .await
1410 .unwrap();
1411 journal.sync().await.unwrap();
1412
1413 let pruned_boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1414
1415 let boundary = journal.pruning_boundary();
1416 assert_eq!(boundary, pruned_boundary);
1417 });
1418 }
1419
1420 #[test_traced("INFO")]
1422 fn test_mmr_prunes_to_journal_boundary() {
1423 let executor = deterministic::Runner::default();
1424 executor.start(|context| async move {
1425 let mut journal = create_journal_with_ops(context, "mmr_boundary", 50).await;
1426
1427 journal
1428 .append(Operation::CommitFloor(None, Location::new_unchecked(25)))
1429 .await
1430 .unwrap();
1431 journal.sync().await.unwrap();
1432
1433 let pruned_boundary = journal.prune(Location::new_unchecked(25)).await.unwrap();
1434
1435 let oldest_retained = journal.oldest_retained_loc();
1437 assert_eq!(Some(pruned_boundary), oldest_retained);
1438
1439 assert!(pruned_boundary <= Location::new_unchecked(25));
1441
1442 assert_eq!(journal.size(), 51);
1444 });
1445 }
1446
1447 #[test_traced("INFO")]
1449 fn test_proof_multiple_operations() {
1450 let executor = deterministic::Runner::default();
1451 executor.start(|context| async move {
1452 let journal = create_journal_with_ops(context, "proof_multi", 50).await;
1453
1454 let (proof, ops) = journal
1455 .proof(Location::new_unchecked(0), NZU64!(50))
1456 .await
1457 .unwrap();
1458
1459 assert_eq!(ops.len(), 50);
1460 for (i, op) in ops.iter().enumerate() {
1461 assert_eq!(*op, create_operation(i as u8));
1462 }
1463
1464 let mut hasher = StandardHasher::new();
1466 let root = journal.root();
1467 assert!(verify_proof(
1468 &proof,
1469 &ops,
1470 Location::new_unchecked(0),
1471 &root,
1472 &mut hasher
1473 ));
1474 });
1475 }
1476
1477 #[test_traced("INFO")]
1479 fn test_historical_proof_limited_by_max_ops() {
1480 let executor = deterministic::Runner::default();
1481 executor.start(|context| async move {
1482 let journal = create_journal_with_ops(context, "proof_limit", 50).await;
1483
1484 let size = journal.size();
1485 let (proof, ops) = journal
1486 .historical_proof(size, Location::new_unchecked(0), NZU64!(20))
1487 .await
1488 .unwrap();
1489
1490 assert_eq!(ops.len(), 20);
1492 for (i, op) in ops.iter().enumerate() {
1493 assert_eq!(*op, create_operation(i as u8));
1494 }
1495
1496 let mut hasher = StandardHasher::new();
1498 let root = journal.root();
1499 assert!(verify_proof(
1500 &proof,
1501 &ops,
1502 Location::new_unchecked(0),
1503 &root,
1504 &mut hasher
1505 ));
1506 });
1507 }
1508
1509 #[test_traced("INFO")]
1511 fn test_historical_proof_at_end_of_journal() {
1512 let executor = deterministic::Runner::default();
1513 executor.start(|context| async move {
1514 let journal = create_journal_with_ops(context, "proof_end", 50).await;
1515
1516 let size = journal.size();
1517 let (proof, ops) = journal
1519 .historical_proof(size, Location::new_unchecked(40), NZU64!(20))
1520 .await
1521 .unwrap();
1522
1523 assert_eq!(ops.len(), 10);
1525 for (i, op) in ops.iter().enumerate() {
1526 assert_eq!(*op, create_operation((40 + i) as u8));
1527 }
1528
1529 let mut hasher = StandardHasher::new();
1531 let root = journal.root();
1532 assert!(verify_proof(
1533 &proof,
1534 &ops,
1535 Location::new_unchecked(40),
1536 &root,
1537 &mut hasher
1538 ));
1539 });
1540 }
1541
1542 #[test_traced("INFO")]
1544 fn test_historical_proof_out_of_range_returns_error() {
1545 let executor = deterministic::Runner::default();
1546 executor.start(|context| async move {
1547 let journal = create_journal_with_ops(context, "proof_oob", 5).await;
1548
1549 let result = journal
1551 .historical_proof(
1552 Location::new_unchecked(10),
1553 Location::new_unchecked(0),
1554 NZU64!(1),
1555 )
1556 .await;
1557
1558 assert!(matches!(
1559 result,
1560 Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
1561 ));
1562 });
1563 }
1564
1565 #[test_traced("INFO")]
1567 fn test_historical_proof_start_too_large_returns_error() {
1568 let executor = deterministic::Runner::default();
1569 executor.start(|context| async move {
1570 let journal = create_journal_with_ops(context, "proof_start_oob", 5).await;
1571
1572 let size = journal.size();
1573 let result = journal.historical_proof(size, size, NZU64!(1)).await;
1575
1576 assert!(matches!(
1577 result,
1578 Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
1579 ));
1580 });
1581 }
1582
1583 #[test_traced("INFO")]
1585 fn test_historical_proof_truly_historical() {
1586 let executor = deterministic::Runner::default();
1587 executor.start(|context| async move {
1588 let mut journal = create_journal_with_ops(context, "proof_historical", 50).await;
1590
1591 let mut hasher = StandardHasher::new();
1593 let historical_root = journal.root();
1594 let historical_size = journal.size();
1595
1596 for i in 50..100 {
1598 journal.append(create_operation(i as u8)).await.unwrap();
1599 }
1600 journal.sync().await.unwrap();
1601
1602 let (proof, ops) = journal
1604 .historical_proof(historical_size, Location::new_unchecked(0), NZU64!(50))
1605 .await
1606 .unwrap();
1607
1608 assert_eq!(ops.len(), 50);
1610 for (i, op) in ops.iter().enumerate() {
1611 assert_eq!(*op, create_operation(i as u8));
1612 }
1613
1614 assert!(verify_proof(
1616 &proof,
1617 &ops,
1618 Location::new_unchecked(0),
1619 &historical_root,
1620 &mut hasher
1621 ));
1622 });
1623 }
1624
1625 #[test_traced("INFO")]
1627 fn test_historical_proof_pruned_location_returns_error() {
1628 let executor = deterministic::Runner::default();
1629 executor.start(|context| async move {
1630 let mut journal = create_journal_with_ops(context, "proof_pruned", 50).await;
1631
1632 journal
1633 .append(Operation::CommitFloor(None, Location::new_unchecked(25)))
1634 .await
1635 .unwrap();
1636 journal.sync().await.unwrap();
1637 let pruned_boundary = journal.prune(Location::new_unchecked(25)).await.unwrap();
1638
1639 let size = journal.size();
1641 let start_loc = Location::new_unchecked(0);
1642 if start_loc < pruned_boundary {
1643 let result = journal.historical_proof(size, start_loc, NZU64!(1)).await;
1644
1645 assert!(result.is_err());
1647 }
1648 });
1649 }
1650
1651 #[test_traced("INFO")]
1653 fn test_replay_operations() {
1654 let executor = deterministic::Runner::default();
1655 executor.start(|context| async move {
1656 let journal = create_empty_journal(context.clone(), "replay").await;
1658 let stream = journal.replay(0, NZUsize!(10)).await.unwrap();
1659 futures::pin_mut!(stream);
1660 assert!(stream.next().await.is_none());
1661
1662 let journal = create_journal_with_ops(context, "replay", 50).await;
1664 let stream = journal.replay(0, NZUsize!(100)).await.unwrap();
1665 futures::pin_mut!(stream);
1666
1667 for i in 0..50 {
1668 let (pos, op) = stream.next().await.unwrap().unwrap();
1669 assert_eq!(pos, i);
1670 assert_eq!(op, create_operation(i as u8));
1671 }
1672
1673 assert!(stream.next().await.is_none());
1674 });
1675 }
1676
1677 #[test_traced("INFO")]
1679 fn test_replay_from_middle() {
1680 let executor = deterministic::Runner::default();
1681 executor.start(|context| async move {
1682 let journal = create_journal_with_ops(context, "replay_middle", 50).await;
1683 let stream = journal.replay(25, NZUsize!(100)).await.unwrap();
1684 futures::pin_mut!(stream);
1685
1686 let mut count = 0;
1687 while let Some(result) = stream.next().await {
1688 let (pos, op) = result.unwrap();
1689 assert_eq!(pos, 25 + count);
1690 assert_eq!(op, create_operation((25 + count) as u8));
1691 count += 1;
1692 }
1693
1694 assert_eq!(count, 25);
1696 });
1697 }
1698}