1use super::{metrics, AckManager, Config, Mailbox, Message, TipManager};
2use crate::{
3 linked::{namespace, parsed, prover::Prover, serializer, Context, Epoch},
4 Application, Collector, ThresholdCoordinator,
5};
6use commonware_cryptography::{
7 bls12381::primitives::{
8 group::{self},
9 ops,
10 poly::{self},
11 },
12 Scheme,
13};
14use commonware_macros::select;
15use commonware_p2p::{Receiver, Recipients, Sender};
16use commonware_runtime::{
17 telemetry::{
18 histogram,
19 status::{CounterExt, Status},
20 },
21 Blob, Clock, Handle, Metrics, Spawner, Storage,
22};
23use commonware_storage::journal::{self, variable::Journal};
24use commonware_utils::{futures::Pool as FuturesPool, Array};
25use futures::{
26 channel::{mpsc, oneshot},
27 future::{self, Either},
28 pin_mut, StreamExt,
29};
30use std::{
31 collections::BTreeMap,
32 marker::PhantomData,
33 time::{Duration, SystemTime},
34};
35use thiserror::Error;
36use tracing::{debug, error, info, warn};
37
38struct Verify<C: Scheme, D: Array, E: Clock> {
40 timer: histogram::Timer<E>,
41 context: Context<C::PublicKey>,
42 payload: D,
43 result: Result<bool, Error>,
44}
45
46pub struct Actor<
48 B: Blob,
49 E: Clock + Spawner + Storage<B> + Metrics,
50 C: Scheme,
51 D: Array,
52 A: Application<Context = Context<C::PublicKey>, Digest = D> + Clone,
53 Z: Collector<Digest = D>,
54 S: ThresholdCoordinator<
55 Index = Epoch,
56 Share = group::Share,
57 Identity = poly::Public,
58 PublicKey = C::PublicKey,
59 >,
60 NetS: Sender<PublicKey = C::PublicKey>,
61 NetR: Receiver<PublicKey = C::PublicKey>,
62> {
63 context: E,
67 crypto: C,
68 coordinator: S,
69 application: A,
70 collector: Z,
71 _sender: PhantomData<NetS>,
72 _receiver: PhantomData<NetR>,
73
74 chunk_namespace: Vec<u8>,
80
81 ack_namespace: Vec<u8>,
83
84 refresh_epoch_timeout: Duration,
90 refresh_epoch_deadline: Option<SystemTime>,
91
92 rebroadcast_timeout: Duration,
94 rebroadcast_deadline: Option<SystemTime>,
95
96 epoch_bounds: (u64, u64),
108
109 height_bound: u64,
115
116 #[allow(clippy::type_complexity)]
124 pending_verifies: FuturesPool<Verify<C, D, E>>,
125
126 verify_concurrent: usize,
128
129 mailbox_receiver: mpsc::Receiver<Message<D>>,
131
132 journal_heights_per_section: u64,
138
139 journal_replay_concurrency: usize,
141
142 journal_name_prefix: String,
145
146 journals: BTreeMap<C::PublicKey, Journal<B, E>>,
148
149 tip_manager: TipManager<C, D>,
158
159 ack_manager: AckManager<D, C::PublicKey>,
162
163 epoch: Epoch,
165
166 metrics: metrics::Metrics<E>,
172
173 broadcast_timer: Option<histogram::Timer<E>>,
175}
176
177impl<
178 B: Blob,
179 E: Clock + Spawner + Storage<B> + Metrics,
180 C: Scheme,
181 D: Array,
182 A: Application<Context = Context<C::PublicKey>, Digest = D> + Clone,
183 Z: Collector<Digest = D>,
184 S: ThresholdCoordinator<
185 Index = Epoch,
186 Share = group::Share,
187 Identity = poly::Public,
188 PublicKey = C::PublicKey,
189 >,
190 NetS: Sender<PublicKey = C::PublicKey>,
191 NetR: Receiver<PublicKey = C::PublicKey>,
192 > Actor<B, E, C, D, A, Z, S, NetS, NetR>
193{
194 pub fn new(context: E, cfg: Config<C, D, A, Z, S>) -> (Self, Mailbox<D>) {
197 let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
198 let mailbox = Mailbox::new(mailbox_sender);
199 let metrics = metrics::Metrics::init(context.clone());
200
201 let result = Self {
202 context,
203 crypto: cfg.crypto,
204 _sender: PhantomData,
205 _receiver: PhantomData,
206 coordinator: cfg.coordinator,
207 application: cfg.application,
208 collector: cfg.collector,
209 chunk_namespace: namespace::chunk(&cfg.namespace),
210 ack_namespace: namespace::ack(&cfg.namespace),
211 refresh_epoch_timeout: cfg.refresh_epoch_timeout,
212 refresh_epoch_deadline: None,
213 rebroadcast_timeout: cfg.rebroadcast_timeout,
214 rebroadcast_deadline: None,
215 epoch_bounds: cfg.epoch_bounds,
216 height_bound: cfg.height_bound,
217 pending_verifies: FuturesPool::default(),
218 verify_concurrent: cfg.verify_concurrent,
219 mailbox_receiver,
220 journal_heights_per_section: cfg.journal_heights_per_section,
221 journal_replay_concurrency: cfg.journal_replay_concurrency,
222 journal_name_prefix: cfg.journal_name_prefix,
223 journals: BTreeMap::new(),
224 tip_manager: TipManager::<C, D>::new(),
225 ack_manager: AckManager::<D, C::PublicKey>::new(),
226 epoch: 0,
227 metrics,
228 broadcast_timer: None,
229 };
230
231 (result, mailbox)
232 }
233
234 pub fn start(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) -> Handle<()> {
247 self.context.spawn_ref()(self.run(chunk_network, ack_network))
248 }
249
250 async fn run(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) {
252 let (mut node_sender, mut node_receiver) = chunk_network;
253 let (mut ack_sender, mut ack_receiver) = ack_network;
254 let mut shutdown = self.context.stopped();
255
256 self.refresh_epoch();
259 self.journal_prepare(&self.crypto.public_key()).await;
260 if let Err(err) = self.rebroadcast(&mut node_sender).await {
261 info!(?err, "initial rebroadcast failed");
263 }
264
265 loop {
266 self.refresh_epoch();
268
269 let refresh_epoch = match self.refresh_epoch_deadline {
272 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
273 None => Either::Right(future::pending()),
274 };
275 let rebroadcast = match self.rebroadcast_deadline {
276 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
277 None => Either::Right(future::pending()),
278 };
279
280 select! {
281 _ = &mut shutdown => {
283 debug!("shutdown");
284 self.pending_verifies.cancel_all();
285 while let Some((_, journal)) = self.journals.pop_first() {
286 journal.close().await.expect("unable to close journal");
287 }
288 return;
289 },
290
291 _ = refresh_epoch => {
293 debug!("refresh epoch");
294 continue;
296 },
297
298 _ = rebroadcast => {
300 debug!("rebroadcast");
301 if let Err(err) = self.rebroadcast(&mut node_sender).await {
302 info!(?err, "rebroadcast failed");
303 continue;
304 }
305 },
306
307 msg = node_receiver.recv() => {
309 debug!("node network");
310 let (sender, msg) = match msg {
312 Ok(r) => r,
313 Err(err) => {
314 error!(?err, "node receiver failed");
315 break;
316 }
317 };
318 let mut guard = self.metrics.nodes.guard(Status::Invalid);
319 let node = match parsed::Node::<C, D>::decode(&msg) {
320 Ok(node) => node,
321 Err(err) => {
322 warn!(?err, ?sender, "node decode failed");
323 continue;
324 }
325 };
326 if let Err(err) = self.validate_node(&node, &sender) {
327 warn!(?err, ?node, ?sender, "node validate failed");
328 continue;
329 };
330
331 self.journal_prepare(&sender).await;
333
334 if let Some(parent) = node.parent.as_ref() {
336 self.handle_threshold(&node.chunk, parent.epoch, parent.threshold).await;
337 }
338
339 self.handle_node(&node).await;
341 guard.set(Status::Success);
342 },
343
344 msg = ack_receiver.recv() => {
346 debug!("ack network");
347 let (sender, msg) = match msg {
349 Ok(r) => r,
350 Err(err) => {
351 warn!(?err, "ack receiver failed");
352 break;
353 }
354 };
355 let mut guard = self.metrics.acks.guard(Status::Invalid);
356 let ack = match parsed::Ack::decode(&msg) {
357 Ok(ack) => ack,
358 Err(err) => {
359 warn!(?err, ?sender, "ack decode failed");
360 continue;
361 }
362 };
363 if let Err(err) = self.validate_ack(&ack, &sender) {
364 warn!(?err, ?ack, ?sender, "ack validate failed");
365 continue;
366 };
367 if let Err(err) = self.handle_ack(&ack).await {
368 warn!(?err, ?ack, "ack handle failed");
369 guard.set(Status::Failure);
370 continue;
371 }
372 guard.set(Status::Success);
373 },
374
375 verify = self.pending_verifies.next_completed() => {
377 let Verify { timer, context, payload, result } = verify;
378 drop(timer); match result {
380 Err(err) => {
381 warn!(?err, ?context, ?payload, "verified returned error");
382 self.metrics.verify.inc(Status::Dropped);
383 }
384 Ok(false) => {
385 warn!(?context, ?payload, "verified was false");
386 self.metrics.verify.inc(Status::Failure);
387 }
388 Ok(true) => {
389 debug!(?context, ?payload, "verified");
390 self.metrics.verify.inc(Status::Success);
391 if let Err(err) = self.handle_app_verified(&context, &payload, &mut ack_sender).await {
392 warn!(?err, ?context, ?payload, "verified handle failed");
393 }
394 },
395 }
396 },
397
398 mail = self.mailbox_receiver.next() => {
400 let Some(msg) = mail else {
401 error!("mailbox receiver failed");
402 break;
403 };
404 match msg {
405 Message::Broadcast{ payload, result } => {
406 debug!("broadcast");
407 if self.coordinator.is_sequencer(self.epoch, &self.crypto.public_key()).is_none() {
408 warn!(epoch=?self.epoch, ?payload, "not a sequencer");
409 continue;
410 }
411
412 if let Err(err) = self.broadcast_new(payload, result, &mut node_sender).await {
414 warn!(?err, "broadcast new failed");
415 continue;
416 }
417 }
418 }
419 }
420 }
421 }
422 }
423
424 async fn handle_app_verified(
433 &mut self,
434 context: &Context<C::PublicKey>,
435 payload: &D,
436 ack_sender: &mut NetS,
437 ) -> Result<(), Error> {
438 let Some(tip) = self.tip_manager.get(&context.sequencer) else {
440 return Err(Error::AppVerifiedNoTip);
441 };
442
443 if tip.chunk.height != context.height {
445 return Err(Error::AppVerifiedHeightMismatch);
446 }
447
448 if tip.chunk.payload != *payload {
450 return Err(Error::AppVerifiedPayloadMismatch);
451 }
452
453 let Some(share) = self.coordinator.share(self.epoch) else {
455 return Err(Error::UnknownShare(self.epoch));
456 };
457 let partial = ops::partial_sign_message(
458 share,
459 Some(&self.ack_namespace),
460 &serializer::ack(&tip.chunk, self.epoch),
461 );
462
463 self.journal_sync(&context.sequencer, context.height).await;
466
467 let recipients = {
470 let Some(signers) = self.coordinator.signers(self.epoch) else {
471 return Err(Error::UnknownSigners(self.epoch));
472 };
473 let mut recipients = signers.clone();
474 if self
475 .coordinator
476 .is_signer(self.epoch, &tip.chunk.sequencer)
477 .is_none()
478 {
479 recipients.push(tip.chunk.sequencer.clone());
480 }
481 recipients
482 };
483
484 let ack = parsed::Ack {
486 chunk: tip.chunk,
487 epoch: self.epoch,
488 partial,
489 };
490 ack_sender
491 .send(Recipients::Some(recipients), ack.encode().into(), false)
492 .await
493 .map_err(|_| Error::UnableToSendMessage)?;
494
495 self.handle_ack(&ack).await?;
497
498 Ok(())
499 }
500
501 async fn handle_threshold(
507 &mut self,
508 chunk: &parsed::Chunk<D, C::PublicKey>,
509 epoch: Epoch,
510 threshold: group::Signature,
511 ) {
512 if !self
514 .ack_manager
515 .add_threshold(&chunk.sequencer, chunk.height, epoch, threshold)
516 {
517 return;
518 }
519
520 if chunk.sequencer == self.crypto.public_key() {
522 self.broadcast_timer.take();
523 }
524
525 let context = Context {
527 sequencer: chunk.sequencer.clone(),
528 height: chunk.height,
529 };
530 let proof =
531 Prover::<C, D>::serialize_threshold(&context, &chunk.payload, epoch, &threshold);
532 self.collector
533 .acknowledged(proof, chunk.payload.clone())
534 .await;
535 }
536
537 async fn handle_ack(&mut self, ack: &parsed::Ack<D, C::PublicKey>) -> Result<(), Error> {
542 let Some(identity) = self.coordinator.identity(ack.epoch) else {
544 return Err(Error::UnknownIdentity(ack.epoch));
545 };
546 let quorum = identity.required();
547
548 if let Some(threshold) = self.ack_manager.add_ack(ack, quorum) {
550 self.metrics.threshold.inc();
551 self.handle_threshold(&ack.chunk, ack.epoch, threshold)
552 .await;
553 }
554
555 Ok(())
556 }
557
558 async fn handle_node(&mut self, node: &parsed::Node<C, D>) {
562 let is_new = self.tip_manager.put(node);
564
565 if is_new {
567 self.metrics
569 .sequencer_heights
570 .get_or_create(&metrics::SequencerLabel::from(&node.chunk.sequencer))
571 .set(node.chunk.height as i64);
572
573 self.journal_append(node).await;
577 self.journal_sync(&node.chunk.sequencer, node.chunk.height)
578 .await;
579 }
580
581 let n = self.pending_verifies.len();
583 if n >= self.verify_concurrent {
584 warn!(?n, "too many pending verifies");
585 return;
586 }
587
588 let context = Context {
590 sequencer: node.chunk.sequencer.clone(),
591 height: node.chunk.height,
592 };
593 let payload = node.chunk.payload.clone();
594 let mut application = self.application.clone();
595 let timer = self.metrics.verify_duration.timer();
596 self.pending_verifies.push(async move {
597 let receiver = application.verify(context.clone(), payload.clone()).await;
598 let result = receiver.await.map_err(Error::AppVerifyCanceled);
599 Verify {
600 timer,
601 context,
602 payload,
603 result,
604 }
605 });
606 }
607
608 async fn broadcast_new(
617 &mut self,
618 payload: D,
619 result: oneshot::Sender<bool>,
620 node_sender: &mut NetS,
621 ) -> Result<(), Error> {
622 let mut guard = self.metrics.new_broadcast.guard(Status::Dropped);
623 let me = self.crypto.public_key();
624
625 let mut height = 0;
627 let mut parent = None;
628 if let Some(tip) = self.tip_manager.get(&me) {
629 let Some((epoch, threshold)) = self.ack_manager.get_threshold(&me, tip.chunk.height)
631 else {
632 let _ = result.send(false);
633 return Err(Error::NoThresholdForTip(tip.chunk.height));
634 };
635
636 height = tip.chunk.height + 1;
638 parent = Some(parsed::Parent {
639 payload: tip.chunk.payload,
640 threshold,
641 epoch,
642 });
643 }
644
645 let chunk = parsed::Chunk {
647 sequencer: me.clone(),
648 height,
649 payload,
650 };
651 let signature = self
652 .crypto
653 .sign(Some(&self.chunk_namespace), &serializer::chunk(&chunk));
654 let node = parsed::Node::<C, D> {
655 chunk,
656 signature,
657 parent,
658 };
659
660 self.handle_node(&node).await;
662
663 self.journal_sync(&me, height).await;
666
667 self.broadcast_timer = Some(self.metrics.e2e_duration.timer());
669
670 if let Err(err) = self.broadcast(&node, node_sender, self.epoch).await {
672 let _ = result.send(false);
673 guard.set(Status::Failure);
674 return Err(err);
675 };
676
677 let _ = result.send(true);
679 guard.set(Status::Success);
680 Ok(())
681 }
682
683 async fn rebroadcast(&mut self, node_sender: &mut NetS) -> Result<(), Error> {
690 let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
691
692 self.rebroadcast_deadline = None;
694
695 let me = self.crypto.public_key();
697 if self.coordinator.is_sequencer(self.epoch, &me).is_none() {
698 return Err(Error::IAmNotASequencer(self.epoch));
699 }
700
701 let Some(tip) = self.tip_manager.get(&me) else {
703 return Err(Error::NothingToRebroadcast);
704 };
705
706 if self
708 .ack_manager
709 .get_threshold(&me, tip.chunk.height)
710 .is_some()
711 {
712 return Err(Error::AlreadyBroadcast);
713 }
714
715 guard.set(Status::Failure);
717 self.broadcast(&tip, node_sender, self.epoch).await?;
718 guard.set(Status::Success);
719 Ok(())
720 }
721
722 async fn broadcast(
724 &mut self,
725 node: &parsed::Node<C, D>,
726 node_sender: &mut NetS,
727 epoch: Epoch,
728 ) -> Result<(), Error> {
729 let Some(signers) = self.coordinator.signers(epoch) else {
731 return Err(Error::UnknownSigners(epoch));
732 };
733 node_sender
734 .send(
735 Recipients::Some(signers.clone()),
736 node.encode().into(),
737 false,
738 )
739 .await
740 .map_err(|_| Error::BroadcastFailed)?;
741
742 self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
744
745 Ok(())
746 }
747
748 fn validate_node(
757 &mut self,
758 node: &parsed::Node<C, D>,
759 sender: &C::PublicKey,
760 ) -> Result<(), Error> {
761 if node.chunk.sequencer != *sender {
763 return Err(Error::PeerMismatch);
764 }
765
766 if let Some(tip) = self.tip_manager.get(sender) {
769 if tip == *node {
770 return Ok(());
771 }
772 }
773
774 self.validate_chunk(&node.chunk, self.epoch)?;
776
777 if !C::verify(
779 Some(&self.chunk_namespace),
780 &serializer::chunk(&node.chunk),
781 sender,
782 &node.signature,
783 ) {
784 return Err(Error::InvalidNodeSignature);
785 }
786
787 if node.chunk.height == 0 {
789 if node.parent.is_some() {
790 return Err(Error::GenesisChunkMustNotHaveParent);
791 }
792 return Ok(());
793 }
794
795 let Some(parent) = &node.parent else {
797 return Err(Error::NodeMissingParent);
798 };
799 let parent_chunk = parsed::Chunk {
800 sequencer: sender.clone(),
801 height: node.chunk.height.checked_sub(1).unwrap(),
802 payload: parent.payload.clone(),
803 };
804
805 let Some(identity) = self.coordinator.identity(parent.epoch) else {
807 return Err(Error::UnknownIdentity(parent.epoch));
808 };
809 let public_key = poly::public(identity);
810 ops::verify_message(
811 &public_key,
812 Some(&self.ack_namespace),
813 &serializer::ack(&parent_chunk, parent.epoch),
814 &parent.threshold,
815 )
816 .map_err(|_| Error::InvalidThresholdSignature)?;
817
818 Ok(())
819 }
820
821 fn validate_ack(
826 &self,
827 ack: &parsed::Ack<D, C::PublicKey>,
828 sender: &C::PublicKey,
829 ) -> Result<(), Error> {
830 self.validate_chunk(&ack.chunk, ack.epoch)?;
832
833 let Some(signer_index) = self.coordinator.is_signer(ack.epoch, sender) else {
835 return Err(Error::UnknownSigner(ack.epoch, sender.to_string()));
836 };
837 if signer_index != ack.partial.index {
838 return Err(Error::PeerMismatch);
839 }
840
841 {
843 let (eb_lo, eb_hi) = self.epoch_bounds;
844 let bound_lo = self.epoch.saturating_sub(eb_lo);
845 let bound_hi = self.epoch.saturating_add(eb_hi);
846 if ack.epoch < bound_lo || ack.epoch > bound_hi {
847 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
848 }
849 }
850
851 {
853 let bound_lo = self
854 .tip_manager
855 .get(&ack.chunk.sequencer)
856 .map(|t| t.chunk.height)
857 .unwrap_or(0);
858 let bound_hi = bound_lo + self.height_bound;
859 if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
860 return Err(Error::AckHeightOutsideBounds(
861 ack.chunk.height,
862 bound_lo,
863 bound_hi,
864 ));
865 }
866 }
867
868 let Some(identity) = self.coordinator.identity(ack.epoch) else {
871 return Err(Error::UnknownIdentity(ack.epoch));
872 };
873 ops::partial_verify_message(
874 identity,
875 Some(&self.ack_namespace),
876 &serializer::ack(&ack.chunk, ack.epoch),
877 &ack.partial,
878 )
879 .map_err(|_| Error::InvalidPartialSignature)?;
880
881 Ok(())
882 }
883
884 fn validate_chunk(
889 &self,
890 chunk: &parsed::Chunk<D, C::PublicKey>,
891 epoch: Epoch,
892 ) -> Result<(), Error> {
893 if self
895 .coordinator
896 .is_sequencer(epoch, &chunk.sequencer)
897 .is_none()
898 {
899 return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
900 }
901
902 if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
904 match chunk.height.cmp(&tip.chunk.height) {
906 std::cmp::Ordering::Less => {
907 return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
908 }
909 std::cmp::Ordering::Equal => {
910 if tip.chunk.payload != chunk.payload {
912 return Err(Error::ChunkMismatch(
913 chunk.sequencer.to_string(),
914 chunk.height,
915 ));
916 }
917 }
918 std::cmp::Ordering::Greater => {}
919 }
920 }
921
922 Ok(())
923 }
924
925 fn get_journal_section(&self, height: u64) -> u64 {
931 height / self.journal_heights_per_section
932 }
933
934 async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
938 if self.journals.contains_key(sequencer) {
940 return;
941 }
942
943 let cfg = journal::variable::Config {
945 partition: format!("{}{}", &self.journal_name_prefix, sequencer),
946 };
947 let mut journal = Journal::init(self.context.clone(), cfg)
948 .await
949 .expect("unable to init journal");
950
951 {
953 debug!(?sequencer, "journal replay begin");
954
955 let stream = journal
957 .replay(self.journal_replay_concurrency, None)
958 .await
959 .expect("unable to replay journal");
960 pin_mut!(stream);
961
962 let mut tip: Option<parsed::Node<C, D>> = None;
965 let mut num_items = 0;
966 while let Some(msg) = stream.next().await {
967 num_items += 1;
968 let (_, _, _, msg) = msg.expect("unable to decode journal message");
969 let node = parsed::Node::<C, D>::decode(&msg)
970 .expect("journal message is unexpected format");
971 let height = node.chunk.height;
972 match tip {
973 None => {
974 tip = Some(node);
975 }
976 Some(ref t) => {
977 if height > t.chunk.height {
978 tip = Some(node);
979 }
980 }
981 }
982 }
983
984 if let Some(node) = tip.take() {
987 let is_new = self.tip_manager.put(&node);
988 assert!(is_new);
989 }
990
991 debug!(?sequencer, ?num_items, "journal replay end");
992 }
993
994 self.journals.insert(sequencer.clone(), journal);
996 }
997
998 async fn journal_append(&mut self, node: &parsed::Node<C, D>) {
1003 let section = self.get_journal_section(node.chunk.height);
1004 self.journals
1005 .get_mut(&node.chunk.sequencer)
1006 .expect("journal does not exist")
1007 .append(section, node.encode().into())
1008 .await
1009 .expect("unable to append to journal");
1010 }
1011
1012 async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: u64) {
1014 let section = self.get_journal_section(height);
1015
1016 let journal = self
1018 .journals
1019 .get_mut(sequencer)
1020 .expect("journal does not exist");
1021
1022 journal.sync(section).await.expect("unable to sync journal");
1024
1025 let _ = journal.prune(section).await;
1027 }
1028
1029 fn refresh_epoch(&mut self) {
1035 self.refresh_epoch_deadline = Some(self.context.current() + self.refresh_epoch_timeout);
1037
1038 let epoch = self.coordinator.index();
1040 assert!(epoch >= self.epoch);
1041
1042 self.epoch = epoch;
1044 }
1045}
1046
1047#[derive(Error, Debug)]
1049enum Error {
1050 #[error("Application verify error: {0}")]
1052 AppVerifyCanceled(oneshot::Canceled),
1053 #[error("Application verified no tip")]
1054 AppVerifiedNoTip,
1055 #[error("Application verified height mismatch")]
1056 AppVerifiedHeightMismatch,
1057 #[error("Application verified payload mismatch")]
1058 AppVerifiedPayloadMismatch,
1059
1060 #[error("Unable to send message")]
1062 UnableToSendMessage,
1063
1064 #[error("Already broadcast")]
1066 AlreadyBroadcast,
1067 #[error("I am not a sequencer in epoch {0}")]
1068 IAmNotASequencer(u64),
1069 #[error("Nothing to rebroadcast")]
1070 NothingToRebroadcast,
1071 #[error("Broadcast failed")]
1072 BroadcastFailed,
1073 #[error("No threshold for tip")]
1074 NoThresholdForTip(u64),
1075
1076 #[error("Genesis chunk must not have a parent")]
1078 GenesisChunkMustNotHaveParent,
1079 #[error("Node missing parent")]
1080 NodeMissingParent,
1081
1082 #[error("Unknown identity at epoch {0}")]
1084 UnknownIdentity(u64),
1085 #[error("Unknown signers at epoch {0}")]
1086 UnknownSigners(u64),
1087 #[error("Epoch {0} has no sequencer {1}")]
1088 UnknownSequencer(u64, String),
1089 #[error("Epoch {0} has no signer {1}")]
1090 UnknownSigner(u64, String),
1091 #[error("Unknown share at epoch {0}")]
1092 UnknownShare(u64),
1093
1094 #[error("Peer mismatch")]
1096 PeerMismatch,
1097
1098 #[error("Invalid threshold signature")]
1100 InvalidThresholdSignature,
1101 #[error("Invalid partial signature")]
1102 InvalidPartialSignature,
1103 #[error("Invalid node signature")]
1104 InvalidNodeSignature,
1105
1106 #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
1108 AckEpochOutsideBounds(u64, u64, u64),
1109 #[error("Invalid ack height {0} outside bounds {1} - {2}")]
1110 AckHeightOutsideBounds(u64, u64, u64),
1111 #[error("Chunk height {0} lower than tip height {1}")]
1112 ChunkHeightTooLow(u64, u64),
1113
1114 #[error("Chunk mismatch from sender {0} with height {1}")]
1116 ChunkMismatch(String, u64),
1117}