1use super::{
4 metrics,
5 safe_tip::SafeTip,
6 types::{Ack, Activity, Error, Item, TipAck},
7 Config,
8};
9use crate::{
10 aggregation::{scheme, types::Certificate},
11 types::{Epoch, EpochDelta, Height, HeightDelta, Participant},
12 Automaton, Monitor, Reporter,
13};
14use commonware_cryptography::{
15 certificate::{Provider, Scheme},
16 Digest,
17};
18use commonware_macros::select_loop;
19use commonware_p2p::{
20 utils::codec::{wrap, WrappedSender},
21 Blocker, Receiver, Recipients, Sender,
22};
23use commonware_parallel::Strategy;
24use commonware_runtime::{
25 buffer::paged::CacheRef,
26 spawn_cell,
27 telemetry::metrics::{
28 histogram,
29 status::{CounterExt, GaugeExt, Status},
30 },
31 BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
32};
33use commonware_storage::journal::segmented::variable::{Config as JConfig, Journal};
34use commonware_utils::{futures::Pool as FuturesPool, ordered::Quorum, N3f1, PrioritySet};
35use futures::{
36 future::{self, Either},
37 pin_mut, StreamExt,
38};
39use rand_core::CryptoRngCore;
40use std::{
41 cmp::max,
42 collections::BTreeMap,
43 num::{NonZeroU64, NonZeroUsize},
44 sync::Arc,
45 time::{Duration, SystemTime},
46};
47use tracing::{debug, error, info, trace, warn};
48
49enum Pending<S: Scheme, D: Digest> {
51 Unverified(BTreeMap<Epoch, BTreeMap<Participant, Ack<S, D>>>),
54
55 Verified(D, BTreeMap<Epoch, BTreeMap<Participant, Ack<S, D>>>),
57}
58
59struct DigestRequest<D: Digest, E: Clock> {
62 height: Height,
64
65 result: Result<D, Error>,
67
68 timer: histogram::Timer<E>,
70}
71
72pub struct Engine<
74 E: BufferPooler + Clock + Spawner + Storage + Metrics + CryptoRngCore,
75 P: Provider<Scope = Epoch>,
76 D: Digest,
77 A: Automaton<Context = Height, Digest = D> + Clone,
78 Z: Reporter<Activity = Activity<P::Scheme, D>>,
79 M: Monitor<Index = Epoch>,
80 B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
81 T: Strategy,
82> {
83 context: ContextCell<E>,
85 automaton: A,
86 monitor: M,
87 provider: P,
88 reporter: Z,
89 blocker: B,
90 strategy: T,
91
92 epoch_bounds: (EpochDelta, EpochDelta),
101
102 window: HeightDelta,
104
105 activity_timeout: HeightDelta,
107
108 digest_requests: FuturesPool<DigestRequest<D, E>>,
111
112 epoch: Epoch,
115
116 tip: Height,
118
119 safe_tip: SafeTip<<P::Scheme as Scheme>::PublicKey>,
121
122 pending: BTreeMap<Height, Pending<P::Scheme, D>>,
126
127 confirmed: BTreeMap<Height, Certificate<P::Scheme, D>>,
129
130 rebroadcast_timeout: Duration,
133
134 rebroadcast_deadlines: PrioritySet<Height, SystemTime>,
136
137 journal: Option<Journal<E, Activity<P::Scheme, D>>>,
140 journal_partition: String,
141 journal_write_buffer: NonZeroUsize,
142 journal_replay_buffer: NonZeroUsize,
143 journal_heights_per_section: NonZeroU64,
144 journal_compression: Option<u8>,
145 journal_page_cache: CacheRef,
146
147 priority_acks: bool,
150
151 metrics: metrics::Metrics<E>,
154}
155
156impl<
157 E: BufferPooler + Clock + Spawner + Storage + Metrics + CryptoRngCore,
158 P: Provider<Scope = Epoch, Scheme: scheme::Scheme<D>>,
159 D: Digest,
160 A: Automaton<Context = Height, Digest = D> + Clone,
161 Z: Reporter<Activity = Activity<P::Scheme, D>>,
162 M: Monitor<Index = Epoch>,
163 B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
164 T: Strategy,
165 > Engine<E, P, D, A, Z, M, B, T>
166{
167 pub fn new(context: E, cfg: Config<P, D, A, Z, M, B, T>) -> Self {
169 let metrics = metrics::Metrics::init(context.clone());
171
172 Self {
173 context: ContextCell::new(context),
174 automaton: cfg.automaton,
175 reporter: cfg.reporter,
176 monitor: cfg.monitor,
177 provider: cfg.provider,
178 blocker: cfg.blocker,
179 strategy: cfg.strategy,
180 epoch_bounds: cfg.epoch_bounds,
181 window: HeightDelta::new(cfg.window.into()),
182 activity_timeout: cfg.activity_timeout,
183 epoch: Epoch::zero(),
184 tip: Height::zero(),
185 safe_tip: SafeTip::default(),
186 digest_requests: FuturesPool::default(),
187 pending: BTreeMap::new(),
188 confirmed: BTreeMap::new(),
189 rebroadcast_timeout: cfg.rebroadcast_timeout.into(),
190 rebroadcast_deadlines: PrioritySet::new(),
191 journal: None,
192 journal_partition: cfg.journal_partition,
193 journal_write_buffer: cfg.journal_write_buffer,
194 journal_replay_buffer: cfg.journal_replay_buffer,
195 journal_heights_per_section: cfg.journal_heights_per_section,
196 journal_compression: cfg.journal_compression,
197 journal_page_cache: cfg.journal_page_cache,
198 priority_acks: cfg.priority_acks,
199 metrics,
200 }
201 }
202
203 fn scheme(&self, epoch: Epoch) -> Result<Arc<P::Scheme>, Error> {
205 self.provider
206 .scoped(epoch)
207 .ok_or(Error::UnknownEpoch(epoch))
208 }
209
210 pub fn start(
220 mut self,
221 network: (
222 impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
223 impl Receiver<PublicKey = <P::Scheme as Scheme>::PublicKey>,
224 ),
225 ) -> Handle<()> {
226 spawn_cell!(self.context, self.run(network).await)
227 }
228
229 async fn run(
231 mut self,
232 network: (
233 impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
234 impl Receiver<PublicKey = <P::Scheme as Scheme>::PublicKey>,
235 ),
236 ) {
237 let (mut sender, mut receiver) = wrap(
238 (),
239 self.context.network_buffer_pool().clone(),
240 network.0,
241 network.1,
242 );
243
244 let (latest, mut epoch_updates) = self.monitor.subscribe().await;
246 self.epoch = latest;
247
248 let journal_cfg = JConfig {
250 partition: self.journal_partition.clone(),
251 compression: self.journal_compression,
252 codec_config: P::Scheme::certificate_codec_config_unbounded(),
253 page_cache: self.journal_page_cache.clone(),
254 write_buffer: self.journal_write_buffer,
255 };
256 let journal = Journal::init(
257 self.context.with_label("journal").into_present(),
258 journal_cfg,
259 )
260 .await
261 .expect("init failed");
262 let unverified_heights = self.replay(&journal).await;
263 self.journal = Some(journal);
264
265 for height in unverified_heights {
267 trace!(%height, "requesting digest for unverified height from replay");
268 self.get_digest(height);
269 }
270
271 let scheme = self
273 .scheme(self.epoch)
274 .expect("current epoch scheme must exist");
275 self.safe_tip.init(scheme.participants());
276
277 select_loop! {
278 self.context,
279 on_start => {
280 let _ = self.metrics.tip.try_set(self.tip.get());
281
282 let next = self.next();
284
285 if next.delta_from(self.tip).unwrap() < self.window {
287 trace!(%next, "requesting new digest");
288 assert!(self
289 .pending
290 .insert(next, Pending::Unverified(BTreeMap::new()))
291 .is_none());
292 self.get_digest(next);
293 continue;
294 }
295
296 let rebroadcast = match self.rebroadcast_deadlines.peek() {
298 Some((_, &deadline)) => Either::Left(self.context.sleep_until(deadline)),
299 None => Either::Right(future::pending()),
300 };
301 },
302 on_stopped => {
303 debug!("shutdown");
304 },
305 Some(epoch) = epoch_updates.recv() else {
307 error!("epoch subscription failed");
308 break;
309 } => {
310 debug!(current = %self.epoch, new = %epoch, "refresh epoch");
312 assert!(epoch >= self.epoch);
313 self.epoch = epoch;
314
315 let scheme = self
317 .scheme(self.epoch)
318 .expect("current epoch scheme must exist");
319 self.safe_tip.reconcile(scheme.participants());
320
321 let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0);
323 self.pending
324 .iter_mut()
325 .for_each(|(_, pending)| match pending {
326 self::Pending::Unverified(acks) => {
327 acks.retain(|epoch, _| *epoch >= min_epoch);
328 }
329 self::Pending::Verified(_, acks) => {
330 acks.retain(|epoch, _| *epoch >= min_epoch);
331 }
332 });
333
334 continue;
335 },
336
337 request = self.digest_requests.next_completed() => {
339 let DigestRequest {
340 height,
341 result,
342 timer,
343 } = request;
344 drop(timer); match result {
346 Err(err) => {
347 warn!(?err, %height, "automaton returned error");
348 self.metrics.digest.inc(Status::Dropped);
349 }
350 Ok(digest) => {
351 if let Err(err) = self.handle_digest(height, digest, &mut sender).await {
352 debug!(?err, %height, "handle_digest failed");
353 continue;
354 }
355 }
356 }
357 },
358
359 msg = receiver.recv() => {
361 let (sender, msg) = match msg {
363 Ok(r) => r,
364 Err(err) => {
365 warn!(?err, "ack receiver failed");
366 break;
367 }
368 };
369 let mut guard = self.metrics.acks.guard(Status::Invalid);
370 let TipAck { ack, tip } = match msg {
371 Ok(peer_ack) => peer_ack,
372 Err(err) => {
373 commonware_p2p::block!(self.blocker, sender, ?err, "ack decode failed");
374 continue;
375 }
376 };
377
378 if self.safe_tip.update(sender.clone(), tip).is_some() {
380 let safe_tip = self.safe_tip.get();
382 if safe_tip > self.tip {
383 self.fast_forward_tip(safe_tip).await;
384 }
385 }
386
387 if let Err(err) = self.validate_ack(&ack, &sender) {
389 if err.blockable() {
390 commonware_p2p::block!(
391 self.blocker,
392 sender,
393 ?err,
394 "ack validation failure"
395 );
396 } else {
397 debug!(?sender, ?err, "ack validate failed");
398 }
399 continue;
400 };
401
402 if let Err(err) = self.handle_ack(&ack).await {
404 debug!(?err, ?sender, "ack handle failed");
405 guard.set(Status::Failure);
406 continue;
407 }
408
409 debug!(?sender, epoch = %ack.epoch, height = %ack.item.height, "ack");
411 guard.set(Status::Success);
412 },
413
414 _ = rebroadcast => {
416 let (height, _) = self
418 .rebroadcast_deadlines
419 .pop()
420 .expect("no rebroadcast deadline");
421 trace!(%height, "rebroadcasting");
422 if let Err(err) = self.handle_rebroadcast(height, &mut sender).await {
423 warn!(?err, %height, "rebroadcast failed");
424 };
425 },
426 }
427
428 if let Some(journal) = self.journal.take() {
430 journal
431 .sync_all()
432 .await
433 .expect("unable to close aggregation journal");
434 }
435 }
436
437 async fn handle_digest(
441 &mut self,
442 height: Height,
443 digest: D,
444 sender: &mut WrappedSender<
445 impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
446 TipAck<P::Scheme, D>,
447 >,
448 ) -> Result<(), Error> {
449 if !matches!(self.pending.get(&height), Some(Pending::Unverified(_))) {
451 return Err(Error::AckHeight(height));
452 };
453
454 let Some(Pending::Unverified(acks)) = self.pending.remove(&height) else {
456 panic!("Pending::Unverified entry not found");
457 };
458 self.pending
459 .insert(height, Pending::Verified(digest, BTreeMap::new()));
460
461 for epoch_acks in acks.values() {
465 for epoch_ack in epoch_acks.values() {
466 if epoch_ack.item.digest != digest {
468 continue;
469 }
470
471 let _ = self.handle_ack(epoch_ack).await;
473 }
474 if self.confirmed.contains_key(&height) {
476 break;
477 }
478 }
479
480 let ack = self.sign_ack(height, digest).await?;
482
483 self.rebroadcast_deadlines
485 .put(height, self.context.current() + self.rebroadcast_timeout);
486
487 let _ = self.handle_ack(&ack).await;
489
490 self.broadcast(ack, sender).await?;
492
493 Ok(())
494 }
495
496 async fn handle_ack(&mut self, ack: &Ack<P::Scheme, D>) -> Result<(), Error> {
501 let scheme = self.scheme(ack.epoch)?;
503 let quorum = scheme.participants().quorum::<N3f1>();
504
505 let acks_by_epoch = match self.pending.get_mut(&ack.item.height) {
507 None => {
508 return Err(Error::AckHeight(ack.item.height));
511 }
512 Some(Pending::Unverified(acks)) => acks,
513 Some(Pending::Verified(digest, acks)) => {
514 if ack.item.digest != *digest {
516 return Err(Error::AckDigest(ack.item.height));
517 }
518 acks
519 }
520 };
521
522 let acks = acks_by_epoch.entry(ack.epoch).or_default();
524 if acks.contains_key(&ack.attestation.signer) {
525 return Ok(());
526 }
527 acks.insert(ack.attestation.signer, ack.clone());
528
529 let filtered = acks
531 .values()
532 .filter(|a| a.item.digest == ack.item.digest)
533 .collect::<Vec<_>>();
534 if filtered.len() >= quorum as usize {
535 if let Some(certificate) = Certificate::from_acks(&*scheme, filtered, &self.strategy) {
536 self.metrics.certificates.inc();
537 self.handle_certificate(certificate).await;
538 }
539 }
540
541 Ok(())
542 }
543
544 async fn handle_certificate(&mut self, certificate: Certificate<P::Scheme, D>) {
546 let height = certificate.item.height;
548 if self.confirmed.contains_key(&height) {
549 return;
550 }
551
552 self.confirmed.insert(height, certificate.clone());
554
555 let certified = Activity::Certified(certificate);
557 self.record(certified.clone()).await;
558 self.sync(height).await;
559 self.reporter.report(certified).await;
560
561 if height == self.tip {
563 let mut new_tip = height.next();
565 while self.confirmed.contains_key(&new_tip) && new_tip.get() < u64::MAX {
566 new_tip = new_tip.next();
567 }
568
569 if new_tip > self.tip {
571 self.fast_forward_tip(new_tip).await;
572 }
573 }
574 }
575
576 async fn handle_rebroadcast(
578 &mut self,
579 height: Height,
580 sender: &mut WrappedSender<
581 impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
582 TipAck<P::Scheme, D>,
583 >,
584 ) -> Result<(), Error> {
585 let Some(Pending::Verified(digest, acks)) = self.pending.get(&height) else {
586 return Ok(());
588 };
589
590 let scheme = self.scheme(self.epoch)?;
592 let Some(signer) = scheme.me() else {
593 return Err(Error::NotSigner(self.epoch));
594 };
595 let ack = acks
596 .get(&self.epoch)
597 .and_then(|acks| acks.get(&signer).cloned());
598 let ack = match ack {
599 Some(ack) => ack,
600 None => self.sign_ack(height, *digest).await?,
601 };
602
603 self.rebroadcast_deadlines
605 .put(height, self.context.current() + self.rebroadcast_timeout);
606
607 self.broadcast(ack, sender).await
609 }
610
611 fn validate_ack(
617 &mut self,
618 ack: &Ack<P::Scheme, D>,
619 sender: &<P::Scheme as Scheme>::PublicKey,
620 ) -> Result<(), Error> {
621 {
623 let (eb_lo, eb_hi) = self.epoch_bounds;
624 let bound_lo = self.epoch.saturating_sub(eb_lo);
625 let bound_hi = self.epoch.saturating_add(eb_hi);
626 if ack.epoch < bound_lo || ack.epoch > bound_hi {
627 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
628 }
629 }
630
631 let scheme = self.scheme(ack.epoch)?;
633 let participants = scheme.participants();
634 let Some(signer) = participants.index(sender) else {
635 return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
636 };
637 if signer != ack.attestation.signer {
638 return Err(Error::PeerMismatch);
639 }
640
641 let activity_threshold = self.tip.saturating_sub(self.activity_timeout);
643 if ack.item.height < activity_threshold {
644 return Err(Error::AckCertified(ack.item.height));
645 }
646
647 if ack
649 .item
650 .height
651 .delta_from(self.tip)
652 .is_some_and(|d| d >= self.window)
653 {
654 return Err(Error::AckHeight(ack.item.height));
655 }
656
657 if self.confirmed.contains_key(&ack.item.height) {
659 return Err(Error::AckCertified(ack.item.height));
660 }
661 let have_ack = match self.pending.get(&ack.item.height) {
662 None => false,
663 Some(Pending::Unverified(epoch_map)) => epoch_map
664 .get(&ack.epoch)
665 .is_some_and(|acks| acks.contains_key(&ack.attestation.signer)),
666 Some(Pending::Verified(digest, epoch_map)) => {
667 if ack.item.digest != *digest {
670 return Err(Error::AckDigest(ack.item.height));
671 }
672 epoch_map
673 .get(&ack.epoch)
674 .is_some_and(|acks| acks.contains_key(&ack.attestation.signer))
675 }
676 };
677 if have_ack {
678 return Err(Error::AckDuplicate(sender.to_string(), ack.item.height));
679 }
680
681 if !ack.verify(&mut self.context, &*scheme, &self.strategy) {
683 return Err(Error::InvalidAckSignature);
684 }
685
686 Ok(())
687 }
688
689 fn get_digest(&mut self, height: Height) {
695 assert!(self.pending.contains_key(&height));
696 let mut automaton = self.automaton.clone();
697 let timer = self.metrics.digest_duration.timer();
698 self.digest_requests.push(async move {
699 let receiver = automaton.propose(height).await;
700 let result = receiver.await.map_err(Error::AppProposeCanceled);
701 DigestRequest {
702 height,
703 result,
704 timer,
705 }
706 });
707 }
708
709 async fn sign_ack(&mut self, height: Height, digest: D) -> Result<Ack<P::Scheme, D>, Error> {
712 let scheme = self.scheme(self.epoch)?;
713 if scheme.me().is_none() {
714 return Err(Error::NotSigner(self.epoch));
715 }
716
717 let item = Item { height, digest };
719 let ack = Ack::sign(&*scheme, self.epoch, item).ok_or(Error::NotSigner(self.epoch))?;
720
721 self.record(Activity::Ack(ack.clone())).await;
723 self.sync(height).await;
724
725 Ok(ack)
726 }
727
728 async fn broadcast(
732 &mut self,
733 ack: Ack<P::Scheme, D>,
734 sender: &mut WrappedSender<
735 impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
736 TipAck<P::Scheme, D>,
737 >,
738 ) -> Result<(), Error> {
739 sender
740 .send(
741 Recipients::All,
742 TipAck { ack, tip: self.tip },
743 self.priority_acks,
744 )
745 .await
746 .map_err(|err| {
747 warn!(?err, "failed to send ack");
748 Error::UnableToSendMessage
749 })?;
750 Ok(())
751 }
752
753 fn next(&self) -> Height {
756 let max_pending = self
757 .pending
758 .last_key_value()
759 .map(|(k, _)| k.next())
760 .unwrap_or_default();
761 let max_confirmed = self
762 .confirmed
763 .last_key_value()
764 .map(|(k, _)| k.next())
765 .unwrap_or_default();
766 max(self.tip, max(max_pending, max_confirmed))
767 }
768
769 async fn fast_forward_tip(&mut self, tip: Height) {
775 assert!(tip > self.tip);
776
777 let activity_threshold = tip.saturating_sub(self.activity_timeout);
779 self.pending
780 .retain(|height, _| *height >= activity_threshold);
781 self.confirmed
782 .retain(|height, _| *height >= activity_threshold);
783
784 self.record(Activity::Tip(tip)).await;
786 self.sync(tip).await;
787 self.reporter.report(Activity::Tip(tip)).await;
788
789 let section = self.get_journal_section(activity_threshold);
791 let journal = self.journal.as_mut().expect("journal must be initialized");
792 let _ = journal.prune(section).await;
793
794 self.tip = tip;
796 }
797
798 const fn get_journal_section(&self, height: Height) -> u64 {
802 height.get() / self.journal_heights_per_section.get()
803 }
804
805 async fn replay(&mut self, journal: &Journal<E, Activity<P::Scheme, D>>) -> Vec<Height> {
808 let mut tip = Height::default();
809 let mut certified = Vec::new();
810 let mut acks = Vec::new();
811 let stream = journal
812 .replay(0, 0, self.journal_replay_buffer)
813 .await
814 .expect("replay failed");
815 pin_mut!(stream);
816 while let Some(msg) = stream.next().await {
817 let (_, _, _, activity) = msg.expect("replay failed");
818 match activity {
819 Activity::Tip(height) => {
820 tip = max(tip, height);
821 self.reporter.report(Activity::Tip(height)).await;
822 }
823 Activity::Certified(certificate) => {
824 certified.push(certificate.clone());
825 self.reporter.report(Activity::Certified(certificate)).await;
826 }
827 Activity::Ack(ack) => {
828 acks.push(ack.clone());
829 self.reporter.report(Activity::Ack(ack)).await;
830 }
831 }
832 }
833
834 self.tip = tip;
836 let activity_threshold = tip.saturating_sub(self.activity_timeout);
837
838 certified
840 .iter()
841 .filter(|certificate| certificate.item.height >= activity_threshold)
842 .for_each(|certificate| {
843 self.confirmed
844 .insert(certificate.item.height, certificate.clone());
845 });
846
847 let mut acks_by_height: BTreeMap<Height, Vec<Ack<P::Scheme, D>>> = BTreeMap::new();
849 for ack in acks {
850 if ack.item.height >= activity_threshold
851 && !self.confirmed.contains_key(&ack.item.height)
852 {
853 acks_by_height.entry(ack.item.height).or_default().push(ack);
854 }
855 }
856
857 let mut unverified = Vec::new();
859 for (height, mut acks_group) in acks_by_height {
860 let current_scheme = self.scheme(self.epoch).ok();
862 let our_signer = current_scheme.as_ref().and_then(|s| s.me());
863 let our_digest = our_signer.and_then(|signer| {
864 acks_group
865 .iter()
866 .find(|ack| ack.epoch == self.epoch && ack.attestation.signer == signer)
867 .map(|ack| ack.item.digest)
868 });
869
870 if let Some(digest) = our_digest {
872 acks_group.retain(|other| other.item.digest == digest);
873 }
874
875 let mut epoch_map = BTreeMap::new();
877 for ack in acks_group {
878 epoch_map
879 .entry(ack.epoch)
880 .or_insert_with(BTreeMap::new)
881 .insert(ack.attestation.signer, ack);
882 }
883
884 match our_digest {
887 Some(digest) => {
888 self.pending
889 .insert(height, Pending::Verified(digest, epoch_map));
890
891 self.rebroadcast_deadlines
893 .put(height, self.context.current());
894 }
895 None => {
896 self.pending.insert(height, Pending::Unverified(epoch_map));
897
898 unverified.push(height);
900 }
901 }
902 }
903
904 let next = self.next();
907 for height in Height::range(self.tip, next) {
908 if self.pending.contains_key(&height) || self.confirmed.contains_key(&height) {
910 continue;
911 }
912
913 self.pending
915 .insert(height, Pending::Unverified(BTreeMap::new()));
916 unverified.push(height);
917 }
918 info!(tip = %self.tip, %next, ?unverified, "replayed journal");
919
920 unverified
921 }
922
923 async fn record(&mut self, activity: Activity<P::Scheme, D>) {
925 let height = match activity {
926 Activity::Ack(ref ack) => ack.item.height,
927 Activity::Certified(ref certificate) => certificate.item.height,
928 Activity::Tip(h) => h,
929 };
930 let section = self.get_journal_section(height);
931 self.journal
932 .as_mut()
933 .expect("journal must be initialized")
934 .append(section, &activity)
935 .await
936 .expect("unable to append to journal");
937 }
938
939 async fn sync(&mut self, height: Height) {
941 let section = self.get_journal_section(height);
942 let journal = self.journal.as_mut().expect("journal must be initialized");
943 journal.sync(section).await.expect("unable to sync journal");
944 }
945}