1use super::{AckManager, Config, Mailbox, Message, TipManager};
2use crate::{
3 linked::{namespace, parsed, prover::Prover, serializer, Context, Epoch},
4 Application, Collector, ThresholdCoordinator,
5};
6use commonware_cryptography::{
7 bls12381::primitives::{
8 group::{self},
9 ops,
10 poly::{self},
11 },
12 Scheme,
13};
14use commonware_macros::select;
15use commonware_p2p::{Receiver, Recipients, Sender};
16use commonware_runtime::{Blob, Clock, Handle, Metrics, Spawner, Storage};
17use commonware_storage::journal::{self, variable::Journal};
18use commonware_utils::Array;
19use futures::{
20 channel::{mpsc, oneshot},
21 future::{self, Either},
22 pin_mut,
23 stream::FuturesUnordered,
24 StreamExt,
25};
26use std::{
27 collections::BTreeMap,
28 future::Future,
29 marker::PhantomData,
30 pin::Pin,
31 time::{Duration, SystemTime},
32};
33use thiserror::Error;
34use tracing::{debug, error, info, warn};
35
36type VerifyFuture<D, P> =
38 Pin<Box<dyn Future<Output = (Context<P>, D, Result<bool, Error>)> + Send>>;
39
40pub struct Actor<
42 B: Blob,
43 E: Clock + Spawner + Storage<B> + Metrics,
44 C: Scheme,
45 D: Array,
46 A: Application<Context = Context<C::PublicKey>, Digest = D> + Clone,
47 Z: Collector<Digest = D>,
48 S: ThresholdCoordinator<
49 Index = Epoch,
50 Share = group::Share,
51 Identity = poly::Public,
52 PublicKey = C::PublicKey,
53 >,
54 NetS: Sender<PublicKey = C::PublicKey>,
55 NetR: Receiver<PublicKey = C::PublicKey>,
56> {
57 context: E,
61 crypto: C,
62 coordinator: S,
63 application: A,
64 collector: Z,
65 _sender: PhantomData<NetS>,
66 _receiver: PhantomData<NetR>,
67
68 chunk_namespace: Vec<u8>,
74
75 ack_namespace: Vec<u8>,
77
78 refresh_epoch_timeout: Duration,
84 refresh_epoch_deadline: Option<SystemTime>,
85
86 rebroadcast_timeout: Duration,
88 rebroadcast_deadline: Option<SystemTime>,
89
90 epoch_bounds: (u64, u64),
102
103 height_bound: u64,
109
110 pending_verifies: FuturesUnordered<VerifyFuture<D, C::PublicKey>>,
118
119 pending_verify_size: usize,
121
122 mailbox_receiver: mpsc::Receiver<Message<D>>,
124
125 journal_heights_per_section: u64,
131
132 journal_replay_concurrency: usize,
134
135 journal_name_prefix: String,
138
139 journals: BTreeMap<C::PublicKey, Journal<B, E>>,
141
142 tip_manager: TipManager<C, D>,
151
152 ack_manager: AckManager<D, C::PublicKey>,
155
156 epoch: Epoch,
158}
159
160impl<
161 B: Blob,
162 E: Clock + Spawner + Storage<B> + Metrics,
163 C: Scheme,
164 D: Array,
165 A: Application<Context = Context<C::PublicKey>, Digest = D> + Clone,
166 Z: Collector<Digest = D>,
167 S: ThresholdCoordinator<
168 Index = Epoch,
169 Share = group::Share,
170 Identity = poly::Public,
171 PublicKey = C::PublicKey,
172 >,
173 NetS: Sender<PublicKey = C::PublicKey>,
174 NetR: Receiver<PublicKey = C::PublicKey>,
175 > Actor<B, E, C, D, A, Z, S, NetS, NetR>
176{
177 pub fn new(context: E, cfg: Config<C, D, A, Z, S>) -> (Self, Mailbox<D>) {
180 let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
181 let mailbox = Mailbox::new(mailbox_sender);
182
183 let pending_verifies = FuturesUnordered::new();
185 {
186 let dummy: VerifyFuture<D, C::PublicKey> = Box::pin(async {
190 future::pending::<(Context<C::PublicKey>, D, Result<bool, Error>)>().await
191 });
192 pending_verifies.push(dummy);
193 }
194
195 let result = Self {
196 context,
197 crypto: cfg.crypto,
198 _sender: PhantomData,
199 _receiver: PhantomData,
200 coordinator: cfg.coordinator,
201 application: cfg.application,
202 collector: cfg.collector,
203 chunk_namespace: namespace::chunk(&cfg.namespace),
204 ack_namespace: namespace::ack(&cfg.namespace),
205 refresh_epoch_timeout: cfg.refresh_epoch_timeout,
206 refresh_epoch_deadline: None,
207 rebroadcast_timeout: cfg.rebroadcast_timeout,
208 rebroadcast_deadline: None,
209 epoch_bounds: cfg.epoch_bounds,
210 height_bound: cfg.height_bound,
211 pending_verifies,
212 pending_verify_size: cfg.pending_verify_size,
213 mailbox_receiver,
214 journal_heights_per_section: cfg.journal_heights_per_section,
215 journal_replay_concurrency: cfg.journal_replay_concurrency,
216 journal_name_prefix: cfg.journal_name_prefix,
217 journals: BTreeMap::new(),
218 tip_manager: TipManager::<C, D>::new(),
219 ack_manager: AckManager::<D, C::PublicKey>::new(),
220 epoch: 0,
221 };
222
223 (result, mailbox)
224 }
225
226 pub fn start(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) -> Handle<()> {
239 self.context.spawn_ref()(self.run(chunk_network, ack_network))
240 }
241
242 async fn run(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) {
244 let (mut node_sender, mut node_receiver) = chunk_network;
245 let (mut ack_sender, mut ack_receiver) = ack_network;
246 let mut shutdown = self.context.stopped();
247
248 self.refresh_epoch();
251 self.journal_prepare(&self.crypto.public_key()).await;
252 if let Err(err) = self.rebroadcast(&mut node_sender).await {
253 info!(?err, "initial rebroadcast failed");
255 }
256
257 loop {
258 self.refresh_epoch();
260
261 let refresh_epoch = match self.refresh_epoch_deadline {
264 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
265 None => Either::Right(future::pending()),
266 };
267 let rebroadcast = match self.rebroadcast_deadline {
268 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
269 None => Either::Right(future::pending()),
270 };
271
272 select! {
273 _ = &mut shutdown => {
275 debug!("shutdown");
276 while let Some((_, journal)) = self.journals.pop_first() {
277 journal.close().await.expect("unable to close journal");
278 }
279 return;
280 },
281
282 _ = refresh_epoch => {
284 debug!("refresh epoch");
285 continue;
287 },
288
289 _ = rebroadcast => {
291 debug!("rebroadcast");
292 if let Err(err) = self.rebroadcast(&mut node_sender).await {
293 info!(?err, "rebroadcast failed");
294 continue;
295 }
296 },
297
298 msg = node_receiver.recv() => {
300 debug!("node network");
301 let (sender, msg) = match msg {
303 Ok(r) => r,
304 Err(err) => {
305 error!(?err, "node receiver failed");
306 break;
307 }
308 };
309 let node = match parsed::Node::<C, D>::decode(&msg) {
310 Ok(node) => node,
311 Err(err) => {
312 warn!(?err, ?sender, "node decode failed");
313 continue;
314 }
315 };
316 if let Err(err) = self.validate_node(&node, &sender) {
317 warn!(?err, ?node, ?sender, "node validate failed");
318 continue;
319 };
320
321 self.journal_prepare(&sender).await;
323
324 if let Some(parent) = node.parent.as_ref() {
326 self.handle_threshold(&node.chunk, parent.epoch, parent.threshold).await;
327 }
328
329 self.handle_node(&node).await;
331 },
332
333 msg = ack_receiver.recv() => {
335 debug!("ack network");
336 let (sender, msg) = match msg {
338 Ok(r) => r,
339 Err(err) => {
340 warn!(?err, "ack receiver failed");
341 break;
342 }
343 };
344 let ack = match parsed::Ack::decode(&msg) {
345 Ok(ack) => ack,
346 Err(err) => {
347 warn!(?err, ?sender, "ack decode failed");
348 continue;
349 }
350 };
351 if let Err(err) = self.validate_ack(&ack, &sender) {
352 warn!(?err, ?ack, ?sender, "ack validate failed");
353 continue;
354 };
355 if let Err(err) = self.handle_ack(&ack).await {
356 warn!(?err, ?ack, "ack handle failed");
357 continue;
358 }
359 },
360
361 maybe_verified = self.pending_verifies.select_next_some() => {
363 let (context, digest, verify_result) = maybe_verified;
364 match verify_result {
365 Ok(true) => {
366 debug!(?context, ?digest, "verified");
367 if let Err(err) = self.handle_app_verified(&context, &digest, &mut ack_sender).await {
368 warn!(?err, ?context, ?digest, "verified handle failed");
369 }
370 },
371 Ok(false) => {
372 warn!(?context, ?digest, "verified was false");
373 },
374 Err(err) => {
375 warn!(?err, ?context, ?digest, "verified returned error");
376 },
377 }
378 },
379
380 mail = self.mailbox_receiver.next() => {
382 let Some(msg) = mail else {
383 error!("mailbox receiver failed");
384 break;
385 };
386 match msg {
387 Message::Broadcast{ payload, result } => {
388 debug!("broadcast");
389 if self.coordinator.is_sequencer(self.epoch, &self.crypto.public_key()).is_none() {
390 warn!(epoch=?self.epoch, ?payload, "not a sequencer");
391 continue;
392 }
393
394 if let Err(err) = self.broadcast_new(payload, result, &mut node_sender).await {
396 warn!(?err, "broadcast new failed");
397 continue;
398 }
399 }
400 }
401 }
402 }
403 }
404 }
405
406 async fn handle_app_verified(
415 &mut self,
416 context: &Context<C::PublicKey>,
417 payload: &D,
418 ack_sender: &mut NetS,
419 ) -> Result<(), Error> {
420 let Some(tip) = self.tip_manager.get(&context.sequencer) else {
422 return Err(Error::AppVerifiedNoTip);
423 };
424
425 if tip.chunk.height != context.height {
427 return Err(Error::AppVerifiedHeightMismatch);
428 }
429
430 if tip.chunk.payload != *payload {
432 return Err(Error::AppVerifiedPayloadMismatch);
433 }
434
435 let Some(share) = self.coordinator.share(self.epoch) else {
437 return Err(Error::UnknownShare(self.epoch));
438 };
439 let partial = ops::partial_sign_message(
440 share,
441 Some(&self.ack_namespace),
442 &serializer::ack(&tip.chunk, self.epoch),
443 );
444
445 self.journal_sync(&context.sequencer, context.height).await;
448
449 let recipients = {
452 let Some(signers) = self.coordinator.signers(self.epoch) else {
453 return Err(Error::UnknownSigners(self.epoch));
454 };
455 let mut recipients = signers.clone();
456 if self
457 .coordinator
458 .is_signer(self.epoch, &tip.chunk.sequencer)
459 .is_none()
460 {
461 recipients.push(tip.chunk.sequencer.clone());
462 }
463 recipients
464 };
465
466 let ack = parsed::Ack {
468 chunk: tip.chunk,
469 epoch: self.epoch,
470 partial,
471 };
472 ack_sender
473 .send(Recipients::Some(recipients), ack.encode().into(), false)
474 .await
475 .map_err(|_| Error::UnableToSendMessage)?;
476
477 self.handle_ack(&ack).await?;
479
480 Ok(())
481 }
482
483 async fn handle_threshold(
489 &mut self,
490 chunk: &parsed::Chunk<D, C::PublicKey>,
491 epoch: Epoch,
492 threshold: group::Signature,
493 ) {
494 if !self
496 .ack_manager
497 .add_threshold(&chunk.sequencer, chunk.height, epoch, threshold)
498 {
499 return;
500 }
501
502 let context = Context {
504 sequencer: chunk.sequencer.clone(),
505 height: chunk.height,
506 };
507 let proof =
508 Prover::<C, D>::serialize_threshold(&context, &chunk.payload, epoch, &threshold);
509 self.collector
510 .acknowledged(proof, chunk.payload.clone())
511 .await;
512 }
513
514 async fn handle_ack(&mut self, ack: &parsed::Ack<D, C::PublicKey>) -> Result<(), Error> {
519 let Some(identity) = self.coordinator.identity(ack.epoch) else {
521 return Err(Error::UnknownIdentity(ack.epoch));
522 };
523 let quorum = identity.required();
524
525 if let Some(threshold) = self.ack_manager.add_ack(ack, quorum) {
527 self.handle_threshold(&ack.chunk, ack.epoch, threshold)
529 .await;
530 }
531
532 Ok(())
533 }
534
535 async fn handle_node(&mut self, node: &parsed::Node<C, D>) {
539 let is_new = self.tip_manager.put(node);
541
542 if is_new {
546 self.journal_append(node).await;
547 self.journal_sync(&node.chunk.sequencer, node.chunk.height)
548 .await;
549 }
550
551 if self.pending_verifies.len() > self.pending_verify_size {
555 warn!(n=?self.pending_verifies.len(), "too many pending verifies");
556 return;
557 }
558
559 let context = Context {
561 sequencer: node.chunk.sequencer.clone(),
562 height: node.chunk.height,
563 };
564 let payload = node.chunk.payload.clone();
565 let mut app_clone = self.application.clone();
566 let verify_future = Box::pin(async move {
567 let receiver = app_clone.verify(context.clone(), payload.clone()).await;
568 let result = receiver.await.map_err(Error::AppVerifyCanceled);
569 (context, payload, result)
570 });
571
572 self.pending_verifies.push(verify_future);
574 }
575
576 async fn broadcast_new(
585 &mut self,
586 payload: D,
587 result: oneshot::Sender<bool>,
588 node_sender: &mut NetS,
589 ) -> Result<(), Error> {
590 let me = self.crypto.public_key();
591
592 let mut height = 0;
594 let mut parent = None;
595 if let Some(tip) = self.tip_manager.get(&me) {
596 let Some((epoch, threshold)) = self.ack_manager.get_threshold(&me, tip.chunk.height)
598 else {
599 let _ = result.send(false);
600 return Err(Error::NoThresholdForTip(tip.chunk.height));
601 };
602
603 height = tip.chunk.height + 1;
605 parent = Some(parsed::Parent {
606 payload: tip.chunk.payload,
607 threshold,
608 epoch,
609 });
610 }
611
612 let chunk = parsed::Chunk {
614 sequencer: me.clone(),
615 height,
616 payload,
617 };
618 let signature = self
619 .crypto
620 .sign(Some(&self.chunk_namespace), &serializer::chunk(&chunk));
621 let node = parsed::Node::<C, D> {
622 chunk,
623 signature,
624 parent,
625 };
626
627 self.handle_node(&node).await;
629
630 self.journal_sync(&me, height).await;
633
634 if let Err(err) = self.broadcast(&node, node_sender, self.epoch).await {
636 let _ = result.send(false);
637 return Err(err);
638 };
639
640 let _ = result.send(true);
642 Ok(())
643 }
644
645 async fn rebroadcast(&mut self, node_sender: &mut NetS) -> Result<(), Error> {
652 self.rebroadcast_deadline = None;
654
655 let me = self.crypto.public_key();
657 if self.coordinator.is_sequencer(self.epoch, &me).is_none() {
658 return Err(Error::IAmNotASequencer(self.epoch));
659 }
660
661 let Some(tip) = self.tip_manager.get(&me) else {
663 return Err(Error::NothingToRebroadcast);
664 };
665
666 if self
668 .ack_manager
669 .get_threshold(&me, tip.chunk.height)
670 .is_some()
671 {
672 return Err(Error::AlreadyBroadcast);
673 }
674
675 self.broadcast(&tip, node_sender, self.epoch).await?;
677
678 Ok(())
679 }
680
681 async fn broadcast(
683 &mut self,
684 node: &parsed::Node<C, D>,
685 node_sender: &mut NetS,
686 epoch: Epoch,
687 ) -> Result<(), Error> {
688 let Some(signers) = self.coordinator.signers(epoch) else {
690 return Err(Error::UnknownSigners(epoch));
691 };
692 node_sender
693 .send(
694 Recipients::Some(signers.clone()),
695 node.encode().into(),
696 false,
697 )
698 .await
699 .map_err(|_| Error::BroadcastFailed)?;
700
701 self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
703
704 Ok(())
705 }
706
707 fn validate_node(
716 &mut self,
717 node: &parsed::Node<C, D>,
718 sender: &C::PublicKey,
719 ) -> Result<(), Error> {
720 if node.chunk.sequencer != *sender {
722 return Err(Error::PeerMismatch);
723 }
724
725 if let Some(tip) = self.tip_manager.get(sender) {
728 if tip == *node {
729 return Ok(());
730 }
731 }
732
733 self.validate_chunk(&node.chunk, self.epoch)?;
735
736 if !C::verify(
738 Some(&self.chunk_namespace),
739 &serializer::chunk(&node.chunk),
740 sender,
741 &node.signature,
742 ) {
743 return Err(Error::InvalidNodeSignature);
744 }
745
746 if node.chunk.height == 0 {
748 if node.parent.is_some() {
749 return Err(Error::GenesisChunkMustNotHaveParent);
750 }
751 return Ok(());
752 }
753
754 let Some(parent) = &node.parent else {
756 return Err(Error::NodeMissingParent);
757 };
758 let parent_chunk = parsed::Chunk {
759 sequencer: sender.clone(),
760 height: node.chunk.height.checked_sub(1).unwrap(),
761 payload: parent.payload.clone(),
762 };
763
764 let Some(identity) = self.coordinator.identity(parent.epoch) else {
766 return Err(Error::UnknownIdentity(parent.epoch));
767 };
768 let public_key = poly::public(identity);
769 ops::verify_message(
770 &public_key,
771 Some(&self.ack_namespace),
772 &serializer::ack(&parent_chunk, parent.epoch),
773 &parent.threshold,
774 )
775 .map_err(|_| Error::InvalidThresholdSignature)?;
776
777 Ok(())
778 }
779
780 fn validate_ack(
785 &self,
786 ack: &parsed::Ack<D, C::PublicKey>,
787 sender: &C::PublicKey,
788 ) -> Result<(), Error> {
789 self.validate_chunk(&ack.chunk, ack.epoch)?;
791
792 let Some(signer_index) = self.coordinator.is_signer(ack.epoch, sender) else {
794 return Err(Error::UnknownSigner(ack.epoch, sender.to_string()));
795 };
796 if signer_index != ack.partial.index {
797 return Err(Error::PeerMismatch);
798 }
799
800 {
802 let (eb_lo, eb_hi) = self.epoch_bounds;
803 let bound_lo = self.epoch.saturating_sub(eb_lo);
804 let bound_hi = self.epoch.saturating_add(eb_hi);
805 if ack.epoch < bound_lo || ack.epoch > bound_hi {
806 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
807 }
808 }
809
810 {
812 let bound_lo = self
813 .tip_manager
814 .get(&ack.chunk.sequencer)
815 .map(|t| t.chunk.height)
816 .unwrap_or(0);
817 let bound_hi = bound_lo + self.height_bound;
818 if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
819 return Err(Error::AckHeightOutsideBounds(
820 ack.chunk.height,
821 bound_lo,
822 bound_hi,
823 ));
824 }
825 }
826
827 let Some(identity) = self.coordinator.identity(ack.epoch) else {
830 return Err(Error::UnknownIdentity(ack.epoch));
831 };
832 ops::partial_verify_message(
833 identity,
834 Some(&self.ack_namespace),
835 &serializer::ack(&ack.chunk, ack.epoch),
836 &ack.partial,
837 )
838 .map_err(|_| Error::InvalidPartialSignature)?;
839
840 Ok(())
841 }
842
843 fn validate_chunk(
848 &self,
849 chunk: &parsed::Chunk<D, C::PublicKey>,
850 epoch: Epoch,
851 ) -> Result<(), Error> {
852 if self
854 .coordinator
855 .is_sequencer(epoch, &chunk.sequencer)
856 .is_none()
857 {
858 return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
859 }
860
861 if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
863 match chunk.height.cmp(&tip.chunk.height) {
865 std::cmp::Ordering::Less => {
866 return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
867 }
868 std::cmp::Ordering::Equal => {
869 if tip.chunk.payload != chunk.payload {
871 return Err(Error::ChunkMismatch(
872 chunk.sequencer.to_string(),
873 chunk.height,
874 ));
875 }
876 }
877 std::cmp::Ordering::Greater => {}
878 }
879 }
880
881 Ok(())
882 }
883
884 fn get_journal_section(&self, height: u64) -> u64 {
890 height / self.journal_heights_per_section
891 }
892
893 async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
897 if self.journals.contains_key(sequencer) {
899 return;
900 }
901
902 let cfg = journal::variable::Config {
904 partition: format!("{}{}", &self.journal_name_prefix, sequencer),
905 };
906 let mut journal = Journal::init(self.context.clone(), cfg)
907 .await
908 .expect("unable to init journal");
909
910 {
912 debug!(?sequencer, "journal replay begin");
913
914 let stream = journal
916 .replay(self.journal_replay_concurrency, None)
917 .await
918 .expect("unable to replay journal");
919 pin_mut!(stream);
920
921 let mut tip: Option<parsed::Node<C, D>> = None;
924 let mut num_items = 0;
925 while let Some(msg) = stream.next().await {
926 num_items += 1;
927 let (_, _, _, msg) = msg.expect("unable to decode journal message");
928 let node = parsed::Node::<C, D>::decode(&msg)
929 .expect("journal message is unexpected format");
930 let height = node.chunk.height;
931 match tip {
932 None => {
933 tip = Some(node);
934 }
935 Some(ref t) => {
936 if height > t.chunk.height {
937 tip = Some(node);
938 }
939 }
940 }
941 }
942
943 if let Some(node) = tip.take() {
946 let is_new = self.tip_manager.put(&node);
947 assert!(is_new);
948 }
949
950 debug!(?sequencer, ?num_items, "journal replay end");
951 }
952
953 self.journals.insert(sequencer.clone(), journal);
955 }
956
957 async fn journal_append(&mut self, node: &parsed::Node<C, D>) {
962 let section = self.get_journal_section(node.chunk.height);
963 self.journals
964 .get_mut(&node.chunk.sequencer)
965 .expect("journal does not exist")
966 .append(section, node.encode().into())
967 .await
968 .expect("unable to append to journal");
969 }
970
971 async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: u64) {
973 let section = self.get_journal_section(height);
974
975 let journal = self
977 .journals
978 .get_mut(sequencer)
979 .expect("journal does not exist");
980
981 journal.sync(section).await.expect("unable to sync journal");
983
984 let _ = journal.prune(section).await;
986 }
987
988 fn refresh_epoch(&mut self) {
994 self.refresh_epoch_deadline = Some(self.context.current() + self.refresh_epoch_timeout);
996
997 let epoch = self.coordinator.index();
999 assert!(epoch >= self.epoch);
1000
1001 self.epoch = epoch;
1003 }
1004}
1005
1006#[derive(Error, Debug)]
1008enum Error {
1009 #[error("Application verify error: {0}")]
1011 AppVerifyCanceled(oneshot::Canceled),
1012 #[error("Application verified no tip")]
1013 AppVerifiedNoTip,
1014 #[error("Application verified height mismatch")]
1015 AppVerifiedHeightMismatch,
1016 #[error("Application verified payload mismatch")]
1017 AppVerifiedPayloadMismatch,
1018
1019 #[error("Unable to send message")]
1021 UnableToSendMessage,
1022
1023 #[error("Already broadcast")]
1025 AlreadyBroadcast,
1026 #[error("I am not a sequencer in epoch {0}")]
1027 IAmNotASequencer(u64),
1028 #[error("Nothing to rebroadcast")]
1029 NothingToRebroadcast,
1030 #[error("Broadcast failed")]
1031 BroadcastFailed,
1032 #[error("No threshold for tip")]
1033 NoThresholdForTip(u64),
1034
1035 #[error("Genesis chunk must not have a parent")]
1037 GenesisChunkMustNotHaveParent,
1038 #[error("Node missing parent")]
1039 NodeMissingParent,
1040
1041 #[error("Unknown identity at epoch {0}")]
1043 UnknownIdentity(u64),
1044 #[error("Unknown signers at epoch {0}")]
1045 UnknownSigners(u64),
1046 #[error("Epoch {0} has no sequencer {1}")]
1047 UnknownSequencer(u64, String),
1048 #[error("Epoch {0} has no signer {1}")]
1049 UnknownSigner(u64, String),
1050 #[error("Unknown share at epoch {0}")]
1051 UnknownShare(u64),
1052
1053 #[error("Peer mismatch")]
1055 PeerMismatch,
1056
1057 #[error("Invalid threshold signature")]
1059 InvalidThresholdSignature,
1060 #[error("Invalid partial signature")]
1061 InvalidPartialSignature,
1062 #[error("Invalid node signature")]
1063 InvalidNodeSignature,
1064
1065 #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
1067 AckEpochOutsideBounds(u64, u64, u64),
1068 #[error("Invalid ack height {0} outside bounds {1} - {2}")]
1069 AckHeightOutsideBounds(u64, u64, u64),
1070 #[error("Chunk height {0} lower than tip height {1}")]
1071 ChunkHeightTooLow(u64, u64),
1072
1073 #[error("Chunk mismatch from sender {0} with height {1}")]
1075 ChunkMismatch(String, u64),
1076}