1use super::{
11 metrics,
12 types::{Ack, Activity, Chunk, Context, Epoch, Error, Lock, Node, Parent, Proposal},
13 AckManager, Config, TipManager,
14};
15use crate::{Automaton, Monitor, Relay, Reporter, Supervisor, ThresholdSupervisor};
16use commonware_cryptography::{
17 bls12381::primitives::{group, poly, variant::Variant},
18 Digest, PublicKey, Signer,
19};
20use commonware_macros::select;
21use commonware_p2p::{
22 utils::codec::{wrap, WrappedSender},
23 Receiver, Recipients, Sender,
24};
25use commonware_runtime::{
26 telemetry::metrics::{
27 histogram,
28 status::{CounterExt, Status},
29 },
30 Clock, Handle, Metrics, Spawner, Storage,
31};
32use commonware_storage::journal::{self, variable::Journal};
33use commonware_utils::futures::Pool as FuturesPool;
34use futures::{
35 channel::oneshot,
36 future::{self, Either},
37 pin_mut, StreamExt,
38};
39use std::{
40 collections::BTreeMap,
41 marker::PhantomData,
42 time::{Duration, SystemTime},
43};
44use tracing::{debug, error, info, warn};
45
46struct Verify<C: PublicKey, D: Digest, E: Clock> {
48 timer: histogram::Timer<E>,
49 context: Context<C>,
50 payload: D,
51 result: Result<bool, Error>,
52}
53
54pub struct Engine<
56 E: Clock + Spawner + Storage + Metrics,
57 C: Signer,
58 V: Variant,
59 D: Digest,
60 A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
61 R: Relay<Digest = D>,
62 Z: Reporter<Activity = Activity<C::PublicKey, V, D>>,
63 M: Monitor<Index = Epoch>,
64 Su: Supervisor<Index = Epoch, PublicKey = C::PublicKey>,
65 TSu: ThresholdSupervisor<
66 Index = Epoch,
67 PublicKey = C::PublicKey,
68 Identity = V::Public,
69 Polynomial = poly::Public<V>,
70 Share = group::Share,
71 >,
72 NetS: Sender<PublicKey = C::PublicKey>,
73 NetR: Receiver<PublicKey = C::PublicKey>,
74> {
75 context: E,
79 crypto: C,
80 automaton: A,
81 relay: R,
82 monitor: M,
83 sequencers: Su,
84 validators: TSu,
85 reporter: Z,
86
87 namespace: Vec<u8>,
93
94 rebroadcast_timeout: Duration,
100 rebroadcast_deadline: Option<SystemTime>,
101
102 epoch_bounds: (u64, u64),
114
115 height_bound: u64,
121
122 pending_verifies: FuturesPool<Verify<C::PublicKey, D, E>>,
134
135 journal_heights_per_section: u64,
141
142 journal_replay_concurrency: usize,
144
145 journal_replay_buffer: usize,
147
148 journal_write_buffer: usize,
150
151 journal_name_prefix: String,
154
155 journal_compression: Option<u8>,
157
158 #[allow(clippy::type_complexity)]
160 journals: BTreeMap<C::PublicKey, Journal<E, Node<C::PublicKey, V, D>>>,
161
162 tip_manager: TipManager<C::PublicKey, V, D>,
171
172 ack_manager: AckManager<C::PublicKey, V, D>,
175
176 epoch: Epoch,
178
179 priority_proposals: bool,
185
186 priority_acks: bool,
188
189 _phantom: PhantomData<(NetS, NetR)>,
191
192 metrics: metrics::Metrics<E>,
198
199 propose_timer: Option<histogram::Timer<E>>,
201}
202
203impl<
204 E: Clock + Spawner + Storage + Metrics,
205 C: Signer,
206 V: Variant,
207 D: Digest,
208 A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
209 R: Relay<Digest = D>,
210 Z: Reporter<Activity = Activity<C::PublicKey, V, D>>,
211 M: Monitor<Index = Epoch>,
212 Su: Supervisor<Index = Epoch, PublicKey = C::PublicKey>,
213 TSu: ThresholdSupervisor<
214 Index = Epoch,
215 PublicKey = C::PublicKey,
216 Identity = V::Public,
217 Polynomial = poly::Public<V>,
218 Share = group::Share,
219 >,
220 NetS: Sender<PublicKey = C::PublicKey>,
221 NetR: Receiver<PublicKey = C::PublicKey>,
222 > Engine<E, C, V, D, A, R, Z, M, Su, TSu, NetS, NetR>
223{
224 pub fn new(context: E, cfg: Config<C, V, D, A, R, Z, M, Su, TSu>) -> Self {
226 let metrics = metrics::Metrics::init(context.clone());
227
228 Self {
229 context,
230 crypto: cfg.crypto,
231 automaton: cfg.automaton,
232 relay: cfg.relay,
233 reporter: cfg.reporter,
234 monitor: cfg.monitor,
235 sequencers: cfg.sequencers,
236 validators: cfg.validators,
237 namespace: cfg.namespace,
238 rebroadcast_timeout: cfg.rebroadcast_timeout,
239 rebroadcast_deadline: None,
240 epoch_bounds: cfg.epoch_bounds,
241 height_bound: cfg.height_bound,
242 pending_verifies: FuturesPool::default(),
243 journal_heights_per_section: cfg.journal_heights_per_section,
244 journal_replay_concurrency: cfg.journal_replay_concurrency,
245 journal_replay_buffer: cfg.journal_replay_buffer,
246 journal_write_buffer: cfg.journal_write_buffer,
247 journal_name_prefix: cfg.journal_name_prefix,
248 journal_compression: cfg.journal_compression,
249 journals: BTreeMap::new(),
250 tip_manager: TipManager::<C::PublicKey, V, D>::new(),
251 ack_manager: AckManager::<C::PublicKey, V, D>::new(),
252 epoch: 0,
253 priority_proposals: cfg.priority_proposals,
254 priority_acks: cfg.priority_acks,
255 _phantom: PhantomData,
256 metrics,
257 propose_timer: None,
258 }
259 }
260
261 pub fn start(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) -> Handle<()> {
272 self.context.spawn_ref()(self.run(chunk_network, ack_network))
273 }
274
275 async fn run(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) {
277 let (mut node_sender, mut node_receiver) = wrap((), chunk_network.0, chunk_network.1);
278 let (mut ack_sender, mut ack_receiver) = wrap((), ack_network.0, ack_network.1);
279 let mut shutdown = self.context.stopped();
280
281 let mut pending: Option<(Context<C::PublicKey>, oneshot::Receiver<D>)> = None;
283
284 let (latest, mut epoch_updates) = self.monitor.subscribe().await;
286 self.epoch = latest;
287
288 self.journal_prepare(&self.crypto.public_key()).await;
291 if let Err(err) = self.rebroadcast(&mut node_sender).await {
292 info!(?err, "initial rebroadcast failed");
294 }
295
296 loop {
297 if pending.is_none() {
299 if let Some(context) = self.should_propose() {
300 let receiver = self.automaton.propose(context.clone()).await;
301 pending = Some((context, receiver));
302 }
303 }
304
305 let rebroadcast = match self.rebroadcast_deadline {
309 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
310 None => Either::Right(future::pending()),
311 };
312 let propose = match &mut pending {
313 Some((_context, receiver)) => Either::Left(receiver),
314 None => Either::Right(futures::future::pending()),
315 };
316
317 select! {
319 _ = &mut shutdown => {
321 debug!("shutdown");
322 break;
323 },
324
325 epoch = epoch_updates.next() => {
327 let Some(epoch) = epoch else {
329 error!("epoch subscription failed");
330 break;
331 };
332
333 debug!(current=self.epoch, new=epoch, "refresh epoch");
335 assert!(epoch >= self.epoch);
336 self.epoch = epoch;
337 continue;
338 },
339
340 _ = rebroadcast => {
342 debug!(epoch = self.epoch, sender=?self.crypto.public_key(), "rebroadcast");
343 if let Err(err) = self.rebroadcast(&mut node_sender).await {
344 info!(?err, "rebroadcast failed");
345 continue;
346 }
347 },
348
349 receiver = propose => {
351 let (context, _) = pending.take().unwrap();
353 debug!(height = context.height, "propose");
354
355 let Ok(payload) = receiver else {
357 warn!(?context, "automaton dropped proposal");
358 continue;
359 };
360
361 if let Err(err) = self.propose(context.clone(), payload, &mut node_sender).await {
363 warn!(?err, ?context, "propose new failed");
364 continue;
365 }
366 },
367
368 msg = node_receiver.recv() => {
370 let (sender, msg) = match msg {
372 Ok(r) => r,
373 Err(err) => {
374 error!(?err, "node receiver failed");
375 break;
376 }
377 };
378 let mut guard = self.metrics.nodes.guard(Status::Invalid);
379 let node = match msg {
380 Ok(node) => node,
381 Err(err) => {
382 warn!(?err, ?sender, "node decode failed");
383 continue;
384 }
385 };
386 let result = match self.validate_node(&node, &sender) {
387 Ok(result) => result,
388 Err(err) => {
389 warn!(?err, ?sender, "node validate failed");
390 continue;
391 }
392 };
393
394 self.journal_prepare(&sender).await;
396
397 if let Some(parent_chunk) = result {
399 let parent = node.parent.as_ref().unwrap();
400 self.handle_threshold(&parent_chunk, parent.epoch, parent.signature).await;
401 }
402
403 self.handle_node(&node).await;
408 debug!(?sender, height=node.chunk.height, "node");
409 guard.set(Status::Success);
410 },
411
412 msg = ack_receiver.recv() => {
414 let (sender, msg) = match msg {
416 Ok(r) => r,
417 Err(err) => {
418 warn!(?err, "ack receiver failed");
419 break;
420 }
421 };
422 let mut guard = self.metrics.acks.guard(Status::Invalid);
423 let ack = match msg {
424 Ok(ack) => ack,
425 Err(err) => {
426 warn!(?err, ?sender, "ack decode failed");
427 continue;
428 }
429 };
430 if let Err(err) = self.validate_ack(&ack, &sender) {
431 warn!(?err, ?sender, "ack validate failed");
432 continue;
433 };
434 if let Err(err) = self.handle_ack(&ack).await {
435 warn!(?err, ?sender, "ack handle failed");
436 guard.set(Status::Failure);
437 continue;
438 }
439 debug!(?sender, epoch=ack.epoch, sequencer=?ack.chunk.sequencer, height=ack.chunk.height, "ack");
440 guard.set(Status::Success);
441 },
442
443 verify = self.pending_verifies.next_completed() => {
445 let Verify { timer, context, payload, result } = verify;
446 drop(timer); match result {
448 Err(err) => {
449 warn!(?err, ?context, "verified returned error");
450 self.metrics.verify.inc(Status::Dropped);
451 }
452 Ok(false) => {
453 warn!(?context, "verified was false");
454 self.metrics.verify.inc(Status::Failure);
455 }
456 Ok(true) => {
457 debug!(?context, "verified");
458 self.metrics.verify.inc(Status::Success);
459 if let Err(err) = self.handle_app_verified(&context, &payload, &mut ack_sender).await {
460 warn!(?err, ?context, ?payload, "verified handle failed");
461 }
462 },
463 }
464 },
465 }
466 }
467
468 self.pending_verifies.cancel_all();
470 while let Some((_, journal)) = self.journals.pop_first() {
471 journal.close().await.expect("unable to close journal");
472 }
473 }
474
475 async fn handle_app_verified(
484 &mut self,
485 context: &Context<C::PublicKey>,
486 payload: &D,
487 ack_sender: &mut WrappedSender<NetS, Ack<C::PublicKey, V, D>>,
488 ) -> Result<(), Error> {
489 let Some(tip) = self.tip_manager.get(&context.sequencer) else {
491 return Err(Error::AppVerifiedNoTip);
492 };
493
494 if tip.chunk.height != context.height {
496 return Err(Error::AppVerifiedHeightMismatch);
497 }
498
499 if tip.chunk.payload != *payload {
501 return Err(Error::AppVerifiedPayloadMismatch);
502 }
503
504 self.reporter
506 .report(Activity::Tip(Proposal::new(
507 tip.chunk.clone(),
508 tip.signature.clone(),
509 )))
510 .await;
511
512 let Some(share) = self.validators.share(self.epoch) else {
514 return Err(Error::UnknownShare(self.epoch));
515 };
516 let ack = Ack::sign(&self.namespace, share, tip.chunk.clone(), self.epoch);
517
518 self.journal_sync(&context.sequencer, context.height).await;
521
522 let recipients = {
525 let Some(validators) = self.validators.participants(self.epoch) else {
526 return Err(Error::UnknownValidators(self.epoch));
527 };
528 let mut recipients = validators.clone();
529 if self
530 .validators
531 .is_participant(self.epoch, &tip.chunk.sequencer)
532 .is_none()
533 {
534 recipients.push(tip.chunk.sequencer.clone());
535 }
536 recipients
537 };
538
539 self.handle_ack(&ack).await?;
541
542 ack_sender
544 .send(Recipients::Some(recipients), ack, self.priority_acks)
545 .await
546 .map_err(|_| Error::UnableToSendMessage)?;
547
548 Ok(())
549 }
550
551 async fn handle_threshold(
557 &mut self,
558 chunk: &Chunk<C::PublicKey, D>,
559 epoch: Epoch,
560 threshold: V::Signature,
561 ) {
562 if !self
564 .ack_manager
565 .add_threshold(&chunk.sequencer, chunk.height, epoch, threshold)
566 {
567 return;
568 }
569
570 if chunk.sequencer == self.crypto.public_key() {
572 self.propose_timer.take();
573 }
574
575 self.reporter
577 .report(Activity::Lock(Lock::new(chunk.clone(), epoch, threshold)))
578 .await;
579 }
580
581 async fn handle_ack(&mut self, ack: &Ack<C::PublicKey, V, D>) -> Result<(), Error> {
586 let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
588 return Err(Error::UnknownPolynomial(ack.epoch));
589 };
590 let quorum = polynomial.required();
591
592 if let Some(threshold) = self.ack_manager.add_ack(ack, quorum) {
594 debug!(epoch=ack.epoch, sequencer=?ack.chunk.sequencer, height=ack.chunk.height, "recovered threshold");
595 self.metrics.threshold.inc();
596 self.handle_threshold(&ack.chunk, ack.epoch, threshold)
597 .await;
598 }
599
600 Ok(())
601 }
602
603 async fn handle_node(&mut self, node: &Node<C::PublicKey, V, D>) {
607 let is_new = self.tip_manager.put(node);
609
610 if is_new {
612 self.metrics
614 .sequencer_heights
615 .get_or_create(&metrics::SequencerLabel::from(&node.chunk.sequencer))
616 .set(node.chunk.height as i64);
617
618 self.journal_append(node.clone()).await;
622 self.journal_sync(&node.chunk.sequencer, node.chunk.height)
623 .await;
624 }
625
626 let context = Context {
628 sequencer: node.chunk.sequencer.clone(),
629 height: node.chunk.height,
630 };
631 let payload = node.chunk.payload;
632 let mut automaton = self.automaton.clone();
633 let timer = self.metrics.verify_duration.timer();
634 self.pending_verifies.push(async move {
635 let receiver = automaton.verify(context.clone(), payload).await;
636 let result = receiver.await.map_err(Error::AppVerifyCanceled);
637 Verify {
638 timer,
639 context,
640 payload,
641 result,
642 }
643 });
644 }
645
646 fn should_propose(&self) -> Option<Context<C::PublicKey>> {
654 let me = self.crypto.public_key();
655
656 self.sequencers.is_participant(self.epoch, &me)?;
658
659 match self.tip_manager.get(&me) {
661 None => Some(Context {
662 sequencer: me,
663 height: 0,
664 }),
665 Some(tip) => self
666 .ack_manager
667 .get_threshold(&me, tip.chunk.height)
668 .map(|_| Context {
669 sequencer: me,
670 height: tip.chunk.height.checked_add(1).unwrap(),
671 }),
672 }
673 }
674
675 async fn propose(
680 &mut self,
681 context: Context<C::PublicKey>,
682 payload: D,
683 node_sender: &mut WrappedSender<NetS, Node<C::PublicKey, V, D>>,
684 ) -> Result<(), Error> {
685 let mut guard = self.metrics.propose.guard(Status::Dropped);
686 let me = self.crypto.public_key();
687
688 if context.sequencer != me {
690 return Err(Error::ContextSequencer);
691 }
692
693 if self.sequencers.is_participant(self.epoch, &me).is_none() {
695 return Err(Error::IAmNotASequencer(self.epoch));
696 }
697
698 let mut height = 0;
700 let mut parent = None;
701 if let Some(tip) = self.tip_manager.get(&me) {
702 let Some((epoch, threshold)) = self.ack_manager.get_threshold(&me, tip.chunk.height)
704 else {
705 return Err(Error::MissingThreshold);
706 };
707
708 height = tip.chunk.height + 1;
710 parent = Some(Parent::new(tip.chunk.payload, epoch, threshold));
711 }
712
713 if context.height != height {
715 return Err(Error::ContextHeight);
716 }
717
718 let node = Node::sign(&self.namespace, &mut self.crypto, height, payload, parent);
720
721 self.handle_node(&node).await;
723
724 self.journal_sync(&me, height).await;
727
728 self.propose_timer = Some(self.metrics.e2e_duration.timer());
730
731 if let Err(err) = self.broadcast(node, node_sender, self.epoch).await {
733 guard.set(Status::Failure);
734 return Err(err);
735 };
736
737 guard.set(Status::Success);
739 Ok(())
740 }
741
742 async fn rebroadcast(
749 &mut self,
750 node_sender: &mut WrappedSender<NetS, Node<C::PublicKey, V, D>>,
751 ) -> Result<(), Error> {
752 let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
753
754 self.rebroadcast_deadline = None;
756
757 let me = self.crypto.public_key();
759 if self.sequencers.is_participant(self.epoch, &me).is_none() {
760 return Err(Error::IAmNotASequencer(self.epoch));
761 }
762
763 let Some(tip) = self.tip_manager.get(&me) else {
765 return Err(Error::NothingToRebroadcast);
766 };
767
768 if self
770 .ack_manager
771 .get_threshold(&me, tip.chunk.height)
772 .is_some()
773 {
774 return Err(Error::AlreadyThresholded);
775 }
776
777 guard.set(Status::Failure);
779 self.broadcast(tip, node_sender, self.epoch).await?;
780 guard.set(Status::Success);
781 Ok(())
782 }
783
784 async fn broadcast(
786 &mut self,
787 node: Node<C::PublicKey, V, D>,
788 node_sender: &mut WrappedSender<NetS, Node<C::PublicKey, V, D>>,
789 epoch: Epoch,
790 ) -> Result<(), Error> {
791 let Some(validators) = self.validators.participants(epoch) else {
793 return Err(Error::UnknownValidators(epoch));
794 };
795
796 self.relay.broadcast(node.chunk.payload).await;
798
799 node_sender
801 .send(
802 Recipients::Some(validators.clone()),
803 node,
804 self.priority_proposals,
805 )
806 .await
807 .map_err(|_| Error::BroadcastFailed)?;
808
809 self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
811
812 Ok(())
813 }
814
815 fn validate_node(
825 &mut self,
826 node: &Node<C::PublicKey, V, D>,
827 sender: &C::PublicKey,
828 ) -> Result<Option<Chunk<C::PublicKey, D>>, Error> {
829 if node.chunk.sequencer != *sender {
831 return Err(Error::PeerMismatch);
832 }
833
834 if let Some(tip) = self.tip_manager.get(sender) {
837 if tip == *node {
838 return Ok(None);
839 }
840 }
841
842 self.validate_chunk(&node.chunk, self.epoch)?;
844
845 node.verify(&self.namespace, self.validators.identity())
847 .map_err(|_| Error::InvalidNodeSignature)
848 }
849
850 fn validate_ack(
855 &self,
856 ack: &Ack<C::PublicKey, V, D>,
857 sender: &C::PublicKey,
858 ) -> Result<(), Error> {
859 self.validate_chunk(&ack.chunk, ack.epoch)?;
861
862 let Some(index) = self.validators.is_participant(ack.epoch, sender) else {
864 return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
865 };
866 if index != ack.signature.index {
867 return Err(Error::PeerMismatch);
868 }
869
870 {
872 let (eb_lo, eb_hi) = self.epoch_bounds;
873 let bound_lo = self.epoch.saturating_sub(eb_lo);
874 let bound_hi = self.epoch.saturating_add(eb_hi);
875 if ack.epoch < bound_lo || ack.epoch > bound_hi {
876 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
877 }
878 }
879
880 {
882 let bound_lo = self
883 .tip_manager
884 .get(&ack.chunk.sequencer)
885 .map(|t| t.chunk.height)
886 .unwrap_or(0);
887 let bound_hi = bound_lo + self.height_bound;
888 if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
889 return Err(Error::AckHeightOutsideBounds(
890 ack.chunk.height,
891 bound_lo,
892 bound_hi,
893 ));
894 }
895 }
896
897 let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
900 return Err(Error::UnknownPolynomial(ack.epoch));
901 };
902 if !ack.verify(&self.namespace, polynomial) {
903 return Err(Error::InvalidAckSignature);
904 }
905
906 Ok(())
907 }
908
909 fn validate_chunk(&self, chunk: &Chunk<C::PublicKey, D>, epoch: Epoch) -> Result<(), Error> {
914 if self
916 .sequencers
917 .is_participant(epoch, &chunk.sequencer)
918 .is_none()
919 {
920 return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
921 }
922
923 if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
925 match chunk.height.cmp(&tip.chunk.height) {
927 std::cmp::Ordering::Less => {
928 return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
929 }
930 std::cmp::Ordering::Equal => {
931 if tip.chunk.payload != chunk.payload {
933 return Err(Error::ChunkMismatch(
934 chunk.sequencer.to_string(),
935 chunk.height,
936 ));
937 }
938 }
939 std::cmp::Ordering::Greater => {}
940 }
941 }
942
943 Ok(())
944 }
945
946 fn get_journal_section(&self, height: u64) -> u64 {
952 height / self.journal_heights_per_section
953 }
954
955 async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
959 if self.journals.contains_key(sequencer) {
961 return;
962 }
963
964 let cfg = journal::variable::Config {
966 partition: format!("{}{}", &self.journal_name_prefix, sequencer),
967 compression: self.journal_compression,
968 codec_config: (),
969 write_buffer: self.journal_write_buffer,
970 };
971 let journal =
972 Journal::<_, Node<C::PublicKey, V, D>>::init(self.context.with_label("journal"), cfg)
973 .await
974 .expect("unable to init journal");
975
976 {
978 debug!(?sequencer, "journal replay begin");
979
980 let stream = journal
982 .replay(self.journal_replay_concurrency, self.journal_replay_buffer)
983 .await
984 .expect("unable to replay journal");
985 pin_mut!(stream);
986
987 let mut tip: Option<Node<C::PublicKey, V, D>> = None;
990 let mut num_items = 0;
991 while let Some(msg) = stream.next().await {
992 let (_, _, _, node) = msg.expect("unable to read from journal");
993 num_items += 1;
994 let height = node.chunk.height;
995 match tip {
996 None => {
997 tip = Some(node);
998 }
999 Some(ref t) => {
1000 if height > t.chunk.height {
1001 tip = Some(node);
1002 }
1003 }
1004 }
1005 }
1006
1007 if let Some(node) = tip.take() {
1010 let is_new = self.tip_manager.put(&node);
1011 assert!(is_new);
1012 }
1013
1014 debug!(?sequencer, ?num_items, "journal replay end");
1015 }
1016
1017 self.journals.insert(sequencer.clone(), journal);
1019 }
1020
1021 async fn journal_append(&mut self, node: Node<C::PublicKey, V, D>) {
1026 let section = self.get_journal_section(node.chunk.height);
1027 self.journals
1028 .get_mut(&node.chunk.sequencer)
1029 .expect("journal does not exist")
1030 .append(section, node)
1031 .await
1032 .expect("unable to append to journal");
1033 }
1034
1035 async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: u64) {
1037 let section = self.get_journal_section(height);
1038
1039 let journal = self
1041 .journals
1042 .get_mut(sequencer)
1043 .expect("journal does not exist");
1044
1045 journal.sync(section).await.expect("unable to sync journal");
1047
1048 let _ = journal.prune(section).await;
1050 }
1051}