1use super::{
11 metrics,
12 types::{Ack, Activity, Chunk, Context, Epoch, Error, Lock, Node, Parent, Proposal},
13 AckManager, Config, TipManager,
14};
15use crate::{Automaton, Monitor, Relay, Reporter, Supervisor, ThresholdSupervisor};
16use commonware_cryptography::{
17 bls12381::primitives::{group, poly, variant::Variant},
18 Digest, PublicKey, Signer,
19};
20use commonware_macros::select;
21use commonware_p2p::{
22 utils::codec::{wrap, WrappedSender},
23 Receiver, Recipients, Sender,
24};
25use commonware_runtime::{
26 telemetry::metrics::{
27 histogram,
28 status::{CounterExt, Status},
29 },
30 Clock, Handle, Metrics, Spawner, Storage,
31};
32use commonware_storage::journal::{self, variable::Journal};
33use commonware_utils::futures::Pool as FuturesPool;
34use futures::{
35 channel::oneshot,
36 future::{self, Either},
37 pin_mut, StreamExt,
38};
39use std::{
40 collections::BTreeMap,
41 marker::PhantomData,
42 time::{Duration, SystemTime},
43};
44use tracing::{debug, error, info, warn};
45
46struct Verify<C: PublicKey, D: Digest, E: Clock> {
48 timer: histogram::Timer<E>,
49 context: Context<C>,
50 payload: D,
51 result: Result<bool, Error>,
52}
53
54pub struct Engine<
56 E: Clock + Spawner + Storage + Metrics,
57 C: Signer,
58 V: Variant,
59 D: Digest,
60 A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
61 R: Relay<Digest = D>,
62 Z: Reporter<Activity = Activity<C::PublicKey, V, D>>,
63 M: Monitor<Index = Epoch>,
64 Su: Supervisor<Index = Epoch, PublicKey = C::PublicKey>,
65 TSu: ThresholdSupervisor<
66 Index = Epoch,
67 PublicKey = C::PublicKey,
68 Identity = V::Public,
69 Polynomial = poly::Public<V>,
70 Share = group::Share,
71 >,
72 NetS: Sender<PublicKey = C::PublicKey>,
73 NetR: Receiver<PublicKey = C::PublicKey>,
74> {
75 context: E,
79 crypto: C,
80 automaton: A,
81 relay: R,
82 monitor: M,
83 sequencers: Su,
84 validators: TSu,
85 reporter: Z,
86
87 namespace: Vec<u8>,
93
94 rebroadcast_timeout: Duration,
100 rebroadcast_deadline: Option<SystemTime>,
101
102 epoch_bounds: (u64, u64),
114
115 height_bound: u64,
121
122 pending_verifies: FuturesPool<Verify<C::PublicKey, D, E>>,
134
135 journal_heights_per_section: u64,
141
142 journal_replay_buffer: usize,
144
145 journal_write_buffer: usize,
147
148 journal_name_prefix: String,
151
152 journal_compression: Option<u8>,
154
155 #[allow(clippy::type_complexity)]
157 journals: BTreeMap<C::PublicKey, Journal<E, Node<C::PublicKey, V, D>>>,
158
159 tip_manager: TipManager<C::PublicKey, V, D>,
168
169 ack_manager: AckManager<C::PublicKey, V, D>,
172
173 epoch: Epoch,
175
176 priority_proposals: bool,
182
183 priority_acks: bool,
185
186 _phantom: PhantomData<(NetS, NetR)>,
188
189 metrics: metrics::Metrics<E>,
195
196 propose_timer: Option<histogram::Timer<E>>,
198}
199
200impl<
201 E: Clock + Spawner + Storage + Metrics,
202 C: Signer,
203 V: Variant,
204 D: Digest,
205 A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
206 R: Relay<Digest = D>,
207 Z: Reporter<Activity = Activity<C::PublicKey, V, D>>,
208 M: Monitor<Index = Epoch>,
209 Su: Supervisor<Index = Epoch, PublicKey = C::PublicKey>,
210 TSu: ThresholdSupervisor<
211 Index = Epoch,
212 PublicKey = C::PublicKey,
213 Identity = V::Public,
214 Polynomial = poly::Public<V>,
215 Share = group::Share,
216 >,
217 NetS: Sender<PublicKey = C::PublicKey>,
218 NetR: Receiver<PublicKey = C::PublicKey>,
219 > Engine<E, C, V, D, A, R, Z, M, Su, TSu, NetS, NetR>
220{
221 pub fn new(context: E, cfg: Config<C, V, D, A, R, Z, M, Su, TSu>) -> Self {
223 let metrics = metrics::Metrics::init(context.clone());
224
225 Self {
226 context,
227 crypto: cfg.crypto,
228 automaton: cfg.automaton,
229 relay: cfg.relay,
230 reporter: cfg.reporter,
231 monitor: cfg.monitor,
232 sequencers: cfg.sequencers,
233 validators: cfg.validators,
234 namespace: cfg.namespace,
235 rebroadcast_timeout: cfg.rebroadcast_timeout,
236 rebroadcast_deadline: None,
237 epoch_bounds: cfg.epoch_bounds,
238 height_bound: cfg.height_bound,
239 pending_verifies: FuturesPool::default(),
240 journal_heights_per_section: cfg.journal_heights_per_section,
241 journal_replay_buffer: cfg.journal_replay_buffer,
242 journal_write_buffer: cfg.journal_write_buffer,
243 journal_name_prefix: cfg.journal_name_prefix,
244 journal_compression: cfg.journal_compression,
245 journals: BTreeMap::new(),
246 tip_manager: TipManager::<C::PublicKey, V, D>::new(),
247 ack_manager: AckManager::<C::PublicKey, V, D>::new(),
248 epoch: 0,
249 priority_proposals: cfg.priority_proposals,
250 priority_acks: cfg.priority_acks,
251 _phantom: PhantomData,
252 metrics,
253 propose_timer: None,
254 }
255 }
256
257 pub fn start(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) -> Handle<()> {
268 self.context.spawn_ref()(self.run(chunk_network, ack_network))
269 }
270
271 async fn run(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) {
273 let (mut node_sender, mut node_receiver) = wrap((), chunk_network.0, chunk_network.1);
274 let (mut ack_sender, mut ack_receiver) = wrap((), ack_network.0, ack_network.1);
275 let mut shutdown = self.context.stopped();
276
277 let mut pending: Option<(Context<C::PublicKey>, oneshot::Receiver<D>)> = None;
279
280 let (latest, mut epoch_updates) = self.monitor.subscribe().await;
282 self.epoch = latest;
283
284 self.journal_prepare(&self.crypto.public_key()).await;
287 if let Err(err) = self.rebroadcast(&mut node_sender).await {
288 info!(?err, "initial rebroadcast failed");
290 }
291
292 loop {
293 if pending.is_none() {
295 if let Some(context) = self.should_propose() {
296 let receiver = self.automaton.propose(context.clone()).await;
297 pending = Some((context, receiver));
298 }
299 }
300
301 let rebroadcast = match self.rebroadcast_deadline {
305 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
306 None => Either::Right(future::pending()),
307 };
308 let propose = match &mut pending {
309 Some((_context, receiver)) => Either::Left(receiver),
310 None => Either::Right(futures::future::pending()),
311 };
312
313 select! {
315 _ = &mut shutdown => {
317 debug!("shutdown");
318 break;
319 },
320
321 epoch = epoch_updates.next() => {
323 let Some(epoch) = epoch else {
325 error!("epoch subscription failed");
326 break;
327 };
328
329 debug!(current=self.epoch, new=epoch, "refresh epoch");
331 assert!(epoch >= self.epoch);
332 self.epoch = epoch;
333 continue;
334 },
335
336 _ = rebroadcast => {
338 debug!(epoch = self.epoch, sender=?self.crypto.public_key(), "rebroadcast");
339 if let Err(err) = self.rebroadcast(&mut node_sender).await {
340 info!(?err, "rebroadcast failed");
341 continue;
342 }
343 },
344
345 receiver = propose => {
347 let (context, _) = pending.take().unwrap();
349 debug!(height = context.height, "propose");
350
351 let Ok(payload) = receiver else {
353 warn!(?context, "automaton dropped proposal");
354 continue;
355 };
356
357 if let Err(err) = self.propose(context.clone(), payload, &mut node_sender).await {
359 warn!(?err, ?context, "propose new failed");
360 continue;
361 }
362 },
363
364 msg = node_receiver.recv() => {
366 let (sender, msg) = match msg {
368 Ok(r) => r,
369 Err(err) => {
370 error!(?err, "node receiver failed");
371 break;
372 }
373 };
374 let mut guard = self.metrics.nodes.guard(Status::Invalid);
375 let node = match msg {
376 Ok(node) => node,
377 Err(err) => {
378 debug!(?err, ?sender, "node decode failed");
379 continue;
380 }
381 };
382 let result = match self.validate_node(&node, &sender) {
383 Ok(result) => result,
384 Err(err) => {
385 debug!(?err, ?sender, "node validate failed");
386 continue;
387 }
388 };
389
390 self.journal_prepare(&sender).await;
392
393 if let Some(parent_chunk) = result {
395 let parent = node.parent.as_ref().unwrap();
396 self.handle_threshold(&parent_chunk, parent.epoch, parent.signature).await;
397 }
398
399 self.handle_node(&node).await;
404 debug!(?sender, height=node.chunk.height, "node");
405 guard.set(Status::Success);
406 },
407
408 msg = ack_receiver.recv() => {
410 let (sender, msg) = match msg {
412 Ok(r) => r,
413 Err(err) => {
414 warn!(?err, "ack receiver failed");
415 break;
416 }
417 };
418 let mut guard = self.metrics.acks.guard(Status::Invalid);
419 let ack = match msg {
420 Ok(ack) => ack,
421 Err(err) => {
422 debug!(?err, ?sender, "ack decode failed");
423 continue;
424 }
425 };
426 if let Err(err) = self.validate_ack(&ack, &sender) {
427 debug!(?err, ?sender, "ack validate failed");
428 continue;
429 };
430 if let Err(err) = self.handle_ack(&ack).await {
431 debug!(?err, ?sender, "ack handle failed");
432 guard.set(Status::Failure);
433 continue;
434 }
435 debug!(?sender, epoch=ack.epoch, sequencer=?ack.chunk.sequencer, height=ack.chunk.height, "ack");
436 guard.set(Status::Success);
437 },
438
439 verify = self.pending_verifies.next_completed() => {
441 let Verify { timer, context, payload, result } = verify;
442 drop(timer); match result {
444 Err(err) => {
445 warn!(?err, ?context, "verified returned error");
446 self.metrics.verify.inc(Status::Dropped);
447 }
448 Ok(false) => {
449 debug!(?context, "verified was false");
450 self.metrics.verify.inc(Status::Failure);
451 }
452 Ok(true) => {
453 debug!(?context, "verified");
454 self.metrics.verify.inc(Status::Success);
455 if let Err(err) = self.handle_app_verified(&context, &payload, &mut ack_sender).await {
456 debug!(?err, ?context, ?payload, "verified handle failed");
457 }
458 },
459 }
460 },
461 }
462 }
463
464 self.pending_verifies.cancel_all();
466 while let Some((_, journal)) = self.journals.pop_first() {
467 journal.close().await.expect("unable to close journal");
468 }
469 }
470
471 async fn handle_app_verified(
480 &mut self,
481 context: &Context<C::PublicKey>,
482 payload: &D,
483 ack_sender: &mut WrappedSender<NetS, Ack<C::PublicKey, V, D>>,
484 ) -> Result<(), Error> {
485 let Some(tip) = self.tip_manager.get(&context.sequencer) else {
487 return Err(Error::AppVerifiedNoTip);
488 };
489
490 if tip.chunk.height != context.height {
492 return Err(Error::AppVerifiedHeightMismatch);
493 }
494
495 if tip.chunk.payload != *payload {
497 return Err(Error::AppVerifiedPayloadMismatch);
498 }
499
500 self.reporter
502 .report(Activity::Tip(Proposal::new(
503 tip.chunk.clone(),
504 tip.signature.clone(),
505 )))
506 .await;
507
508 let Some(share) = self.validators.share(self.epoch) else {
510 return Err(Error::UnknownShare(self.epoch));
511 };
512 let ack = Ack::sign(&self.namespace, share, tip.chunk.clone(), self.epoch);
513
514 self.journal_sync(&context.sequencer, context.height).await;
517
518 let recipients = {
521 let Some(validators) = self.validators.participants(self.epoch) else {
522 return Err(Error::UnknownValidators(self.epoch));
523 };
524 let mut recipients = validators.clone();
525 if self
526 .validators
527 .is_participant(self.epoch, &tip.chunk.sequencer)
528 .is_none()
529 {
530 recipients.push(tip.chunk.sequencer.clone());
531 }
532 recipients
533 };
534
535 self.handle_ack(&ack).await?;
537
538 ack_sender
540 .send(Recipients::Some(recipients), ack, self.priority_acks)
541 .await
542 .map_err(|_| Error::UnableToSendMessage)?;
543
544 Ok(())
545 }
546
547 async fn handle_threshold(
553 &mut self,
554 chunk: &Chunk<C::PublicKey, D>,
555 epoch: Epoch,
556 threshold: V::Signature,
557 ) {
558 if !self
560 .ack_manager
561 .add_threshold(&chunk.sequencer, chunk.height, epoch, threshold)
562 {
563 return;
564 }
565
566 if chunk.sequencer == self.crypto.public_key() {
568 self.propose_timer.take();
569 }
570
571 self.reporter
573 .report(Activity::Lock(Lock::new(chunk.clone(), epoch, threshold)))
574 .await;
575 }
576
577 async fn handle_ack(&mut self, ack: &Ack<C::PublicKey, V, D>) -> Result<(), Error> {
582 let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
584 return Err(Error::UnknownPolynomial(ack.epoch));
585 };
586 let quorum = polynomial.required();
587
588 if let Some(threshold) = self.ack_manager.add_ack(ack, quorum) {
590 debug!(epoch=ack.epoch, sequencer=?ack.chunk.sequencer, height=ack.chunk.height, "recovered threshold");
591 self.metrics.threshold.inc();
592 self.handle_threshold(&ack.chunk, ack.epoch, threshold)
593 .await;
594 }
595
596 Ok(())
597 }
598
599 async fn handle_node(&mut self, node: &Node<C::PublicKey, V, D>) {
603 let is_new = self.tip_manager.put(node);
605
606 if is_new {
608 self.metrics
610 .sequencer_heights
611 .get_or_create(&metrics::SequencerLabel::from(&node.chunk.sequencer))
612 .set(node.chunk.height as i64);
613
614 self.journal_append(node.clone()).await;
618 self.journal_sync(&node.chunk.sequencer, node.chunk.height)
619 .await;
620 }
621
622 let context = Context {
624 sequencer: node.chunk.sequencer.clone(),
625 height: node.chunk.height,
626 };
627 let payload = node.chunk.payload;
628 let mut automaton = self.automaton.clone();
629 let timer = self.metrics.verify_duration.timer();
630 self.pending_verifies.push(async move {
631 let receiver = automaton.verify(context.clone(), payload).await;
632 let result = receiver.await.map_err(Error::AppVerifyCanceled);
633 Verify {
634 timer,
635 context,
636 payload,
637 result,
638 }
639 });
640 }
641
642 fn should_propose(&self) -> Option<Context<C::PublicKey>> {
650 let me = self.crypto.public_key();
651
652 self.sequencers.is_participant(self.epoch, &me)?;
654
655 match self.tip_manager.get(&me) {
657 None => Some(Context {
658 sequencer: me,
659 height: 0,
660 }),
661 Some(tip) => self
662 .ack_manager
663 .get_threshold(&me, tip.chunk.height)
664 .map(|_| Context {
665 sequencer: me,
666 height: tip.chunk.height.checked_add(1).unwrap(),
667 }),
668 }
669 }
670
671 async fn propose(
676 &mut self,
677 context: Context<C::PublicKey>,
678 payload: D,
679 node_sender: &mut WrappedSender<NetS, Node<C::PublicKey, V, D>>,
680 ) -> Result<(), Error> {
681 let mut guard = self.metrics.propose.guard(Status::Dropped);
682 let me = self.crypto.public_key();
683
684 if context.sequencer != me {
686 return Err(Error::ContextSequencer);
687 }
688
689 if self.sequencers.is_participant(self.epoch, &me).is_none() {
691 return Err(Error::IAmNotASequencer(self.epoch));
692 }
693
694 let mut height = 0;
696 let mut parent = None;
697 if let Some(tip) = self.tip_manager.get(&me) {
698 let Some((epoch, threshold)) = self.ack_manager.get_threshold(&me, tip.chunk.height)
700 else {
701 return Err(Error::MissingThreshold);
702 };
703
704 height = tip.chunk.height + 1;
706 parent = Some(Parent::new(tip.chunk.payload, epoch, threshold));
707 }
708
709 if context.height != height {
711 return Err(Error::ContextHeight);
712 }
713
714 let node = Node::sign(&self.namespace, &mut self.crypto, height, payload, parent);
716
717 self.handle_node(&node).await;
719
720 self.journal_sync(&me, height).await;
723
724 self.propose_timer = Some(self.metrics.e2e_duration.timer());
726
727 if let Err(err) = self.broadcast(node, node_sender, self.epoch).await {
729 guard.set(Status::Failure);
730 return Err(err);
731 };
732
733 guard.set(Status::Success);
735 Ok(())
736 }
737
738 async fn rebroadcast(
745 &mut self,
746 node_sender: &mut WrappedSender<NetS, Node<C::PublicKey, V, D>>,
747 ) -> Result<(), Error> {
748 let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
749
750 self.rebroadcast_deadline = None;
752
753 let me = self.crypto.public_key();
755 if self.sequencers.is_participant(self.epoch, &me).is_none() {
756 return Err(Error::IAmNotASequencer(self.epoch));
757 }
758
759 let Some(tip) = self.tip_manager.get(&me) else {
761 return Err(Error::NothingToRebroadcast);
762 };
763
764 if self
766 .ack_manager
767 .get_threshold(&me, tip.chunk.height)
768 .is_some()
769 {
770 return Err(Error::AlreadyThresholded);
771 }
772
773 guard.set(Status::Failure);
775 self.broadcast(tip, node_sender, self.epoch).await?;
776 guard.set(Status::Success);
777 Ok(())
778 }
779
780 async fn broadcast(
782 &mut self,
783 node: Node<C::PublicKey, V, D>,
784 node_sender: &mut WrappedSender<NetS, Node<C::PublicKey, V, D>>,
785 epoch: Epoch,
786 ) -> Result<(), Error> {
787 let Some(validators) = self.validators.participants(epoch) else {
789 return Err(Error::UnknownValidators(epoch));
790 };
791
792 self.relay.broadcast(node.chunk.payload).await;
794
795 node_sender
797 .send(
798 Recipients::Some(validators.clone()),
799 node,
800 self.priority_proposals,
801 )
802 .await
803 .map_err(|_| Error::BroadcastFailed)?;
804
805 self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
807
808 Ok(())
809 }
810
811 fn validate_node(
821 &mut self,
822 node: &Node<C::PublicKey, V, D>,
823 sender: &C::PublicKey,
824 ) -> Result<Option<Chunk<C::PublicKey, D>>, Error> {
825 if node.chunk.sequencer != *sender {
827 return Err(Error::PeerMismatch);
828 }
829
830 if let Some(tip) = self.tip_manager.get(sender) {
833 if tip == *node {
834 return Ok(None);
835 }
836 }
837
838 self.validate_chunk(&node.chunk, self.epoch)?;
840
841 node.verify(&self.namespace, self.validators.identity())
843 .map_err(|_| Error::InvalidNodeSignature)
844 }
845
846 fn validate_ack(
851 &self,
852 ack: &Ack<C::PublicKey, V, D>,
853 sender: &C::PublicKey,
854 ) -> Result<(), Error> {
855 self.validate_chunk(&ack.chunk, ack.epoch)?;
857
858 let Some(index) = self.validators.is_participant(ack.epoch, sender) else {
860 return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
861 };
862 if index != ack.signature.index {
863 return Err(Error::PeerMismatch);
864 }
865
866 {
868 let (eb_lo, eb_hi) = self.epoch_bounds;
869 let bound_lo = self.epoch.saturating_sub(eb_lo);
870 let bound_hi = self.epoch.saturating_add(eb_hi);
871 if ack.epoch < bound_lo || ack.epoch > bound_hi {
872 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
873 }
874 }
875
876 {
878 let bound_lo = self
879 .tip_manager
880 .get(&ack.chunk.sequencer)
881 .map(|t| t.chunk.height)
882 .unwrap_or(0);
883 let bound_hi = bound_lo + self.height_bound;
884 if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
885 return Err(Error::AckHeightOutsideBounds(
886 ack.chunk.height,
887 bound_lo,
888 bound_hi,
889 ));
890 }
891 }
892
893 let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
896 return Err(Error::UnknownPolynomial(ack.epoch));
897 };
898 if !ack.verify(&self.namespace, polynomial) {
899 return Err(Error::InvalidAckSignature);
900 }
901
902 Ok(())
903 }
904
905 fn validate_chunk(&self, chunk: &Chunk<C::PublicKey, D>, epoch: Epoch) -> Result<(), Error> {
910 if self
912 .sequencers
913 .is_participant(epoch, &chunk.sequencer)
914 .is_none()
915 {
916 return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
917 }
918
919 if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
921 match chunk.height.cmp(&tip.chunk.height) {
923 std::cmp::Ordering::Less => {
924 return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
925 }
926 std::cmp::Ordering::Equal => {
927 if tip.chunk.payload != chunk.payload {
929 return Err(Error::ChunkMismatch(
930 chunk.sequencer.to_string(),
931 chunk.height,
932 ));
933 }
934 }
935 std::cmp::Ordering::Greater => {}
936 }
937 }
938
939 Ok(())
940 }
941
942 fn get_journal_section(&self, height: u64) -> u64 {
948 height / self.journal_heights_per_section
949 }
950
951 async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
955 if self.journals.contains_key(sequencer) {
957 return;
958 }
959
960 let cfg = journal::variable::Config {
962 partition: format!("{}{}", &self.journal_name_prefix, sequencer),
963 compression: self.journal_compression,
964 codec_config: (),
965 write_buffer: self.journal_write_buffer,
966 };
967 let journal =
968 Journal::<_, Node<C::PublicKey, V, D>>::init(self.context.with_label("journal"), cfg)
969 .await
970 .expect("unable to init journal");
971
972 {
974 debug!(?sequencer, "journal replay begin");
975
976 let stream = journal
978 .replay(self.journal_replay_buffer)
979 .await
980 .expect("unable to replay journal");
981 pin_mut!(stream);
982
983 let mut tip: Option<Node<C::PublicKey, V, D>> = None;
986 let mut num_items = 0;
987 while let Some(msg) = stream.next().await {
988 let (_, _, _, node) = msg.expect("unable to read from journal");
989 num_items += 1;
990 let height = node.chunk.height;
991 match tip {
992 None => {
993 tip = Some(node);
994 }
995 Some(ref t) => {
996 if height > t.chunk.height {
997 tip = Some(node);
998 }
999 }
1000 }
1001 }
1002
1003 if let Some(node) = tip.take() {
1006 let is_new = self.tip_manager.put(&node);
1007 assert!(is_new);
1008 }
1009
1010 debug!(?sequencer, ?num_items, "journal replay end");
1011 }
1012
1013 self.journals.insert(sequencer.clone(), journal);
1015 }
1016
1017 async fn journal_append(&mut self, node: Node<C::PublicKey, V, D>) {
1022 let section = self.get_journal_section(node.chunk.height);
1023 self.journals
1024 .get_mut(&node.chunk.sequencer)
1025 .expect("journal does not exist")
1026 .append(section, node)
1027 .await
1028 .expect("unable to append to journal");
1029 }
1030
1031 async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: u64) {
1033 let section = self.get_journal_section(height);
1034
1035 let journal = self
1037 .journals
1038 .get_mut(sequencer)
1039 .expect("journal does not exist");
1040
1041 journal.sync(section).await.expect("unable to sync journal");
1043
1044 let _ = journal.prune(section).await;
1046 }
1047}