1use super::{metrics, AckManager, Config, Mailbox, Message, TipManager};
11use crate::{
12 linked::{namespace, parsed, prover::Prover, serializer, Context, Epoch},
13 Application, Collector, ThresholdCoordinator,
14};
15use commonware_cryptography::{
16 bls12381::primitives::{
17 group::{self},
18 ops,
19 poly::{self},
20 },
21 Scheme,
22};
23use commonware_macros::select;
24use commonware_p2p::{Receiver, Recipients, Sender};
25use commonware_runtime::{
26 telemetry::{
27 histogram,
28 status::{CounterExt, Status},
29 },
30 Blob, Clock, Handle, Metrics, Spawner, Storage,
31};
32use commonware_storage::journal::{self, variable::Journal};
33use commonware_utils::{futures::Pool as FuturesPool, Array};
34use futures::{
35 channel::{mpsc, oneshot},
36 future::{self, Either},
37 pin_mut, StreamExt,
38};
39use std::{
40 collections::BTreeMap,
41 marker::PhantomData,
42 time::{Duration, SystemTime},
43};
44use thiserror::Error;
45use tracing::{debug, error, info, warn};
46
47struct Verify<C: Scheme, D: Array, E: Clock> {
49 timer: histogram::Timer<E>,
50 context: Context<C::PublicKey>,
51 payload: D,
52 result: Result<bool, Error>,
53}
54
55pub struct Engine<
57 B: Blob,
58 E: Clock + Spawner + Storage<B> + Metrics,
59 C: Scheme,
60 D: Array,
61 A: Application<Context = Context<C::PublicKey>, Digest = D> + Clone,
62 Z: Collector<Digest = D>,
63 S: ThresholdCoordinator<
64 Index = Epoch,
65 Share = group::Share,
66 Identity = poly::Public,
67 PublicKey = C::PublicKey,
68 >,
69 NetS: Sender<PublicKey = C::PublicKey>,
70 NetR: Receiver<PublicKey = C::PublicKey>,
71> {
72 context: E,
76 crypto: C,
77 coordinator: S,
78 application: A,
79 collector: Z,
80 _sender: PhantomData<NetS>,
81 _receiver: PhantomData<NetR>,
82
83 chunk_namespace: Vec<u8>,
89
90 ack_namespace: Vec<u8>,
92
93 refresh_epoch_timeout: Duration,
99 refresh_epoch_deadline: Option<SystemTime>,
100
101 rebroadcast_timeout: Duration,
103 rebroadcast_deadline: Option<SystemTime>,
104
105 epoch_bounds: (u64, u64),
117
118 height_bound: u64,
124
125 #[allow(clippy::type_complexity)]
133 pending_verifies: FuturesPool<Verify<C, D, E>>,
134
135 verify_concurrent: usize,
137
138 mailbox_receiver: mpsc::Receiver<Message<D>>,
140
141 journal_heights_per_section: u64,
147
148 journal_replay_concurrency: usize,
150
151 journal_name_prefix: String,
154
155 journals: BTreeMap<C::PublicKey, Journal<B, E>>,
157
158 tip_manager: TipManager<C, D>,
167
168 ack_manager: AckManager<D, C::PublicKey>,
171
172 epoch: Epoch,
174
175 metrics: metrics::Metrics<E>,
181
182 broadcast_timer: Option<histogram::Timer<E>>,
184}
185
186impl<
187 B: Blob,
188 E: Clock + Spawner + Storage<B> + Metrics,
189 C: Scheme,
190 D: Array,
191 A: Application<Context = Context<C::PublicKey>, Digest = D> + Clone,
192 Z: Collector<Digest = D>,
193 S: ThresholdCoordinator<
194 Index = Epoch,
195 Share = group::Share,
196 Identity = poly::Public,
197 PublicKey = C::PublicKey,
198 >,
199 NetS: Sender<PublicKey = C::PublicKey>,
200 NetR: Receiver<PublicKey = C::PublicKey>,
201 > Engine<B, E, C, D, A, Z, S, NetS, NetR>
202{
203 pub fn new(context: E, cfg: Config<C, D, A, Z, S>) -> (Self, Mailbox<D>) {
206 let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
207 let mailbox = Mailbox::new(mailbox_sender);
208 let metrics = metrics::Metrics::init(context.clone());
209
210 let result = Self {
211 context,
212 crypto: cfg.crypto,
213 _sender: PhantomData,
214 _receiver: PhantomData,
215 coordinator: cfg.coordinator,
216 application: cfg.application,
217 collector: cfg.collector,
218 chunk_namespace: namespace::chunk(&cfg.namespace),
219 ack_namespace: namespace::ack(&cfg.namespace),
220 refresh_epoch_timeout: cfg.refresh_epoch_timeout,
221 refresh_epoch_deadline: None,
222 rebroadcast_timeout: cfg.rebroadcast_timeout,
223 rebroadcast_deadline: None,
224 epoch_bounds: cfg.epoch_bounds,
225 height_bound: cfg.height_bound,
226 pending_verifies: FuturesPool::default(),
227 verify_concurrent: cfg.verify_concurrent,
228 mailbox_receiver,
229 journal_heights_per_section: cfg.journal_heights_per_section,
230 journal_replay_concurrency: cfg.journal_replay_concurrency,
231 journal_name_prefix: cfg.journal_name_prefix,
232 journals: BTreeMap::new(),
233 tip_manager: TipManager::<C, D>::new(),
234 ack_manager: AckManager::<D, C::PublicKey>::new(),
235 epoch: 0,
236 metrics,
237 broadcast_timer: None,
238 };
239
240 (result, mailbox)
241 }
242
243 pub fn start(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) -> Handle<()> {
256 self.context.spawn_ref()(self.run(chunk_network, ack_network))
257 }
258
259 async fn run(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) {
261 let (mut node_sender, mut node_receiver) = chunk_network;
262 let (mut ack_sender, mut ack_receiver) = ack_network;
263 let mut shutdown = self.context.stopped();
264
265 self.refresh_epoch();
268 self.journal_prepare(&self.crypto.public_key()).await;
269 if let Err(err) = self.rebroadcast(&mut node_sender).await {
270 info!(?err, "initial rebroadcast failed");
272 }
273
274 loop {
275 self.refresh_epoch();
277
278 let refresh_epoch = match self.refresh_epoch_deadline {
281 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
282 None => Either::Right(future::pending()),
283 };
284 let rebroadcast = match self.rebroadcast_deadline {
285 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
286 None => Either::Right(future::pending()),
287 };
288
289 select! {
290 _ = &mut shutdown => {
292 debug!("shutdown");
293 self.pending_verifies.cancel_all();
294 while let Some((_, journal)) = self.journals.pop_first() {
295 journal.close().await.expect("unable to close journal");
296 }
297 return;
298 },
299
300 _ = refresh_epoch => {
302 debug!("refresh epoch");
303 continue;
305 },
306
307 _ = rebroadcast => {
309 debug!("rebroadcast");
310 if let Err(err) = self.rebroadcast(&mut node_sender).await {
311 info!(?err, "rebroadcast failed");
312 continue;
313 }
314 },
315
316 msg = node_receiver.recv() => {
318 debug!("node network");
319 let (sender, msg) = match msg {
321 Ok(r) => r,
322 Err(err) => {
323 error!(?err, "node receiver failed");
324 break;
325 }
326 };
327 let mut guard = self.metrics.nodes.guard(Status::Invalid);
328 let node = match parsed::Node::<C, D>::decode(&msg) {
329 Ok(node) => node,
330 Err(err) => {
331 warn!(?err, ?sender, "node decode failed");
332 continue;
333 }
334 };
335 if let Err(err) = self.validate_node(&node, &sender) {
336 warn!(?err, ?node, ?sender, "node validate failed");
337 continue;
338 };
339
340 self.journal_prepare(&sender).await;
342
343 if let Some(parent) = node.parent.as_ref() {
345 self.handle_threshold(&node.chunk, parent.epoch, parent.threshold).await;
346 }
347
348 self.handle_node(&node).await;
350 guard.set(Status::Success);
351 },
352
353 msg = ack_receiver.recv() => {
355 debug!("ack network");
356 let (sender, msg) = match msg {
358 Ok(r) => r,
359 Err(err) => {
360 warn!(?err, "ack receiver failed");
361 break;
362 }
363 };
364 let mut guard = self.metrics.acks.guard(Status::Invalid);
365 let ack = match parsed::Ack::decode(&msg) {
366 Ok(ack) => ack,
367 Err(err) => {
368 warn!(?err, ?sender, "ack decode failed");
369 continue;
370 }
371 };
372 if let Err(err) = self.validate_ack(&ack, &sender) {
373 warn!(?err, ?ack, ?sender, "ack validate failed");
374 continue;
375 };
376 if let Err(err) = self.handle_ack(&ack).await {
377 warn!(?err, ?ack, "ack handle failed");
378 guard.set(Status::Failure);
379 continue;
380 }
381 guard.set(Status::Success);
382 },
383
384 verify = self.pending_verifies.next_completed() => {
386 let Verify { timer, context, payload, result } = verify;
387 drop(timer); match result {
389 Err(err) => {
390 warn!(?err, ?context, ?payload, "verified returned error");
391 self.metrics.verify.inc(Status::Dropped);
392 }
393 Ok(false) => {
394 warn!(?context, ?payload, "verified was false");
395 self.metrics.verify.inc(Status::Failure);
396 }
397 Ok(true) => {
398 debug!(?context, ?payload, "verified");
399 self.metrics.verify.inc(Status::Success);
400 if let Err(err) = self.handle_app_verified(&context, &payload, &mut ack_sender).await {
401 warn!(?err, ?context, ?payload, "verified handle failed");
402 }
403 },
404 }
405 },
406
407 mail = self.mailbox_receiver.next() => {
409 let Some(msg) = mail else {
410 error!("mailbox receiver failed");
411 break;
412 };
413 match msg {
414 Message::Broadcast{ payload, result } => {
415 debug!("broadcast");
416 if self.coordinator.is_sequencer(self.epoch, &self.crypto.public_key()).is_none() {
417 warn!(epoch=?self.epoch, ?payload, "not a sequencer");
418 continue;
419 }
420
421 if let Err(err) = self.broadcast_new(payload, result, &mut node_sender).await {
423 warn!(?err, "broadcast new failed");
424 continue;
425 }
426 }
427 }
428 }
429 }
430 }
431 }
432
433 async fn handle_app_verified(
442 &mut self,
443 context: &Context<C::PublicKey>,
444 payload: &D,
445 ack_sender: &mut NetS,
446 ) -> Result<(), Error> {
447 let Some(tip) = self.tip_manager.get(&context.sequencer) else {
449 return Err(Error::AppVerifiedNoTip);
450 };
451
452 if tip.chunk.height != context.height {
454 return Err(Error::AppVerifiedHeightMismatch);
455 }
456
457 if tip.chunk.payload != *payload {
459 return Err(Error::AppVerifiedPayloadMismatch);
460 }
461
462 let Some(share) = self.coordinator.share(self.epoch) else {
464 return Err(Error::UnknownShare(self.epoch));
465 };
466 let partial = ops::partial_sign_message(
467 share,
468 Some(&self.ack_namespace),
469 &serializer::ack(&tip.chunk, self.epoch),
470 );
471
472 self.journal_sync(&context.sequencer, context.height).await;
475
476 let recipients = {
479 let Some(signers) = self.coordinator.signers(self.epoch) else {
480 return Err(Error::UnknownSigners(self.epoch));
481 };
482 let mut recipients = signers.clone();
483 if self
484 .coordinator
485 .is_signer(self.epoch, &tip.chunk.sequencer)
486 .is_none()
487 {
488 recipients.push(tip.chunk.sequencer.clone());
489 }
490 recipients
491 };
492
493 let ack = parsed::Ack {
495 chunk: tip.chunk,
496 epoch: self.epoch,
497 partial,
498 };
499 ack_sender
500 .send(Recipients::Some(recipients), ack.encode().into(), false)
501 .await
502 .map_err(|_| Error::UnableToSendMessage)?;
503
504 self.handle_ack(&ack).await?;
506
507 Ok(())
508 }
509
510 async fn handle_threshold(
516 &mut self,
517 chunk: &parsed::Chunk<D, C::PublicKey>,
518 epoch: Epoch,
519 threshold: group::Signature,
520 ) {
521 if !self
523 .ack_manager
524 .add_threshold(&chunk.sequencer, chunk.height, epoch, threshold)
525 {
526 return;
527 }
528
529 if chunk.sequencer == self.crypto.public_key() {
531 self.broadcast_timer.take();
532 }
533
534 let context = Context {
536 sequencer: chunk.sequencer.clone(),
537 height: chunk.height,
538 };
539 let proof =
540 Prover::<C, D>::serialize_threshold(&context, &chunk.payload, epoch, &threshold);
541 self.collector
542 .acknowledged(proof, chunk.payload.clone())
543 .await;
544 }
545
546 async fn handle_ack(&mut self, ack: &parsed::Ack<D, C::PublicKey>) -> Result<(), Error> {
551 let Some(identity) = self.coordinator.identity(ack.epoch) else {
553 return Err(Error::UnknownIdentity(ack.epoch));
554 };
555 let quorum = identity.required();
556
557 if let Some(threshold) = self.ack_manager.add_ack(ack, quorum) {
559 self.metrics.threshold.inc();
560 self.handle_threshold(&ack.chunk, ack.epoch, threshold)
561 .await;
562 }
563
564 Ok(())
565 }
566
567 async fn handle_node(&mut self, node: &parsed::Node<C, D>) {
571 let is_new = self.tip_manager.put(node);
573
574 if is_new {
576 self.metrics
578 .sequencer_heights
579 .get_or_create(&metrics::SequencerLabel::from(&node.chunk.sequencer))
580 .set(node.chunk.height as i64);
581
582 self.journal_append(node).await;
586 self.journal_sync(&node.chunk.sequencer, node.chunk.height)
587 .await;
588 }
589
590 let n = self.pending_verifies.len();
592 if n >= self.verify_concurrent {
593 warn!(?n, "too many pending verifies");
594 return;
595 }
596
597 let context = Context {
599 sequencer: node.chunk.sequencer.clone(),
600 height: node.chunk.height,
601 };
602 let payload = node.chunk.payload.clone();
603 let mut application = self.application.clone();
604 let timer = self.metrics.verify_duration.timer();
605 self.pending_verifies.push(async move {
606 let receiver = application.verify(context.clone(), payload.clone()).await;
607 let result = receiver.await.map_err(Error::AppVerifyCanceled);
608 Verify {
609 timer,
610 context,
611 payload,
612 result,
613 }
614 });
615 }
616
617 async fn broadcast_new(
626 &mut self,
627 payload: D,
628 result: oneshot::Sender<bool>,
629 node_sender: &mut NetS,
630 ) -> Result<(), Error> {
631 let mut guard = self.metrics.new_broadcast.guard(Status::Dropped);
632 let me = self.crypto.public_key();
633
634 let mut height = 0;
636 let mut parent = None;
637 if let Some(tip) = self.tip_manager.get(&me) {
638 let Some((epoch, threshold)) = self.ack_manager.get_threshold(&me, tip.chunk.height)
640 else {
641 let _ = result.send(false);
642 return Err(Error::NoThresholdForTip(tip.chunk.height));
643 };
644
645 height = tip.chunk.height + 1;
647 parent = Some(parsed::Parent {
648 payload: tip.chunk.payload,
649 threshold,
650 epoch,
651 });
652 }
653
654 let chunk = parsed::Chunk {
656 sequencer: me.clone(),
657 height,
658 payload,
659 };
660 let signature = self
661 .crypto
662 .sign(Some(&self.chunk_namespace), &serializer::chunk(&chunk));
663 let node = parsed::Node::<C, D> {
664 chunk,
665 signature,
666 parent,
667 };
668
669 self.handle_node(&node).await;
671
672 self.journal_sync(&me, height).await;
675
676 self.broadcast_timer = Some(self.metrics.e2e_duration.timer());
678
679 if let Err(err) = self.broadcast(&node, node_sender, self.epoch).await {
681 let _ = result.send(false);
682 guard.set(Status::Failure);
683 return Err(err);
684 };
685
686 let _ = result.send(true);
688 guard.set(Status::Success);
689 Ok(())
690 }
691
692 async fn rebroadcast(&mut self, node_sender: &mut NetS) -> Result<(), Error> {
699 let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
700
701 self.rebroadcast_deadline = None;
703
704 let me = self.crypto.public_key();
706 if self.coordinator.is_sequencer(self.epoch, &me).is_none() {
707 return Err(Error::IAmNotASequencer(self.epoch));
708 }
709
710 let Some(tip) = self.tip_manager.get(&me) else {
712 return Err(Error::NothingToRebroadcast);
713 };
714
715 if self
717 .ack_manager
718 .get_threshold(&me, tip.chunk.height)
719 .is_some()
720 {
721 return Err(Error::AlreadyBroadcast);
722 }
723
724 guard.set(Status::Failure);
726 self.broadcast(&tip, node_sender, self.epoch).await?;
727 guard.set(Status::Success);
728 Ok(())
729 }
730
731 async fn broadcast(
733 &mut self,
734 node: &parsed::Node<C, D>,
735 node_sender: &mut NetS,
736 epoch: Epoch,
737 ) -> Result<(), Error> {
738 let Some(signers) = self.coordinator.signers(epoch) else {
740 return Err(Error::UnknownSigners(epoch));
741 };
742 node_sender
743 .send(
744 Recipients::Some(signers.clone()),
745 node.encode().into(),
746 false,
747 )
748 .await
749 .map_err(|_| Error::BroadcastFailed)?;
750
751 self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
753
754 Ok(())
755 }
756
757 fn validate_node(
766 &mut self,
767 node: &parsed::Node<C, D>,
768 sender: &C::PublicKey,
769 ) -> Result<(), Error> {
770 if node.chunk.sequencer != *sender {
772 return Err(Error::PeerMismatch);
773 }
774
775 if let Some(tip) = self.tip_manager.get(sender) {
778 if tip == *node {
779 return Ok(());
780 }
781 }
782
783 self.validate_chunk(&node.chunk, self.epoch)?;
785
786 if !C::verify(
788 Some(&self.chunk_namespace),
789 &serializer::chunk(&node.chunk),
790 sender,
791 &node.signature,
792 ) {
793 return Err(Error::InvalidNodeSignature);
794 }
795
796 if node.chunk.height == 0 {
798 if node.parent.is_some() {
799 return Err(Error::GenesisChunkMustNotHaveParent);
800 }
801 return Ok(());
802 }
803
804 let Some(parent) = &node.parent else {
806 return Err(Error::NodeMissingParent);
807 };
808 let parent_chunk = parsed::Chunk {
809 sequencer: sender.clone(),
810 height: node.chunk.height.checked_sub(1).unwrap(),
811 payload: parent.payload.clone(),
812 };
813
814 let Some(identity) = self.coordinator.identity(parent.epoch) else {
816 return Err(Error::UnknownIdentity(parent.epoch));
817 };
818 let public_key = poly::public(identity);
819 ops::verify_message(
820 &public_key,
821 Some(&self.ack_namespace),
822 &serializer::ack(&parent_chunk, parent.epoch),
823 &parent.threshold,
824 )
825 .map_err(|_| Error::InvalidThresholdSignature)?;
826
827 Ok(())
828 }
829
830 fn validate_ack(
835 &self,
836 ack: &parsed::Ack<D, C::PublicKey>,
837 sender: &C::PublicKey,
838 ) -> Result<(), Error> {
839 self.validate_chunk(&ack.chunk, ack.epoch)?;
841
842 let Some(signer_index) = self.coordinator.is_signer(ack.epoch, sender) else {
844 return Err(Error::UnknownSigner(ack.epoch, sender.to_string()));
845 };
846 if signer_index != ack.partial.index {
847 return Err(Error::PeerMismatch);
848 }
849
850 {
852 let (eb_lo, eb_hi) = self.epoch_bounds;
853 let bound_lo = self.epoch.saturating_sub(eb_lo);
854 let bound_hi = self.epoch.saturating_add(eb_hi);
855 if ack.epoch < bound_lo || ack.epoch > bound_hi {
856 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
857 }
858 }
859
860 {
862 let bound_lo = self
863 .tip_manager
864 .get(&ack.chunk.sequencer)
865 .map(|t| t.chunk.height)
866 .unwrap_or(0);
867 let bound_hi = bound_lo + self.height_bound;
868 if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
869 return Err(Error::AckHeightOutsideBounds(
870 ack.chunk.height,
871 bound_lo,
872 bound_hi,
873 ));
874 }
875 }
876
877 let Some(identity) = self.coordinator.identity(ack.epoch) else {
880 return Err(Error::UnknownIdentity(ack.epoch));
881 };
882 ops::partial_verify_message(
883 identity,
884 Some(&self.ack_namespace),
885 &serializer::ack(&ack.chunk, ack.epoch),
886 &ack.partial,
887 )
888 .map_err(|_| Error::InvalidPartialSignature)?;
889
890 Ok(())
891 }
892
893 fn validate_chunk(
898 &self,
899 chunk: &parsed::Chunk<D, C::PublicKey>,
900 epoch: Epoch,
901 ) -> Result<(), Error> {
902 if self
904 .coordinator
905 .is_sequencer(epoch, &chunk.sequencer)
906 .is_none()
907 {
908 return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
909 }
910
911 if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
913 match chunk.height.cmp(&tip.chunk.height) {
915 std::cmp::Ordering::Less => {
916 return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
917 }
918 std::cmp::Ordering::Equal => {
919 if tip.chunk.payload != chunk.payload {
921 return Err(Error::ChunkMismatch(
922 chunk.sequencer.to_string(),
923 chunk.height,
924 ));
925 }
926 }
927 std::cmp::Ordering::Greater => {}
928 }
929 }
930
931 Ok(())
932 }
933
934 fn get_journal_section(&self, height: u64) -> u64 {
940 height / self.journal_heights_per_section
941 }
942
943 async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
947 if self.journals.contains_key(sequencer) {
949 return;
950 }
951
952 let cfg = journal::variable::Config {
954 partition: format!("{}{}", &self.journal_name_prefix, sequencer),
955 };
956 let mut journal = Journal::init(self.context.clone(), cfg)
957 .await
958 .expect("unable to init journal");
959
960 {
962 debug!(?sequencer, "journal replay begin");
963
964 let stream = journal
966 .replay(self.journal_replay_concurrency, None)
967 .await
968 .expect("unable to replay journal");
969 pin_mut!(stream);
970
971 let mut tip: Option<parsed::Node<C, D>> = None;
974 let mut num_items = 0;
975 while let Some(msg) = stream.next().await {
976 num_items += 1;
977 let (_, _, _, msg) = msg.expect("unable to decode journal message");
978 let node = parsed::Node::<C, D>::decode(&msg)
979 .expect("journal message is unexpected format");
980 let height = node.chunk.height;
981 match tip {
982 None => {
983 tip = Some(node);
984 }
985 Some(ref t) => {
986 if height > t.chunk.height {
987 tip = Some(node);
988 }
989 }
990 }
991 }
992
993 if let Some(node) = tip.take() {
996 let is_new = self.tip_manager.put(&node);
997 assert!(is_new);
998 }
999
1000 debug!(?sequencer, ?num_items, "journal replay end");
1001 }
1002
1003 self.journals.insert(sequencer.clone(), journal);
1005 }
1006
1007 async fn journal_append(&mut self, node: &parsed::Node<C, D>) {
1012 let section = self.get_journal_section(node.chunk.height);
1013 self.journals
1014 .get_mut(&node.chunk.sequencer)
1015 .expect("journal does not exist")
1016 .append(section, node.encode().into())
1017 .await
1018 .expect("unable to append to journal");
1019 }
1020
1021 async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: u64) {
1023 let section = self.get_journal_section(height);
1024
1025 let journal = self
1027 .journals
1028 .get_mut(sequencer)
1029 .expect("journal does not exist");
1030
1031 journal.sync(section).await.expect("unable to sync journal");
1033
1034 let _ = journal.prune(section).await;
1036 }
1037
1038 fn refresh_epoch(&mut self) {
1044 self.refresh_epoch_deadline = Some(self.context.current() + self.refresh_epoch_timeout);
1046
1047 let epoch = self.coordinator.index();
1049 assert!(epoch >= self.epoch);
1050
1051 self.epoch = epoch;
1053 }
1054}
1055
1056#[derive(Error, Debug)]
1058enum Error {
1059 #[error("Application verify error: {0}")]
1061 AppVerifyCanceled(oneshot::Canceled),
1062 #[error("Application verified no tip")]
1063 AppVerifiedNoTip,
1064 #[error("Application verified height mismatch")]
1065 AppVerifiedHeightMismatch,
1066 #[error("Application verified payload mismatch")]
1067 AppVerifiedPayloadMismatch,
1068
1069 #[error("Unable to send message")]
1071 UnableToSendMessage,
1072
1073 #[error("Already broadcast")]
1075 AlreadyBroadcast,
1076 #[error("I am not a sequencer in epoch {0}")]
1077 IAmNotASequencer(u64),
1078 #[error("Nothing to rebroadcast")]
1079 NothingToRebroadcast,
1080 #[error("Broadcast failed")]
1081 BroadcastFailed,
1082 #[error("No threshold for tip")]
1083 NoThresholdForTip(u64),
1084
1085 #[error("Genesis chunk must not have a parent")]
1087 GenesisChunkMustNotHaveParent,
1088 #[error("Node missing parent")]
1089 NodeMissingParent,
1090
1091 #[error("Unknown identity at epoch {0}")]
1093 UnknownIdentity(u64),
1094 #[error("Unknown signers at epoch {0}")]
1095 UnknownSigners(u64),
1096 #[error("Epoch {0} has no sequencer {1}")]
1097 UnknownSequencer(u64, String),
1098 #[error("Epoch {0} has no signer {1}")]
1099 UnknownSigner(u64, String),
1100 #[error("Unknown share at epoch {0}")]
1101 UnknownShare(u64),
1102
1103 #[error("Peer mismatch")]
1105 PeerMismatch,
1106
1107 #[error("Invalid threshold signature")]
1109 InvalidThresholdSignature,
1110 #[error("Invalid partial signature")]
1111 InvalidPartialSignature,
1112 #[error("Invalid node signature")]
1113 InvalidNodeSignature,
1114
1115 #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
1117 AckEpochOutsideBounds(u64, u64, u64),
1118 #[error("Invalid ack height {0} outside bounds {1} - {2}")]
1119 AckHeightOutsideBounds(u64, u64, u64),
1120 #[error("Chunk height {0} lower than tip height {1}")]
1121 ChunkHeightTooLow(u64, u64),
1122
1123 #[error("Chunk mismatch from sender {0} with height {1}")]
1125 ChunkMismatch(String, u64),
1126}