1use super::{metrics, AckManager, Config, Mailbox, Message, TipManager};
11use crate::{
12 linked::{namespace, parsed, prover::Prover, serializer, Context, Epoch},
13 Application, Collector, ThresholdCoordinator,
14};
15use commonware_cryptography::{
16 bls12381::primitives::{
17 group::{self},
18 ops,
19 poly::{self},
20 },
21 Digest, Scheme,
22};
23use commonware_macros::select;
24use commonware_p2p::{Receiver, Recipients, Sender};
25use commonware_runtime::{
26 telemetry::{
27 histogram,
28 status::{CounterExt, Status},
29 },
30 Blob, Clock, Handle, Metrics, Spawner, Storage,
31};
32use commonware_storage::journal::{self, variable::Journal};
33use commonware_utils::futures::Pool as FuturesPool;
34use futures::{
35 channel::{mpsc, oneshot},
36 future::{self, Either},
37 pin_mut, StreamExt,
38};
39use std::{
40 collections::BTreeMap,
41 marker::PhantomData,
42 time::{Duration, SystemTime},
43};
44use thiserror::Error;
45use tracing::{debug, error, info, warn};
46
47struct Verify<C: Scheme, D: Digest, E: Clock> {
49 timer: histogram::Timer<E>,
50 context: Context<C::PublicKey>,
51 payload: D,
52 result: Result<bool, Error>,
53}
54
55pub struct Engine<
57 B: Blob,
58 E: Clock + Spawner + Storage<B> + Metrics,
59 C: Scheme,
60 D: Digest,
61 A: Application<Context = Context<C::PublicKey>, Digest = D> + Clone,
62 Z: Collector<Digest = D>,
63 S: ThresholdCoordinator<
64 Index = Epoch,
65 Share = group::Share,
66 Identity = poly::Public,
67 PublicKey = C::PublicKey,
68 >,
69 NetS: Sender<PublicKey = C::PublicKey>,
70 NetR: Receiver<PublicKey = C::PublicKey>,
71> {
72 context: E,
76 crypto: C,
77 coordinator: S,
78 application: A,
79 collector: Z,
80 _sender: PhantomData<NetS>,
81 _receiver: PhantomData<NetR>,
82
83 chunk_namespace: Vec<u8>,
89
90 ack_namespace: Vec<u8>,
92
93 refresh_epoch_timeout: Duration,
99 refresh_epoch_deadline: Option<SystemTime>,
100
101 rebroadcast_timeout: Duration,
103 rebroadcast_deadline: Option<SystemTime>,
104
105 epoch_bounds: (u64, u64),
117
118 height_bound: u64,
124
125 #[allow(clippy::type_complexity)]
133 pending_verifies: FuturesPool<Verify<C, D, E>>,
134
135 verify_concurrent: usize,
137
138 mailbox_receiver: mpsc::Receiver<Message<D>>,
140
141 journal_heights_per_section: u64,
147
148 journal_replay_concurrency: usize,
150
151 journal_name_prefix: String,
154
155 journals: BTreeMap<C::PublicKey, Journal<B, E>>,
157
158 tip_manager: TipManager<C, D>,
167
168 ack_manager: AckManager<D, C::PublicKey>,
171
172 epoch: Epoch,
174
175 metrics: metrics::Metrics<E>,
181
182 broadcast_timer: Option<histogram::Timer<E>>,
184}
185
186impl<
187 B: Blob,
188 E: Clock + Spawner + Storage<B> + Metrics,
189 C: Scheme,
190 D: Digest,
191 A: Application<Context = Context<C::PublicKey>, Digest = D> + Clone,
192 Z: Collector<Digest = D>,
193 S: ThresholdCoordinator<
194 Index = Epoch,
195 Share = group::Share,
196 Identity = poly::Public,
197 PublicKey = C::PublicKey,
198 >,
199 NetS: Sender<PublicKey = C::PublicKey>,
200 NetR: Receiver<PublicKey = C::PublicKey>,
201 > Engine<B, E, C, D, A, Z, S, NetS, NetR>
202{
203 pub fn new(context: E, cfg: Config<C, D, A, Z, S>) -> (Self, Mailbox<D>) {
206 let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
207 let mailbox = Mailbox::new(mailbox_sender);
208 let metrics = metrics::Metrics::init(context.clone());
209
210 let result = Self {
211 context,
212 crypto: cfg.crypto,
213 _sender: PhantomData,
214 _receiver: PhantomData,
215 coordinator: cfg.coordinator,
216 application: cfg.application,
217 collector: cfg.collector,
218 chunk_namespace: namespace::chunk(&cfg.namespace),
219 ack_namespace: namespace::ack(&cfg.namespace),
220 refresh_epoch_timeout: cfg.refresh_epoch_timeout,
221 refresh_epoch_deadline: None,
222 rebroadcast_timeout: cfg.rebroadcast_timeout,
223 rebroadcast_deadline: None,
224 epoch_bounds: cfg.epoch_bounds,
225 height_bound: cfg.height_bound,
226 pending_verifies: FuturesPool::default(),
227 verify_concurrent: cfg.verify_concurrent,
228 mailbox_receiver,
229 journal_heights_per_section: cfg.journal_heights_per_section,
230 journal_replay_concurrency: cfg.journal_replay_concurrency,
231 journal_name_prefix: cfg.journal_name_prefix,
232 journals: BTreeMap::new(),
233 tip_manager: TipManager::<C, D>::new(),
234 ack_manager: AckManager::<D, C::PublicKey>::new(),
235 epoch: 0,
236 metrics,
237 broadcast_timer: None,
238 };
239
240 (result, mailbox)
241 }
242
243 pub fn start(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) -> Handle<()> {
256 self.context.spawn_ref()(self.run(chunk_network, ack_network))
257 }
258
259 async fn run(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) {
261 let (mut node_sender, mut node_receiver) = chunk_network;
262 let (mut ack_sender, mut ack_receiver) = ack_network;
263 let mut shutdown = self.context.stopped();
264
265 self.refresh_epoch();
268 self.journal_prepare(&self.crypto.public_key()).await;
269 if let Err(err) = self.rebroadcast(&mut node_sender).await {
270 info!(?err, "initial rebroadcast failed");
272 }
273
274 loop {
275 self.refresh_epoch();
277
278 let refresh_epoch = match self.refresh_epoch_deadline {
281 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
282 None => Either::Right(future::pending()),
283 };
284 let rebroadcast = match self.rebroadcast_deadline {
285 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
286 None => Either::Right(future::pending()),
287 };
288
289 select! {
290 _ = &mut shutdown => {
292 debug!("shutdown");
293 self.pending_verifies.cancel_all();
294 while let Some((_, journal)) = self.journals.pop_first() {
295 journal.close().await.expect("unable to close journal");
296 }
297 return;
298 },
299
300 _ = refresh_epoch => {
302 debug!("refresh epoch");
303 continue;
305 },
306
307 _ = rebroadcast => {
309 debug!("rebroadcast");
310 if let Err(err) = self.rebroadcast(&mut node_sender).await {
311 info!(?err, "rebroadcast failed");
312 continue;
313 }
314 },
315
316 msg = node_receiver.recv() => {
318 debug!("node network");
319 let (sender, msg) = match msg {
321 Ok(r) => r,
322 Err(err) => {
323 error!(?err, "node receiver failed");
324 break;
325 }
326 };
327 let mut guard = self.metrics.nodes.guard(Status::Invalid);
328 let node = match parsed::Node::<C, D>::decode(&msg) {
329 Ok(node) => node,
330 Err(err) => {
331 warn!(?err, ?sender, "node decode failed");
332 continue;
333 }
334 };
335 if let Err(err) = self.validate_node(&node, &sender) {
336 warn!(?err, ?node, ?sender, "node validate failed");
337 continue;
338 };
339
340 self.journal_prepare(&sender).await;
342
343 if let Some(parent) = node.parent.as_ref() {
345 self.handle_threshold(&node.chunk, parent.epoch, parent.threshold).await;
346 }
347
348 self.handle_node(&node).await;
350 guard.set(Status::Success);
351 },
352
353 msg = ack_receiver.recv() => {
355 debug!("ack network");
356 let (sender, msg) = match msg {
358 Ok(r) => r,
359 Err(err) => {
360 warn!(?err, "ack receiver failed");
361 break;
362 }
363 };
364 let mut guard = self.metrics.acks.guard(Status::Invalid);
365 let ack = match parsed::Ack::decode(&msg) {
366 Ok(ack) => ack,
367 Err(err) => {
368 warn!(?err, ?sender, "ack decode failed");
369 continue;
370 }
371 };
372 if let Err(err) = self.validate_ack(&ack, &sender) {
373 warn!(?err, ?ack, ?sender, "ack validate failed");
374 continue;
375 };
376 if let Err(err) = self.handle_ack(&ack).await {
377 warn!(?err, ?ack, "ack handle failed");
378 guard.set(Status::Failure);
379 continue;
380 }
381 guard.set(Status::Success);
382 },
383
384 verify = self.pending_verifies.next_completed() => {
386 let Verify { timer, context, payload, result } = verify;
387 drop(timer); match result {
389 Err(err) => {
390 warn!(?err, ?context, ?payload, "verified returned error");
391 self.metrics.verify.inc(Status::Dropped);
392 }
393 Ok(false) => {
394 warn!(?context, ?payload, "verified was false");
395 self.metrics.verify.inc(Status::Failure);
396 }
397 Ok(true) => {
398 debug!(?context, ?payload, "verified");
399 self.metrics.verify.inc(Status::Success);
400 if let Err(err) = self.handle_app_verified(&context, &payload, &mut ack_sender).await {
401 warn!(?err, ?context, ?payload, "verified handle failed");
402 }
403 },
404 }
405 },
406
407 mail = self.mailbox_receiver.next() => {
409 let Some(msg) = mail else {
410 error!("mailbox receiver failed");
411 break;
412 };
413 match msg {
414 Message::Broadcast{ payload, result } => {
415 debug!("broadcast");
416 if self.coordinator.is_sequencer(self.epoch, &self.crypto.public_key()).is_none() {
417 warn!(epoch=?self.epoch, ?payload, "not a sequencer");
418 continue;
419 }
420
421 if let Err(err) = self.broadcast_new(payload, result, &mut node_sender).await {
423 warn!(?err, "broadcast new failed");
424 continue;
425 }
426 }
427 }
428 }
429 }
430 }
431 }
432
433 async fn handle_app_verified(
442 &mut self,
443 context: &Context<C::PublicKey>,
444 payload: &D,
445 ack_sender: &mut NetS,
446 ) -> Result<(), Error> {
447 let Some(tip) = self.tip_manager.get(&context.sequencer) else {
449 return Err(Error::AppVerifiedNoTip);
450 };
451
452 if tip.chunk.height != context.height {
454 return Err(Error::AppVerifiedHeightMismatch);
455 }
456
457 if tip.chunk.payload != *payload {
459 return Err(Error::AppVerifiedPayloadMismatch);
460 }
461
462 let Some(share) = self.coordinator.share(self.epoch) else {
464 return Err(Error::UnknownShare(self.epoch));
465 };
466 let partial = ops::partial_sign_message(
467 share,
468 Some(&self.ack_namespace),
469 &serializer::ack(&tip.chunk, self.epoch),
470 );
471
472 self.journal_sync(&context.sequencer, context.height).await;
475
476 let recipients = {
479 let Some(signers) = self.coordinator.signers(self.epoch) else {
480 return Err(Error::UnknownSigners(self.epoch));
481 };
482 let mut recipients = signers.clone();
483 if self
484 .coordinator
485 .is_signer(self.epoch, &tip.chunk.sequencer)
486 .is_none()
487 {
488 recipients.push(tip.chunk.sequencer.clone());
489 }
490 recipients
491 };
492
493 let ack = parsed::Ack {
495 chunk: tip.chunk,
496 epoch: self.epoch,
497 partial,
498 };
499 ack_sender
500 .send(Recipients::Some(recipients), ack.encode().into(), false)
501 .await
502 .map_err(|_| Error::UnableToSendMessage)?;
503
504 self.handle_ack(&ack).await?;
506
507 Ok(())
508 }
509
510 async fn handle_threshold(
516 &mut self,
517 chunk: &parsed::Chunk<D, C::PublicKey>,
518 epoch: Epoch,
519 threshold: group::Signature,
520 ) {
521 if !self
523 .ack_manager
524 .add_threshold(&chunk.sequencer, chunk.height, epoch, threshold)
525 {
526 return;
527 }
528
529 if chunk.sequencer == self.crypto.public_key() {
531 self.broadcast_timer.take();
532 }
533
534 let context = Context {
536 sequencer: chunk.sequencer.clone(),
537 height: chunk.height,
538 };
539 let proof =
540 Prover::<C, D>::serialize_threshold(&context, &chunk.payload, epoch, &threshold);
541 self.collector.acknowledged(proof, chunk.payload).await;
542 }
543
544 async fn handle_ack(&mut self, ack: &parsed::Ack<D, C::PublicKey>) -> Result<(), Error> {
549 let Some(identity) = self.coordinator.identity(ack.epoch) else {
551 return Err(Error::UnknownIdentity(ack.epoch));
552 };
553 let quorum = identity.required();
554
555 if let Some(threshold) = self.ack_manager.add_ack(ack, quorum) {
557 self.metrics.threshold.inc();
558 self.handle_threshold(&ack.chunk, ack.epoch, threshold)
559 .await;
560 }
561
562 Ok(())
563 }
564
565 async fn handle_node(&mut self, node: &parsed::Node<C, D>) {
569 let is_new = self.tip_manager.put(node);
571
572 if is_new {
574 self.metrics
576 .sequencer_heights
577 .get_or_create(&metrics::SequencerLabel::from(&node.chunk.sequencer))
578 .set(node.chunk.height as i64);
579
580 self.journal_append(node).await;
584 self.journal_sync(&node.chunk.sequencer, node.chunk.height)
585 .await;
586 }
587
588 let n = self.pending_verifies.len();
590 if n >= self.verify_concurrent {
591 warn!(?n, "too many pending verifies");
592 return;
593 }
594
595 let context = Context {
597 sequencer: node.chunk.sequencer.clone(),
598 height: node.chunk.height,
599 };
600 let payload = node.chunk.payload;
601 let mut application = self.application.clone();
602 let timer = self.metrics.verify_duration.timer();
603 self.pending_verifies.push(async move {
604 let receiver = application.verify(context.clone(), payload).await;
605 let result = receiver.await.map_err(Error::AppVerifyCanceled);
606 Verify {
607 timer,
608 context,
609 payload,
610 result,
611 }
612 });
613 }
614
615 async fn broadcast_new(
624 &mut self,
625 payload: D,
626 result: oneshot::Sender<bool>,
627 node_sender: &mut NetS,
628 ) -> Result<(), Error> {
629 let mut guard = self.metrics.new_broadcast.guard(Status::Dropped);
630 let me = self.crypto.public_key();
631
632 let mut height = 0;
634 let mut parent = None;
635 if let Some(tip) = self.tip_manager.get(&me) {
636 let Some((epoch, threshold)) = self.ack_manager.get_threshold(&me, tip.chunk.height)
638 else {
639 let _ = result.send(false);
640 return Err(Error::NoThresholdForTip(tip.chunk.height));
641 };
642
643 height = tip.chunk.height + 1;
645 parent = Some(parsed::Parent {
646 payload: tip.chunk.payload,
647 threshold,
648 epoch,
649 });
650 }
651
652 let chunk = parsed::Chunk {
654 sequencer: me.clone(),
655 height,
656 payload,
657 };
658 let signature = self
659 .crypto
660 .sign(Some(&self.chunk_namespace), &serializer::chunk(&chunk));
661 let node = parsed::Node::<C, D> {
662 chunk,
663 signature,
664 parent,
665 };
666
667 self.handle_node(&node).await;
669
670 self.journal_sync(&me, height).await;
673
674 self.broadcast_timer = Some(self.metrics.e2e_duration.timer());
676
677 if let Err(err) = self.broadcast(&node, node_sender, self.epoch).await {
679 let _ = result.send(false);
680 guard.set(Status::Failure);
681 return Err(err);
682 };
683
684 let _ = result.send(true);
686 guard.set(Status::Success);
687 Ok(())
688 }
689
690 async fn rebroadcast(&mut self, node_sender: &mut NetS) -> Result<(), Error> {
697 let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
698
699 self.rebroadcast_deadline = None;
701
702 let me = self.crypto.public_key();
704 if self.coordinator.is_sequencer(self.epoch, &me).is_none() {
705 return Err(Error::IAmNotASequencer(self.epoch));
706 }
707
708 let Some(tip) = self.tip_manager.get(&me) else {
710 return Err(Error::NothingToRebroadcast);
711 };
712
713 if self
715 .ack_manager
716 .get_threshold(&me, tip.chunk.height)
717 .is_some()
718 {
719 return Err(Error::AlreadyBroadcast);
720 }
721
722 guard.set(Status::Failure);
724 self.broadcast(&tip, node_sender, self.epoch).await?;
725 guard.set(Status::Success);
726 Ok(())
727 }
728
729 async fn broadcast(
731 &mut self,
732 node: &parsed::Node<C, D>,
733 node_sender: &mut NetS,
734 epoch: Epoch,
735 ) -> Result<(), Error> {
736 let Some(signers) = self.coordinator.signers(epoch) else {
738 return Err(Error::UnknownSigners(epoch));
739 };
740 node_sender
741 .send(
742 Recipients::Some(signers.clone()),
743 node.encode().into(),
744 false,
745 )
746 .await
747 .map_err(|_| Error::BroadcastFailed)?;
748
749 self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
751
752 Ok(())
753 }
754
755 fn validate_node(
764 &mut self,
765 node: &parsed::Node<C, D>,
766 sender: &C::PublicKey,
767 ) -> Result<(), Error> {
768 if node.chunk.sequencer != *sender {
770 return Err(Error::PeerMismatch);
771 }
772
773 if let Some(tip) = self.tip_manager.get(sender) {
776 if tip == *node {
777 return Ok(());
778 }
779 }
780
781 self.validate_chunk(&node.chunk, self.epoch)?;
783
784 if !C::verify(
786 Some(&self.chunk_namespace),
787 &serializer::chunk(&node.chunk),
788 sender,
789 &node.signature,
790 ) {
791 return Err(Error::InvalidNodeSignature);
792 }
793
794 if node.chunk.height == 0 {
796 if node.parent.is_some() {
797 return Err(Error::GenesisChunkMustNotHaveParent);
798 }
799 return Ok(());
800 }
801
802 let Some(parent) = &node.parent else {
804 return Err(Error::NodeMissingParent);
805 };
806 let parent_chunk = parsed::Chunk {
807 sequencer: sender.clone(),
808 height: node.chunk.height.checked_sub(1).unwrap(),
809 payload: parent.payload,
810 };
811
812 let Some(identity) = self.coordinator.identity(parent.epoch) else {
814 return Err(Error::UnknownIdentity(parent.epoch));
815 };
816 let public_key = poly::public(identity);
817 ops::verify_message(
818 &public_key,
819 Some(&self.ack_namespace),
820 &serializer::ack(&parent_chunk, parent.epoch),
821 &parent.threshold,
822 )
823 .map_err(|_| Error::InvalidThresholdSignature)?;
824
825 Ok(())
826 }
827
828 fn validate_ack(
833 &self,
834 ack: &parsed::Ack<D, C::PublicKey>,
835 sender: &C::PublicKey,
836 ) -> Result<(), Error> {
837 self.validate_chunk(&ack.chunk, ack.epoch)?;
839
840 let Some(signer_index) = self.coordinator.is_signer(ack.epoch, sender) else {
842 return Err(Error::UnknownSigner(ack.epoch, sender.to_string()));
843 };
844 if signer_index != ack.partial.index {
845 return Err(Error::PeerMismatch);
846 }
847
848 {
850 let (eb_lo, eb_hi) = self.epoch_bounds;
851 let bound_lo = self.epoch.saturating_sub(eb_lo);
852 let bound_hi = self.epoch.saturating_add(eb_hi);
853 if ack.epoch < bound_lo || ack.epoch > bound_hi {
854 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
855 }
856 }
857
858 {
860 let bound_lo = self
861 .tip_manager
862 .get(&ack.chunk.sequencer)
863 .map(|t| t.chunk.height)
864 .unwrap_or(0);
865 let bound_hi = bound_lo + self.height_bound;
866 if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
867 return Err(Error::AckHeightOutsideBounds(
868 ack.chunk.height,
869 bound_lo,
870 bound_hi,
871 ));
872 }
873 }
874
875 let Some(identity) = self.coordinator.identity(ack.epoch) else {
878 return Err(Error::UnknownIdentity(ack.epoch));
879 };
880 ops::partial_verify_message(
881 identity,
882 Some(&self.ack_namespace),
883 &serializer::ack(&ack.chunk, ack.epoch),
884 &ack.partial,
885 )
886 .map_err(|_| Error::InvalidPartialSignature)?;
887
888 Ok(())
889 }
890
891 fn validate_chunk(
896 &self,
897 chunk: &parsed::Chunk<D, C::PublicKey>,
898 epoch: Epoch,
899 ) -> Result<(), Error> {
900 if self
902 .coordinator
903 .is_sequencer(epoch, &chunk.sequencer)
904 .is_none()
905 {
906 return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
907 }
908
909 if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
911 match chunk.height.cmp(&tip.chunk.height) {
913 std::cmp::Ordering::Less => {
914 return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
915 }
916 std::cmp::Ordering::Equal => {
917 if tip.chunk.payload != chunk.payload {
919 return Err(Error::ChunkMismatch(
920 chunk.sequencer.to_string(),
921 chunk.height,
922 ));
923 }
924 }
925 std::cmp::Ordering::Greater => {}
926 }
927 }
928
929 Ok(())
930 }
931
932 fn get_journal_section(&self, height: u64) -> u64 {
938 height / self.journal_heights_per_section
939 }
940
941 async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
945 if self.journals.contains_key(sequencer) {
947 return;
948 }
949
950 let cfg = journal::variable::Config {
952 partition: format!("{}{}", &self.journal_name_prefix, sequencer),
953 };
954 let mut journal = Journal::init(self.context.clone(), cfg)
955 .await
956 .expect("unable to init journal");
957
958 {
960 debug!(?sequencer, "journal replay begin");
961
962 let stream = journal
964 .replay(self.journal_replay_concurrency, None)
965 .await
966 .expect("unable to replay journal");
967 pin_mut!(stream);
968
969 let mut tip: Option<parsed::Node<C, D>> = None;
972 let mut num_items = 0;
973 while let Some(msg) = stream.next().await {
974 num_items += 1;
975 let (_, _, _, msg) = msg.expect("unable to decode journal message");
976 let node = parsed::Node::<C, D>::decode(&msg)
977 .expect("journal message is unexpected format");
978 let height = node.chunk.height;
979 match tip {
980 None => {
981 tip = Some(node);
982 }
983 Some(ref t) => {
984 if height > t.chunk.height {
985 tip = Some(node);
986 }
987 }
988 }
989 }
990
991 if let Some(node) = tip.take() {
994 let is_new = self.tip_manager.put(&node);
995 assert!(is_new);
996 }
997
998 debug!(?sequencer, ?num_items, "journal replay end");
999 }
1000
1001 self.journals.insert(sequencer.clone(), journal);
1003 }
1004
1005 async fn journal_append(&mut self, node: &parsed::Node<C, D>) {
1010 let section = self.get_journal_section(node.chunk.height);
1011 self.journals
1012 .get_mut(&node.chunk.sequencer)
1013 .expect("journal does not exist")
1014 .append(section, node.encode().into())
1015 .await
1016 .expect("unable to append to journal");
1017 }
1018
1019 async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: u64) {
1021 let section = self.get_journal_section(height);
1022
1023 let journal = self
1025 .journals
1026 .get_mut(sequencer)
1027 .expect("journal does not exist");
1028
1029 journal.sync(section).await.expect("unable to sync journal");
1031
1032 let _ = journal.prune(section).await;
1034 }
1035
1036 fn refresh_epoch(&mut self) {
1042 self.refresh_epoch_deadline = Some(self.context.current() + self.refresh_epoch_timeout);
1044
1045 let epoch = self.coordinator.index();
1047 assert!(epoch >= self.epoch);
1048
1049 self.epoch = epoch;
1051 }
1052}
1053
1054#[derive(Error, Debug)]
1056enum Error {
1057 #[error("Application verify error: {0}")]
1059 AppVerifyCanceled(oneshot::Canceled),
1060 #[error("Application verified no tip")]
1061 AppVerifiedNoTip,
1062 #[error("Application verified height mismatch")]
1063 AppVerifiedHeightMismatch,
1064 #[error("Application verified payload mismatch")]
1065 AppVerifiedPayloadMismatch,
1066
1067 #[error("Unable to send message")]
1069 UnableToSendMessage,
1070
1071 #[error("Already broadcast")]
1073 AlreadyBroadcast,
1074 #[error("I am not a sequencer in epoch {0}")]
1075 IAmNotASequencer(u64),
1076 #[error("Nothing to rebroadcast")]
1077 NothingToRebroadcast,
1078 #[error("Broadcast failed")]
1079 BroadcastFailed,
1080 #[error("No threshold for tip")]
1081 NoThresholdForTip(u64),
1082
1083 #[error("Genesis chunk must not have a parent")]
1085 GenesisChunkMustNotHaveParent,
1086 #[error("Node missing parent")]
1087 NodeMissingParent,
1088
1089 #[error("Unknown identity at epoch {0}")]
1091 UnknownIdentity(u64),
1092 #[error("Unknown signers at epoch {0}")]
1093 UnknownSigners(u64),
1094 #[error("Epoch {0} has no sequencer {1}")]
1095 UnknownSequencer(u64, String),
1096 #[error("Epoch {0} has no signer {1}")]
1097 UnknownSigner(u64, String),
1098 #[error("Unknown share at epoch {0}")]
1099 UnknownShare(u64),
1100
1101 #[error("Peer mismatch")]
1103 PeerMismatch,
1104
1105 #[error("Invalid threshold signature")]
1107 InvalidThresholdSignature,
1108 #[error("Invalid partial signature")]
1109 InvalidPartialSignature,
1110 #[error("Invalid node signature")]
1111 InvalidNodeSignature,
1112
1113 #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
1115 AckEpochOutsideBounds(u64, u64, u64),
1116 #[error("Invalid ack height {0} outside bounds {1} - {2}")]
1117 AckHeightOutsideBounds(u64, u64, u64),
1118 #[error("Chunk height {0} lower than tip height {1}")]
1119 ChunkHeightTooLow(u64, u64),
1120
1121 #[error("Chunk mismatch from sender {0} with height {1}")]
1123 ChunkMismatch(String, u64),
1124}