1use crate::{
9 journal::{
10 contiguous::{fixed, variable, Contiguous, MutableContiguous},
11 Error as JournalError,
12 },
13 mmr::{
14 journaled::{CleanMmr, DirtyMmr, Mmr},
15 mem::{Clean, Dirty, State},
16 Location, Position, Proof, StandardHasher,
17 },
18 Persistable,
19};
20use commonware_codec::{CodecFixedShared, CodecShared, Encode, EncodeShared};
21use commonware_cryptography::{DigestOf, Hasher};
22use commonware_runtime::{Clock, Metrics, Storage};
23use core::num::{NonZeroU64, NonZeroUsize};
24use futures::{future::try_join_all, try_join, TryFutureExt as _};
25use thiserror::Error;
26use tracing::{debug, warn};
27
28#[derive(Error, Debug)]
30pub enum Error {
31 #[error("mmr error: {0}")]
32 Mmr(#[from] crate::mmr::Error),
33
34 #[error("journal error: {0}")]
35 Journal(#[from] super::Error),
36}
37pub struct Journal<E, C, H, S: State<H::Digest> + Send + Sync = Dirty>
42where
43 E: Storage + Clock + Metrics,
44 C: Contiguous<Item: EncodeShared>,
45 H: Hasher,
46{
47 pub(crate) mmr: Mmr<E, H::Digest, S>,
50
51 pub(crate) journal: C,
54
55 pub(crate) hasher: StandardHasher<H>,
56}
57
58impl<E, C, H, S> Journal<E, C, H, S>
59where
60 E: Storage + Clock + Metrics,
61 C: Contiguous<Item: EncodeShared>,
62 H: Hasher,
63 S: State<DigestOf<H>> + Send + Sync,
64{
65 pub fn bounds(&self) -> std::ops::Range<Location> {
68 let inner = self.journal.bounds();
69 Location::new_unchecked(inner.start)..Location::new_unchecked(inner.end)
70 }
71
72 pub fn size(&self) -> Location {
74 Location::new_unchecked(self.journal.bounds().end)
75 }
76
77 pub async fn read(&self, loc: Location) -> Result<C::Item, Error> {
79 self.journal.read(*loc).await.map_err(Error::Journal)
80 }
81}
82
83impl<E, C, H, S> Journal<E, C, H, S>
84where
85 E: Storage + Clock + Metrics,
86 C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
87 H: Hasher,
88 S: State<DigestOf<H>> + Send + Sync,
89{
90 pub async fn commit(&mut self) -> Result<(), Error> {
93 self.journal.commit().await.map_err(Error::Journal)
94 }
95}
96
97impl<E, C, H> Journal<E, C, H, Clean<H::Digest>>
98where
99 E: Storage + Clock + Metrics,
100 C: MutableContiguous<Item: EncodeShared>,
101 H: Hasher,
102{
103 pub async fn from_components(
105 mmr: CleanMmr<E, H::Digest>,
106 journal: C,
107 mut hasher: StandardHasher<H>,
108 apply_batch_size: u64,
109 ) -> Result<Self, Error> {
110 let mut mmr =
111 Self::align(mmr.into_dirty(), &journal, &mut hasher, apply_batch_size).await?;
112
113 mmr.sync().await?;
116
117 Ok(Self {
118 mmr,
119 journal,
120 hasher,
121 })
122 }
123
124 async fn align(
128 mut mmr: DirtyMmr<E, H::Digest>,
129 journal: &C,
130 hasher: &mut StandardHasher<H>,
131 apply_batch_size: u64,
132 ) -> Result<CleanMmr<E, H::Digest>, Error> {
133 let journal_size = journal.size();
136 let mut mmr_size = mmr.leaves();
137 if mmr_size > journal_size {
138 let pop_count = mmr_size - journal_size;
139 warn!(journal_size, ?pop_count, "popping MMR items");
140 mmr.pop(*pop_count as usize).await?;
141 mmr_size = Location::new_unchecked(journal_size);
142 }
143
144 if mmr_size < journal_size {
146 let replay_count = journal_size - *mmr_size;
147 warn!(
148 journal_size,
149 replay_count, "MMR lags behind journal, replaying journal to catch up"
150 );
151
152 let mut batch_size = 0;
153 while mmr_size < journal_size {
154 let op = journal.read(*mmr_size).await?;
155 mmr.add(hasher, &op.encode()).await?;
156 mmr_size += 1;
157 batch_size += 1;
158 if batch_size >= apply_batch_size {
159 mmr = mmr.merkleize(hasher).into_dirty();
160 batch_size = 0;
161 }
162 }
163 return Ok(mmr.merkleize(hasher));
164 }
165
166 assert_eq!(journal.size(), mmr.leaves());
168
169 Ok(mmr.merkleize(hasher))
170 }
171
172 pub async fn prune(&mut self, prune_loc: Location) -> Result<Location, Error> {
177 if self.mmr.size() == 0 {
178 return Ok(self.bounds().start);
180 }
181
182 self.mmr.sync().await?;
186
187 if !self.journal.prune(*prune_loc).await? {
189 return Ok(self.bounds().start);
190 }
191
192 let bounds = self.bounds();
193 debug!(size = ?bounds.end, ?prune_loc, boundary = ?bounds.start, "pruned inactive ops");
194
195 self.mmr
197 .prune_to_pos(Position::try_from(bounds.start)?)
198 .await?;
199
200 Ok(bounds.start)
201 }
202}
203
204impl<E, C, H> Journal<E, C, H, Clean<H::Digest>>
205where
206 E: Storage + Clock + Metrics,
207 C: Contiguous<Item: EncodeShared>,
208 H: Hasher,
209{
210 pub async fn proof(
224 &self,
225 start_loc: Location,
226 max_ops: NonZeroU64,
227 ) -> Result<(Proof<H::Digest>, Vec<C::Item>), Error> {
228 self.historical_proof(self.size(), start_loc, max_ops).await
229 }
230
231 pub async fn historical_proof(
244 &self,
245 historical_leaves: Location,
246 start_loc: Location,
247 max_ops: NonZeroU64,
248 ) -> Result<(Proof<H::Digest>, Vec<C::Item>), Error> {
249 let leaves = self.size();
250 if historical_leaves > leaves {
251 return Err(crate::mmr::Error::RangeOutOfBounds(leaves).into());
252 }
253 if start_loc >= historical_leaves {
254 return Err(crate::mmr::Error::RangeOutOfBounds(start_loc).into());
255 }
256 let end_loc = std::cmp::min(historical_leaves, start_loc.saturating_add(max_ops.get()));
257
258 let proof = self
259 .mmr
260 .historical_range_proof(historical_leaves, start_loc..end_loc)
261 .await?;
262
263 let mut ops = Vec::with_capacity((*end_loc - *start_loc) as usize);
264 let futures = (*start_loc..*end_loc)
265 .map(|i| self.journal.read(i))
266 .collect::<Vec<_>>();
267 try_join_all(futures)
268 .await?
269 .into_iter()
270 .for_each(|op| ops.push(op));
271
272 Ok((proof, ops))
273 }
274
275 pub const fn root(&self) -> H::Digest {
277 self.mmr.root()
278 }
279
280 pub fn into_dirty(self) -> Journal<E, C, H, Dirty> {
282 Journal {
283 mmr: self.mmr.into_dirty(),
284 journal: self.journal,
285 hasher: self.hasher,
286 }
287 }
288}
289
290impl<E, C, H> Journal<E, C, H, Clean<H::Digest>>
291where
292 E: Storage + Clock + Metrics,
293 C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
294 H: Hasher,
295{
296 pub async fn destroy(self) -> Result<(), Error> {
298 try_join!(
299 self.journal.destroy().map_err(Error::Journal),
300 self.mmr.destroy().map_err(Error::Mmr),
301 )?;
302 Ok(())
303 }
304
305 pub async fn sync(&mut self) -> Result<(), Error> {
307 try_join!(
308 self.journal.sync().map_err(Error::Journal),
309 self.mmr.sync().map_err(Into::into)
310 )?;
311
312 Ok(())
313 }
314}
315
316impl<E, C, H> Journal<E, C, H, Dirty>
317where
318 E: Storage + Clock + Metrics,
319 C: Contiguous<Item: EncodeShared>,
320 H: Hasher,
321{
322 pub fn merkleize(self) -> Journal<E, C, H, Clean<H::Digest>> {
324 let Self {
325 mmr,
326 journal,
327 mut hasher,
328 } = self;
329 Journal {
330 mmr: mmr.merkleize(&mut hasher),
331 journal,
332 hasher,
333 }
334 }
335}
336
337impl<E, C, H> Journal<E, C, H, Dirty>
338where
339 E: Storage + Clock + Metrics,
340 C: MutableContiguous<Item: EncodeShared>,
341 H: Hasher,
342{
343 pub async fn append(&mut self, item: C::Item) -> Result<Location, Error> {
344 let encoded_item = item.encode();
345
346 let (_, loc) = try_join!(
348 self.mmr
349 .add(&mut self.hasher, &encoded_item)
350 .map_err(Error::Mmr),
351 self.journal.append(item).map_err(Into::into)
352 )?;
353
354 Ok(Location::new_unchecked(loc))
355 }
356
357 pub async fn from_components(
359 mmr: CleanMmr<E, H::Digest>,
360 journal: C,
361 hasher: StandardHasher<H>,
362 apply_batch_size: u64,
363 ) -> Result<Self, Error> {
364 let clean = Journal::<E, C, H, Clean<H::Digest>>::from_components(
365 mmr,
366 journal,
367 hasher,
368 apply_batch_size,
369 )
370 .await?;
371 Ok(clean.into_dirty())
372 }
373}
374
375const APPLY_BATCH_SIZE: u64 = 1 << 16;
377
378impl<E, O, H> Journal<E, fixed::Journal<E, O>, H, Clean<H::Digest>>
379where
380 E: Storage + Clock + Metrics,
381 O: CodecFixedShared,
382 H: Hasher,
383{
384 pub async fn new(
389 context: E,
390 mmr_cfg: crate::mmr::journaled::Config,
391 journal_cfg: fixed::Config,
392 rewind_predicate: fn(&O) -> bool,
393 ) -> Result<Self, Error> {
394 let mut journal = fixed::Journal::init(context.with_label("journal"), journal_cfg).await?;
395
396 journal.rewind_to(rewind_predicate).await?;
398
399 let mut hasher = StandardHasher::<H>::new();
401 let mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_cfg).await?;
402 let mut mmr =
403 Self::align(mmr.into_dirty(), &journal, &mut hasher, APPLY_BATCH_SIZE).await?;
404
405 journal.sync().await?;
408 mmr.sync().await?;
409
410 Ok(Self {
411 mmr,
412 journal,
413 hasher,
414 })
415 }
416}
417
418impl<E, O, H> Journal<E, variable::Journal<E, O>, H, Clean<H::Digest>>
419where
420 E: Storage + Clock + Metrics,
421 O: CodecShared,
422 H: Hasher,
423{
424 pub async fn new(
429 context: E,
430 mmr_cfg: crate::mmr::journaled::Config,
431 journal_cfg: variable::Config<O::Cfg>,
432 rewind_predicate: fn(&O) -> bool,
433 ) -> Result<Self, Error> {
434 let mut hasher = StandardHasher::<H>::new();
435 let mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_cfg).await?;
436 let mut journal =
437 variable::Journal::init(context.with_label("journal"), journal_cfg).await?;
438
439 journal.rewind_to(rewind_predicate).await?;
441
442 let mut mmr =
444 Self::align(mmr.into_dirty(), &journal, &mut hasher, APPLY_BATCH_SIZE).await?;
445
446 journal.sync().await?;
449 mmr.sync().await?;
450
451 Ok(Self {
452 mmr,
453 journal,
454 hasher,
455 })
456 }
457}
458
459impl<E, C, H, S> Contiguous for Journal<E, C, H, S>
460where
461 E: Storage + Clock + Metrics,
462 C: MutableContiguous<Item: EncodeShared>,
463 H: Hasher,
464 S: State<DigestOf<H>> + Send + Sync,
465{
466 type Item = C::Item;
467
468 fn bounds(&self) -> std::ops::Range<u64> {
469 self.journal.bounds()
470 }
471
472 async fn replay(
473 &self,
474 start_pos: u64,
475 buffer: NonZeroUsize,
476 ) -> Result<
477 impl futures::Stream<Item = Result<(u64, Self::Item), JournalError>> + '_,
478 JournalError,
479 > {
480 self.journal.replay(start_pos, buffer).await
481 }
482
483 async fn read(&self, position: u64) -> Result<Self::Item, JournalError> {
484 self.journal.read(position).await
485 }
486}
487
488impl<E, C, H> MutableContiguous for Journal<E, C, H, Dirty>
489where
490 E: Storage + Clock + Metrics,
491 C: MutableContiguous<Item: EncodeShared>,
492 H: Hasher,
493{
494 async fn append(&mut self, item: Self::Item) -> Result<u64, JournalError> {
495 let res = self.append(item).await.map_err(|e| match e {
496 Error::Journal(inner) => inner,
497 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
498 })?;
499
500 Ok(*res)
501 }
502
503 async fn prune(&mut self, min_position: u64) -> Result<bool, JournalError> {
504 self.journal.prune(min_position).await
505 }
506
507 async fn rewind(&mut self, size: u64) -> Result<(), JournalError> {
508 self.journal.rewind(size).await?;
509
510 let leaves = *self.mmr.leaves();
511 if leaves > size {
512 self.mmr
513 .pop((leaves - size) as usize)
514 .await
515 .map_err(|error| JournalError::Mmr(anyhow::Error::from(error)))?;
516 }
517
518 Ok(())
519 }
520}
521
522impl<E, C, H> Persistable for Journal<E, C, H, Clean<H::Digest>>
523where
524 E: Storage + Clock + Metrics,
525 C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
526 H: Hasher,
527{
528 type Error = JournalError;
529
530 async fn commit(&mut self) -> Result<(), JournalError> {
531 self.commit().await.map_err(|e| match e {
532 Error::Journal(inner) => inner,
533 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
534 })
535 }
536
537 async fn sync(&mut self) -> Result<(), JournalError> {
538 self.sync().await.map_err(|e| match e {
539 Error::Journal(inner) => inner,
540 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
541 })
542 }
543
544 async fn destroy(self) -> Result<(), JournalError> {
545 self.destroy().await.map_err(|e| match e {
546 Error::Journal(inner) => inner,
547 Error::Mmr(inner) => JournalError::Mmr(anyhow::Error::from(inner)),
548 })
549 }
550}
551
552#[cfg(test)]
553mod tests {
554 use super::*;
555 use crate::{
556 journal::contiguous::fixed::{Config as JConfig, Journal as ContiguousJournal},
557 mmr::{
558 journaled::{Config as MmrConfig, Mmr},
559 Location,
560 },
561 qmdb::{
562 any::unordered::{fixed::Operation, Update},
563 operation::Committable,
564 },
565 };
566 use commonware_codec::Encode;
567 use commonware_cryptography::{sha256, sha256::Digest, Sha256};
568 use commonware_macros::test_traced;
569 use commonware_runtime::{
570 buffer::paged::CacheRef,
571 deterministic::{self, Context},
572 Metrics, Runner as _,
573 };
574 use commonware_utils::{NZUsize, NZU16, NZU64};
575 use futures::StreamExt as _;
576 use std::num::NonZeroU16;
577
578 const PAGE_SIZE: NonZeroU16 = NZU16!(101);
579 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
580
581 fn mmr_config(suffix: &str) -> MmrConfig {
583 MmrConfig {
584 journal_partition: format!("mmr_journal_{suffix}"),
585 metadata_partition: format!("mmr_metadata_{suffix}"),
586 items_per_blob: NZU64!(11),
587 write_buffer: NZUsize!(1024),
588 thread_pool: None,
589 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
590 }
591 }
592
593 fn journal_config(suffix: &str) -> JConfig {
595 JConfig {
596 partition: format!("journal_{suffix}"),
597 items_per_blob: NZU64!(7),
598 write_buffer: NZUsize!(1024),
599 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
600 }
601 }
602
603 type AuthenticatedJournal = Journal<
604 deterministic::Context,
605 ContiguousJournal<deterministic::Context, Operation<Digest, Digest>>,
606 Sha256,
607 Clean<sha256::Digest>,
608 >;
609
610 async fn create_empty_journal(context: Context, suffix: &str) -> AuthenticatedJournal {
612 AuthenticatedJournal::new(
613 context,
614 mmr_config(suffix),
615 journal_config(suffix),
616 |op: &Operation<Digest, Digest>| op.is_commit(),
617 )
618 .await
619 .unwrap()
620 }
621
622 fn create_operation(index: u8) -> Operation<Digest, Digest> {
624 Operation::Update(Update(
625 Sha256::fill(index),
626 Sha256::fill(index.wrapping_add(1)),
627 ))
628 }
629
630 async fn create_journal_with_ops(
634 context: Context,
635 suffix: &str,
636 count: usize,
637 ) -> AuthenticatedJournal {
638 let mut journal = create_empty_journal(context, suffix).await.into_dirty();
639
640 for i in 0..count {
641 let op = create_operation(i as u8);
642 let loc = journal.append(op).await.unwrap();
643 assert_eq!(loc, Location::new_unchecked(i as u64));
644 }
645
646 let mut journal = journal.merkleize();
647 journal.sync().await.unwrap();
648 journal
649 }
650
651 async fn create_components(
656 context: Context,
657 suffix: &str,
658 ) -> (
659 CleanMmr<deterministic::Context, sha256::Digest>,
660 ContiguousJournal<deterministic::Context, Operation<Digest, Digest>>,
661 StandardHasher<Sha256>,
662 ) {
663 let mut hasher = StandardHasher::new();
664 let mmr = Mmr::init(context.with_label("mmr"), &mut hasher, mmr_config(suffix))
665 .await
666 .unwrap();
667 let journal =
668 ContiguousJournal::init(context.with_label("journal"), journal_config(suffix))
669 .await
670 .unwrap();
671 (mmr, journal, hasher)
672 }
673
674 fn verify_proof(
676 proof: &crate::mmr::Proof<<Sha256 as commonware_cryptography::Hasher>::Digest>,
677 operations: &[Operation<Digest, Digest>],
678 start_loc: Location,
679 root: &<Sha256 as commonware_cryptography::Hasher>::Digest,
680 hasher: &mut StandardHasher<Sha256>,
681 ) -> bool {
682 let encoded_ops: Vec<_> = operations.iter().map(|op| op.encode()).collect();
683 proof.verify_range_inclusion(hasher, &encoded_ops, start_loc, root)
684 }
685
686 #[test_traced("INFO")]
688 fn test_new_creates_empty_journal() {
689 let executor = deterministic::Runner::default();
690 executor.start(|context| async move {
691 let journal = create_empty_journal(context, "new_empty").await;
692
693 let bounds = journal.bounds();
694 assert_eq!(bounds.end, Location::new_unchecked(0));
695 assert_eq!(bounds.start, Location::new_unchecked(0));
696 assert!(bounds.is_empty());
697 });
698 }
699
700 #[test_traced("INFO")]
702 fn test_align_with_empty_mmr_and_journal() {
703 let executor = deterministic::Runner::default();
704 executor.start(|context| async move {
705 let (mmr, journal, mut hasher) = create_components(context, "align_empty").await;
706
707 let mmr = Journal::align(mmr.into_dirty(), &journal, &mut hasher, APPLY_BATCH_SIZE)
708 .await
709 .unwrap();
710
711 assert_eq!(mmr.leaves(), Location::new_unchecked(0));
712 assert_eq!(journal.size(), Location::new_unchecked(0));
713 });
714 }
715
716 #[test_traced("WARN")]
718 fn test_align_when_mmr_ahead() {
719 let executor = deterministic::Runner::default();
720 executor.start(|context| async move {
721 let (mmr, mut journal, mut hasher) = create_components(context, "mmr_ahead").await;
722
723 let mut mmr = mmr.into_dirty();
725 for i in 0..20 {
726 let op = create_operation(i as u8);
727 let encoded = op.encode();
728 mmr.add(&mut hasher, &encoded).await.unwrap();
729 journal.append(op).await.unwrap();
730 }
731 let mmr = mmr.merkleize(&mut hasher);
732
733 let commit_op = Operation::CommitFloor(None, Location::new_unchecked(0));
735 journal.append(commit_op).await.unwrap();
736 journal.sync().await.unwrap();
737
738 let mmr = Journal::align(mmr.into_dirty(), &journal, &mut hasher, APPLY_BATCH_SIZE)
740 .await
741 .unwrap();
742
743 assert_eq!(mmr.leaves(), Location::new_unchecked(21));
745 assert_eq!(journal.size(), Location::new_unchecked(21));
746 });
747 }
748
749 #[test_traced("WARN")]
751 fn test_align_when_journal_ahead() {
752 let executor = deterministic::Runner::default();
753 executor.start(|context| async move {
754 let (mmr, mut journal, mut hasher) = create_components(context, "journal_ahead").await;
755
756 for i in 0..20 {
758 let op = create_operation(i as u8);
759 journal.append(op).await.unwrap();
760 }
761
762 let commit_op = Operation::CommitFloor(None, Location::new_unchecked(0));
764 journal.append(commit_op).await.unwrap();
765 journal.sync().await.unwrap();
766
767 let mmr = Journal::align(mmr.into_dirty(), &journal, &mut hasher, APPLY_BATCH_SIZE)
769 .await
770 .unwrap();
771
772 assert_eq!(mmr.leaves(), Location::new_unchecked(21));
774 assert_eq!(journal.size(), Location::new_unchecked(21));
775 });
776 }
777
778 #[test_traced("INFO")]
780 fn test_align_with_mismatched_committed_ops() {
781 let executor = deterministic::Runner::default();
782 executor.start(|context| async move {
783 let mut journal = create_empty_journal(context.with_label("first"), "mismatched")
784 .await
785 .into_dirty();
786
787 for i in 0..20 {
789 let loc = journal.append(create_operation(i as u8)).await.unwrap();
790 assert_eq!(loc, Location::new_unchecked(i as u64));
791 }
792 let mut journal = journal.merkleize();
793
794 let size_before = journal.size();
797 assert_eq!(size_before, 20);
798
799 journal.sync().await.unwrap();
801 drop(journal);
802 let journal = create_empty_journal(context.with_label("second"), "mismatched").await;
803
804 assert_eq!(journal.size(), 0);
806 });
807 }
808
809 #[test_traced("INFO")]
810 fn test_rewind() {
811 let executor = deterministic::Runner::default();
812 executor.start(|context| async move {
813 {
815 let mut journal = ContiguousJournal::init(
816 context.with_label("rewind_match"),
817 journal_config("rewind_match"),
818 )
819 .await
820 .unwrap();
821
822 for i in 0..3 {
824 journal.append(create_operation(i)).await.unwrap();
825 }
826 journal
827 .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
828 .await
829 .unwrap();
830 for i in 4..7 {
831 journal.append(create_operation(i)).await.unwrap();
832 }
833
834 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
836 assert_eq!(final_size, 4);
837 assert_eq!(journal.size(), 4);
838
839 let op = journal.read(3).await.unwrap();
841 assert!(op.is_commit());
842 }
843
844 {
846 let mut journal = ContiguousJournal::init(
847 context.with_label("rewind_multiple"),
848 journal_config("rewind_multiple"),
849 )
850 .await
851 .unwrap();
852
853 journal.append(create_operation(0)).await.unwrap();
855 journal
856 .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
857 .await
858 .unwrap(); journal.append(create_operation(2)).await.unwrap();
860 journal
861 .append(Operation::CommitFloor(None, Location::new_unchecked(1)))
862 .await
863 .unwrap(); journal.append(create_operation(4)).await.unwrap();
865
866 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
868 assert_eq!(final_size, 4);
869
870 let op = journal.read(3).await.unwrap();
872 assert!(op.is_commit());
873
874 assert!(journal.read(4).await.is_err());
876 }
877
878 {
880 let mut journal = ContiguousJournal::init(
881 context.with_label("rewind_no_match"),
882 journal_config("rewind_no_match"),
883 )
884 .await
885 .unwrap();
886
887 for i in 0..10 {
889 journal.append(create_operation(i)).await.unwrap();
890 }
891
892 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
894 assert_eq!(final_size, 0, "Should rewind to pruning boundary (0)");
895 assert_eq!(journal.size(), 0);
896 }
897
898 {
900 let mut journal = ContiguousJournal::init(
901 context.with_label("rewind_with_pruning"),
902 journal_config("rewind_with_pruning"),
903 )
904 .await
905 .unwrap();
906
907 for i in 0..10 {
909 journal.append(create_operation(i)).await.unwrap();
910 }
911 journal
912 .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
913 .await
914 .unwrap(); for i in 11..15 {
916 journal.append(create_operation(i)).await.unwrap();
917 }
918 journal.sync().await.unwrap();
919
920 journal.prune(8).await.unwrap();
922 assert_eq!(journal.bounds().start, Location::new_unchecked(7));
923
924 for i in 15..20 {
926 journal.append(create_operation(i)).await.unwrap();
927 }
928
929 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
931 assert_eq!(final_size, 11);
932
933 let op = journal.read(10).await.unwrap();
935 assert!(op.is_commit());
936 }
937
938 {
940 let mut journal = ContiguousJournal::init(
941 context.with_label("rewind_no_match_pruned"),
942 journal_config("rewind_no_match_pruned"),
943 )
944 .await
945 .unwrap();
946
947 for i in 0..5 {
949 journal.append(create_operation(i)).await.unwrap();
950 }
951 journal
952 .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
953 .await
954 .unwrap(); for i in 6..10 {
956 journal.append(create_operation(i)).await.unwrap();
957 }
958 journal.sync().await.unwrap();
959
960 journal.prune(8).await.unwrap();
963 assert_eq!(journal.bounds().start, Location::new_unchecked(7));
964
965 for i in 10..14 {
967 journal.append(create_operation(i)).await.unwrap();
968 }
969
970 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
973 assert_eq!(final_size, 7);
974 }
975
976 {
978 let mut journal = ContiguousJournal::init(
979 context.with_label("rewind_empty"),
980 journal_config("rewind_empty"),
981 )
982 .await
983 .unwrap();
984
985 let final_size = journal
987 .rewind_to(|op: &Operation<Digest, Digest>| op.is_commit())
988 .await
989 .unwrap();
990 assert_eq!(final_size, 0);
991 assert_eq!(journal.size(), 0);
992 }
993
994 {
996 let mut journal = AuthenticatedJournal::new(
997 context,
998 mmr_config("rewind"),
999 journal_config("rewind"),
1000 |op| op.is_commit(),
1001 )
1002 .await
1003 .unwrap()
1004 .into_dirty();
1005
1006 for i in 0..5 {
1008 journal.append(create_operation(i)).await.unwrap();
1009 }
1010 journal
1011 .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
1012 .await
1013 .unwrap(); for i in 6..10 {
1015 journal.append(create_operation(i)).await.unwrap();
1016 }
1017 let journal = journal.merkleize();
1018 assert_eq!(journal.size(), 10);
1019
1020 let mut journal = journal.into_dirty();
1021 journal.rewind(2).await.unwrap();
1022 assert_eq!(journal.size(), 2);
1023 assert_eq!(journal.mmr.leaves(), 2);
1024 assert_eq!(journal.mmr.size(), 3);
1025 let bounds = journal.bounds();
1026 assert_eq!(bounds.start, Location::new_unchecked(0));
1027 assert!(!bounds.is_empty());
1028
1029 assert!(matches!(
1030 journal.rewind(3).await,
1031 Err(JournalError::InvalidRewind(_))
1032 ));
1033
1034 journal.rewind(0).await.unwrap();
1035 let journal = journal.merkleize();
1036 assert_eq!(journal.size(), 0);
1037 assert_eq!(journal.mmr.leaves(), 0);
1038 assert_eq!(journal.mmr.size(), 0);
1039 let bounds = journal.bounds();
1040 assert_eq!(bounds.start, Location::new_unchecked(0));
1041 assert!(bounds.is_empty());
1042
1043 let mut journal = journal.into_dirty();
1045 for i in 0..255 {
1046 journal.append(create_operation(i)).await.unwrap();
1047 }
1048
1049 let mut journal = journal.merkleize();
1050 journal.prune(Location::new_unchecked(100)).await.unwrap();
1051 assert_eq!(journal.bounds().start, Location::new_unchecked(98));
1052 let mut journal = journal.into_dirty();
1053 let res = journal.rewind(97).await;
1054 assert!(matches!(res, Err(JournalError::InvalidRewind(97))));
1055 journal.rewind(98).await.unwrap();
1056 let bounds = journal.bounds();
1057 assert_eq!(bounds.end, Location::new_unchecked(98));
1058 assert_eq!(journal.mmr.leaves(), 98);
1059 assert_eq!(bounds.start, Location::new_unchecked(98));
1060 assert!(bounds.is_empty());
1061 }
1062 });
1063 }
1064
1065 #[test_traced("INFO")]
1068 fn test_apply_op_and_read_operations() {
1069 let executor = deterministic::Runner::default();
1070 executor.start(|context| async move {
1071 let mut journal = create_empty_journal(context, "apply_op").await.into_dirty();
1072
1073 assert_eq!(journal.size(), 0);
1074
1075 let expected_ops: Vec<_> = (0..50).map(|i| create_operation(i as u8)).collect();
1077 for (i, op) in expected_ops.iter().enumerate() {
1078 let loc = journal.append(op.clone()).await.unwrap();
1079 assert_eq!(loc, Location::new_unchecked(i as u64));
1080 assert_eq!(journal.size(), (i + 1) as u64);
1081 }
1082 let mut journal = journal.merkleize();
1083
1084 assert_eq!(journal.size(), 50);
1085
1086 journal.sync().await.unwrap();
1088 for (i, expected_op) in expected_ops.iter().enumerate() {
1089 let read_op = journal
1090 .read(Location::new_unchecked(i as u64))
1091 .await
1092 .unwrap();
1093 assert_eq!(read_op, *expected_op);
1094 }
1095 });
1096 }
1097
1098 #[test_traced("INFO")]
1100 fn test_read_operations_at_various_positions() {
1101 let executor = deterministic::Runner::default();
1102 executor.start(|context| async move {
1103 let journal = create_journal_with_ops(context, "read", 50).await;
1104
1105 let first_op = journal.read(Location::new_unchecked(0)).await.unwrap();
1107 assert_eq!(first_op, create_operation(0));
1108
1109 let middle_op = journal.read(Location::new_unchecked(25)).await.unwrap();
1111 assert_eq!(middle_op, create_operation(25));
1112
1113 let last_op = journal.read(Location::new_unchecked(49)).await.unwrap();
1115 assert_eq!(last_op, create_operation(49));
1116
1117 for i in 0..50 {
1119 let op = journal.read(Location::new_unchecked(i)).await.unwrap();
1120 assert_eq!(op, create_operation(i as u8));
1121 }
1122 });
1123 }
1124
1125 #[test_traced("INFO")]
1127 fn test_read_pruned_operation_returns_error() {
1128 let executor = deterministic::Runner::default();
1129 executor.start(|context| async move {
1130 let mut journal = create_journal_with_ops(context, "read_pruned", 100)
1131 .await
1132 .into_dirty();
1133
1134 journal
1136 .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1137 .await
1138 .unwrap();
1139 let mut journal = journal.merkleize();
1140 journal.sync().await.unwrap();
1141 let pruned_boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1142
1143 let read_loc = Location::new_unchecked(0);
1145 if read_loc < pruned_boundary {
1146 let result = journal.read(read_loc).await;
1147 assert!(matches!(
1148 result,
1149 Err(Error::Journal(crate::journal::Error::ItemPruned(_)))
1150 ));
1151 }
1152 });
1153 }
1154
1155 #[test_traced("INFO")]
1157 fn test_read_out_of_range_returns_error() {
1158 let executor = deterministic::Runner::default();
1159 executor.start(|context| async move {
1160 let journal = create_journal_with_ops(context, "read_oob", 3).await;
1161
1162 let result = journal.read(Location::new_unchecked(10)).await;
1164 assert!(matches!(
1165 result,
1166 Err(Error::Journal(crate::journal::Error::ItemOutOfRange(_)))
1167 ));
1168 });
1169 }
1170
1171 #[test_traced("INFO")]
1173 fn test_read_all_operations_back_correctly() {
1174 let executor = deterministic::Runner::default();
1175 executor.start(|context| async move {
1176 let journal = create_journal_with_ops(context, "read_all", 50).await;
1177
1178 assert_eq!(journal.size(), 50);
1179
1180 for i in 0..50 {
1182 let op = journal.read(Location::new_unchecked(i)).await.unwrap();
1183 assert_eq!(op, create_operation(i as u8));
1184 }
1185 });
1186 }
1187
1188 #[test_traced("INFO")]
1190 fn test_sync() {
1191 let executor = deterministic::Runner::default();
1192 executor.start(|context| async move {
1193 let mut journal = create_empty_journal(context.with_label("first"), "close_pending")
1194 .await
1195 .into_dirty();
1196
1197 let expected_ops: Vec<_> = (0..20).map(|i| create_operation(i as u8)).collect();
1199 for (i, op) in expected_ops.iter().enumerate() {
1200 let loc = journal.append(op.clone()).await.unwrap();
1201 assert_eq!(loc, Location::new_unchecked(i as u64),);
1202 }
1203
1204 let commit_loc = journal
1206 .append(Operation::CommitFloor(None, Location::new_unchecked(0)))
1207 .await
1208 .unwrap();
1209 let mut journal = journal.merkleize();
1210 assert_eq!(
1211 commit_loc,
1212 Location::new_unchecked(20),
1213 "commit should be at location 20"
1214 );
1215 journal.sync().await.unwrap();
1216
1217 drop(journal);
1219 let journal = create_empty_journal(context.with_label("second"), "close_pending").await;
1220 assert_eq!(journal.size(), 21);
1221
1222 for (i, expected_op) in expected_ops.iter().enumerate() {
1224 let read_op = journal
1225 .read(Location::new_unchecked(i as u64))
1226 .await
1227 .unwrap();
1228 assert_eq!(read_op, *expected_op);
1229 }
1230 });
1231 }
1232
1233 #[test_traced("INFO")]
1235 fn test_prune_empty_journal() {
1236 let executor = deterministic::Runner::default();
1237 executor.start(|context| async move {
1238 let mut journal = create_empty_journal(context, "prune_empty").await;
1239
1240 let boundary = journal.prune(Location::new_unchecked(0)).await.unwrap();
1241
1242 assert_eq!(boundary, Location::new_unchecked(0));
1243 });
1244 }
1245
1246 #[test_traced("INFO")]
1248 fn test_prune_to_location() {
1249 let executor = deterministic::Runner::default();
1250 executor.start(|context| async move {
1251 let mut journal = create_journal_with_ops(context, "prune_to", 100)
1252 .await
1253 .into_dirty();
1254
1255 journal
1257 .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1258 .await
1259 .unwrap();
1260 let mut journal = journal.merkleize();
1261 journal.sync().await.unwrap();
1262
1263 let boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1264
1265 assert!(boundary <= Location::new_unchecked(50));
1267 });
1268 }
1269
1270 #[test_traced("INFO")]
1272 fn test_prune_returns_actual_boundary() {
1273 let executor = deterministic::Runner::default();
1274 executor.start(|context| async move {
1275 let mut journal = create_journal_with_ops(context, "prune_boundary", 100)
1276 .await
1277 .into_dirty();
1278
1279 journal
1280 .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1281 .await
1282 .unwrap();
1283 let mut journal = journal.merkleize();
1284 journal.sync().await.unwrap();
1285
1286 let requested = Location::new_unchecked(50);
1287 let actual = journal.prune(requested).await.unwrap();
1288
1289 let bounds = journal.bounds();
1291 assert!(!bounds.is_empty());
1292 assert_eq!(actual, bounds.start);
1293
1294 assert!(actual <= requested);
1296 });
1297 }
1298
1299 #[test_traced("INFO")]
1301 fn test_prune_preserves_operation_count() {
1302 let executor = deterministic::Runner::default();
1303 executor.start(|context| async move {
1304 let mut journal = create_journal_with_ops(context, "prune_count", 100)
1305 .await
1306 .into_dirty();
1307
1308 journal
1309 .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1310 .await
1311 .unwrap();
1312 let mut journal = journal.merkleize();
1313 journal.sync().await.unwrap();
1314
1315 let count_before = journal.size();
1316 journal.prune(Location::new_unchecked(50)).await.unwrap();
1317 let count_after = journal.size();
1318
1319 assert_eq!(count_before, count_after);
1320 });
1321 }
1322
1323 #[test_traced("INFO")]
1325 fn test_bounds_empty_and_pruned() {
1326 let executor = deterministic::Runner::default();
1327 executor.start(|context| async move {
1328 let journal = create_empty_journal(context.with_label("empty"), "oldest").await;
1330 assert!(journal.bounds().is_empty());
1331 journal.destroy().await.unwrap();
1332
1333 let journal =
1335 create_journal_with_ops(context.with_label("no_prune"), "oldest", 100).await;
1336 let bounds = journal.bounds();
1337 assert!(!bounds.is_empty());
1338 assert_eq!(bounds.start, Location::new_unchecked(0));
1339 journal.destroy().await.unwrap();
1340
1341 let journal =
1343 create_journal_with_ops(context.with_label("pruned"), "oldest", 100).await;
1344 let mut journal = journal.into_dirty();
1345 journal
1346 .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1347 .await
1348 .unwrap();
1349 let mut journal = journal.merkleize();
1350 journal.sync().await.unwrap();
1351
1352 let pruned_boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1353
1354 let bounds = journal.bounds();
1356 assert!(!bounds.is_empty());
1357 assert_eq!(bounds.start, pruned_boundary);
1358 assert!(pruned_boundary <= Location::new_unchecked(50));
1360 journal.destroy().await.unwrap();
1361 });
1362 }
1363
1364 #[test_traced("INFO")]
1366 fn test_bounds_start_after_prune() {
1367 let executor = deterministic::Runner::default();
1368 executor.start(|context| async move {
1369 let journal = create_empty_journal(context.with_label("empty"), "boundary").await;
1371 assert_eq!(journal.bounds().start, Location::new_unchecked(0));
1372
1373 let journal =
1375 create_journal_with_ops(context.with_label("no_prune"), "boundary", 100).await;
1376 assert_eq!(journal.bounds().start, Location::new_unchecked(0));
1377
1378 let mut journal =
1380 create_journal_with_ops(context.with_label("pruned"), "boundary", 100)
1381 .await
1382 .into_dirty();
1383 journal
1384 .append(Operation::CommitFloor(None, Location::new_unchecked(50)))
1385 .await
1386 .unwrap();
1387 let mut journal = journal.merkleize();
1388 journal.sync().await.unwrap();
1389
1390 let pruned_boundary = journal.prune(Location::new_unchecked(50)).await.unwrap();
1391
1392 assert_eq!(journal.bounds().start, pruned_boundary);
1393 });
1394 }
1395
1396 #[test_traced("INFO")]
1398 fn test_mmr_prunes_to_journal_boundary() {
1399 let executor = deterministic::Runner::default();
1400 executor.start(|context| async move {
1401 let mut journal = create_journal_with_ops(context, "mmr_boundary", 50)
1402 .await
1403 .into_dirty();
1404
1405 journal
1406 .append(Operation::CommitFloor(None, Location::new_unchecked(25)))
1407 .await
1408 .unwrap();
1409 let mut journal = journal.merkleize();
1410 journal.sync().await.unwrap();
1411
1412 let pruned_boundary = journal.prune(Location::new_unchecked(25)).await.unwrap();
1413
1414 assert!(!journal.bounds().is_empty());
1416 assert_eq!(pruned_boundary, journal.bounds().start);
1417
1418 assert!(pruned_boundary <= Location::new_unchecked(25));
1420
1421 assert_eq!(journal.size(), 51);
1423 });
1424 }
1425
1426 #[test_traced("INFO")]
1428 fn test_proof_multiple_operations() {
1429 let executor = deterministic::Runner::default();
1430 executor.start(|context| async move {
1431 let journal = create_journal_with_ops(context, "proof_multi", 50).await;
1432
1433 let (proof, ops) = journal
1434 .proof(Location::new_unchecked(0), NZU64!(50))
1435 .await
1436 .unwrap();
1437
1438 assert_eq!(ops.len(), 50);
1439 for (i, op) in ops.iter().enumerate() {
1440 assert_eq!(*op, create_operation(i as u8));
1441 }
1442
1443 let mut hasher = StandardHasher::new();
1445 let root = journal.root();
1446 assert!(verify_proof(
1447 &proof,
1448 &ops,
1449 Location::new_unchecked(0),
1450 &root,
1451 &mut hasher
1452 ));
1453 });
1454 }
1455
1456 #[test_traced("INFO")]
1458 fn test_historical_proof_limited_by_max_ops() {
1459 let executor = deterministic::Runner::default();
1460 executor.start(|context| async move {
1461 let journal = create_journal_with_ops(context, "proof_limit", 50).await;
1462
1463 let size = journal.size();
1464 let (proof, ops) = journal
1465 .historical_proof(size, Location::new_unchecked(0), NZU64!(20))
1466 .await
1467 .unwrap();
1468
1469 assert_eq!(ops.len(), 20);
1471 for (i, op) in ops.iter().enumerate() {
1472 assert_eq!(*op, create_operation(i as u8));
1473 }
1474
1475 let mut hasher = StandardHasher::new();
1477 let root = journal.root();
1478 assert!(verify_proof(
1479 &proof,
1480 &ops,
1481 Location::new_unchecked(0),
1482 &root,
1483 &mut hasher
1484 ));
1485 });
1486 }
1487
1488 #[test_traced("INFO")]
1490 fn test_historical_proof_at_end_of_journal() {
1491 let executor = deterministic::Runner::default();
1492 executor.start(|context| async move {
1493 let journal = create_journal_with_ops(context, "proof_end", 50).await;
1494
1495 let size = journal.size();
1496 let (proof, ops) = journal
1498 .historical_proof(size, Location::new_unchecked(40), NZU64!(20))
1499 .await
1500 .unwrap();
1501
1502 assert_eq!(ops.len(), 10);
1504 for (i, op) in ops.iter().enumerate() {
1505 assert_eq!(*op, create_operation((40 + i) as u8));
1506 }
1507
1508 let mut hasher = StandardHasher::new();
1510 let root = journal.root();
1511 assert!(verify_proof(
1512 &proof,
1513 &ops,
1514 Location::new_unchecked(40),
1515 &root,
1516 &mut hasher
1517 ));
1518 });
1519 }
1520
1521 #[test_traced("INFO")]
1523 fn test_historical_proof_out_of_range_returns_error() {
1524 let executor = deterministic::Runner::default();
1525 executor.start(|context| async move {
1526 let journal = create_journal_with_ops(context, "proof_oob", 5).await;
1527
1528 let result = journal
1530 .historical_proof(
1531 Location::new_unchecked(10),
1532 Location::new_unchecked(0),
1533 NZU64!(1),
1534 )
1535 .await;
1536
1537 assert!(matches!(
1538 result,
1539 Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
1540 ));
1541 });
1542 }
1543
1544 #[test_traced("INFO")]
1546 fn test_historical_proof_start_too_large_returns_error() {
1547 let executor = deterministic::Runner::default();
1548 executor.start(|context| async move {
1549 let journal = create_journal_with_ops(context, "proof_start_oob", 5).await;
1550
1551 let size = journal.size();
1552 let result = journal.historical_proof(size, size, NZU64!(1)).await;
1554
1555 assert!(matches!(
1556 result,
1557 Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
1558 ));
1559 });
1560 }
1561
1562 #[test_traced("INFO")]
1564 fn test_historical_proof_truly_historical() {
1565 let executor = deterministic::Runner::default();
1566 executor.start(|context| async move {
1567 let journal = create_journal_with_ops(context, "proof_historical", 50).await;
1569
1570 let mut hasher = StandardHasher::new();
1572 let historical_root = journal.root();
1573 let historical_size = journal.size();
1574
1575 let mut journal = journal.into_dirty();
1577 for i in 50..100 {
1578 journal.append(create_operation(i as u8)).await.unwrap();
1579 }
1580 let mut journal = journal.merkleize();
1581 journal.sync().await.unwrap();
1582
1583 let (proof, ops) = journal
1585 .historical_proof(historical_size, Location::new_unchecked(0), NZU64!(50))
1586 .await
1587 .unwrap();
1588
1589 assert_eq!(ops.len(), 50);
1591 for (i, op) in ops.iter().enumerate() {
1592 assert_eq!(*op, create_operation(i as u8));
1593 }
1594
1595 assert!(verify_proof(
1597 &proof,
1598 &ops,
1599 Location::new_unchecked(0),
1600 &historical_root,
1601 &mut hasher
1602 ));
1603 });
1604 }
1605
1606 #[test_traced("INFO")]
1608 fn test_historical_proof_pruned_location_returns_error() {
1609 let executor = deterministic::Runner::default();
1610 executor.start(|context| async move {
1611 let journal = create_journal_with_ops(context, "proof_pruned", 50).await;
1612
1613 let mut journal = journal.into_dirty();
1614 journal
1615 .append(Operation::CommitFloor(None, Location::new_unchecked(25)))
1616 .await
1617 .unwrap();
1618 let mut journal = journal.merkleize();
1619 journal.sync().await.unwrap();
1620 let pruned_boundary = journal.prune(Location::new_unchecked(25)).await.unwrap();
1621
1622 let size = journal.size();
1624 let start_loc = Location::new_unchecked(0);
1625 if start_loc < pruned_boundary {
1626 let result = journal.historical_proof(size, start_loc, NZU64!(1)).await;
1627
1628 assert!(result.is_err());
1630 }
1631 });
1632 }
1633
1634 #[test_traced("INFO")]
1636 fn test_replay_operations() {
1637 let executor = deterministic::Runner::default();
1638 executor.start(|context| async move {
1639 let journal = create_empty_journal(context.with_label("empty"), "replay").await;
1641 let stream = journal.replay(0, NZUsize!(10)).await.unwrap();
1642 futures::pin_mut!(stream);
1643 assert!(stream.next().await.is_none());
1644
1645 let journal =
1647 create_journal_with_ops(context.with_label("with_ops"), "replay", 50).await;
1648 let stream = journal.replay(0, NZUsize!(100)).await.unwrap();
1649 futures::pin_mut!(stream);
1650
1651 for i in 0..50 {
1652 let (pos, op) = stream.next().await.unwrap().unwrap();
1653 assert_eq!(pos, i);
1654 assert_eq!(op, create_operation(i as u8));
1655 }
1656
1657 assert!(stream.next().await.is_none());
1658 });
1659 }
1660
1661 #[test_traced("INFO")]
1663 fn test_replay_from_middle() {
1664 let executor = deterministic::Runner::default();
1665 executor.start(|context| async move {
1666 let journal = create_journal_with_ops(context, "replay_middle", 50).await;
1667 let stream = journal.replay(25, NZUsize!(100)).await.unwrap();
1668 futures::pin_mut!(stream);
1669
1670 let mut count = 0;
1671 while let Some(result) = stream.next().await {
1672 let (pos, op) = result.unwrap();
1673 assert_eq!(pos, 25 + count);
1674 assert_eq!(op, create_operation((25 + count) as u8));
1675 count += 1;
1676 }
1677
1678 assert_eq!(count, 25);
1680 });
1681 }
1682}