1use super::{
11 metrics,
12 types::{Ack, Activity, Chunk, Context, Epoch, Error, Lock, Node, Parent, Proposal},
13 AckManager, Config, TipManager,
14};
15use crate::{Automaton, Monitor, Relay, Reporter, Supervisor, ThresholdSupervisor};
16use commonware_cryptography::{
17 bls12381::primitives::{group, poly, variant::Variant},
18 Digest, PublicKey, Signer,
19};
20use commonware_macros::select;
21use commonware_p2p::{
22 utils::codec::{wrap, WrappedSender},
23 Receiver, Recipients, Sender,
24};
25use commonware_runtime::{
26 buffer::PoolRef,
27 telemetry::metrics::{
28 histogram,
29 status::{CounterExt, Status},
30 },
31 Clock, Handle, Metrics, Spawner, Storage,
32};
33use commonware_storage::journal::{self, variable::Journal};
34use commonware_utils::futures::Pool as FuturesPool;
35use futures::{
36 channel::oneshot,
37 future::{self, Either},
38 pin_mut, StreamExt,
39};
40use std::{
41 collections::BTreeMap,
42 marker::PhantomData,
43 num::NonZeroUsize,
44 time::{Duration, SystemTime},
45};
46use tracing::{debug, error, info, warn};
47
48struct Verify<C: PublicKey, D: Digest, E: Clock> {
50 timer: histogram::Timer<E>,
51 context: Context<C>,
52 payload: D,
53 result: Result<bool, Error>,
54}
55
56pub struct Engine<
58 E: Clock + Spawner + Storage + Metrics,
59 C: Signer,
60 V: Variant,
61 D: Digest,
62 A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
63 R: Relay<Digest = D>,
64 Z: Reporter<Activity = Activity<C::PublicKey, V, D>>,
65 M: Monitor<Index = Epoch>,
66 Su: Supervisor<Index = Epoch, PublicKey = C::PublicKey>,
67 TSu: ThresholdSupervisor<
68 Index = Epoch,
69 PublicKey = C::PublicKey,
70 Identity = V::Public,
71 Polynomial = poly::Public<V>,
72 Share = group::Share,
73 >,
74 NetS: Sender<PublicKey = C::PublicKey>,
75 NetR: Receiver<PublicKey = C::PublicKey>,
76> {
77 context: E,
81 crypto: C,
82 automaton: A,
83 relay: R,
84 monitor: M,
85 sequencers: Su,
86 validators: TSu,
87 reporter: Z,
88
89 namespace: Vec<u8>,
95
96 rebroadcast_timeout: Duration,
102 rebroadcast_deadline: Option<SystemTime>,
103
104 epoch_bounds: (u64, u64),
116
117 height_bound: u64,
123
124 pending_verifies: FuturesPool<Verify<C::PublicKey, D, E>>,
136
137 journal_heights_per_section: u64,
143
144 journal_replay_buffer: NonZeroUsize,
146
147 journal_write_buffer: NonZeroUsize,
149
150 journal_name_prefix: String,
153
154 journal_compression: Option<u8>,
156
157 journal_buffer_pool: PoolRef,
159
160 #[allow(clippy::type_complexity)]
162 journals: BTreeMap<C::PublicKey, Journal<E, Node<C::PublicKey, V, D>>>,
163
164 tip_manager: TipManager<C::PublicKey, V, D>,
173
174 ack_manager: AckManager<C::PublicKey, V, D>,
177
178 epoch: Epoch,
180
181 priority_proposals: bool,
187
188 priority_acks: bool,
190
191 _phantom: PhantomData<(NetS, NetR)>,
193
194 metrics: metrics::Metrics<E>,
200
201 propose_timer: Option<histogram::Timer<E>>,
203}
204
205impl<
206 E: Clock + Spawner + Storage + Metrics,
207 C: Signer,
208 V: Variant,
209 D: Digest,
210 A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
211 R: Relay<Digest = D>,
212 Z: Reporter<Activity = Activity<C::PublicKey, V, D>>,
213 M: Monitor<Index = Epoch>,
214 Su: Supervisor<Index = Epoch, PublicKey = C::PublicKey>,
215 TSu: ThresholdSupervisor<
216 Index = Epoch,
217 PublicKey = C::PublicKey,
218 Identity = V::Public,
219 Polynomial = poly::Public<V>,
220 Share = group::Share,
221 >,
222 NetS: Sender<PublicKey = C::PublicKey>,
223 NetR: Receiver<PublicKey = C::PublicKey>,
224 > Engine<E, C, V, D, A, R, Z, M, Su, TSu, NetS, NetR>
225{
226 pub fn new(context: E, cfg: Config<C, V, D, A, R, Z, M, Su, TSu>) -> Self {
228 let metrics = metrics::Metrics::init(context.clone());
229
230 Self {
231 context,
232 crypto: cfg.crypto,
233 automaton: cfg.automaton,
234 relay: cfg.relay,
235 reporter: cfg.reporter,
236 monitor: cfg.monitor,
237 sequencers: cfg.sequencers,
238 validators: cfg.validators,
239 namespace: cfg.namespace,
240 rebroadcast_timeout: cfg.rebroadcast_timeout,
241 rebroadcast_deadline: None,
242 epoch_bounds: cfg.epoch_bounds,
243 height_bound: cfg.height_bound,
244 pending_verifies: FuturesPool::default(),
245 journal_heights_per_section: cfg.journal_heights_per_section,
246 journal_replay_buffer: cfg.journal_replay_buffer,
247 journal_write_buffer: cfg.journal_write_buffer,
248 journal_name_prefix: cfg.journal_name_prefix,
249 journal_compression: cfg.journal_compression,
250 journal_buffer_pool: cfg.journal_buffer_pool,
251 journals: BTreeMap::new(),
252 tip_manager: TipManager::<C::PublicKey, V, D>::new(),
253 ack_manager: AckManager::<C::PublicKey, V, D>::new(),
254 epoch: 0,
255 priority_proposals: cfg.priority_proposals,
256 priority_acks: cfg.priority_acks,
257 _phantom: PhantomData,
258 metrics,
259 propose_timer: None,
260 }
261 }
262
263 pub fn start(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) -> Handle<()> {
274 self.context.spawn_ref()(self.run(chunk_network, ack_network))
275 }
276
277 async fn run(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) {
279 let (mut node_sender, mut node_receiver) = wrap((), chunk_network.0, chunk_network.1);
280 let (mut ack_sender, mut ack_receiver) = wrap((), ack_network.0, ack_network.1);
281 let mut shutdown = self.context.stopped();
282
283 let mut pending: Option<(Context<C::PublicKey>, oneshot::Receiver<D>)> = None;
285
286 let (latest, mut epoch_updates) = self.monitor.subscribe().await;
288 self.epoch = latest;
289
290 self.journal_prepare(&self.crypto.public_key()).await;
293 if let Err(err) = self.rebroadcast(&mut node_sender).await {
294 info!(?err, "initial rebroadcast failed");
296 }
297
298 loop {
299 if pending.is_none() {
301 if let Some(context) = self.should_propose() {
302 let receiver = self.automaton.propose(context.clone()).await;
303 pending = Some((context, receiver));
304 }
305 }
306
307 let rebroadcast = match self.rebroadcast_deadline {
311 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
312 None => Either::Right(future::pending()),
313 };
314 let propose = match &mut pending {
315 Some((_context, receiver)) => Either::Left(receiver),
316 None => Either::Right(futures::future::pending()),
317 };
318
319 select! {
321 _ = &mut shutdown => {
323 debug!("shutdown");
324 break;
325 },
326
327 epoch = epoch_updates.next() => {
329 let Some(epoch) = epoch else {
331 error!("epoch subscription failed");
332 break;
333 };
334
335 debug!(current=self.epoch, new=epoch, "refresh epoch");
337 assert!(epoch >= self.epoch);
338 self.epoch = epoch;
339 continue;
340 },
341
342 _ = rebroadcast => {
344 debug!(epoch = self.epoch, sender=?self.crypto.public_key(), "rebroadcast");
345 if let Err(err) = self.rebroadcast(&mut node_sender).await {
346 info!(?err, "rebroadcast failed");
347 continue;
348 }
349 },
350
351 receiver = propose => {
353 let (context, _) = pending.take().unwrap();
355 debug!(height = context.height, "propose");
356
357 let Ok(payload) = receiver else {
359 warn!(?context, "automaton dropped proposal");
360 continue;
361 };
362
363 if let Err(err) = self.propose(context.clone(), payload, &mut node_sender).await {
365 warn!(?err, ?context, "propose new failed");
366 continue;
367 }
368 },
369
370 msg = node_receiver.recv() => {
372 let (sender, msg) = match msg {
374 Ok(r) => r,
375 Err(err) => {
376 error!(?err, "node receiver failed");
377 break;
378 }
379 };
380 let mut guard = self.metrics.nodes.guard(Status::Invalid);
381 let node = match msg {
382 Ok(node) => node,
383 Err(err) => {
384 debug!(?err, ?sender, "node decode failed");
385 continue;
386 }
387 };
388 let result = match self.validate_node(&node, &sender) {
389 Ok(result) => result,
390 Err(err) => {
391 debug!(?err, ?sender, "node validate failed");
392 continue;
393 }
394 };
395
396 self.journal_prepare(&sender).await;
398
399 if let Some(parent_chunk) = result {
401 let parent = node.parent.as_ref().unwrap();
402 self.handle_threshold(&parent_chunk, parent.epoch, parent.signature).await;
403 }
404
405 self.handle_node(&node).await;
410 debug!(?sender, height=node.chunk.height, "node");
411 guard.set(Status::Success);
412 },
413
414 msg = ack_receiver.recv() => {
416 let (sender, msg) = match msg {
418 Ok(r) => r,
419 Err(err) => {
420 warn!(?err, "ack receiver failed");
421 break;
422 }
423 };
424 let mut guard = self.metrics.acks.guard(Status::Invalid);
425 let ack = match msg {
426 Ok(ack) => ack,
427 Err(err) => {
428 debug!(?err, ?sender, "ack decode failed");
429 continue;
430 }
431 };
432 if let Err(err) = self.validate_ack(&ack, &sender) {
433 debug!(?err, ?sender, "ack validate failed");
434 continue;
435 };
436 if let Err(err) = self.handle_ack(&ack).await {
437 debug!(?err, ?sender, "ack handle failed");
438 guard.set(Status::Failure);
439 continue;
440 }
441 debug!(?sender, epoch=ack.epoch, sequencer=?ack.chunk.sequencer, height=ack.chunk.height, "ack");
442 guard.set(Status::Success);
443 },
444
445 verify = self.pending_verifies.next_completed() => {
447 let Verify { timer, context, payload, result } = verify;
448 drop(timer); match result {
450 Err(err) => {
451 warn!(?err, ?context, "verified returned error");
452 self.metrics.verify.inc(Status::Dropped);
453 }
454 Ok(false) => {
455 debug!(?context, "verified was false");
456 self.metrics.verify.inc(Status::Failure);
457 }
458 Ok(true) => {
459 debug!(?context, "verified");
460 self.metrics.verify.inc(Status::Success);
461 if let Err(err) = self.handle_app_verified(&context, &payload, &mut ack_sender).await {
462 debug!(?err, ?context, ?payload, "verified handle failed");
463 }
464 },
465 }
466 },
467 }
468 }
469
470 self.pending_verifies.cancel_all();
472 while let Some((_, journal)) = self.journals.pop_first() {
473 journal.close().await.expect("unable to close journal");
474 }
475 }
476
477 async fn handle_app_verified(
486 &mut self,
487 context: &Context<C::PublicKey>,
488 payload: &D,
489 ack_sender: &mut WrappedSender<NetS, Ack<C::PublicKey, V, D>>,
490 ) -> Result<(), Error> {
491 let Some(tip) = self.tip_manager.get(&context.sequencer) else {
493 return Err(Error::AppVerifiedNoTip);
494 };
495
496 if tip.chunk.height != context.height {
498 return Err(Error::AppVerifiedHeightMismatch);
499 }
500
501 if tip.chunk.payload != *payload {
503 return Err(Error::AppVerifiedPayloadMismatch);
504 }
505
506 self.reporter
508 .report(Activity::Tip(Proposal::new(
509 tip.chunk.clone(),
510 tip.signature.clone(),
511 )))
512 .await;
513
514 let Some(share) = self.validators.share(self.epoch) else {
516 return Err(Error::UnknownShare(self.epoch));
517 };
518 let ack = Ack::sign(&self.namespace, share, tip.chunk.clone(), self.epoch);
519
520 self.journal_sync(&context.sequencer, context.height).await;
523
524 let recipients = {
527 let Some(validators) = self.validators.participants(self.epoch) else {
528 return Err(Error::UnknownValidators(self.epoch));
529 };
530 let mut recipients = validators.clone();
531 if self
532 .validators
533 .is_participant(self.epoch, &tip.chunk.sequencer)
534 .is_none()
535 {
536 recipients.push(tip.chunk.sequencer.clone());
537 }
538 recipients
539 };
540
541 self.handle_ack(&ack).await?;
543
544 ack_sender
546 .send(Recipients::Some(recipients), ack, self.priority_acks)
547 .await
548 .map_err(|_| Error::UnableToSendMessage)?;
549
550 Ok(())
551 }
552
553 async fn handle_threshold(
559 &mut self,
560 chunk: &Chunk<C::PublicKey, D>,
561 epoch: Epoch,
562 threshold: V::Signature,
563 ) {
564 if !self
566 .ack_manager
567 .add_threshold(&chunk.sequencer, chunk.height, epoch, threshold)
568 {
569 return;
570 }
571
572 if chunk.sequencer == self.crypto.public_key() {
574 self.propose_timer.take();
575 }
576
577 self.reporter
579 .report(Activity::Lock(Lock::new(chunk.clone(), epoch, threshold)))
580 .await;
581 }
582
583 async fn handle_ack(&mut self, ack: &Ack<C::PublicKey, V, D>) -> Result<(), Error> {
588 let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
590 return Err(Error::UnknownPolynomial(ack.epoch));
591 };
592 let quorum = polynomial.required();
593
594 if let Some(threshold) = self.ack_manager.add_ack(ack, quorum) {
596 debug!(epoch=ack.epoch, sequencer=?ack.chunk.sequencer, height=ack.chunk.height, "recovered threshold");
597 self.metrics.threshold.inc();
598 self.handle_threshold(&ack.chunk, ack.epoch, threshold)
599 .await;
600 }
601
602 Ok(())
603 }
604
605 async fn handle_node(&mut self, node: &Node<C::PublicKey, V, D>) {
609 let is_new = self.tip_manager.put(node);
611
612 if is_new {
614 self.metrics
616 .sequencer_heights
617 .get_or_create(&metrics::SequencerLabel::from(&node.chunk.sequencer))
618 .set(node.chunk.height as i64);
619
620 self.journal_append(node.clone()).await;
624 self.journal_sync(&node.chunk.sequencer, node.chunk.height)
625 .await;
626 }
627
628 let context = Context {
630 sequencer: node.chunk.sequencer.clone(),
631 height: node.chunk.height,
632 };
633 let payload = node.chunk.payload;
634 let mut automaton = self.automaton.clone();
635 let timer = self.metrics.verify_duration.timer();
636 self.pending_verifies.push(async move {
637 let receiver = automaton.verify(context.clone(), payload).await;
638 let result = receiver.await.map_err(Error::AppVerifyCanceled);
639 Verify {
640 timer,
641 context,
642 payload,
643 result,
644 }
645 });
646 }
647
648 fn should_propose(&self) -> Option<Context<C::PublicKey>> {
656 let me = self.crypto.public_key();
657
658 self.sequencers.is_participant(self.epoch, &me)?;
660
661 match self.tip_manager.get(&me) {
663 None => Some(Context {
664 sequencer: me,
665 height: 0,
666 }),
667 Some(tip) => self
668 .ack_manager
669 .get_threshold(&me, tip.chunk.height)
670 .map(|_| Context {
671 sequencer: me,
672 height: tip.chunk.height.checked_add(1).unwrap(),
673 }),
674 }
675 }
676
677 async fn propose(
682 &mut self,
683 context: Context<C::PublicKey>,
684 payload: D,
685 node_sender: &mut WrappedSender<NetS, Node<C::PublicKey, V, D>>,
686 ) -> Result<(), Error> {
687 let mut guard = self.metrics.propose.guard(Status::Dropped);
688 let me = self.crypto.public_key();
689
690 if context.sequencer != me {
692 return Err(Error::ContextSequencer);
693 }
694
695 if self.sequencers.is_participant(self.epoch, &me).is_none() {
697 return Err(Error::IAmNotASequencer(self.epoch));
698 }
699
700 let mut height = 0;
702 let mut parent = None;
703 if let Some(tip) = self.tip_manager.get(&me) {
704 let Some((epoch, threshold)) = self.ack_manager.get_threshold(&me, tip.chunk.height)
706 else {
707 return Err(Error::MissingThreshold);
708 };
709
710 height = tip.chunk.height + 1;
712 parent = Some(Parent::new(tip.chunk.payload, epoch, threshold));
713 }
714
715 if context.height != height {
717 return Err(Error::ContextHeight);
718 }
719
720 let node = Node::sign(&self.namespace, &mut self.crypto, height, payload, parent);
722
723 self.handle_node(&node).await;
725
726 self.journal_sync(&me, height).await;
729
730 self.propose_timer = Some(self.metrics.e2e_duration.timer());
732
733 if let Err(err) = self.broadcast(node, node_sender, self.epoch).await {
735 guard.set(Status::Failure);
736 return Err(err);
737 };
738
739 guard.set(Status::Success);
741 Ok(())
742 }
743
744 async fn rebroadcast(
751 &mut self,
752 node_sender: &mut WrappedSender<NetS, Node<C::PublicKey, V, D>>,
753 ) -> Result<(), Error> {
754 let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
755
756 self.rebroadcast_deadline = None;
758
759 let me = self.crypto.public_key();
761 if self.sequencers.is_participant(self.epoch, &me).is_none() {
762 return Err(Error::IAmNotASequencer(self.epoch));
763 }
764
765 let Some(tip) = self.tip_manager.get(&me) else {
767 return Err(Error::NothingToRebroadcast);
768 };
769
770 if self
772 .ack_manager
773 .get_threshold(&me, tip.chunk.height)
774 .is_some()
775 {
776 return Err(Error::AlreadyThresholded);
777 }
778
779 guard.set(Status::Failure);
781 self.broadcast(tip, node_sender, self.epoch).await?;
782 guard.set(Status::Success);
783 Ok(())
784 }
785
786 async fn broadcast(
788 &mut self,
789 node: Node<C::PublicKey, V, D>,
790 node_sender: &mut WrappedSender<NetS, Node<C::PublicKey, V, D>>,
791 epoch: Epoch,
792 ) -> Result<(), Error> {
793 let Some(validators) = self.validators.participants(epoch) else {
795 return Err(Error::UnknownValidators(epoch));
796 };
797
798 self.relay.broadcast(node.chunk.payload).await;
800
801 node_sender
803 .send(
804 Recipients::Some(validators.clone()),
805 node,
806 self.priority_proposals,
807 )
808 .await
809 .map_err(|_| Error::BroadcastFailed)?;
810
811 self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
813
814 Ok(())
815 }
816
817 fn validate_node(
827 &mut self,
828 node: &Node<C::PublicKey, V, D>,
829 sender: &C::PublicKey,
830 ) -> Result<Option<Chunk<C::PublicKey, D>>, Error> {
831 if node.chunk.sequencer != *sender {
833 return Err(Error::PeerMismatch);
834 }
835
836 if let Some(tip) = self.tip_manager.get(sender) {
839 if tip == *node {
840 return Ok(None);
841 }
842 }
843
844 self.validate_chunk(&node.chunk, self.epoch)?;
846
847 node.verify(&self.namespace, self.validators.identity())
849 .map_err(|_| Error::InvalidNodeSignature)
850 }
851
852 fn validate_ack(
857 &self,
858 ack: &Ack<C::PublicKey, V, D>,
859 sender: &C::PublicKey,
860 ) -> Result<(), Error> {
861 self.validate_chunk(&ack.chunk, ack.epoch)?;
863
864 let Some(index) = self.validators.is_participant(ack.epoch, sender) else {
866 return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
867 };
868 if index != ack.signature.index {
869 return Err(Error::PeerMismatch);
870 }
871
872 {
874 let (eb_lo, eb_hi) = self.epoch_bounds;
875 let bound_lo = self.epoch.saturating_sub(eb_lo);
876 let bound_hi = self.epoch.saturating_add(eb_hi);
877 if ack.epoch < bound_lo || ack.epoch > bound_hi {
878 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
879 }
880 }
881
882 {
884 let bound_lo = self
885 .tip_manager
886 .get(&ack.chunk.sequencer)
887 .map(|t| t.chunk.height)
888 .unwrap_or(0);
889 let bound_hi = bound_lo + self.height_bound;
890 if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
891 return Err(Error::AckHeightOutsideBounds(
892 ack.chunk.height,
893 bound_lo,
894 bound_hi,
895 ));
896 }
897 }
898
899 let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
902 return Err(Error::UnknownPolynomial(ack.epoch));
903 };
904 if !ack.verify(&self.namespace, polynomial) {
905 return Err(Error::InvalidAckSignature);
906 }
907
908 Ok(())
909 }
910
911 fn validate_chunk(&self, chunk: &Chunk<C::PublicKey, D>, epoch: Epoch) -> Result<(), Error> {
916 if self
918 .sequencers
919 .is_participant(epoch, &chunk.sequencer)
920 .is_none()
921 {
922 return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
923 }
924
925 if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
927 match chunk.height.cmp(&tip.chunk.height) {
929 std::cmp::Ordering::Less => {
930 return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
931 }
932 std::cmp::Ordering::Equal => {
933 if tip.chunk.payload != chunk.payload {
935 return Err(Error::ChunkMismatch(
936 chunk.sequencer.to_string(),
937 chunk.height,
938 ));
939 }
940 }
941 std::cmp::Ordering::Greater => {}
942 }
943 }
944
945 Ok(())
946 }
947
948 fn get_journal_section(&self, height: u64) -> u64 {
954 height / self.journal_heights_per_section
955 }
956
957 async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
961 if self.journals.contains_key(sequencer) {
963 return;
964 }
965
966 let cfg = journal::variable::Config {
968 partition: format!("{}{}", &self.journal_name_prefix, sequencer),
969 compression: self.journal_compression,
970 codec_config: (),
971 buffer_pool: self.journal_buffer_pool.clone(),
972 write_buffer: self.journal_write_buffer,
973 };
974 let journal =
975 Journal::<_, Node<C::PublicKey, V, D>>::init(self.context.with_label("journal"), cfg)
976 .await
977 .expect("unable to init journal");
978
979 {
981 debug!(?sequencer, "journal replay begin");
982
983 let stream = journal
985 .replay(0, 0, self.journal_replay_buffer)
986 .await
987 .expect("unable to replay journal");
988 pin_mut!(stream);
989
990 let mut tip: Option<Node<C::PublicKey, V, D>> = None;
993 let mut num_items = 0;
994 while let Some(msg) = stream.next().await {
995 let (_, _, _, node) = msg.expect("unable to read from journal");
996 num_items += 1;
997 let height = node.chunk.height;
998 match tip {
999 None => {
1000 tip = Some(node);
1001 }
1002 Some(ref t) => {
1003 if height > t.chunk.height {
1004 tip = Some(node);
1005 }
1006 }
1007 }
1008 }
1009
1010 if let Some(node) = tip.take() {
1013 let is_new = self.tip_manager.put(&node);
1014 assert!(is_new);
1015 }
1016
1017 debug!(?sequencer, ?num_items, "journal replay end");
1018 }
1019
1020 self.journals.insert(sequencer.clone(), journal);
1022 }
1023
1024 async fn journal_append(&mut self, node: Node<C::PublicKey, V, D>) {
1029 let section = self.get_journal_section(node.chunk.height);
1030 self.journals
1031 .get_mut(&node.chunk.sequencer)
1032 .expect("journal does not exist")
1033 .append(section, node)
1034 .await
1035 .expect("unable to append to journal");
1036 }
1037
1038 async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: u64) {
1040 let section = self.get_journal_section(height);
1041
1042 let journal = self
1044 .journals
1045 .get_mut(sequencer)
1046 .expect("journal does not exist");
1047
1048 journal.sync(section).await.expect("unable to sync journal");
1050
1051 let _ = journal.prune(section).await;
1053 }
1054}