1use super::{AckManager, Config, Mailbox, Message, TipManager};
2use crate::{
3 linked::{namespace, parsed, prover::Prover, serializer, Context, Epoch},
4 Application, Collector, ThresholdCoordinator,
5};
6use commonware_cryptography::{
7 bls12381::primitives::{
8 group::{self},
9 ops,
10 poly::{self},
11 },
12 Array, Scheme,
13};
14use commonware_macros::select;
15use commonware_p2p::{Receiver, Recipients, Sender};
16use commonware_runtime::{Blob, Clock, Spawner, Storage};
17use commonware_storage::journal::{self, variable::Journal};
18use futures::{
19 channel::{mpsc, oneshot},
20 future::{self, Either},
21 pin_mut,
22 stream::FuturesUnordered,
23 StreamExt,
24};
25use prometheus_client::registry::Registry;
26use std::{
27 collections::BTreeMap,
28 future::Future,
29 marker::PhantomData,
30 pin::Pin,
31 sync::{Arc, Mutex},
32 time::{Duration, SystemTime},
33};
34use thiserror::Error;
35use tracing::{debug, error, info, warn};
36
37type VerifyFuture<D, P> =
39 Pin<Box<dyn Future<Output = (Context<P>, D, Result<bool, Error>)> + Send>>;
40
41pub struct Actor<
43 B: Blob,
44 E: Clock + Spawner + Storage<B>,
45 C: Scheme,
46 D: Array,
47 A: Application<Context = Context<C::PublicKey>, Digest = D> + Clone,
48 Z: Collector<Digest = D>,
49 S: ThresholdCoordinator<
50 Index = Epoch,
51 Share = group::Share,
52 Identity = poly::Public,
53 PublicKey = C::PublicKey,
54 >,
55 NetS: Sender<PublicKey = C::PublicKey>,
56 NetR: Receiver<PublicKey = C::PublicKey>,
57> {
58 runtime: E,
62 crypto: C,
63 coordinator: S,
64 application: A,
65 collector: Z,
66 _sender: PhantomData<NetS>,
67 _receiver: PhantomData<NetR>,
68
69 chunk_namespace: Vec<u8>,
75
76 ack_namespace: Vec<u8>,
78
79 refresh_epoch_timeout: Duration,
85 refresh_epoch_deadline: Option<SystemTime>,
86
87 rebroadcast_timeout: Duration,
89 rebroadcast_deadline: Option<SystemTime>,
90
91 epoch_bounds: (u64, u64),
103
104 height_bound: u64,
110
111 pending_verifies: FuturesUnordered<VerifyFuture<D, C::PublicKey>>,
119
120 pending_verify_size: usize,
122
123 mailbox_receiver: mpsc::Receiver<Message<D>>,
125
126 journal_heights_per_section: u64,
132
133 journal_replay_concurrency: usize,
135
136 journal_name_prefix: String,
139
140 journals: BTreeMap<C::PublicKey, Journal<B, E>>,
142
143 tip_manager: TipManager<C, D>,
152
153 ack_manager: AckManager<D, C::PublicKey>,
156
157 epoch: Epoch,
159}
160
161impl<
162 B: Blob,
163 E: Clock + Spawner + Storage<B>,
164 C: Scheme,
165 D: Array,
166 A: Application<Context = Context<C::PublicKey>, Digest = D> + Clone,
167 Z: Collector<Digest = D>,
168 S: ThresholdCoordinator<
169 Index = Epoch,
170 Share = group::Share,
171 Identity = poly::Public,
172 PublicKey = C::PublicKey,
173 >,
174 NetS: Sender<PublicKey = C::PublicKey>,
175 NetR: Receiver<PublicKey = C::PublicKey>,
176 > Actor<B, E, C, D, A, Z, S, NetS, NetR>
177{
178 pub fn new(runtime: E, cfg: Config<C, D, A, Z, S>) -> (Self, Mailbox<D>) {
181 let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
182 let mailbox = Mailbox::new(mailbox_sender);
183
184 let pending_verifies = FuturesUnordered::new();
186 {
187 let dummy: VerifyFuture<D, C::PublicKey> = Box::pin(async {
191 future::pending::<(Context<C::PublicKey>, D, Result<bool, Error>)>().await
192 });
193 pending_verifies.push(dummy);
194 }
195
196 let result = Self {
197 runtime,
198 crypto: cfg.crypto,
199 _sender: PhantomData,
200 _receiver: PhantomData,
201 coordinator: cfg.coordinator,
202 application: cfg.application,
203 collector: cfg.collector,
204 chunk_namespace: namespace::chunk(&cfg.namespace),
205 ack_namespace: namespace::ack(&cfg.namespace),
206 refresh_epoch_timeout: cfg.refresh_epoch_timeout,
207 refresh_epoch_deadline: None,
208 rebroadcast_timeout: cfg.rebroadcast_timeout,
209 rebroadcast_deadline: None,
210 epoch_bounds: cfg.epoch_bounds,
211 height_bound: cfg.height_bound,
212 pending_verifies,
213 pending_verify_size: cfg.pending_verify_size,
214 mailbox_receiver,
215 journal_heights_per_section: cfg.journal_heights_per_section,
216 journal_replay_concurrency: cfg.journal_replay_concurrency,
217 journal_name_prefix: cfg.journal_name_prefix,
218 journals: BTreeMap::new(),
219 tip_manager: TipManager::<C, D>::new(),
220 ack_manager: AckManager::<D, C::PublicKey>::new(),
221 epoch: 0,
222 };
223
224 (result, mailbox)
225 }
226
227 pub async fn run(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) {
240 let (mut node_sender, mut node_receiver) = chunk_network;
241 let (mut ack_sender, mut ack_receiver) = ack_network;
242 let mut shutdown = self.runtime.stopped();
243
244 self.refresh_epoch();
247 self.journal_prepare(&self.crypto.public_key()).await;
248 if let Err(err) = self.rebroadcast(&mut node_sender).await {
249 info!(?err, "initial rebroadcast failed");
251 }
252
253 loop {
254 self.refresh_epoch();
256
257 let refresh_epoch = match self.refresh_epoch_deadline {
260 Some(deadline) => Either::Left(self.runtime.sleep_until(deadline)),
261 None => Either::Right(future::pending()),
262 };
263 let rebroadcast = match self.rebroadcast_deadline {
264 Some(deadline) => Either::Left(self.runtime.sleep_until(deadline)),
265 None => Either::Right(future::pending()),
266 };
267
268 select! {
269 _ = &mut shutdown => {
271 debug!("shutdown");
272 while let Some((_, journal)) = self.journals.pop_first() {
273 journal.close().await.expect("unable to close journal");
274 }
275 return;
276 },
277
278 _ = refresh_epoch => {
280 debug!("refresh epoch");
281 continue;
283 },
284
285 _ = rebroadcast => {
287 debug!("rebroadcast");
288 if let Err(err) = self.rebroadcast(&mut node_sender).await {
289 info!(?err, "rebroadcast failed");
290 continue;
291 }
292 },
293
294 msg = node_receiver.recv() => {
296 debug!("node network");
297 let (sender, msg) = match msg {
299 Ok(r) => r,
300 Err(err) => {
301 error!(?err, "node receiver failed");
302 break;
303 }
304 };
305 let node = match parsed::Node::<C, D>::decode(&msg) {
306 Ok(node) => node,
307 Err(err) => {
308 warn!(?err, ?sender, "node decode failed");
309 continue;
310 }
311 };
312 if let Err(err) = self.validate_node(&node, &sender) {
313 warn!(?err, ?node, ?sender, "node validate failed");
314 continue;
315 };
316
317 self.journal_prepare(&sender).await;
319
320 if let Some(parent) = node.parent.as_ref() {
322 self.handle_threshold(&node.chunk, parent.epoch, parent.threshold).await;
323 }
324
325 self.handle_node(&node).await;
327 },
328
329 msg = ack_receiver.recv() => {
331 debug!("ack network");
332 let (sender, msg) = match msg {
334 Ok(r) => r,
335 Err(err) => {
336 warn!(?err, "ack receiver failed");
337 break;
338 }
339 };
340 let ack = match parsed::Ack::decode(&msg) {
341 Ok(ack) => ack,
342 Err(err) => {
343 warn!(?err, ?sender, "ack decode failed");
344 continue;
345 }
346 };
347 if let Err(err) = self.validate_ack(&ack, &sender) {
348 warn!(?err, ?ack, ?sender, "ack validate failed");
349 continue;
350 };
351 if let Err(err) = self.handle_ack(&ack).await {
352 warn!(?err, ?ack, "ack handle failed");
353 continue;
354 }
355 },
356
357 maybe_verified = self.pending_verifies.select_next_some() => {
359 let (context, digest, verify_result) = maybe_verified;
360 match verify_result {
361 Ok(true) => {
362 debug!(?context, ?digest, "verified");
363 if let Err(err) = self.handle_app_verified(&context, &digest, &mut ack_sender).await {
364 warn!(?err, ?context, ?digest, "verified handle failed");
365 }
366 },
367 Ok(false) => {
368 warn!(?context, ?digest, "verified was false");
369 },
370 Err(err) => {
371 warn!(?err, ?context, ?digest, "verified returned error");
372 },
373 }
374 },
375
376 mail = self.mailbox_receiver.next() => {
378 let Some(msg) = mail else {
379 error!("mailbox receiver failed");
380 break;
381 };
382 match msg {
383 Message::Broadcast{ payload, result } => {
384 debug!("broadcast");
385 if self.coordinator.is_sequencer(self.epoch, &self.crypto.public_key()).is_none() {
386 warn!(epoch=?self.epoch, ?payload, "not a sequencer");
387 continue;
388 }
389
390 if let Err(err) = self.broadcast_new(payload, result, &mut node_sender).await {
392 warn!(?err, "broadcast new failed");
393 continue;
394 }
395 }
396 }
397 }
398 }
399 }
400 }
401
402 async fn handle_app_verified(
411 &mut self,
412 context: &Context<C::PublicKey>,
413 payload: &D,
414 ack_sender: &mut NetS,
415 ) -> Result<(), Error> {
416 let Some(tip) = self.tip_manager.get(&context.sequencer) else {
418 return Err(Error::AppVerifiedNoTip);
419 };
420
421 if tip.chunk.height != context.height {
423 return Err(Error::AppVerifiedHeightMismatch);
424 }
425
426 if tip.chunk.payload != *payload {
428 return Err(Error::AppVerifiedPayloadMismatch);
429 }
430
431 let Some(share) = self.coordinator.share(self.epoch) else {
433 return Err(Error::UnknownShare(self.epoch));
434 };
435 let partial = ops::partial_sign_message(
436 share,
437 Some(&self.ack_namespace),
438 &serializer::ack(&tip.chunk, self.epoch),
439 );
440
441 self.journal_sync(&context.sequencer, context.height).await;
444
445 let recipients = {
448 let Some(signers) = self.coordinator.signers(self.epoch) else {
449 return Err(Error::UnknownSigners(self.epoch));
450 };
451 let mut recipients = signers.clone();
452 if self
453 .coordinator
454 .is_signer(self.epoch, &tip.chunk.sequencer)
455 .is_none()
456 {
457 recipients.push(tip.chunk.sequencer.clone());
458 }
459 recipients
460 };
461
462 let ack = parsed::Ack {
464 chunk: tip.chunk,
465 epoch: self.epoch,
466 partial,
467 };
468 ack_sender
469 .send(Recipients::Some(recipients), ack.encode().into(), false)
470 .await
471 .map_err(|_| Error::UnableToSendMessage)?;
472
473 self.handle_ack(&ack).await?;
475
476 Ok(())
477 }
478
479 async fn handle_threshold(
485 &mut self,
486 chunk: &parsed::Chunk<D, C::PublicKey>,
487 epoch: Epoch,
488 threshold: group::Signature,
489 ) {
490 if !self
492 .ack_manager
493 .add_threshold(&chunk.sequencer, chunk.height, epoch, threshold)
494 {
495 return;
496 }
497
498 let context = Context {
500 sequencer: chunk.sequencer.clone(),
501 height: chunk.height,
502 };
503 let proof =
504 Prover::<C, D>::serialize_threshold(&context, &chunk.payload, epoch, &threshold);
505 self.collector
506 .acknowledged(proof, chunk.payload.clone())
507 .await;
508 }
509
510 async fn handle_ack(&mut self, ack: &parsed::Ack<D, C::PublicKey>) -> Result<(), Error> {
515 let Some(identity) = self.coordinator.identity(ack.epoch) else {
517 return Err(Error::UnknownIdentity(ack.epoch));
518 };
519 let quorum = identity.required();
520
521 if let Some(threshold) = self.ack_manager.add_ack(ack, quorum) {
523 self.handle_threshold(&ack.chunk, ack.epoch, threshold)
525 .await;
526 }
527
528 Ok(())
529 }
530
531 async fn handle_node(&mut self, node: &parsed::Node<C, D>) {
535 let is_new = self.tip_manager.put(node);
537
538 if is_new {
542 self.journal_append(node).await;
543 self.journal_sync(&node.chunk.sequencer, node.chunk.height)
544 .await;
545 }
546
547 if self.pending_verifies.len() > self.pending_verify_size {
551 warn!(n=?self.pending_verifies.len(), "too many pending verifies");
552 return;
553 }
554
555 let context = Context {
557 sequencer: node.chunk.sequencer.clone(),
558 height: node.chunk.height,
559 };
560 let payload = node.chunk.payload.clone();
561 let mut app_clone = self.application.clone();
562 let verify_future = Box::pin(async move {
563 let receiver = app_clone.verify(context.clone(), payload.clone()).await;
564 let result = receiver.await.map_err(Error::AppVerifyCanceled);
565 (context, payload, result)
566 });
567
568 self.pending_verifies.push(verify_future);
570 }
571
572 async fn broadcast_new(
581 &mut self,
582 payload: D,
583 result: oneshot::Sender<bool>,
584 node_sender: &mut NetS,
585 ) -> Result<(), Error> {
586 let me = self.crypto.public_key();
587
588 let mut height = 0;
590 let mut parent = None;
591 if let Some(tip) = self.tip_manager.get(&me) {
592 let Some((epoch, threshold)) = self.ack_manager.get_threshold(&me, tip.chunk.height)
594 else {
595 let _ = result.send(false);
596 return Err(Error::NoThresholdForTip(tip.chunk.height));
597 };
598
599 height = tip.chunk.height + 1;
601 parent = Some(parsed::Parent {
602 payload: tip.chunk.payload,
603 threshold,
604 epoch,
605 });
606 }
607
608 let chunk = parsed::Chunk {
610 sequencer: me.clone(),
611 height,
612 payload,
613 };
614 let signature = self
615 .crypto
616 .sign(Some(&self.chunk_namespace), &serializer::chunk(&chunk));
617 let node = parsed::Node::<C, D> {
618 chunk,
619 signature,
620 parent,
621 };
622
623 self.handle_node(&node).await;
625
626 self.journal_sync(&me, height).await;
629
630 if let Err(err) = self.broadcast(&node, node_sender, self.epoch).await {
632 let _ = result.send(false);
633 return Err(err);
634 };
635
636 let _ = result.send(true);
638 Ok(())
639 }
640
641 async fn rebroadcast(&mut self, node_sender: &mut NetS) -> Result<(), Error> {
648 self.rebroadcast_deadline = None;
650
651 let me = self.crypto.public_key();
653 if self.coordinator.is_sequencer(self.epoch, &me).is_none() {
654 return Err(Error::IAmNotASequencer(self.epoch));
655 }
656
657 let Some(tip) = self.tip_manager.get(&me) else {
659 return Err(Error::NothingToRebroadcast);
660 };
661
662 if self
664 .ack_manager
665 .get_threshold(&me, tip.chunk.height)
666 .is_some()
667 {
668 return Err(Error::AlreadyBroadcast);
669 }
670
671 self.broadcast(&tip, node_sender, self.epoch).await?;
673
674 Ok(())
675 }
676
677 async fn broadcast(
679 &mut self,
680 node: &parsed::Node<C, D>,
681 node_sender: &mut NetS,
682 epoch: Epoch,
683 ) -> Result<(), Error> {
684 let Some(signers) = self.coordinator.signers(epoch) else {
686 return Err(Error::UnknownSigners(epoch));
687 };
688 node_sender
689 .send(
690 Recipients::Some(signers.clone()),
691 node.encode().into(),
692 false,
693 )
694 .await
695 .map_err(|_| Error::BroadcastFailed)?;
696
697 self.rebroadcast_deadline = Some(self.runtime.current() + self.rebroadcast_timeout);
699
700 Ok(())
701 }
702
703 fn validate_node(
712 &mut self,
713 node: &parsed::Node<C, D>,
714 sender: &C::PublicKey,
715 ) -> Result<(), Error> {
716 if node.chunk.sequencer != *sender {
718 return Err(Error::PeerMismatch);
719 }
720
721 if let Some(tip) = self.tip_manager.get(sender) {
724 if tip == *node {
725 return Ok(());
726 }
727 }
728
729 self.validate_chunk(&node.chunk, self.epoch)?;
731
732 if !C::verify(
734 Some(&self.chunk_namespace),
735 &serializer::chunk(&node.chunk),
736 sender,
737 &node.signature,
738 ) {
739 return Err(Error::InvalidNodeSignature);
740 }
741
742 if node.chunk.height == 0 {
744 if node.parent.is_some() {
745 return Err(Error::GenesisChunkMustNotHaveParent);
746 }
747 return Ok(());
748 }
749
750 let Some(parent) = &node.parent else {
752 return Err(Error::NodeMissingParent);
753 };
754 let parent_chunk = parsed::Chunk {
755 sequencer: sender.clone(),
756 height: node.chunk.height.checked_sub(1).unwrap(),
757 payload: parent.payload.clone(),
758 };
759
760 let Some(identity) = self.coordinator.identity(parent.epoch) else {
762 return Err(Error::UnknownIdentity(parent.epoch));
763 };
764 let public_key = poly::public(identity);
765 ops::verify_message(
766 &public_key,
767 Some(&self.ack_namespace),
768 &serializer::ack(&parent_chunk, parent.epoch),
769 &parent.threshold,
770 )
771 .map_err(|_| Error::InvalidThresholdSignature)?;
772
773 Ok(())
774 }
775
776 fn validate_ack(
781 &self,
782 ack: &parsed::Ack<D, C::PublicKey>,
783 sender: &C::PublicKey,
784 ) -> Result<(), Error> {
785 self.validate_chunk(&ack.chunk, ack.epoch)?;
787
788 let Some(signer_index) = self.coordinator.is_signer(ack.epoch, sender) else {
790 return Err(Error::UnknownSigner(ack.epoch, sender.to_string()));
791 };
792 if signer_index != ack.partial.index {
793 return Err(Error::PeerMismatch);
794 }
795
796 {
798 let (eb_lo, eb_hi) = self.epoch_bounds;
799 let bound_lo = self.epoch.saturating_sub(eb_lo);
800 let bound_hi = self.epoch.saturating_add(eb_hi);
801 if ack.epoch < bound_lo || ack.epoch > bound_hi {
802 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
803 }
804 }
805
806 {
808 let bound_lo = self
809 .tip_manager
810 .get(&ack.chunk.sequencer)
811 .map(|t| t.chunk.height)
812 .unwrap_or(0);
813 let bound_hi = bound_lo + self.height_bound;
814 if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
815 return Err(Error::AckHeightOutsideBounds(
816 ack.chunk.height,
817 bound_lo,
818 bound_hi,
819 ));
820 }
821 }
822
823 let Some(identity) = self.coordinator.identity(ack.epoch) else {
826 return Err(Error::UnknownIdentity(ack.epoch));
827 };
828 ops::partial_verify_message(
829 identity,
830 Some(&self.ack_namespace),
831 &serializer::ack(&ack.chunk, ack.epoch),
832 &ack.partial,
833 )
834 .map_err(|_| Error::InvalidPartialSignature)?;
835
836 Ok(())
837 }
838
839 fn validate_chunk(
844 &self,
845 chunk: &parsed::Chunk<D, C::PublicKey>,
846 epoch: Epoch,
847 ) -> Result<(), Error> {
848 if self
850 .coordinator
851 .is_sequencer(epoch, &chunk.sequencer)
852 .is_none()
853 {
854 return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
855 }
856
857 if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
859 match chunk.height.cmp(&tip.chunk.height) {
861 std::cmp::Ordering::Less => {
862 return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
863 }
864 std::cmp::Ordering::Equal => {
865 if tip.chunk.payload != chunk.payload {
867 return Err(Error::ChunkMismatch(
868 chunk.sequencer.to_string(),
869 chunk.height,
870 ));
871 }
872 }
873 std::cmp::Ordering::Greater => {}
874 }
875 }
876
877 Ok(())
878 }
879
880 fn get_journal_section(&self, height: u64) -> u64 {
886 height / self.journal_heights_per_section
887 }
888
889 async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
893 if self.journals.contains_key(sequencer) {
895 return;
896 }
897
898 let cfg = journal::variable::Config {
900 registry: Arc::new(Mutex::new(Registry::default())),
901 partition: format!("{}{}", &self.journal_name_prefix, sequencer),
902 };
903 let mut journal = Journal::init(self.runtime.clone(), cfg)
904 .await
905 .expect("unable to init journal");
906
907 {
909 debug!(?sequencer, "journal replay begin");
910
911 let stream = journal
913 .replay(self.journal_replay_concurrency, None)
914 .await
915 .expect("unable to replay journal");
916 pin_mut!(stream);
917
918 let mut tip: Option<parsed::Node<C, D>> = None;
921 let mut num_items = 0;
922 while let Some(msg) = stream.next().await {
923 num_items += 1;
924 let (_, _, _, msg) = msg.expect("unable to decode journal message");
925 let node = parsed::Node::<C, D>::decode(&msg)
926 .expect("journal message is unexpected format");
927 let height = node.chunk.height;
928 match tip {
929 None => {
930 tip = Some(node);
931 }
932 Some(ref t) => {
933 if height > t.chunk.height {
934 tip = Some(node);
935 }
936 }
937 }
938 }
939
940 if let Some(node) = tip.take() {
943 let is_new = self.tip_manager.put(&node);
944 assert!(is_new);
945 }
946
947 debug!(?sequencer, ?num_items, "journal replay end");
948 }
949
950 self.journals.insert(sequencer.clone(), journal);
952 }
953
954 async fn journal_append(&mut self, node: &parsed::Node<C, D>) {
959 let section = self.get_journal_section(node.chunk.height);
960 self.journals
961 .get_mut(&node.chunk.sequencer)
962 .expect("journal does not exist")
963 .append(section, node.encode().into())
964 .await
965 .expect("unable to append to journal");
966 }
967
968 async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: u64) {
970 let section = self.get_journal_section(height);
971
972 let journal = self
974 .journals
975 .get_mut(sequencer)
976 .expect("journal does not exist");
977
978 journal.sync(section).await.expect("unable to sync journal");
980
981 let _ = journal.prune(section).await;
983 }
984
985 fn refresh_epoch(&mut self) {
991 self.refresh_epoch_deadline = Some(self.runtime.current() + self.refresh_epoch_timeout);
993
994 let epoch = self.coordinator.index();
996 assert!(epoch >= self.epoch);
997
998 self.epoch = epoch;
1000 }
1001}
1002
1003#[derive(Error, Debug)]
1005enum Error {
1006 #[error("Application verify error: {0}")]
1008 AppVerifyCanceled(oneshot::Canceled),
1009 #[error("Application verified no tip")]
1010 AppVerifiedNoTip,
1011 #[error("Application verified height mismatch")]
1012 AppVerifiedHeightMismatch,
1013 #[error("Application verified payload mismatch")]
1014 AppVerifiedPayloadMismatch,
1015
1016 #[error("Unable to send message")]
1018 UnableToSendMessage,
1019
1020 #[error("Already broadcast")]
1022 AlreadyBroadcast,
1023 #[error("I am not a sequencer in epoch {0}")]
1024 IAmNotASequencer(u64),
1025 #[error("Nothing to rebroadcast")]
1026 NothingToRebroadcast,
1027 #[error("Broadcast failed")]
1028 BroadcastFailed,
1029 #[error("No threshold for tip")]
1030 NoThresholdForTip(u64),
1031
1032 #[error("Genesis chunk must not have a parent")]
1034 GenesisChunkMustNotHaveParent,
1035 #[error("Node missing parent")]
1036 NodeMissingParent,
1037
1038 #[error("Unknown identity at epoch {0}")]
1040 UnknownIdentity(u64),
1041 #[error("Unknown signers at epoch {0}")]
1042 UnknownSigners(u64),
1043 #[error("Epoch {0} has no sequencer {1}")]
1044 UnknownSequencer(u64, String),
1045 #[error("Epoch {0} has no signer {1}")]
1046 UnknownSigner(u64, String),
1047 #[error("Unknown share at epoch {0}")]
1048 UnknownShare(u64),
1049
1050 #[error("Peer mismatch")]
1052 PeerMismatch,
1053
1054 #[error("Invalid threshold signature")]
1056 InvalidThresholdSignature,
1057 #[error("Invalid partial signature")]
1058 InvalidPartialSignature,
1059 #[error("Invalid node signature")]
1060 InvalidNodeSignature,
1061
1062 #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
1064 AckEpochOutsideBounds(u64, u64, u64),
1065 #[error("Invalid ack height {0} outside bounds {1} - {2}")]
1066 AckHeightOutsideBounds(u64, u64, u64),
1067 #[error("Chunk height {0} lower than tip height {1}")]
1068 ChunkHeightTooLow(u64, u64),
1069
1070 #[error("Chunk mismatch from sender {0} with height {1}")]
1072 ChunkMismatch(String, u64),
1073}