1use super::{
4 metrics,
5 safe_tip::SafeTip,
6 types::{Ack, Activity, Error, Item, TipAck},
7 Config,
8};
9use crate::{
10 aggregation::{scheme, types::Certificate},
11 types::{Epoch, EpochDelta, Height, HeightDelta, Participant},
12 Automaton, Monitor, Reporter,
13};
14use commonware_cryptography::{
15 certificate::{Provider, Scheme},
16 Digest,
17};
18use commonware_macros::select;
19use commonware_p2p::{
20 utils::codec::{wrap, WrappedSender},
21 Blocker, Receiver, Recipients, Sender,
22};
23use commonware_parallel::Strategy;
24use commonware_runtime::{
25 buffer::PoolRef,
26 spawn_cell,
27 telemetry::metrics::{
28 histogram,
29 status::{CounterExt, GaugeExt, Status},
30 },
31 Clock, ContextCell, Handle, Metrics, Spawner, Storage,
32};
33use commonware_storage::journal::segmented::variable::{Config as JConfig, Journal};
34use commonware_utils::{futures::Pool as FuturesPool, ordered::Quorum, N3f1, PrioritySet};
35use futures::{
36 future::{self, Either},
37 pin_mut, StreamExt,
38};
39use rand_core::CryptoRngCore;
40use std::{
41 cmp::max,
42 collections::BTreeMap,
43 num::{NonZeroU64, NonZeroUsize},
44 sync::Arc,
45 time::{Duration, SystemTime},
46};
47use tracing::{debug, error, info, trace, warn};
48
49enum Pending<S: Scheme, D: Digest> {
51 Unverified(BTreeMap<Epoch, BTreeMap<Participant, Ack<S, D>>>),
54
55 Verified(D, BTreeMap<Epoch, BTreeMap<Participant, Ack<S, D>>>),
57}
58
59struct DigestRequest<D: Digest, E: Clock> {
62 height: Height,
64
65 result: Result<D, Error>,
67
68 timer: histogram::Timer<E>,
70}
71
72pub struct Engine<
74 E: Clock + Spawner + Storage + Metrics + CryptoRngCore,
75 P: Provider<Scope = Epoch>,
76 D: Digest,
77 A: Automaton<Context = Height, Digest = D> + Clone,
78 Z: Reporter<Activity = Activity<P::Scheme, D>>,
79 M: Monitor<Index = Epoch>,
80 B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
81 T: Strategy,
82> {
83 context: ContextCell<E>,
85 automaton: A,
86 monitor: M,
87 provider: P,
88 reporter: Z,
89 blocker: B,
90 strategy: T,
91
92 epoch_bounds: (EpochDelta, EpochDelta),
101
102 window: HeightDelta,
104
105 activity_timeout: HeightDelta,
107
108 digest_requests: FuturesPool<DigestRequest<D, E>>,
111
112 epoch: Epoch,
115
116 tip: Height,
118
119 safe_tip: SafeTip<<P::Scheme as Scheme>::PublicKey>,
121
122 pending: BTreeMap<Height, Pending<P::Scheme, D>>,
126
127 confirmed: BTreeMap<Height, Certificate<P::Scheme, D>>,
129
130 rebroadcast_timeout: Duration,
133
134 rebroadcast_deadlines: PrioritySet<Height, SystemTime>,
136
137 journal: Option<Journal<E, Activity<P::Scheme, D>>>,
140 journal_partition: String,
141 journal_write_buffer: NonZeroUsize,
142 journal_replay_buffer: NonZeroUsize,
143 journal_heights_per_section: NonZeroU64,
144 journal_compression: Option<u8>,
145 journal_buffer_pool: PoolRef,
146
147 priority_acks: bool,
150
151 metrics: metrics::Metrics<E>,
154}
155
156impl<
157 E: Clock + Spawner + Storage + Metrics + CryptoRngCore,
158 P: Provider<Scope = Epoch, Scheme: scheme::Scheme<D>>,
159 D: Digest,
160 A: Automaton<Context = Height, Digest = D> + Clone,
161 Z: Reporter<Activity = Activity<P::Scheme, D>>,
162 M: Monitor<Index = Epoch>,
163 B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
164 T: Strategy,
165 > Engine<E, P, D, A, Z, M, B, T>
166{
167 pub fn new(context: E, cfg: Config<P, D, A, Z, M, B, T>) -> Self {
169 let metrics = metrics::Metrics::init(context.clone());
171
172 Self {
173 context: ContextCell::new(context),
174 automaton: cfg.automaton,
175 reporter: cfg.reporter,
176 monitor: cfg.monitor,
177 provider: cfg.provider,
178 blocker: cfg.blocker,
179 strategy: cfg.strategy,
180 epoch_bounds: cfg.epoch_bounds,
181 window: HeightDelta::new(cfg.window.into()),
182 activity_timeout: cfg.activity_timeout,
183 epoch: Epoch::zero(),
184 tip: Height::zero(),
185 safe_tip: SafeTip::default(),
186 digest_requests: FuturesPool::default(),
187 pending: BTreeMap::new(),
188 confirmed: BTreeMap::new(),
189 rebroadcast_timeout: cfg.rebroadcast_timeout.into(),
190 rebroadcast_deadlines: PrioritySet::new(),
191 journal: None,
192 journal_partition: cfg.journal_partition,
193 journal_write_buffer: cfg.journal_write_buffer,
194 journal_replay_buffer: cfg.journal_replay_buffer,
195 journal_heights_per_section: cfg.journal_heights_per_section,
196 journal_compression: cfg.journal_compression,
197 journal_buffer_pool: cfg.journal_buffer_pool,
198 priority_acks: cfg.priority_acks,
199 metrics,
200 }
201 }
202
203 fn scheme(&self, epoch: Epoch) -> Result<Arc<P::Scheme>, Error> {
205 self.provider
206 .scoped(epoch)
207 .ok_or(Error::UnknownEpoch(epoch))
208 }
209
210 pub fn start(
220 mut self,
221 network: (
222 impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
223 impl Receiver<PublicKey = <P::Scheme as Scheme>::PublicKey>,
224 ),
225 ) -> Handle<()> {
226 spawn_cell!(self.context, self.run(network).await)
227 }
228
229 async fn run(
231 mut self,
232 network: (
233 impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
234 impl Receiver<PublicKey = <P::Scheme as Scheme>::PublicKey>,
235 ),
236 ) {
237 let (mut sender, mut receiver) = wrap((), network.0, network.1);
238 let mut shutdown = self.context.stopped();
239
240 let (latest, mut epoch_updates) = self.monitor.subscribe().await;
242 self.epoch = latest;
243
244 let journal_cfg = JConfig {
246 partition: self.journal_partition.clone(),
247 compression: self.journal_compression,
248 codec_config: P::Scheme::certificate_codec_config_unbounded(),
249 buffer_pool: self.journal_buffer_pool.clone(),
250 write_buffer: self.journal_write_buffer,
251 };
252 let journal = Journal::init(
253 self.context.with_label("journal").into_present(),
254 journal_cfg,
255 )
256 .await
257 .expect("init failed");
258 let unverified_heights = self.replay(&journal).await;
259 self.journal = Some(journal);
260
261 for height in unverified_heights {
263 trace!(%height, "requesting digest for unverified height from replay");
264 self.get_digest(height);
265 }
266
267 let scheme = self
269 .scheme(self.epoch)
270 .expect("current epoch scheme must exist");
271 self.safe_tip.init(scheme.participants());
272
273 loop {
274 let _ = self.metrics.tip.try_set(self.tip.get());
275
276 let next = self.next();
278
279 if next.delta_from(self.tip).unwrap() < self.window {
281 trace!(%next, "requesting new digest");
282 assert!(self
283 .pending
284 .insert(next, Pending::Unverified(BTreeMap::new()))
285 .is_none());
286 self.get_digest(next);
287 continue;
288 }
289
290 let rebroadcast = match self.rebroadcast_deadlines.peek() {
292 Some((_, &deadline)) => Either::Left(self.context.sleep_until(deadline)),
293 None => Either::Right(future::pending()),
294 };
295
296 select! {
298 _ = &mut shutdown => {
300 debug!("shutdown");
301 break;
302 },
303
304 epoch = epoch_updates.next() => {
306 let Some(epoch) = epoch else {
308 error!("epoch subscription failed");
309 break;
310 };
311
312 debug!(current = %self.epoch, new = %epoch, "refresh epoch");
314 assert!(epoch >= self.epoch);
315 self.epoch = epoch;
316
317 let scheme = self.scheme(self.epoch)
319 .expect("current epoch scheme must exist");
320 self.safe_tip.reconcile(scheme.participants());
321
322 let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0);
324 self.pending.iter_mut().for_each(|(_, pending)| {
325 match pending {
326 Pending::Unverified(acks) => {
327 acks.retain(|epoch, _| *epoch >= min_epoch);
328 }
329 Pending::Verified(_, acks) => {
330 acks.retain(|epoch, _| *epoch >= min_epoch);
331 }
332 }
333 });
334
335 continue;
336 },
337
338 request = self.digest_requests.next_completed() => {
340 let DigestRequest { height, result, timer } = request;
341 drop(timer); match result {
343 Err(err) => {
344 warn!(?err, %height, "automaton returned error");
345 self.metrics.digest.inc(Status::Dropped);
346 }
347 Ok(digest) => {
348 if let Err(err) = self.handle_digest(height, digest, &mut sender).await {
349 debug!(?err, %height, "handle_digest failed");
350 continue;
351 }
352 }
353 }
354 },
355
356 msg = receiver.recv() => {
358 let (sender, msg) = match msg {
360 Ok(r) => r,
361 Err(err) => {
362 warn!(?err, "ack receiver failed");
363 break;
364 }
365 };
366 let mut guard = self.metrics.acks.guard(Status::Invalid);
367 let TipAck { ack, tip } = match msg {
368 Ok(peer_ack) => peer_ack,
369 Err(err) => {
370 warn!(?err, ?sender, "ack decode failed, blocking peer");
371 self.blocker.block(sender).await;
372 continue;
373 }
374 };
375
376 if self.safe_tip.update(sender.clone(), tip).is_some() {
378 let safe_tip = self.safe_tip.get();
380 if safe_tip > self.tip {
381 self.fast_forward_tip(safe_tip).await;
382 }
383 }
384
385 if let Err(err) = self.validate_ack(&ack, &sender) {
387 if err.blockable() {
388 warn!(?sender, ?err, "blocking peer for validation failure");
389 self.blocker.block(sender).await;
390 } else {
391 debug!(?sender, ?err, "ack validate failed");
392 }
393 continue;
394 };
395
396 if let Err(err) = self.handle_ack(&ack).await {
398 debug!(?err, ?sender, "ack handle failed");
399 guard.set(Status::Failure);
400 continue;
401 }
402
403 debug!(?sender, epoch = %ack.epoch, height = %ack.item.height, "ack");
405 guard.set(Status::Success);
406 },
407
408 _ = rebroadcast => {
410 let (height, _) = self.rebroadcast_deadlines.pop().expect("no rebroadcast deadline");
412 trace!(%height, "rebroadcasting");
413 if let Err(err) = self.handle_rebroadcast(height, &mut sender).await {
414 warn!(?err, %height, "rebroadcast failed");
415 };
416 }
417 }
418 }
419
420 if let Some(journal) = self.journal.take() {
422 journal
423 .sync_all()
424 .await
425 .expect("unable to close aggregation journal");
426 }
427 }
428
429 async fn handle_digest(
433 &mut self,
434 height: Height,
435 digest: D,
436 sender: &mut WrappedSender<
437 impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
438 TipAck<P::Scheme, D>,
439 >,
440 ) -> Result<(), Error> {
441 if !matches!(self.pending.get(&height), Some(Pending::Unverified(_))) {
443 return Err(Error::AckHeight(height));
444 };
445
446 let Some(Pending::Unverified(acks)) = self.pending.remove(&height) else {
448 panic!("Pending::Unverified entry not found");
449 };
450 self.pending
451 .insert(height, Pending::Verified(digest, BTreeMap::new()));
452
453 for epoch_acks in acks.values() {
457 for epoch_ack in epoch_acks.values() {
458 if epoch_ack.item.digest != digest {
460 continue;
461 }
462
463 let _ = self.handle_ack(epoch_ack).await;
465 }
466 if self.confirmed.contains_key(&height) {
468 break;
469 }
470 }
471
472 let ack = self.sign_ack(height, digest).await?;
474
475 self.rebroadcast_deadlines
477 .put(height, self.context.current() + self.rebroadcast_timeout);
478
479 let _ = self.handle_ack(&ack).await;
481
482 self.broadcast(ack, sender).await?;
484
485 Ok(())
486 }
487
488 async fn handle_ack(&mut self, ack: &Ack<P::Scheme, D>) -> Result<(), Error> {
493 let scheme = self.scheme(ack.epoch)?;
495 let quorum = scheme.participants().quorum::<N3f1>();
496
497 let acks_by_epoch = match self.pending.get_mut(&ack.item.height) {
499 None => {
500 return Err(Error::AckHeight(ack.item.height));
503 }
504 Some(Pending::Unverified(acks)) => acks,
505 Some(Pending::Verified(digest, acks)) => {
506 if ack.item.digest != *digest {
508 return Err(Error::AckDigest(ack.item.height));
509 }
510 acks
511 }
512 };
513
514 let acks = acks_by_epoch.entry(ack.epoch).or_default();
516 if acks.contains_key(&ack.attestation.signer) {
517 return Ok(());
518 }
519 acks.insert(ack.attestation.signer, ack.clone());
520
521 let filtered = acks
523 .values()
524 .filter(|a| a.item.digest == ack.item.digest)
525 .collect::<Vec<_>>();
526 if filtered.len() >= quorum as usize {
527 if let Some(certificate) = Certificate::from_acks(&*scheme, filtered, &self.strategy) {
528 self.metrics.certificates.inc();
529 self.handle_certificate(certificate).await;
530 }
531 }
532
533 Ok(())
534 }
535
536 async fn handle_certificate(&mut self, certificate: Certificate<P::Scheme, D>) {
538 let height = certificate.item.height;
540 if self.confirmed.contains_key(&height) {
541 return;
542 }
543
544 self.confirmed.insert(height, certificate.clone());
546
547 let certified = Activity::Certified(certificate);
549 self.record(certified.clone()).await;
550 self.sync(height).await;
551 self.reporter.report(certified).await;
552
553 if height == self.tip {
555 let mut new_tip = height.next();
557 while self.confirmed.contains_key(&new_tip) && new_tip.get() < u64::MAX {
558 new_tip = new_tip.next();
559 }
560
561 if new_tip > self.tip {
563 self.fast_forward_tip(new_tip).await;
564 }
565 }
566 }
567
568 async fn handle_rebroadcast(
570 &mut self,
571 height: Height,
572 sender: &mut WrappedSender<
573 impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
574 TipAck<P::Scheme, D>,
575 >,
576 ) -> Result<(), Error> {
577 let Some(Pending::Verified(digest, acks)) = self.pending.get(&height) else {
578 return Ok(());
580 };
581
582 let scheme = self.scheme(self.epoch)?;
584 let Some(signer) = scheme.me() else {
585 return Err(Error::NotSigner(self.epoch));
586 };
587 let ack = acks
588 .get(&self.epoch)
589 .and_then(|acks| acks.get(&signer).cloned());
590 let ack = match ack {
591 Some(ack) => ack,
592 None => self.sign_ack(height, *digest).await?,
593 };
594
595 self.rebroadcast_deadlines
597 .put(height, self.context.current() + self.rebroadcast_timeout);
598
599 self.broadcast(ack, sender).await
601 }
602
603 fn validate_ack(
609 &mut self,
610 ack: &Ack<P::Scheme, D>,
611 sender: &<P::Scheme as Scheme>::PublicKey,
612 ) -> Result<(), Error> {
613 {
615 let (eb_lo, eb_hi) = self.epoch_bounds;
616 let bound_lo = self.epoch.saturating_sub(eb_lo);
617 let bound_hi = self.epoch.saturating_add(eb_hi);
618 if ack.epoch < bound_lo || ack.epoch > bound_hi {
619 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
620 }
621 }
622
623 let scheme = self.scheme(ack.epoch)?;
625 let participants = scheme.participants();
626 let Some(signer) = participants.index(sender) else {
627 return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
628 };
629 if signer != ack.attestation.signer {
630 return Err(Error::PeerMismatch);
631 }
632
633 let activity_threshold = self.tip.saturating_sub(self.activity_timeout);
635 if ack.item.height < activity_threshold {
636 return Err(Error::AckCertified(ack.item.height));
637 }
638
639 if ack
641 .item
642 .height
643 .delta_from(self.tip)
644 .is_some_and(|d| d >= self.window)
645 {
646 return Err(Error::AckHeight(ack.item.height));
647 }
648
649 if self.confirmed.contains_key(&ack.item.height) {
651 return Err(Error::AckCertified(ack.item.height));
652 }
653 let have_ack = match self.pending.get(&ack.item.height) {
654 None => false,
655 Some(Pending::Unverified(epoch_map)) => epoch_map
656 .get(&ack.epoch)
657 .is_some_and(|acks| acks.contains_key(&ack.attestation.signer)),
658 Some(Pending::Verified(digest, epoch_map)) => {
659 if ack.item.digest != *digest {
662 return Err(Error::AckDigest(ack.item.height));
663 }
664 epoch_map
665 .get(&ack.epoch)
666 .is_some_and(|acks| acks.contains_key(&ack.attestation.signer))
667 }
668 };
669 if have_ack {
670 return Err(Error::AckDuplicate(sender.to_string(), ack.item.height));
671 }
672
673 if !ack.verify(&mut self.context, &*scheme, &self.strategy) {
675 return Err(Error::InvalidAckSignature);
676 }
677
678 Ok(())
679 }
680
681 fn get_digest(&mut self, height: Height) {
687 assert!(self.pending.contains_key(&height));
688 let mut automaton = self.automaton.clone();
689 let timer = self.metrics.digest_duration.timer();
690 self.digest_requests.push(async move {
691 let receiver = automaton.propose(height).await;
692 let result = receiver.await.map_err(Error::AppProposeCanceled);
693 DigestRequest {
694 height,
695 result,
696 timer,
697 }
698 });
699 }
700
701 async fn sign_ack(&mut self, height: Height, digest: D) -> Result<Ack<P::Scheme, D>, Error> {
704 let scheme = self.scheme(self.epoch)?;
705 if scheme.me().is_none() {
706 return Err(Error::NotSigner(self.epoch));
707 }
708
709 let item = Item { height, digest };
711 let ack = Ack::sign(&*scheme, self.epoch, item).ok_or(Error::NotSigner(self.epoch))?;
712
713 self.record(Activity::Ack(ack.clone())).await;
715 self.sync(height).await;
716
717 Ok(ack)
718 }
719
720 async fn broadcast(
724 &mut self,
725 ack: Ack<P::Scheme, D>,
726 sender: &mut WrappedSender<
727 impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
728 TipAck<P::Scheme, D>,
729 >,
730 ) -> Result<(), Error> {
731 sender
732 .send(
733 Recipients::All,
734 TipAck { ack, tip: self.tip },
735 self.priority_acks,
736 )
737 .await
738 .map_err(|err| {
739 warn!(?err, "failed to send ack");
740 Error::UnableToSendMessage
741 })?;
742 Ok(())
743 }
744
745 fn next(&self) -> Height {
748 let max_pending = self
749 .pending
750 .last_key_value()
751 .map(|(k, _)| k.next())
752 .unwrap_or_default();
753 let max_confirmed = self
754 .confirmed
755 .last_key_value()
756 .map(|(k, _)| k.next())
757 .unwrap_or_default();
758 max(self.tip, max(max_pending, max_confirmed))
759 }
760
761 async fn fast_forward_tip(&mut self, tip: Height) {
767 assert!(tip > self.tip);
768
769 let activity_threshold = tip.saturating_sub(self.activity_timeout);
771 self.pending
772 .retain(|height, _| *height >= activity_threshold);
773 self.confirmed
774 .retain(|height, _| *height >= activity_threshold);
775
776 self.record(Activity::Tip(tip)).await;
778 self.sync(tip).await;
779 self.reporter.report(Activity::Tip(tip)).await;
780
781 let section = self.get_journal_section(activity_threshold);
783 let journal = self.journal.as_mut().expect("journal must be initialized");
784 let _ = journal.prune(section).await;
785
786 self.tip = tip;
788 }
789
790 const fn get_journal_section(&self, height: Height) -> u64 {
794 height.get() / self.journal_heights_per_section.get()
795 }
796
797 async fn replay(&mut self, journal: &Journal<E, Activity<P::Scheme, D>>) -> Vec<Height> {
800 let mut tip = Height::default();
801 let mut certified = Vec::new();
802 let mut acks = Vec::new();
803 let stream = journal
804 .replay(0, 0, self.journal_replay_buffer)
805 .await
806 .expect("replay failed");
807 pin_mut!(stream);
808 while let Some(msg) = stream.next().await {
809 let (_, _, _, activity) = msg.expect("replay failed");
810 match activity {
811 Activity::Tip(height) => {
812 tip = max(tip, height);
813 self.reporter.report(Activity::Tip(height)).await;
814 }
815 Activity::Certified(certificate) => {
816 certified.push(certificate.clone());
817 self.reporter.report(Activity::Certified(certificate)).await;
818 }
819 Activity::Ack(ack) => {
820 acks.push(ack.clone());
821 self.reporter.report(Activity::Ack(ack)).await;
822 }
823 }
824 }
825
826 self.tip = tip;
828 let activity_threshold = tip.saturating_sub(self.activity_timeout);
829
830 certified
832 .iter()
833 .filter(|certificate| certificate.item.height >= activity_threshold)
834 .for_each(|certificate| {
835 self.confirmed
836 .insert(certificate.item.height, certificate.clone());
837 });
838
839 let mut acks_by_height: BTreeMap<Height, Vec<Ack<P::Scheme, D>>> = BTreeMap::new();
841 for ack in acks {
842 if ack.item.height >= activity_threshold
843 && !self.confirmed.contains_key(&ack.item.height)
844 {
845 acks_by_height.entry(ack.item.height).or_default().push(ack);
846 }
847 }
848
849 let mut unverified = Vec::new();
851 for (height, mut acks_group) in acks_by_height {
852 let current_scheme = self.scheme(self.epoch).ok();
854 let our_signer = current_scheme.as_ref().and_then(|s| s.me());
855 let our_digest = our_signer.and_then(|signer| {
856 acks_group
857 .iter()
858 .find(|ack| ack.epoch == self.epoch && ack.attestation.signer == signer)
859 .map(|ack| ack.item.digest)
860 });
861
862 if let Some(digest) = our_digest {
864 acks_group.retain(|other| other.item.digest == digest);
865 }
866
867 let mut epoch_map = BTreeMap::new();
869 for ack in acks_group {
870 epoch_map
871 .entry(ack.epoch)
872 .or_insert_with(BTreeMap::new)
873 .insert(ack.attestation.signer, ack);
874 }
875
876 match our_digest {
879 Some(digest) => {
880 self.pending
881 .insert(height, Pending::Verified(digest, epoch_map));
882
883 self.rebroadcast_deadlines
885 .put(height, self.context.current());
886 }
887 None => {
888 self.pending.insert(height, Pending::Unverified(epoch_map));
889
890 unverified.push(height);
892 }
893 }
894 }
895
896 let next = self.next();
899 for height in Height::range(self.tip, next) {
900 if self.pending.contains_key(&height) || self.confirmed.contains_key(&height) {
902 continue;
903 }
904
905 self.pending
907 .insert(height, Pending::Unverified(BTreeMap::new()));
908 unverified.push(height);
909 }
910 info!(tip = %self.tip, %next, ?unverified, "replayed journal");
911
912 unverified
913 }
914
915 async fn record(&mut self, activity: Activity<P::Scheme, D>) {
917 let height = match activity {
918 Activity::Ack(ref ack) => ack.item.height,
919 Activity::Certified(ref certificate) => certificate.item.height,
920 Activity::Tip(h) => h,
921 };
922 let section = self.get_journal_section(height);
923 self.journal
924 .as_mut()
925 .expect("journal must be initialized")
926 .append(section, activity)
927 .await
928 .expect("unable to append to journal");
929 }
930
931 async fn sync(&mut self, height: Height) {
933 let section = self.get_journal_section(height);
934 let journal = self.journal.as_mut().expect("journal must be initialized");
935 journal.sync(section).await.expect("unable to sync journal");
936 }
937}