1use super::{
4 metrics,
5 safe_tip::SafeTip,
6 types::{Ack, Activity, Error, Item, TipAck},
7 Config,
8};
9use crate::{
10 aggregation::{scheme, types::Certificate},
11 types::{Epoch, EpochDelta, Height, HeightDelta, Participant},
12 Automaton, Monitor, Reporter,
13};
14use commonware_cryptography::{
15 certificate::{Provider, Scheme},
16 Digest,
17};
18use commonware_macros::select_loop;
19use commonware_p2p::{
20 utils::codec::{wrap, WrappedSender},
21 Blocker, Receiver, Recipients, Sender,
22};
23use commonware_parallel::Strategy;
24use commonware_runtime::{
25 buffer::paged::CacheRef,
26 spawn_cell,
27 telemetry::metrics::{histogram, status::Status, GaugeExt},
28 BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
29};
30use commonware_storage::journal::segmented::variable::{Config as JConfig, Journal};
31use commonware_utils::{futures::Pool as FuturesPool, ordered::Quorum, N3f1, PrioritySet};
32use futures::{
33 future::{self, Either},
34 pin_mut, StreamExt,
35};
36use rand_core::CryptoRngCore;
37use std::{
38 cmp::max,
39 collections::BTreeMap,
40 num::{NonZeroU64, NonZeroUsize},
41 sync::Arc,
42 time::{Duration, SystemTime},
43};
44use tracing::{debug, error, info, trace, warn};
45
46enum Pending<S: Scheme, D: Digest> {
48 Unverified(BTreeMap<Epoch, BTreeMap<Participant, Ack<S, D>>>),
51
52 Verified(D, BTreeMap<Epoch, BTreeMap<Participant, Ack<S, D>>>),
54}
55
56struct DigestRequest<D: Digest> {
59 height: Height,
61
62 result: Result<D, Error>,
64
65 timer: histogram::Timer,
67}
68
69pub struct Engine<
71 E: BufferPooler + Clock + Spawner + Storage + Metrics + CryptoRngCore,
72 P: Provider<Scope = Epoch>,
73 D: Digest,
74 A: Automaton<Context = Height, Digest = D>,
75 Z: Reporter<Activity = Activity<P::Scheme, D>>,
76 M: Monitor<Index = Epoch>,
77 B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
78 T: Strategy,
79> {
80 context: ContextCell<E>,
82 automaton: A,
83 monitor: M,
84 provider: P,
85 reporter: Z,
86 blocker: B,
87 strategy: T,
88
89 epoch_bounds: (EpochDelta, EpochDelta),
98
99 window: HeightDelta,
101
102 activity_timeout: HeightDelta,
104
105 digest_requests: FuturesPool<DigestRequest<D>>,
108
109 epoch: Epoch,
112
113 tip: Height,
115
116 safe_tip: SafeTip<<P::Scheme as Scheme>::PublicKey>,
118
119 pending: BTreeMap<Height, Pending<P::Scheme, D>>,
123
124 confirmed: BTreeMap<Height, Certificate<P::Scheme, D>>,
126
127 rebroadcast_timeout: Duration,
130
131 rebroadcast_deadlines: PrioritySet<Height, SystemTime>,
133
134 journal: Option<Journal<E, Activity<P::Scheme, D>>>,
137 journal_partition: String,
138 journal_write_buffer: NonZeroUsize,
139 journal_replay_buffer: NonZeroUsize,
140 journal_heights_per_section: NonZeroU64,
141 journal_compression: Option<u8>,
142 journal_page_cache: CacheRef,
143
144 priority_acks: bool,
147
148 metrics: metrics::Metrics,
151}
152
153impl<
154 E: BufferPooler + Clock + Spawner + Storage + Metrics + CryptoRngCore,
155 P: Provider<Scope = Epoch, Scheme: scheme::Scheme<D>>,
156 D: Digest,
157 A: Automaton<Context = Height, Digest = D>,
158 Z: Reporter<Activity = Activity<P::Scheme, D>>,
159 M: Monitor<Index = Epoch>,
160 B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
161 T: Strategy,
162 > Engine<E, P, D, A, Z, M, B, T>
163{
164 pub fn new(context: E, cfg: Config<P, D, A, Z, M, B, T>) -> Self {
166 let metrics = metrics::Metrics::init(&context);
167
168 Self {
169 context: ContextCell::new(context),
170 automaton: cfg.automaton,
171 reporter: cfg.reporter,
172 monitor: cfg.monitor,
173 provider: cfg.provider,
174 blocker: cfg.blocker,
175 strategy: cfg.strategy,
176 epoch_bounds: cfg.epoch_bounds,
177 window: HeightDelta::new(cfg.window.into()),
178 activity_timeout: cfg.activity_timeout,
179 epoch: Epoch::zero(),
180 tip: Height::zero(),
181 safe_tip: SafeTip::default(),
182 digest_requests: FuturesPool::default(),
183 pending: BTreeMap::new(),
184 confirmed: BTreeMap::new(),
185 rebroadcast_timeout: cfg.rebroadcast_timeout.into(),
186 rebroadcast_deadlines: PrioritySet::new(),
187 journal: None,
188 journal_partition: cfg.journal_partition,
189 journal_write_buffer: cfg.journal_write_buffer,
190 journal_replay_buffer: cfg.journal_replay_buffer,
191 journal_heights_per_section: cfg.journal_heights_per_section,
192 journal_compression: cfg.journal_compression,
193 journal_page_cache: cfg.journal_page_cache,
194 priority_acks: cfg.priority_acks,
195 metrics,
196 }
197 }
198
199 fn scheme(&self, epoch: Epoch) -> Result<Arc<P::Scheme>, Error> {
201 self.provider
202 .scoped(epoch)
203 .ok_or(Error::UnknownEpoch(epoch))
204 }
205
206 pub fn start(
216 mut self,
217 network: (
218 impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
219 impl Receiver<PublicKey = <P::Scheme as Scheme>::PublicKey>,
220 ),
221 ) -> Handle<()> {
222 spawn_cell!(self.context, self.run(network))
223 }
224
225 async fn run(
227 mut self,
228 network: (
229 impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
230 impl Receiver<PublicKey = <P::Scheme as Scheme>::PublicKey>,
231 ),
232 ) {
233 let (mut sender, mut receiver) = wrap(
234 (),
235 self.context.network_buffer_pool().clone(),
236 network.0,
237 network.1,
238 );
239
240 let (latest, mut epoch_updates) = self.monitor.subscribe().await;
242 self.epoch = latest;
243
244 let journal_cfg = JConfig {
246 partition: self.journal_partition.clone(),
247 compression: self.journal_compression,
248 codec_config: P::Scheme::certificate_codec_config_unbounded(),
249 page_cache: self.journal_page_cache.clone(),
250 write_buffer: self.journal_write_buffer,
251 };
252 let journal = Journal::init(self.context.child("journal"), journal_cfg)
253 .await
254 .expect("init failed");
255 let unverified_heights = self.replay(&journal).await;
256 self.journal = Some(journal);
257
258 for height in unverified_heights {
260 trace!(%height, "requesting digest for unverified height from replay");
261 self.get_digest(height);
262 }
263
264 let scheme = self
266 .scheme(self.epoch)
267 .expect("current epoch scheme must exist");
268 self.safe_tip.init(scheme.participants());
269
270 select_loop! {
271 self.context,
272 on_start => {
273 let _ = self.metrics.tip.try_set(self.tip.get());
274
275 let next = self.next();
277
278 if next.delta_from(self.tip).unwrap() < self.window {
280 trace!(%next, "requesting new digest");
281 assert!(self
282 .pending
283 .insert(next, Pending::Unverified(BTreeMap::new()))
284 .is_none());
285 self.get_digest(next);
286 continue;
287 }
288
289 let rebroadcast = match self.rebroadcast_deadlines.peek() {
291 Some((_, &deadline)) => Either::Left(self.context.sleep_until(deadline)),
292 None => Either::Right(future::pending()),
293 };
294 },
295 on_stopped => {
296 debug!("shutdown");
297 },
298 Some(epoch) = epoch_updates.recv() else {
300 error!("epoch subscription failed");
301 break;
302 } => {
303 debug!(current = %self.epoch, new = %epoch, "refresh epoch");
305 assert!(epoch >= self.epoch);
306 self.epoch = epoch;
307
308 let scheme = self
310 .scheme(self.epoch)
311 .expect("current epoch scheme must exist");
312 self.safe_tip.reconcile(scheme.participants());
313
314 let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0);
316 self.pending
317 .iter_mut()
318 .for_each(|(_, pending)| match pending {
319 self::Pending::Unverified(acks) => {
320 acks.retain(|epoch, _| *epoch >= min_epoch);
321 }
322 self::Pending::Verified(_, acks) => {
323 acks.retain(|epoch, _| *epoch >= min_epoch);
324 }
325 });
326
327 continue;
328 },
329
330 request = self.digest_requests.next_completed() => {
332 let DigestRequest {
333 height,
334 result,
335 timer,
336 } = request;
337 match result {
338 Err(err) => {
339 warn!(?err, %height, "automaton returned error");
340 self.metrics.digest.inc(Status::Dropped);
341 }
342 Ok(digest) => {
343 timer.observe(self.context.as_ref());
344 if let Err(err) = self.handle_digest(height, digest, &mut sender).await {
345 debug!(?err, %height, "handle_digest failed");
346 continue;
347 }
348 }
349 }
350 },
351
352 msg = receiver.recv() => {
354 let (sender, msg) = match msg {
356 Ok(r) => r,
357 Err(err) => {
358 warn!(?err, "ack receiver failed");
359 break;
360 }
361 };
362 let mut guard = self.metrics.acks.guard(Status::Invalid);
363 let TipAck { ack, tip } = match msg {
364 Ok(peer_ack) => peer_ack,
365 Err(err) => {
366 commonware_p2p::block!(self.blocker, sender, ?err, "ack decode failed");
367 continue;
368 }
369 };
370
371 if self.safe_tip.update(sender.clone(), tip).is_some() {
373 let safe_tip = self.safe_tip.get();
375 if safe_tip > self.tip {
376 self.fast_forward_tip(safe_tip).await;
377 }
378 }
379
380 if let Err(err) = self.validate_ack(&ack, &sender) {
382 if err.blockable() {
383 commonware_p2p::block!(
384 self.blocker,
385 sender,
386 ?err,
387 "ack validation failure"
388 );
389 } else {
390 debug!(?sender, ?err, "ack validate failed");
391 }
392 continue;
393 };
394
395 if let Err(err) = self.handle_ack(&ack).await {
397 debug!(?err, ?sender, "ack handle failed");
398 guard.set(Status::Failure);
399 continue;
400 }
401
402 debug!(?sender, epoch = %ack.epoch, height = %ack.item.height, "ack");
404 guard.set(Status::Success);
405 },
406
407 _ = rebroadcast => {
409 let (height, _) = self
411 .rebroadcast_deadlines
412 .pop()
413 .expect("no rebroadcast deadline");
414 trace!(%height, "rebroadcasting");
415 if let Err(err) = self.handle_rebroadcast(height, &mut sender).await {
416 warn!(?err, %height, "rebroadcast failed");
417 };
418 },
419 }
420
421 if let Some(journal) = self.journal.take() {
423 journal
424 .sync_all()
425 .await
426 .expect("unable to close aggregation journal");
427 }
428 }
429
430 async fn handle_digest(
434 &mut self,
435 height: Height,
436 digest: D,
437 sender: &mut WrappedSender<
438 impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
439 TipAck<P::Scheme, D>,
440 >,
441 ) -> Result<(), Error> {
442 if !matches!(self.pending.get(&height), Some(Pending::Unverified(_))) {
444 return Err(Error::AckHeight(height));
445 };
446
447 let Some(Pending::Unverified(acks)) = self.pending.remove(&height) else {
449 panic!("Pending::Unverified entry not found");
450 };
451 self.pending
452 .insert(height, Pending::Verified(digest, BTreeMap::new()));
453
454 for epoch_acks in acks.values() {
458 for epoch_ack in epoch_acks.values() {
459 if epoch_ack.item.digest != digest {
461 continue;
462 }
463
464 let _ = self.handle_ack(epoch_ack).await;
466 }
467 if self.confirmed.contains_key(&height) {
469 break;
470 }
471 }
472
473 let ack = self.sign_ack(height, digest).await?;
475
476 self.rebroadcast_deadlines
478 .put(height, self.context.current() + self.rebroadcast_timeout);
479
480 let _ = self.handle_ack(&ack).await;
482
483 self.broadcast(ack, sender);
485
486 Ok(())
487 }
488
489 async fn handle_ack(&mut self, ack: &Ack<P::Scheme, D>) -> Result<(), Error> {
494 let scheme = self.scheme(ack.epoch)?;
496 let quorum = scheme.participants().quorum::<N3f1>();
497
498 let acks_by_epoch = match self.pending.get_mut(&ack.item.height) {
500 None => {
501 return Err(Error::AckHeight(ack.item.height));
504 }
505 Some(Pending::Unverified(acks)) => acks,
506 Some(Pending::Verified(digest, acks)) => {
507 if ack.item.digest != *digest {
509 return Err(Error::AckDigest(ack.item.height));
510 }
511 acks
512 }
513 };
514
515 let acks = acks_by_epoch.entry(ack.epoch).or_default();
517 if acks.contains_key(&ack.attestation.signer) {
518 return Ok(());
519 }
520 acks.insert(ack.attestation.signer, ack.clone());
521
522 let filtered = acks
524 .values()
525 .filter(|a| a.item.digest == ack.item.digest)
526 .collect::<Vec<_>>();
527 if filtered.len() >= quorum as usize {
528 if let Some(certificate) = Certificate::from_acks(&*scheme, filtered, &self.strategy) {
529 self.metrics.certificates.inc();
530 self.handle_certificate(certificate).await;
531 }
532 }
533
534 Ok(())
535 }
536
537 async fn handle_certificate(&mut self, certificate: Certificate<P::Scheme, D>) {
539 let height = certificate.item.height;
541 if self.confirmed.contains_key(&height) {
542 return;
543 }
544
545 self.confirmed.insert(height, certificate.clone());
547
548 let certified = Activity::Certified(certificate);
550 self.record(certified.clone()).await;
551 self.sync(height).await;
552 self.reporter.report(certified);
553
554 if height == self.tip {
556 let mut new_tip = height.next();
558 while self.confirmed.contains_key(&new_tip) && new_tip.get() < u64::MAX {
559 new_tip = new_tip.next();
560 }
561
562 if new_tip > self.tip {
564 self.fast_forward_tip(new_tip).await;
565 }
566 }
567 }
568
569 async fn handle_rebroadcast(
571 &mut self,
572 height: Height,
573 sender: &mut WrappedSender<
574 impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
575 TipAck<P::Scheme, D>,
576 >,
577 ) -> Result<(), Error> {
578 let Some(Pending::Verified(digest, acks)) = self.pending.get(&height) else {
579 return Ok(());
581 };
582
583 let scheme = self.scheme(self.epoch)?;
585 let Some(signer) = scheme.me() else {
586 return Err(Error::NotSigner(self.epoch));
587 };
588 let ack = acks
589 .get(&self.epoch)
590 .and_then(|acks| acks.get(&signer).cloned());
591 let ack = match ack {
592 Some(ack) => ack,
593 None => self.sign_ack(height, *digest).await?,
594 };
595
596 self.rebroadcast_deadlines
598 .put(height, self.context.current() + self.rebroadcast_timeout);
599
600 self.broadcast(ack, sender);
602
603 Ok(())
604 }
605
606 fn validate_ack(
612 &mut self,
613 ack: &Ack<P::Scheme, D>,
614 sender: &<P::Scheme as Scheme>::PublicKey,
615 ) -> Result<(), Error> {
616 {
618 let (eb_lo, eb_hi) = self.epoch_bounds;
619 let bound_lo = self.epoch.saturating_sub(eb_lo);
620 let bound_hi = self.epoch.saturating_add(eb_hi);
621 if ack.epoch < bound_lo || ack.epoch > bound_hi {
622 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
623 }
624 }
625
626 let scheme = self.scheme(ack.epoch)?;
628 let participants = scheme.participants();
629 let Some(signer) = participants.index(sender) else {
630 return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
631 };
632 if signer != ack.attestation.signer {
633 return Err(Error::PeerMismatch);
634 }
635
636 let activity_threshold = self.tip.saturating_sub(self.activity_timeout);
638 if ack.item.height < activity_threshold {
639 return Err(Error::AckCertified(ack.item.height));
640 }
641
642 if ack
644 .item
645 .height
646 .delta_from(self.tip)
647 .is_some_and(|d| d >= self.window)
648 {
649 return Err(Error::AckHeight(ack.item.height));
650 }
651
652 if self.confirmed.contains_key(&ack.item.height) {
654 return Err(Error::AckCertified(ack.item.height));
655 }
656 let have_ack = match self.pending.get(&ack.item.height) {
657 None => false,
658 Some(Pending::Unverified(epoch_map)) => epoch_map
659 .get(&ack.epoch)
660 .is_some_and(|acks| acks.contains_key(&ack.attestation.signer)),
661 Some(Pending::Verified(digest, epoch_map)) => {
662 if ack.item.digest != *digest {
665 return Err(Error::AckDigest(ack.item.height));
666 }
667 epoch_map
668 .get(&ack.epoch)
669 .is_some_and(|acks| acks.contains_key(&ack.attestation.signer))
670 }
671 };
672 if have_ack {
673 return Err(Error::AckDuplicate(sender.to_string(), ack.item.height));
674 }
675
676 if !ack.verify(self.context.as_mut(), &*scheme, &self.strategy) {
678 return Err(Error::InvalidAckSignature);
679 }
680
681 Ok(())
682 }
683
684 fn get_digest(&mut self, height: Height) {
690 assert!(self.pending.contains_key(&height));
691 let mut automaton = self.automaton.clone();
692 let timer = self.metrics.digest_duration.timer(self.context.as_ref());
693 self.digest_requests.push(async move {
694 let receiver = automaton.propose(height).await;
695 let result = receiver.await.map_err(Error::AppProposeCanceled);
696 DigestRequest {
697 height,
698 result,
699 timer,
700 }
701 });
702 }
703
704 async fn sign_ack(&mut self, height: Height, digest: D) -> Result<Ack<P::Scheme, D>, Error> {
707 let scheme = self.scheme(self.epoch)?;
708 if scheme.me().is_none() {
709 return Err(Error::NotSigner(self.epoch));
710 }
711
712 let item = Item { height, digest };
714 let ack = Ack::sign(&*scheme, self.epoch, item).ok_or(Error::NotSigner(self.epoch))?;
715
716 self.record(Activity::Ack(ack.clone())).await;
718 self.sync(height).await;
719
720 Ok(ack)
721 }
722
723 fn broadcast(
725 &mut self,
726 ack: Ack<P::Scheme, D>,
727 sender: &mut WrappedSender<
728 impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
729 TipAck<P::Scheme, D>,
730 >,
731 ) {
732 sender.send(
733 Recipients::All,
734 TipAck { ack, tip: self.tip },
735 self.priority_acks,
736 );
737 }
738
739 fn next(&self) -> Height {
742 let max_pending = self
743 .pending
744 .last_key_value()
745 .map(|(k, _)| k.next())
746 .unwrap_or_default();
747 let max_confirmed = self
748 .confirmed
749 .last_key_value()
750 .map(|(k, _)| k.next())
751 .unwrap_or_default();
752 max(self.tip, max(max_pending, max_confirmed))
753 }
754
755 async fn fast_forward_tip(&mut self, tip: Height) {
761 assert!(tip > self.tip);
762
763 let activity_threshold = tip.saturating_sub(self.activity_timeout);
765 self.pending
766 .retain(|height, _| *height >= activity_threshold);
767 self.confirmed
768 .retain(|height, _| *height >= activity_threshold);
769
770 self.record(Activity::Tip(tip)).await;
772 self.sync(tip).await;
773 self.reporter.report(Activity::Tip(tip));
774
775 let section = self.get_journal_section(activity_threshold);
777 let journal = self.journal.as_mut().expect("journal must be initialized");
778 let _ = journal.prune(section).await;
779
780 self.tip = tip;
782 }
783
784 const fn get_journal_section(&self, height: Height) -> u64 {
788 height.get() / self.journal_heights_per_section.get()
789 }
790
791 async fn replay(&mut self, journal: &Journal<E, Activity<P::Scheme, D>>) -> Vec<Height> {
794 let mut tip = Height::default();
795 let mut certified = Vec::new();
796 let mut acks = Vec::new();
797 let stream = journal
798 .replay(0, 0, self.journal_replay_buffer)
799 .await
800 .expect("replay failed");
801 pin_mut!(stream);
802 while let Some(msg) = stream.next().await {
803 let (_, _, _, activity) = msg.expect("replay failed");
804 match activity {
805 Activity::Tip(height) => {
806 tip = max(tip, height);
807 self.reporter.report(Activity::Tip(height));
808 }
809 Activity::Certified(certificate) => {
810 certified.push(certificate.clone());
811 self.reporter.report(Activity::Certified(certificate));
812 }
813 Activity::Ack(ack) => {
814 acks.push(ack.clone());
815 self.reporter.report(Activity::Ack(ack));
816 }
817 }
818 }
819
820 self.tip = tip;
822 let activity_threshold = tip.saturating_sub(self.activity_timeout);
823
824 certified
826 .iter()
827 .filter(|certificate| certificate.item.height >= activity_threshold)
828 .for_each(|certificate| {
829 self.confirmed
830 .insert(certificate.item.height, certificate.clone());
831 });
832
833 let mut acks_by_height: BTreeMap<Height, Vec<Ack<P::Scheme, D>>> = BTreeMap::new();
835 for ack in acks {
836 if ack.item.height >= activity_threshold
837 && !self.confirmed.contains_key(&ack.item.height)
838 {
839 acks_by_height.entry(ack.item.height).or_default().push(ack);
840 }
841 }
842
843 let mut unverified = Vec::new();
845 for (height, mut acks_group) in acks_by_height {
846 let current_scheme = self.scheme(self.epoch).ok();
848 let our_signer = current_scheme.as_ref().and_then(|s| s.me());
849 let our_digest = our_signer.and_then(|signer| {
850 acks_group
851 .iter()
852 .find(|ack| ack.epoch == self.epoch && ack.attestation.signer == signer)
853 .map(|ack| ack.item.digest)
854 });
855
856 if let Some(digest) = our_digest {
858 acks_group.retain(|other| other.item.digest == digest);
859 }
860
861 let mut epoch_map = BTreeMap::new();
863 for ack in acks_group {
864 epoch_map
865 .entry(ack.epoch)
866 .or_insert_with(BTreeMap::new)
867 .insert(ack.attestation.signer, ack);
868 }
869
870 match our_digest {
873 Some(digest) => {
874 self.pending
875 .insert(height, Pending::Verified(digest, epoch_map));
876
877 self.rebroadcast_deadlines
879 .put(height, self.context.current());
880 }
881 None => {
882 self.pending.insert(height, Pending::Unverified(epoch_map));
883
884 unverified.push(height);
886 }
887 }
888 }
889
890 let next = self.next();
893 for height in Height::range(self.tip, next) {
894 if self.pending.contains_key(&height) || self.confirmed.contains_key(&height) {
896 continue;
897 }
898
899 self.pending
901 .insert(height, Pending::Unverified(BTreeMap::new()));
902 unverified.push(height);
903 }
904 info!(tip = %self.tip, %next, ?unverified, "replayed journal");
905
906 unverified
907 }
908
909 async fn record(&mut self, activity: Activity<P::Scheme, D>) {
911 let height = match activity {
912 Activity::Ack(ref ack) => ack.item.height,
913 Activity::Certified(ref certificate) => certificate.item.height,
914 Activity::Tip(h) => h,
915 };
916 let section = self.get_journal_section(height);
917 self.journal
918 .as_mut()
919 .expect("journal must be initialized")
920 .append(section, &activity)
921 .await
922 .expect("unable to append to journal");
923 }
924
925 async fn sync(&mut self, height: Height) {
927 let section = self.get_journal_section(height);
928 let journal = self.journal.as_mut().expect("journal must be initialized");
929 journal.sync(section).await.expect("unable to sync journal");
930 }
931}