1use crate::{
9 journal::{
10 contiguous::{fixed, variable, Contiguous, Mutable, Reader},
11 Error as JournalError,
12 },
13 mmr::{
14 batch,
15 journaled::Mmr,
16 read::{BatchChainInfo, Readable},
17 Error as MmrError, Location, Position, Proof, StandardHasher,
18 },
19 Persistable,
20};
21use alloc::{collections::BTreeMap, sync::Arc, vec::Vec};
22use commonware_codec::{CodecFixedShared, CodecShared, Encode, EncodeShared};
23use commonware_cryptography::{Digest, Hasher};
24use commonware_runtime::{Clock, Metrics, Storage};
25use core::num::NonZeroU64;
26use futures::{future::try_join_all, try_join, TryFutureExt as _};
27use thiserror::Error;
28use tracing::{debug, warn};
29
30#[derive(Error, Debug)]
32pub enum Error {
33 #[error("mmr error: {0}")]
34 Mmr(#[from] crate::mmr::Error),
35
36 #[error("journal error: {0}")]
37 Journal(#[from] super::Error),
38}
39
40pub trait BatchChain<Item> {
42 fn collect(&self, into: &mut Vec<Arc<Vec<Item>>>);
45}
46
47impl<E: Storage + Clock + Metrics, D: Digest, Item> BatchChain<Item> for Mmr<E, D> {
48 fn collect(&self, _into: &mut Vec<Arc<Vec<Item>>>) {}
50}
51
52pub struct UnmerkleizedBatch<'a, H: Hasher, P: Readable<H::Digest>, Item> {
55 inner: batch::UnmerkleizedBatch<'a, H::Digest, P>,
57 hasher: StandardHasher<H>,
59 items: Vec<Item>,
61}
62
63impl<'a, H: Hasher, P: Readable<H::Digest>, Item: Encode> UnmerkleizedBatch<'a, H, P, Item> {
64 pub fn add(&mut self, item: Item) {
66 let encoded = item.encode();
67 self.inner.add(&mut self.hasher, &encoded);
68 self.items.push(item);
69 }
70
71 pub fn merkleize(mut self) -> MerkleizedBatch<'a, H, P, Item> {
73 MerkleizedBatch {
74 inner: self.inner.merkleize(&mut self.hasher),
75 items: Arc::new(self.items),
76 }
77 }
78}
79
80pub struct MerkleizedBatch<'a, H: Hasher, P: Readable<H::Digest>, Item> {
83 inner: batch::MerkleizedBatch<'a, H::Digest, P>,
85 items: Arc<Vec<Item>>,
87}
88
89impl<'a, H: Hasher, P: Readable<H::Digest>, Item> MerkleizedBatch<'a, H, P, Item> {
90 pub fn root(&self) -> H::Digest {
92 self.inner.root()
93 }
94}
95
96impl<'a, H: Hasher, P: Readable<H::Digest>, Item: Send + Sync> Readable<H::Digest>
97 for MerkleizedBatch<'a, H, P, Item>
98{
99 fn size(&self) -> Position {
100 self.inner.size()
101 }
102 fn get_node(&self, pos: Position) -> Option<H::Digest> {
103 self.inner.get_node(pos)
104 }
105 fn root(&self) -> H::Digest {
106 self.inner.root()
107 }
108 fn pruned_to_pos(&self) -> Position {
109 self.inner.pruned_to_pos()
110 }
111}
112
113impl<'a, H: Hasher, P: Readable<H::Digest> + BatchChainInfo<H::Digest>, Item: Send + Sync>
114 BatchChainInfo<H::Digest> for MerkleizedBatch<'a, H, P, Item>
115{
116 fn base_size(&self) -> Position {
117 self.inner.base_size()
118 }
119 fn collect_overwrites(&self, into: &mut BTreeMap<Position, H::Digest>) {
120 self.inner.collect_overwrites(into);
121 }
122}
123
124impl<'a, H: Hasher, P: Readable<H::Digest> + BatchChain<Item>, Item: Send + Sync> BatchChain<Item>
125 for MerkleizedBatch<'a, H, P, Item>
126{
127 fn collect(&self, into: &mut Vec<Arc<Vec<Item>>>) {
128 self.inner.parent().collect(into); into.push(self.items.clone()); }
131}
132
133impl<'a, H: Hasher, P: Readable<H::Digest>, Item: Send + Sync + Encode>
134 MerkleizedBatch<'a, H, P, Item>
135{
136 pub fn new_batch(&self) -> UnmerkleizedBatch<'_, H, Self, Item> {
138 let inner = batch::UnmerkleizedBatch::new(self);
139 #[cfg(feature = "std")]
140 let inner = inner.with_pool(self.inner.pool());
141 UnmerkleizedBatch {
142 inner,
143 hasher: StandardHasher::new(),
144 items: Vec::new(),
145 }
146 }
147}
148
149impl<'a, H: Hasher, P, Item: Send + Sync> MerkleizedBatch<'a, H, P, Item>
150where
151 P: Readable<H::Digest> + BatchChainInfo<H::Digest> + BatchChain<Item>,
152{
153 pub fn finalize(self) -> Changeset<H::Digest, Item> {
156 let mut items = Vec::new();
157 self.collect(&mut items);
158 Changeset {
159 changeset: self.inner.finalize(),
160 items,
161 }
162 }
163}
164
165pub struct Changeset<D: Digest, Item> {
167 changeset: batch::Changeset<D>,
169 items: Vec<Arc<Vec<Item>>>,
171}
172
173pub struct Journal<E, C, H>
178where
179 E: Storage + Clock + Metrics,
180 C: Contiguous<Item: EncodeShared>,
181 H: Hasher,
182{
183 pub(crate) mmr: Mmr<E, H::Digest>,
186
187 pub(crate) journal: C,
190
191 pub(crate) hasher: StandardHasher<H>,
192}
193
194impl<E, C, H> Journal<E, C, H>
195where
196 E: Storage + Clock + Metrics,
197 C: Contiguous<Item: EncodeShared>,
198 H: Hasher,
199{
200 pub async fn size(&self) -> Location {
202 Location::new(self.journal.size().await)
203 }
204
205 pub fn root(&self) -> H::Digest {
207 self.mmr.root()
208 }
209
210 pub fn new_batch(&self) -> UnmerkleizedBatch<'_, H, Mmr<E, H::Digest>, C::Item> {
212 UnmerkleizedBatch {
213 inner: self.mmr.new_batch(),
214 hasher: StandardHasher::new(),
215 items: Vec::new(),
216 }
217 }
218}
219
220impl<E, C, H> Journal<E, C, H>
221where
222 E: Storage + Clock + Metrics,
223 C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
224 H: Hasher,
225{
226 pub async fn commit(&self) -> Result<(), Error> {
229 self.journal.commit().await.map_err(Error::Journal)
230 }
231}
232
233impl<E, C, H> Journal<E, C, H>
234where
235 E: Storage + Clock + Metrics,
236 C: Mutable<Item: EncodeShared>,
237 H: Hasher,
238{
239 pub async fn from_components(
241 mut mmr: Mmr<E, H::Digest>,
242 journal: C,
243 mut hasher: StandardHasher<H>,
244 apply_batch_size: u64,
245 ) -> Result<Self, Error> {
246 Self::align(&mut mmr, &journal, &mut hasher, apply_batch_size).await?;
247
248 mmr.sync().await?;
251
252 Ok(Self {
253 mmr,
254 journal,
255 hasher,
256 })
257 }
258
259 async fn align(
263 mmr: &mut Mmr<E, H::Digest>,
264 journal: &C,
265 hasher: &mut StandardHasher<H>,
266 apply_batch_size: u64,
267 ) -> Result<(), Error> {
268 let journal_size = journal.size().await;
271 let mut mmr_size = mmr.leaves();
272 if mmr_size > journal_size {
273 let rewind_count = mmr_size - journal_size;
274 warn!(
275 journal_size,
276 ?rewind_count,
277 "rewinding MMR to match journal"
278 );
279 mmr.rewind(*rewind_count as usize, hasher).await?;
280 mmr_size = Location::new(journal_size);
281 }
282
283 if mmr_size < journal_size {
285 let replay_count = journal_size - *mmr_size;
286 warn!(
287 ?journal_size,
288 replay_count, "MMR lags behind journal, replaying journal to catch up"
289 );
290
291 let reader = journal.reader().await;
292 while mmr_size < journal_size {
293 let changeset = {
294 let mut batch = mmr.new_batch();
295 let mut count = 0u64;
296 while count < apply_batch_size && mmr_size < journal_size {
297 let op = reader.read(*mmr_size).await?;
298 batch.add(hasher, &op.encode());
299 mmr_size += 1;
300 count += 1;
301 }
302 batch.merkleize(hasher).finalize()
303 };
304 mmr.apply(changeset)?;
305 }
306 return Ok(());
307 }
308
309 assert_eq!(journal.size().await, *mmr.leaves());
311
312 Ok(())
313 }
314
315 pub async fn append(&mut self, item: &C::Item) -> Result<Location, Error> {
317 let encoded_item = item.encode();
318
319 let loc = self.journal.append(item).await?;
321 let changeset = {
322 let mut batch = self.mmr.new_batch();
323 batch.add(&mut self.hasher, &encoded_item);
324 batch.merkleize(&mut self.hasher).finalize()
325 };
326 self.mmr.apply(changeset)?;
327
328 Ok(Location::new(loc))
329 }
330
331 pub async fn apply_batch(&mut self, batch: Changeset<H::Digest, C::Item>) -> Result<(), Error> {
338 let actual = self.mmr.size();
339 if batch.changeset.base_size != actual {
340 return Err(MmrError::StaleChangeset {
341 expected: batch.changeset.base_size,
342 actual,
343 }
344 .into());
345 }
346
347 for items in &batch.items {
348 for item in items.iter() {
349 self.journal.append(item).await?;
350 }
351 }
352 self.mmr.apply(batch.changeset)?;
353 debug_assert_eq!(*self.mmr.leaves(), self.journal.size().await);
354 Ok(())
355 }
356
357 pub async fn prune(&mut self, prune_loc: Location) -> Result<Location, Error> {
362 if self.mmr.size() == 0 {
363 return Ok(Location::new(self.reader().await.bounds().start));
365 }
366
367 self.mmr.sync().await?;
371
372 if !self.journal.prune(*prune_loc).await? {
374 return Ok(Location::new(self.reader().await.bounds().start));
375 }
376
377 let bounds = self.reader().await.bounds();
378 debug!(size = ?bounds.end, ?prune_loc, boundary = ?bounds.start, "pruned inactive ops");
379
380 self.mmr.prune(Location::from(bounds.start)).await?;
382
383 Ok(Location::new(bounds.start))
384 }
385}
386
387impl<E, C, H> Journal<E, C, H>
388where
389 E: Storage + Clock + Metrics,
390 C: Contiguous<Item: EncodeShared>,
391 H: Hasher,
392{
393 pub async fn proof(
407 &self,
408 start_loc: Location,
409 max_ops: NonZeroU64,
410 ) -> Result<(Proof<H::Digest>, Vec<C::Item>), Error> {
411 self.historical_proof(self.size().await, start_loc, max_ops)
412 .await
413 }
414
415 pub async fn historical_proof(
428 &self,
429 historical_leaves: Location,
430 start_loc: Location,
431 max_ops: NonZeroU64,
432 ) -> Result<(Proof<H::Digest>, Vec<C::Item>), Error> {
433 let reader = self.journal.reader().await;
435 let bounds = reader.bounds();
436
437 if *historical_leaves > bounds.end {
438 return Err(MmrError::RangeOutOfBounds(Location::new(bounds.end)).into());
439 }
440 if start_loc >= historical_leaves {
441 return Err(MmrError::RangeOutOfBounds(start_loc).into());
442 }
443
444 let end_loc = std::cmp::min(historical_leaves, start_loc.saturating_add(max_ops.get()));
445
446 let proof = self
447 .mmr
448 .historical_range_proof(historical_leaves, start_loc..end_loc)
449 .await?;
450
451 let mut ops = Vec::with_capacity((*end_loc - *start_loc) as usize);
452 let futures = (*start_loc..*end_loc)
453 .map(|i| reader.read(i))
454 .collect::<Vec<_>>();
455 try_join_all(futures)
456 .await?
457 .into_iter()
458 .for_each(|op| ops.push(op));
459
460 Ok((proof, ops))
461 }
462}
463
464impl<E, C, H> Journal<E, C, H>
465where
466 E: Storage + Clock + Metrics,
467 C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
468 H: Hasher,
469{
470 pub async fn destroy(self) -> Result<(), Error> {
472 try_join!(
473 self.journal.destroy().map_err(Error::Journal),
474 self.mmr.destroy().map_err(Error::Mmr),
475 )?;
476
477 Ok(())
478 }
479
480 pub async fn sync(&self) -> Result<(), Error> {
482 try_join!(
483 self.journal.sync().map_err(Error::Journal),
484 self.mmr.sync().map_err(Error::Mmr)
485 )?;
486
487 Ok(())
488 }
489}
490
491const APPLY_BATCH_SIZE: u64 = 1 << 16;
493
494impl<E, O, H> Journal<E, fixed::Journal<E, O>, H>
495where
496 E: Storage + Clock + Metrics,
497 O: CodecFixedShared,
498 H: Hasher,
499{
500 pub async fn new(
505 context: E,
506 mmr_cfg: crate::mmr::journaled::Config,
507 journal_cfg: fixed::Config,
508 rewind_predicate: fn(&O) -> bool,
509 ) -> Result<Self, Error> {
510 let mut journal = fixed::Journal::init(context.with_label("journal"), journal_cfg).await?;
511
512 journal.rewind_to(rewind_predicate).await?;
514
515 let mut hasher = StandardHasher::<H>::new();
517 let mut mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_cfg).await?;
518 Self::align(&mut mmr, &journal, &mut hasher, APPLY_BATCH_SIZE).await?;
519
520 journal.sync().await?;
523 mmr.sync().await?;
524
525 Ok(Self {
526 mmr,
527 journal,
528 hasher,
529 })
530 }
531}
532
533impl<E, O, H> Journal<E, variable::Journal<E, O>, H>
534where
535 E: Storage + Clock + Metrics,
536 O: CodecShared,
537 H: Hasher,
538{
539 pub async fn new(
544 context: E,
545 mmr_cfg: crate::mmr::journaled::Config,
546 journal_cfg: variable::Config<O::Cfg>,
547 rewind_predicate: fn(&O) -> bool,
548 ) -> Result<Self, Error> {
549 let mut hasher = StandardHasher::<H>::new();
550 let mut mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_cfg).await?;
551 let mut journal =
552 variable::Journal::init(context.with_label("journal"), journal_cfg).await?;
553
554 journal.rewind_to(rewind_predicate).await?;
556
557 Self::align(&mut mmr, &journal, &mut hasher, APPLY_BATCH_SIZE).await?;
559
560 journal.sync().await?;
563 mmr.sync().await?;
564
565 Ok(Self {
566 mmr,
567 journal,
568 hasher,
569 })
570 }
571}
572
573impl<E, C, H> Contiguous for Journal<E, C, H>
574where
575 E: Storage + Clock + Metrics,
576 C: Contiguous<Item: EncodeShared>,
577 H: Hasher,
578{
579 type Item = C::Item;
580
581 async fn reader(&self) -> impl Reader<Item = C::Item> + '_ {
582 self.journal.reader().await
583 }
584
585 async fn size(&self) -> u64 {
586 self.journal.size().await
587 }
588}
589
590impl<E, C, H> Mutable for Journal<E, C, H>
591where
592 E: Storage + Clock + Metrics,
593 C: Mutable<Item: EncodeShared>,
594 H: Hasher,
595{
596 async fn append(&mut self, item: &Self::Item) -> Result<u64, JournalError> {
597 let res = self.append(item).await.map_err(|e| match e {
598 Error::Journal(inner) => inner,
599 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
600 })?;
601
602 Ok(*res)
603 }
604
605 async fn prune(&mut self, min_position: u64) -> Result<bool, JournalError> {
606 self.journal.prune(min_position).await
607 }
608
609 async fn rewind(&mut self, size: u64) -> Result<(), JournalError> {
610 self.journal.rewind(size).await?;
611
612 let leaves = *self.mmr.leaves();
613 if leaves > size {
614 self.mmr
615 .rewind((leaves - size) as usize, &mut self.hasher)
616 .await
617 .map_err(|error| JournalError::Mmr(anyhow::Error::from(error)))?;
618 }
619
620 Ok(())
621 }
622}
623
624impl<E, C, H> Persistable for Journal<E, C, H>
625where
626 E: Storage + Clock + Metrics,
627 C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
628 H: Hasher,
629{
630 type Error = JournalError;
631
632 async fn commit(&self) -> Result<(), JournalError> {
633 self.commit().await.map_err(|e| match e {
634 Error::Journal(inner) => inner,
635 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
636 })
637 }
638
639 async fn sync(&self) -> Result<(), JournalError> {
640 self.sync().await.map_err(|e| match e {
641 Error::Journal(inner) => inner,
642 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
643 })
644 }
645
646 async fn destroy(self) -> Result<(), JournalError> {
647 self.destroy().await.map_err(|e| match e {
648 Error::Journal(inner) => inner,
649 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
650 })
651 }
652}
653
654#[cfg(test)]
655impl<E, C, H> Journal<E, C, H>
656where
657 E: Storage + Clock + Metrics,
658 C: Contiguous<Item: EncodeShared>,
659 H: Hasher,
660{
661 pub(crate) async fn read(&self, loc: Location) -> Result<C::Item, Error> {
663 self.journal
664 .reader()
665 .await
666 .read(*loc)
667 .await
668 .map_err(Error::Journal)
669 }
670}
671
672#[cfg(test)]
673mod tests {
674 use super::*;
675 use crate::{
676 journal::contiguous::fixed::{Config as JConfig, Journal as ContiguousJournal},
677 mmr::{
678 journaled::{Config as MmrConfig, Mmr},
679 Location,
680 },
681 qmdb::{
682 any::unordered::{fixed::Operation, Update},
683 operation::Committable,
684 },
685 };
686 use commonware_codec::Encode;
687 use commonware_cryptography::{sha256, sha256::Digest, Sha256};
688 use commonware_macros::test_traced;
689 use commonware_runtime::{
690 buffer::paged::CacheRef,
691 deterministic::{self, Context},
692 BufferPooler, Metrics, Runner as _,
693 };
694 use commonware_utils::{NZUsize, NZU16, NZU64};
695 use futures::StreamExt as _;
696 use std::num::{NonZeroU16, NonZeroUsize};
697
698 const PAGE_SIZE: NonZeroU16 = NZU16!(101);
699 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
700
701 fn mmr_config(suffix: &str, pooler: &impl BufferPooler) -> MmrConfig {
703 MmrConfig {
704 journal_partition: format!("mmr-journal-{suffix}"),
705 metadata_partition: format!("mmr-metadata-{suffix}"),
706 items_per_blob: NZU64!(11),
707 write_buffer: NZUsize!(1024),
708 thread_pool: None,
709 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
710 }
711 }
712
713 fn journal_config(suffix: &str, pooler: &impl BufferPooler) -> JConfig {
715 JConfig {
716 partition: format!("journal-{suffix}"),
717 items_per_blob: NZU64!(7),
718 write_buffer: NZUsize!(1024),
719 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
720 }
721 }
722
723 type AuthenticatedJournal = Journal<
724 deterministic::Context,
725 ContiguousJournal<deterministic::Context, Operation<Digest, Digest>>,
726 Sha256,
727 >;
728
729 async fn create_empty_journal(context: Context, suffix: &str) -> AuthenticatedJournal {
731 let mmr_cfg = mmr_config(suffix, &context);
732 let journal_cfg = journal_config(suffix, &context);
733 AuthenticatedJournal::new(
734 context,
735 mmr_cfg,
736 journal_cfg,
737 |op: &Operation<Digest, Digest>| op.is_commit(),
738 )
739 .await
740 .unwrap()
741 }
742
743 fn create_operation(index: u8) -> Operation<Digest, Digest> {
745 Operation::Update(Update(
746 Sha256::fill(index),
747 Sha256::fill(index.wrapping_add(1)),
748 ))
749 }
750
751 async fn create_journal_with_ops(
755 context: Context,
756 suffix: &str,
757 count: usize,
758 ) -> AuthenticatedJournal {
759 let mut journal = create_empty_journal(context, suffix).await;
760
761 for i in 0..count {
762 let op = create_operation(i as u8);
763 let loc = journal.append(&op).await.unwrap();
764 assert_eq!(loc, Location::new(i as u64));
765 }
766
767 journal.sync().await.unwrap();
768 journal
769 }
770
771 async fn create_components(
776 context: Context,
777 suffix: &str,
778 ) -> (
779 Mmr<deterministic::Context, sha256::Digest>,
780 ContiguousJournal<deterministic::Context, Operation<Digest, Digest>>,
781 StandardHasher<Sha256>,
782 ) {
783 let mut hasher = StandardHasher::new();
784 let mmr = Mmr::init(
785 context.with_label("mmr"),
786 &mut hasher,
787 mmr_config(suffix, &context),
788 )
789 .await
790 .unwrap();
791 let journal = ContiguousJournal::init(
792 context.with_label("journal"),
793 journal_config(suffix, &context),
794 )
795 .await
796 .unwrap();
797 (mmr, journal, hasher)
798 }
799
800 fn verify_proof(
802 proof: &crate::mmr::Proof<<Sha256 as commonware_cryptography::Hasher>::Digest>,
803 operations: &[Operation<Digest, Digest>],
804 start_loc: Location,
805 root: &<Sha256 as commonware_cryptography::Hasher>::Digest,
806 hasher: &mut StandardHasher<Sha256>,
807 ) -> bool {
808 let encoded_ops: Vec<_> = operations.iter().map(|op| op.encode()).collect();
809 proof.verify_range_inclusion(hasher, &encoded_ops, start_loc, root)
810 }
811
812 #[test_traced("INFO")]
814 fn test_new_creates_empty_journal() {
815 let executor = deterministic::Runner::default();
816 executor.start(|context| async move {
817 let journal = create_empty_journal(context, "new-empty").await;
818
819 let bounds = journal.reader().await.bounds();
820 assert_eq!(bounds.end, 0);
821 assert_eq!(bounds.start, 0);
822 assert!(bounds.is_empty());
823 });
824 }
825
826 #[test_traced("INFO")]
828 fn test_align_with_empty_mmr_and_journal() {
829 let executor = deterministic::Runner::default();
830 executor.start(|context| async move {
831 let (mut mmr, journal, mut hasher) = create_components(context, "align-empty").await;
832
833 AuthenticatedJournal::align(&mut mmr, &journal, &mut hasher, APPLY_BATCH_SIZE)
834 .await
835 .unwrap();
836
837 assert_eq!(mmr.leaves(), Location::new(0));
838 assert_eq!(journal.size().await, 0);
839 });
840 }
841
842 #[test_traced("WARN")]
844 fn test_align_when_mmr_ahead() {
845 let executor = deterministic::Runner::default();
846 executor.start(|context| async move {
847 let (mut mmr, journal, mut hasher) = create_components(context, "mmr-ahead").await;
848
849 {
851 let changeset = {
852 let mut batch = mmr.new_batch();
853 for i in 0..20 {
854 let op = create_operation(i as u8);
855 let encoded = op.encode();
856 batch.add(&mut hasher, &encoded);
857 journal.append(&op).await.unwrap();
858 }
859 batch.merkleize(&mut hasher).finalize()
860 };
861 mmr.apply(changeset).unwrap();
862 }
863
864 let commit_op = Operation::CommitFloor(None, Location::new(0));
866 journal.append(&commit_op).await.unwrap();
867 journal.sync().await.unwrap();
868
869 AuthenticatedJournal::align(&mut mmr, &journal, &mut hasher, APPLY_BATCH_SIZE)
871 .await
872 .unwrap();
873
874 assert_eq!(mmr.leaves(), Location::new(21));
876 assert_eq!(journal.size().await, 21);
877 });
878 }
879
880 #[test_traced("WARN")]
882 fn test_align_when_journal_ahead() {
883 let executor = deterministic::Runner::default();
884 executor.start(|context| async move {
885 let (mut mmr, journal, mut hasher) = create_components(context, "journal-ahead").await;
886
887 for i in 0..20 {
889 let op = create_operation(i as u8);
890 journal.append(&op).await.unwrap();
891 }
892
893 let commit_op = Operation::CommitFloor(None, Location::new(0));
895 journal.append(&commit_op).await.unwrap();
896 journal.sync().await.unwrap();
897
898 AuthenticatedJournal::align(&mut mmr, &journal, &mut hasher, APPLY_BATCH_SIZE)
900 .await
901 .unwrap();
902
903 assert_eq!(mmr.leaves(), Location::new(21));
905 assert_eq!(journal.size().await, 21);
906 });
907 }
908
909 #[test_traced("INFO")]
911 fn test_align_with_mismatched_committed_ops() {
912 let executor = deterministic::Runner::default();
913 executor.start(|context| async move {
914 let mut journal = create_empty_journal(context.with_label("first"), "mismatched").await;
915
916 for i in 0..20 {
918 let loc = journal.append(&create_operation(i as u8)).await.unwrap();
919 assert_eq!(loc, Location::new(i as u64));
920 }
921
922 let size_before = journal.size().await;
925 assert_eq!(size_before, 20);
926
927 journal.sync().await.unwrap();
929 drop(journal);
930 let journal = create_empty_journal(context.with_label("second"), "mismatched").await;
931
932 assert_eq!(journal.size().await, 0);
934 });
935 }
936
937 #[test_traced("INFO")]
938 fn test_rewind() {
939 let executor = deterministic::Runner::default();
940 executor.start(|context| async move {
941 {
943 let mut journal = ContiguousJournal::init(
944 context.with_label("rewind_match"),
945 journal_config("rewind-match", &context),
946 )
947 .await
948 .unwrap();
949
950 for i in 0..3 {
952 journal.append(&create_operation(i)).await.unwrap();
953 }
954 journal
955 .append(&Operation::CommitFloor(None, Location::new(0)))
956 .await
957 .unwrap();
958 for i in 4..7 {
959 journal.append(&create_operation(i)).await.unwrap();
960 }
961
962 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
964 assert_eq!(final_size, 4);
965 assert_eq!(journal.size().await, 4);
966
967 let op = journal.read(3).await.unwrap();
969 assert!(op.is_commit());
970 }
971
972 {
974 let mut journal = ContiguousJournal::init(
975 context.with_label("rewind_multiple"),
976 journal_config("rewind-multiple", &context),
977 )
978 .await
979 .unwrap();
980
981 journal.append(&create_operation(0)).await.unwrap();
983 journal
984 .append(&Operation::CommitFloor(None, Location::new(0)))
985 .await
986 .unwrap(); journal.append(&create_operation(2)).await.unwrap();
988 journal
989 .append(&Operation::CommitFloor(None, Location::new(1)))
990 .await
991 .unwrap(); journal.append(&create_operation(4)).await.unwrap();
993
994 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
996 assert_eq!(final_size, 4);
997
998 let op = journal.read(3).await.unwrap();
1000 assert!(op.is_commit());
1001
1002 assert!(journal.read(4).await.is_err());
1004 }
1005
1006 {
1008 let mut journal = ContiguousJournal::init(
1009 context.with_label("rewind_no_match"),
1010 journal_config("rewind-no-match", &context),
1011 )
1012 .await
1013 .unwrap();
1014
1015 for i in 0..10 {
1017 journal.append(&create_operation(i)).await.unwrap();
1018 }
1019
1020 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1022 assert_eq!(final_size, 0, "Should rewind to pruning boundary (0)");
1023 assert_eq!(journal.size().await, 0);
1024 }
1025
1026 {
1028 let mut journal = ContiguousJournal::init(
1029 context.with_label("rewind_with_pruning"),
1030 journal_config("rewind-with-pruning", &context),
1031 )
1032 .await
1033 .unwrap();
1034
1035 for i in 0..10 {
1037 journal.append(&create_operation(i)).await.unwrap();
1038 }
1039 journal
1040 .append(&Operation::CommitFloor(None, Location::new(0)))
1041 .await
1042 .unwrap(); for i in 11..15 {
1044 journal.append(&create_operation(i)).await.unwrap();
1045 }
1046 journal.sync().await.unwrap();
1047
1048 journal.prune(8).await.unwrap();
1050 assert_eq!(journal.reader().await.bounds().start, 7);
1051
1052 for i in 15..20 {
1054 journal.append(&create_operation(i)).await.unwrap();
1055 }
1056
1057 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1059 assert_eq!(final_size, 11);
1060
1061 let op = journal.read(10).await.unwrap();
1063 assert!(op.is_commit());
1064 }
1065
1066 {
1068 let mut journal = ContiguousJournal::init(
1069 context.with_label("rewind_no_match_pruned"),
1070 journal_config("rewind-no-match-pruned", &context),
1071 )
1072 .await
1073 .unwrap();
1074
1075 for i in 0..5 {
1077 journal.append(&create_operation(i)).await.unwrap();
1078 }
1079 journal
1080 .append(&Operation::CommitFloor(None, Location::new(0)))
1081 .await
1082 .unwrap(); for i in 6..10 {
1084 journal.append(&create_operation(i)).await.unwrap();
1085 }
1086 journal.sync().await.unwrap();
1087
1088 journal.prune(8).await.unwrap();
1091 assert_eq!(journal.reader().await.bounds().start, 7);
1092
1093 for i in 10..14 {
1095 journal.append(&create_operation(i)).await.unwrap();
1096 }
1097
1098 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1101 assert_eq!(final_size, 7);
1102 }
1103
1104 {
1106 let mut journal = ContiguousJournal::init(
1107 context.with_label("rewind_empty"),
1108 journal_config("rewind-empty", &context),
1109 )
1110 .await
1111 .unwrap();
1112
1113 let final_size = journal
1115 .rewind_to(|op: &Operation<Digest, Digest>| op.is_commit())
1116 .await
1117 .unwrap();
1118 assert_eq!(final_size, 0);
1119 assert_eq!(journal.size().await, 0);
1120 }
1121
1122 {
1124 let mmr_cfg = mmr_config("rewind", &context);
1125 let journal_cfg = journal_config("rewind", &context);
1126 let mut journal =
1127 AuthenticatedJournal::new(context, mmr_cfg, journal_cfg, |op| op.is_commit())
1128 .await
1129 .unwrap();
1130
1131 for i in 0..5 {
1133 journal.append(&create_operation(i)).await.unwrap();
1134 }
1135 journal
1136 .append(&Operation::CommitFloor(None, Location::new(0)))
1137 .await
1138 .unwrap(); for i in 6..10 {
1140 journal.append(&create_operation(i)).await.unwrap();
1141 }
1142 assert_eq!(journal.size().await, 10);
1143
1144 journal.rewind(2).await.unwrap();
1145 assert_eq!(journal.size().await, 2);
1146 assert_eq!(journal.mmr.leaves(), 2);
1147 assert_eq!(journal.mmr.size(), 3);
1148 let bounds = journal.reader().await.bounds();
1149 assert_eq!(bounds.start, 0);
1150 assert!(!bounds.is_empty());
1151
1152 assert!(matches!(
1153 journal.rewind(3).await,
1154 Err(JournalError::InvalidRewind(_))
1155 ));
1156
1157 journal.rewind(0).await.unwrap();
1158 assert_eq!(journal.size().await, 0);
1159 assert_eq!(journal.mmr.leaves(), 0);
1160 assert_eq!(journal.mmr.size(), 0);
1161 let bounds = journal.reader().await.bounds();
1162 assert_eq!(bounds.start, 0);
1163 assert!(bounds.is_empty());
1164
1165 for i in 0..255 {
1167 journal.append(&create_operation(i)).await.unwrap();
1168 }
1169 journal.prune(Location::new(100)).await.unwrap();
1170 assert_eq!(journal.reader().await.bounds().start, 98);
1171 let res = journal.rewind(97).await;
1172 assert!(matches!(res, Err(JournalError::InvalidRewind(97))));
1173 journal.rewind(98).await.unwrap();
1174 let bounds = journal.reader().await.bounds();
1175 assert_eq!(bounds.end, 98);
1176 assert_eq!(journal.mmr.leaves(), 98);
1177 assert_eq!(bounds.start, 98);
1178 assert!(bounds.is_empty());
1179 }
1180 });
1181 }
1182
1183 #[test_traced("INFO")]
1186 fn test_apply_op_and_read_operations() {
1187 let executor = deterministic::Runner::default();
1188 executor.start(|context| async move {
1189 let mut journal = create_empty_journal(context, "apply_op").await;
1190
1191 assert_eq!(journal.size().await, 0);
1192
1193 let expected_ops: Vec<_> = (0..50).map(|i| create_operation(i as u8)).collect();
1195 for (i, op) in expected_ops.iter().enumerate() {
1196 let loc = journal.append(op).await.unwrap();
1197 assert_eq!(loc, Location::new(i as u64));
1198 assert_eq!(journal.size().await, (i + 1) as u64);
1199 }
1200
1201 assert_eq!(journal.size().await, 50);
1202
1203 journal.sync().await.unwrap();
1205 for (i, expected_op) in expected_ops.iter().enumerate() {
1206 let read_op = journal.read(Location::new(i as u64)).await.unwrap();
1207 assert_eq!(read_op, *expected_op);
1208 }
1209 });
1210 }
1211
1212 #[test_traced("INFO")]
1214 fn test_read_operations_at_various_positions() {
1215 let executor = deterministic::Runner::default();
1216 executor.start(|context| async move {
1217 let journal = create_journal_with_ops(context, "read", 50).await;
1218
1219 let first_op = journal.read(Location::new(0)).await.unwrap();
1221 assert_eq!(first_op, create_operation(0));
1222
1223 let middle_op = journal.read(Location::new(25)).await.unwrap();
1225 assert_eq!(middle_op, create_operation(25));
1226
1227 let last_op = journal.read(Location::new(49)).await.unwrap();
1229 assert_eq!(last_op, create_operation(49));
1230
1231 for i in 0..50 {
1233 let op = journal.read(Location::new(i)).await.unwrap();
1234 assert_eq!(op, create_operation(i as u8));
1235 }
1236 });
1237 }
1238
1239 #[test_traced("INFO")]
1241 fn test_read_pruned_operation_returns_error() {
1242 let executor = deterministic::Runner::default();
1243 executor.start(|context| async move {
1244 let mut journal = create_journal_with_ops(context, "read_pruned", 100).await;
1245
1246 journal
1248 .append(&Operation::CommitFloor(None, Location::new(50)))
1249 .await
1250 .unwrap();
1251 journal.sync().await.unwrap();
1252 let pruned_boundary = journal.prune(Location::new(50)).await.unwrap();
1253
1254 let read_loc = Location::new(0);
1256 if read_loc < pruned_boundary {
1257 let result = journal.read(read_loc).await;
1258 assert!(matches!(
1259 result,
1260 Err(Error::Journal(crate::journal::Error::ItemPruned(_)))
1261 ));
1262 }
1263 });
1264 }
1265
1266 #[test_traced("INFO")]
1268 fn test_read_out_of_range_returns_error() {
1269 let executor = deterministic::Runner::default();
1270 executor.start(|context| async move {
1271 let journal = create_journal_with_ops(context, "read_oob", 3).await;
1272
1273 let result = journal.read(Location::new(10)).await;
1275 assert!(matches!(
1276 result,
1277 Err(Error::Journal(crate::journal::Error::ItemOutOfRange(_)))
1278 ));
1279 });
1280 }
1281
1282 #[test_traced("INFO")]
1284 fn test_read_all_operations_back_correctly() {
1285 let executor = deterministic::Runner::default();
1286 executor.start(|context| async move {
1287 let journal = create_journal_with_ops(context, "read_all", 50).await;
1288
1289 assert_eq!(journal.size().await, 50);
1290
1291 for i in 0..50 {
1293 let op = journal.read(Location::new(i)).await.unwrap();
1294 assert_eq!(op, create_operation(i as u8));
1295 }
1296 });
1297 }
1298
1299 #[test_traced("INFO")]
1301 fn test_sync() {
1302 let executor = deterministic::Runner::default();
1303 executor.start(|context| async move {
1304 let mut journal =
1305 create_empty_journal(context.with_label("first"), "close_pending").await;
1306
1307 let expected_ops: Vec<_> = (0..20).map(|i| create_operation(i as u8)).collect();
1309 for (i, op) in expected_ops.iter().enumerate() {
1310 let loc = journal.append(op).await.unwrap();
1311 assert_eq!(loc, Location::new(i as u64),);
1312 }
1313
1314 let commit_loc = journal
1316 .append(&Operation::CommitFloor(None, Location::new(0)))
1317 .await
1318 .unwrap();
1319 assert_eq!(
1320 commit_loc,
1321 Location::new(20),
1322 "commit should be at location 20"
1323 );
1324 journal.sync().await.unwrap();
1325
1326 drop(journal);
1328 let journal = create_empty_journal(context.with_label("second"), "close_pending").await;
1329 assert_eq!(journal.size().await, 21);
1330
1331 for (i, expected_op) in expected_ops.iter().enumerate() {
1333 let read_op = journal.read(Location::new(i as u64)).await.unwrap();
1334 assert_eq!(read_op, *expected_op);
1335 }
1336 });
1337 }
1338
1339 #[test_traced("INFO")]
1341 fn test_prune_empty_journal() {
1342 let executor = deterministic::Runner::default();
1343 executor.start(|context| async move {
1344 let mut journal = create_empty_journal(context, "prune_empty").await;
1345
1346 let boundary = journal.prune(Location::new(0)).await.unwrap();
1347
1348 assert_eq!(boundary, Location::new(0));
1349 });
1350 }
1351
1352 #[test_traced("INFO")]
1354 fn test_prune_to_location() {
1355 let executor = deterministic::Runner::default();
1356 executor.start(|context| async move {
1357 let mut journal = create_journal_with_ops(context, "prune_to", 100).await;
1358
1359 journal
1361 .append(&Operation::CommitFloor(None, Location::new(50)))
1362 .await
1363 .unwrap();
1364 journal.sync().await.unwrap();
1365
1366 let boundary = journal.prune(Location::new(50)).await.unwrap();
1367
1368 assert!(boundary <= Location::new(50));
1370 });
1371 }
1372
1373 #[test_traced("INFO")]
1375 fn test_prune_returns_actual_boundary() {
1376 let executor = deterministic::Runner::default();
1377 executor.start(|context| async move {
1378 let mut journal = create_journal_with_ops(context, "prune_boundary", 100).await;
1379
1380 journal
1381 .append(&Operation::CommitFloor(None, Location::new(50)))
1382 .await
1383 .unwrap();
1384 journal.sync().await.unwrap();
1385
1386 let requested = Location::new(50);
1387 let actual = journal.prune(requested).await.unwrap();
1388
1389 let bounds = journal.reader().await.bounds();
1391 assert!(!bounds.is_empty());
1392 assert_eq!(actual, bounds.start);
1393
1394 assert!(actual <= requested);
1396 });
1397 }
1398
1399 #[test_traced("INFO")]
1401 fn test_prune_preserves_operation_count() {
1402 let executor = deterministic::Runner::default();
1403 executor.start(|context| async move {
1404 let mut journal = create_journal_with_ops(context, "prune_count", 100).await;
1405
1406 journal
1407 .append(&Operation::CommitFloor(None, Location::new(50)))
1408 .await
1409 .unwrap();
1410 journal.sync().await.unwrap();
1411
1412 let count_before = journal.size().await;
1413 journal.prune(Location::new(50)).await.unwrap();
1414 let count_after = journal.size().await;
1415
1416 assert_eq!(count_before, count_after);
1417 });
1418 }
1419
1420 #[test_traced("INFO")]
1422 fn test_bounds_empty_and_pruned() {
1423 let executor = deterministic::Runner::default();
1424 executor.start(|context| async move {
1425 let journal = create_empty_journal(context.with_label("empty"), "oldest").await;
1427 assert!(journal.reader().await.bounds().is_empty());
1428 journal.destroy().await.unwrap();
1429
1430 let journal =
1432 create_journal_with_ops(context.with_label("no_prune"), "oldest", 100).await;
1433 let bounds = journal.reader().await.bounds();
1434 assert!(!bounds.is_empty());
1435 assert_eq!(bounds.start, 0);
1436 journal.destroy().await.unwrap();
1437
1438 let mut journal =
1440 create_journal_with_ops(context.with_label("pruned"), "oldest", 100).await;
1441 journal
1442 .append(&Operation::CommitFloor(None, Location::new(50)))
1443 .await
1444 .unwrap();
1445 journal.sync().await.unwrap();
1446
1447 let pruned_boundary = journal.prune(Location::new(50)).await.unwrap();
1448
1449 let bounds = journal.reader().await.bounds();
1451 assert!(!bounds.is_empty());
1452 assert_eq!(bounds.start, pruned_boundary);
1453 assert!(pruned_boundary <= 50);
1455 journal.destroy().await.unwrap();
1456 });
1457 }
1458
1459 #[test_traced("INFO")]
1461 fn test_bounds_start_after_prune() {
1462 let executor = deterministic::Runner::default();
1463 executor.start(|context| async move {
1464 let journal = create_empty_journal(context.with_label("empty"), "boundary").await;
1466 assert_eq!(journal.reader().await.bounds().start, 0);
1467
1468 let journal =
1470 create_journal_with_ops(context.with_label("no_prune"), "boundary", 100).await;
1471 assert_eq!(journal.reader().await.bounds().start, 0);
1472
1473 let mut journal =
1475 create_journal_with_ops(context.with_label("pruned"), "boundary", 100).await;
1476 journal
1477 .append(&Operation::CommitFloor(None, Location::new(50)))
1478 .await
1479 .unwrap();
1480 journal.sync().await.unwrap();
1481
1482 let pruned_boundary = journal.prune(Location::new(50)).await.unwrap();
1483
1484 assert_eq!(journal.reader().await.bounds().start, pruned_boundary);
1485 });
1486 }
1487
1488 #[test_traced("INFO")]
1490 fn test_mmr_prunes_to_journal_boundary() {
1491 let executor = deterministic::Runner::default();
1492 executor.start(|context| async move {
1493 let mut journal = create_journal_with_ops(context, "mmr_boundary", 50).await;
1494
1495 journal
1496 .append(&Operation::CommitFloor(None, Location::new(25)))
1497 .await
1498 .unwrap();
1499 journal.sync().await.unwrap();
1500
1501 let pruned_boundary = journal.prune(Location::new(25)).await.unwrap();
1502
1503 let bounds = journal.reader().await.bounds();
1505 assert!(!bounds.is_empty());
1506 assert_eq!(pruned_boundary, bounds.start);
1507
1508 assert!(pruned_boundary <= Location::new(25));
1510
1511 assert_eq!(journal.size().await, 51);
1513 });
1514 }
1515
1516 #[test_traced("INFO")]
1518 fn test_proof_multiple_operations() {
1519 let executor = deterministic::Runner::default();
1520 executor.start(|context| async move {
1521 let journal = create_journal_with_ops(context, "proof_multi", 50).await;
1522
1523 let (proof, ops) = journal.proof(Location::new(0), NZU64!(50)).await.unwrap();
1524
1525 assert_eq!(ops.len(), 50);
1526 for (i, op) in ops.iter().enumerate() {
1527 assert_eq!(*op, create_operation(i as u8));
1528 }
1529
1530 let mut hasher = StandardHasher::new();
1532 let root = journal.root();
1533 assert!(verify_proof(
1534 &proof,
1535 &ops,
1536 Location::new(0),
1537 &root,
1538 &mut hasher
1539 ));
1540 });
1541 }
1542
1543 #[test_traced("INFO")]
1545 fn test_historical_proof_limited_by_max_ops() {
1546 let executor = deterministic::Runner::default();
1547 executor.start(|context| async move {
1548 let journal = create_journal_with_ops(context, "proof_limit", 50).await;
1549
1550 let size = journal.size().await;
1551 let (proof, ops) = journal
1552 .historical_proof(size, Location::new(0), NZU64!(20))
1553 .await
1554 .unwrap();
1555
1556 assert_eq!(ops.len(), 20);
1558 for (i, op) in ops.iter().enumerate() {
1559 assert_eq!(*op, create_operation(i as u8));
1560 }
1561
1562 let mut hasher = StandardHasher::new();
1564 let root = journal.root();
1565 assert!(verify_proof(
1566 &proof,
1567 &ops,
1568 Location::new(0),
1569 &root,
1570 &mut hasher
1571 ));
1572 });
1573 }
1574
1575 #[test_traced("INFO")]
1577 fn test_historical_proof_at_end_of_journal() {
1578 let executor = deterministic::Runner::default();
1579 executor.start(|context| async move {
1580 let journal = create_journal_with_ops(context, "proof_end", 50).await;
1581
1582 let size = journal.size().await;
1583 let (proof, ops) = journal
1585 .historical_proof(size, Location::new(40), NZU64!(20))
1586 .await
1587 .unwrap();
1588
1589 assert_eq!(ops.len(), 10);
1591 for (i, op) in ops.iter().enumerate() {
1592 assert_eq!(*op, create_operation((40 + i) as u8));
1593 }
1594
1595 let mut hasher = StandardHasher::new();
1597 let root = journal.root();
1598 assert!(verify_proof(
1599 &proof,
1600 &ops,
1601 Location::new(40),
1602 &root,
1603 &mut hasher
1604 ));
1605 });
1606 }
1607
1608 #[test_traced("INFO")]
1610 fn test_historical_proof_out_of_range_returns_error() {
1611 let executor = deterministic::Runner::default();
1612 executor.start(|context| async move {
1613 let journal = create_journal_with_ops(context, "proof_oob", 5).await;
1614
1615 let result = journal
1617 .historical_proof(Location::new(10), Location::new(0), NZU64!(1))
1618 .await;
1619
1620 assert!(matches!(
1621 result,
1622 Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
1623 ));
1624 });
1625 }
1626
1627 #[test_traced("INFO")]
1629 fn test_historical_proof_start_too_large_returns_error() {
1630 let executor = deterministic::Runner::default();
1631 executor.start(|context| async move {
1632 let journal = create_journal_with_ops(context, "proof_start_oob", 5).await;
1633
1634 let size = journal.size().await;
1635 let result = journal.historical_proof(size, size, NZU64!(1)).await;
1637
1638 assert!(matches!(
1639 result,
1640 Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
1641 ));
1642 });
1643 }
1644
1645 #[test_traced("INFO")]
1647 fn test_historical_proof_truly_historical() {
1648 let executor = deterministic::Runner::default();
1649 executor.start(|context| async move {
1650 let mut journal = create_journal_with_ops(context, "proof_historical", 50).await;
1652
1653 let mut hasher = StandardHasher::new();
1655 let historical_root = journal.root();
1656 let historical_size = journal.size().await;
1657
1658 for i in 50..100 {
1660 journal.append(&create_operation(i as u8)).await.unwrap();
1661 }
1662 journal.sync().await.unwrap();
1663
1664 let (proof, ops) = journal
1666 .historical_proof(historical_size, Location::new(0), NZU64!(50))
1667 .await
1668 .unwrap();
1669
1670 assert_eq!(ops.len(), 50);
1672 for (i, op) in ops.iter().enumerate() {
1673 assert_eq!(*op, create_operation(i as u8));
1674 }
1675
1676 assert!(verify_proof(
1678 &proof,
1679 &ops,
1680 Location::new(0),
1681 &historical_root,
1682 &mut hasher
1683 ));
1684 });
1685 }
1686
1687 #[test_traced("INFO")]
1689 fn test_historical_proof_pruned_location_returns_error() {
1690 let executor = deterministic::Runner::default();
1691 executor.start(|context| async move {
1692 let mut journal = create_journal_with_ops(context, "proof_pruned", 50).await;
1693
1694 journal
1695 .append(&Operation::CommitFloor(None, Location::new(25)))
1696 .await
1697 .unwrap();
1698 journal.sync().await.unwrap();
1699 let pruned_boundary = journal.prune(Location::new(25)).await.unwrap();
1700
1701 let size = journal.size().await;
1703 let start_loc = Location::new(0);
1704 if start_loc < pruned_boundary {
1705 let result = journal.historical_proof(size, start_loc, NZU64!(1)).await;
1706
1707 assert!(result.is_err());
1709 }
1710 });
1711 }
1712
1713 #[test_traced("INFO")]
1715 fn test_replay_operations() {
1716 let executor = deterministic::Runner::default();
1717 executor.start(|context| async move {
1718 let journal = create_empty_journal(context.with_label("empty"), "replay").await;
1720 let reader = journal.reader().await;
1721 let stream = reader.replay(NZUsize!(10), 0).await.unwrap();
1722 futures::pin_mut!(stream);
1723 assert!(stream.next().await.is_none());
1724
1725 let journal =
1727 create_journal_with_ops(context.with_label("with_ops"), "replay", 50).await;
1728 let reader = journal.reader().await;
1729 let stream = reader.replay(NZUsize!(100), 0).await.unwrap();
1730 futures::pin_mut!(stream);
1731
1732 for i in 0..50 {
1733 let (pos, op) = stream.next().await.unwrap().unwrap();
1734 assert_eq!(pos, i);
1735 assert_eq!(op, create_operation(i as u8));
1736 }
1737
1738 assert!(stream.next().await.is_none());
1739 });
1740 }
1741
1742 #[test_traced("INFO")]
1744 fn test_replay_from_middle() {
1745 let executor = deterministic::Runner::default();
1746 executor.start(|context| async move {
1747 let journal = create_journal_with_ops(context, "replay_middle", 50).await;
1748 let reader = journal.reader().await;
1749 let stream = reader.replay(NZUsize!(100), 25).await.unwrap();
1750 futures::pin_mut!(stream);
1751
1752 let mut count = 0;
1753 while let Some(result) = stream.next().await {
1754 let (pos, op) = result.unwrap();
1755 assert_eq!(pos, 25 + count);
1756 assert_eq!(op, create_operation((25 + count) as u8));
1757 count += 1;
1758 }
1759
1760 assert_eq!(count, 25);
1762 });
1763 }
1764
1765 #[test_traced("INFO")]
1767 fn test_speculative_batch() {
1768 let executor = deterministic::Runner::default();
1769 executor.start(|context| async move {
1770 let mut journal = create_journal_with_ops(context, "speculative_batch", 10).await;
1771 let original_root = journal.root();
1772
1773 let mut b1 = journal.new_batch();
1775 let mut b2 = journal.new_batch();
1776
1777 let op_a = create_operation(100);
1779 let op_b = create_operation(200);
1780 b1.add(op_a.clone());
1781 b2.add(op_b);
1782
1783 let m1 = b1.merkleize();
1785 let m2 = b2.merkleize();
1786 assert_ne!(m1.root(), m2.root());
1787 assert_ne!(m1.root(), original_root);
1788 assert_ne!(m2.root(), original_root);
1789
1790 assert_eq!(journal.root(), original_root);
1792
1793 let expected_root = m1.root();
1795 let finalized = m1.finalize();
1796 drop(m2); journal.apply_batch(finalized).await.unwrap();
1798
1799 assert_eq!(journal.root(), expected_root);
1801 assert_eq!(*journal.size().await, 11);
1802 });
1803 }
1804
1805 #[test_traced("INFO")]
1808 fn test_speculative_batch_stacking() {
1809 let executor = deterministic::Runner::default();
1810 executor.start(|context| async move {
1811 let mut journal = create_journal_with_ops(context, "batch_stacking", 10).await;
1812
1813 let op_a = create_operation(100);
1814 let op_b = create_operation(200);
1815
1816 let (expected_root, finalized) = {
1818 let mut batch_a = journal.new_batch();
1819 batch_a.add(op_a.clone());
1820 let merkleized_a = batch_a.merkleize();
1821
1822 let mut batch_b = merkleized_a.new_batch();
1823 batch_b.add(op_b.clone());
1824 let merkleized_b = batch_b.merkleize();
1825
1826 let root = merkleized_b.root();
1827 (root, merkleized_b.finalize())
1828 };
1830
1831 journal.apply_batch(finalized).await.unwrap();
1832
1833 assert_eq!(journal.root(), expected_root);
1834 assert_eq!(*journal.size().await, 12);
1835
1836 let read_a = journal.read(Location::new(10)).await.unwrap();
1838 assert_eq!(read_a, op_a);
1839 let read_b = journal.read(Location::new(11)).await.unwrap();
1840 assert_eq!(read_b, op_b);
1841 });
1842 }
1843
1844 #[test_traced("INFO")]
1845 fn test_stale_batch_sibling() {
1846 let executor = deterministic::Runner::default();
1847 executor.start(|context| async move {
1848 let mut journal = create_empty_journal(context, "stale-sibling").await;
1849 let op_a = create_operation(1);
1850 let op_b = create_operation(2);
1851
1852 let finalized_a = {
1854 let mut batch = journal.new_batch();
1855 batch.add(op_a.clone());
1856 batch.merkleize().finalize()
1857 };
1858 let finalized_b = {
1859 let mut batch = journal.new_batch();
1860 batch.add(op_b);
1861 batch.merkleize().finalize()
1862 };
1863
1864 journal.apply_batch(finalized_a).await.unwrap();
1866 let expected_root = journal.root();
1867 let expected_size = journal.size().await;
1868
1869 let result = journal.apply_batch(finalized_b).await;
1871 assert!(
1872 matches!(
1873 result,
1874 Err(super::Error::Mmr(crate::mmr::Error::StaleChangeset { .. }))
1875 ),
1876 "expected StaleChangeset, got {result:?}"
1877 );
1878
1879 assert_eq!(journal.root(), expected_root);
1881 assert_eq!(journal.size().await, expected_size);
1882 let (_, ops) = journal.proof(Location::new(0), NZU64!(1)).await.unwrap();
1883 assert_eq!(ops, vec![op_a]);
1884 });
1885 }
1886
1887 #[test_traced("INFO")]
1888 fn test_stale_batch_chained() {
1889 let executor = deterministic::Runner::default();
1890 executor.start(|context| async move {
1891 let mut journal = create_journal_with_ops(context, "stale-chained", 5).await;
1892
1893 let parent = {
1895 let mut batch = journal.new_batch();
1896 batch.add(create_operation(10));
1897 batch.merkleize()
1898 };
1899 let child_a = {
1900 let mut batch = parent.new_batch();
1901 batch.add(create_operation(20));
1902 batch.merkleize().finalize()
1903 };
1904 let child_b = {
1905 let mut batch = parent.new_batch();
1906 batch.add(create_operation(30));
1907 batch.merkleize().finalize()
1908 };
1909 drop(parent);
1910
1911 journal.apply_batch(child_a).await.unwrap();
1913 let result = journal.apply_batch(child_b).await;
1914 assert!(
1915 matches!(
1916 result,
1917 Err(super::Error::Mmr(crate::mmr::Error::StaleChangeset { .. }))
1918 ),
1919 "expected StaleChangeset for sibling, got {result:?}"
1920 );
1921 });
1922 }
1923
1924 #[test_traced("INFO")]
1925 fn test_stale_batch_parent_before_child() {
1926 let executor = deterministic::Runner::default();
1927 executor.start(|context| async move {
1928 let mut journal = create_empty_journal(context, "stale-parent-first").await;
1929
1930 let (parent_finalized, child_finalized) = {
1932 let parent = {
1933 let mut batch = journal.new_batch();
1934 batch.add(create_operation(1));
1935 batch.merkleize()
1936 };
1937 let child = {
1938 let mut batch = parent.new_batch();
1939 batch.add(create_operation(2));
1940 batch.merkleize().finalize()
1941 };
1942 (parent.finalize(), child)
1943 };
1944
1945 journal.apply_batch(parent_finalized).await.unwrap();
1947 let result = journal.apply_batch(child_finalized).await;
1948 assert!(
1949 matches!(
1950 result,
1951 Err(super::Error::Mmr(crate::mmr::Error::StaleChangeset { .. }))
1952 ),
1953 "expected StaleChangeset for child after parent applied, got {result:?}"
1954 );
1955 });
1956 }
1957
1958 #[test_traced("INFO")]
1959 fn test_stale_batch_child_before_parent() {
1960 let executor = deterministic::Runner::default();
1961 executor.start(|context| async move {
1962 let mut journal = create_empty_journal(context, "stale-child-first").await;
1963
1964 let (parent_finalized, child_finalized) = {
1966 let parent = {
1967 let mut batch = journal.new_batch();
1968 batch.add(create_operation(1));
1969 batch.merkleize()
1970 };
1971 let child = {
1972 let mut batch = parent.new_batch();
1973 batch.add(create_operation(2));
1974 batch.merkleize().finalize()
1975 };
1976 (parent.finalize(), child)
1977 };
1978
1979 journal.apply_batch(child_finalized).await.unwrap();
1981 let result = journal.apply_batch(parent_finalized).await;
1982 assert!(
1983 matches!(
1984 result,
1985 Err(super::Error::Mmr(crate::mmr::Error::StaleChangeset { .. }))
1986 ),
1987 "expected StaleChangeset for parent after child applied, got {result:?}"
1988 );
1989 });
1990 }
1991}