1use super::{
142 mailbox::{Mailbox, Message},
143 metrics::ShardMetrics,
144};
145use crate::{
146 marshal::coding::{
147 types::{CodedBlock, Shard},
148 validation::{validate_reconstruction, ReconstructionError as InvariantError},
149 },
150 types::{coding::Commitment, Epoch, Round},
151 Block, CertifiableBlock, Heightable,
152};
153use commonware_actor::mailbox;
154use commonware_codec::{Decode, Error as CodecError, Read};
155use commonware_coding::{Config as CodingConfig, Scheme as CodingScheme};
156use commonware_cryptography::{
157 certificate::{Provider, Scheme as CertificateScheme},
158 Committable, Digestible, Hasher, PublicKey,
159};
160use commonware_macros::select_loop;
161use commonware_p2p::{
162 utils::codec::{WrappedBackgroundReceiver, WrappedSender},
163 Blocker, Provider as PeerProvider, Receiver, Recipients, Sender,
164};
165use commonware_parallel::Strategy;
166use commonware_runtime::{
167 spawn_cell,
168 telemetry::metrics::{GaugeExt, HistogramExt},
169 BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner,
170};
171use commonware_utils::{
172 bitmap::BitMap,
173 channel::{fallible::OneshotExt, oneshot},
174 ordered::{Quorum, Set},
175};
176use rand::Rng;
177use std::{
178 collections::{BTreeMap, VecDeque},
179 num::NonZeroUsize,
180};
181use thiserror::Error;
182use tracing::{debug, warn};
183
184#[derive(Debug, Error)]
186pub enum Error<C: CodingScheme> {
187 #[error(transparent)]
189 Coding(C::Error),
190
191 #[error(transparent)]
193 Codec(#[from] CodecError),
194
195 #[error("block digest mismatch: reconstructed block does not match commitment digest")]
197 DigestMismatch,
198
199 #[error("block config mismatch: reconstructed config does not match commitment config")]
201 ConfigMismatch,
202
203 #[error("block context mismatch: reconstructed context does not match commitment context")]
205 ContextMismatch,
206}
207
208#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
209enum BlockSubscriptionKey<D> {
210 Commitment(Commitment),
211 Digest(D),
212}
213
214pub struct Config<P, S, X, D, C, H, B, T>
216where
217 P: PublicKey,
218 S: Provider<Scope = Epoch>,
219 X: Blocker<PublicKey = P>,
220 D: PeerProvider<PublicKey = P>,
221 C: CodingScheme,
222 H: Hasher,
223 B: CertifiableBlock,
224 T: Strategy,
225{
226 pub scheme_provider: S,
228
229 pub blocker: X,
231
232 pub shard_codec_cfg: <Shard<C, H> as Read>::Cfg,
234
235 pub block_codec_cfg: B::Cfg,
237
238 pub strategy: T,
240
241 pub mailbox_size: NonZeroUsize,
243
244 pub peer_buffer_size: NonZeroUsize,
255
256 pub background_channel_capacity: NonZeroUsize,
262
263 pub peer_provider: D,
269}
270
271struct ReconstructedBlock<B, C, H>
273where
274 B: Block,
275 C: CodingScheme,
276 H: Hasher,
277{
278 round: Round,
279 block: CodedBlock<B, C, H>,
280}
281
282pub struct Engine<E, S, X, D, C, H, B, P, T>
287where
288 E: BufferPooler + Rng + Spawner + Metrics + Clock,
289 S: Provider<Scope = Epoch>,
290 S::Scheme: CertificateScheme<PublicKey = P>,
291 X: Blocker,
292 D: PeerProvider<PublicKey = P>,
293 C: CodingScheme,
294 H: Hasher,
295 B: CertifiableBlock,
296 P: PublicKey,
297 T: Strategy,
298{
299 context: ContextCell<E>,
301
302 mailbox: mailbox::Receiver<Message<B, C, H, P>>,
304
305 scheme_provider: S,
307
308 blocker: X,
310
311 shard_codec_cfg: <Shard<C, H> as Read>::Cfg,
313
314 block_codec_cfg: B::Cfg,
316
317 strategy: T,
319
320 state: BTreeMap<Commitment, ReconstructionState<P, C, H>>,
322
323 peer_buffers: BTreeMap<P, VecDeque<Shard<C, H>>>,
328
329 peer_buffer_size: NonZeroUsize,
331
332 peer_provider: D,
334
335 aggregate_peers: Set<P>,
338
339 latest_primary_peers: Set<P>,
341
342 background_channel_capacity: NonZeroUsize,
344
345 reconstructed_blocks: BTreeMap<Commitment, ReconstructedBlock<B, C, H>>,
349
350 assigned_shard_verified_subscriptions: BTreeMap<Commitment, Vec<oneshot::Sender<()>>>,
361
362 #[allow(clippy::type_complexity)]
365 block_subscriptions:
366 BTreeMap<BlockSubscriptionKey<B::Digest>, Vec<oneshot::Sender<CodedBlock<B, C, H>>>>,
367
368 metrics: ShardMetrics<P>,
370}
371
372impl<E, S, X, D, C, H, B, P, T> Engine<E, S, X, D, C, H, B, P, T>
373where
374 E: BufferPooler + Rng + Spawner + Metrics + Clock,
375 S: Provider<Scope = Epoch>,
376 S::Scheme: CertificateScheme<PublicKey = P>,
377 X: Blocker<PublicKey = P>,
378 D: PeerProvider<PublicKey = P>,
379 C: CodingScheme,
380 H: Hasher,
381 B: CertifiableBlock,
382 P: PublicKey,
383 T: Strategy,
384{
385 pub fn new(context: E, config: Config<P, S, X, D, C, H, B, T>) -> (Self, Mailbox<B, C, H, P>) {
387 let metrics = ShardMetrics::new(&context);
388 let (sender, mailbox) = mailbox::new(context.child("mailbox"), config.mailbox_size);
389 (
390 Self {
391 context: ContextCell::new(context),
392 mailbox,
393 scheme_provider: config.scheme_provider,
394 blocker: config.blocker,
395 shard_codec_cfg: config.shard_codec_cfg,
396 block_codec_cfg: config.block_codec_cfg,
397 strategy: config.strategy,
398 state: BTreeMap::new(),
399 peer_buffers: BTreeMap::new(),
400 peer_buffer_size: config.peer_buffer_size,
401 peer_provider: config.peer_provider,
402 aggregate_peers: Set::default(),
403 latest_primary_peers: Set::default(),
404 background_channel_capacity: config.background_channel_capacity,
405 reconstructed_blocks: BTreeMap::new(),
406 assigned_shard_verified_subscriptions: BTreeMap::new(),
407 block_subscriptions: BTreeMap::new(),
408 metrics,
409 },
410 Mailbox::new(sender),
411 )
412 }
413
414 pub fn start(
416 mut self,
417 network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
418 ) -> Handle<()> {
419 spawn_cell!(self.context, self.run(network))
420 }
421
422 async fn run(
424 mut self,
425 (sender, receiver): (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
426 ) {
427 let mut sender = WrappedSender::<_, Shard<C, H>>::new(
428 self.context.network_buffer_pool().clone(),
429 sender,
430 );
431 let (receiver_service, mut receiver) =
432 WrappedBackgroundReceiver::<_, P, X, _, Shard<C, H>>::new(
433 self.context.child("shard_ingress"),
434 receiver,
435 self.shard_codec_cfg.clone(),
436 self.blocker.clone(),
437 self.background_channel_capacity,
438 &self.strategy,
439 );
440 let _receiver_handle = receiver_service.start();
442 let mut peer_set_subscription = self.peer_provider.subscribe().await;
443
444 select_loop! {
445 self.context,
446 on_start => {
447 let _ = self
448 .metrics
449 .reconstruction_states_count
450 .try_set(self.state.len());
451 let _ = self
452 .metrics
453 .reconstructed_blocks_cache_count
454 .try_set(self.reconstructed_blocks.len());
455
456 self.block_subscriptions.retain(|_, subscribers| {
458 subscribers.retain(|tx| !tx.is_closed());
459 !subscribers.is_empty()
460 });
461 self.assigned_shard_verified_subscriptions
462 .retain(|_, subscribers| {
463 subscribers.retain(|tx| !tx.is_closed());
464 !subscribers.is_empty()
465 });
466 },
467 on_stopped => {
468 debug!("received shutdown signal, stopping shard engine");
469 },
470 Some(update) = peer_set_subscription.recv() else {
471 debug!("peer set subscription closed");
472 return;
473 } => {
474 let all_peers = update.all.union();
475 self.update_latest_primary_peers(update.latest.primary);
476 self.aggregate_peers = all_peers;
477 },
478 Some(message) = self.mailbox.recv() else {
479 debug!("shard mailbox closed, stopping shard engine");
480 return;
481 } => {
482 if message.response_closed() {
483 continue;
484 }
485
486 match message {
487 Message::Proposed { block, round } => {
488 self.broadcast_shards(&mut sender, round, block);
489 }
490 Message::Discovered {
491 commitment,
492 leader,
493 round,
494 } => {
495 self.handle_external_proposal(&mut sender, commitment, leader, round);
496 }
497 Message::Notarized { commitment, round } => {
498 self.handle_notarized_commitment(&mut sender, commitment, round);
499 }
500 Message::GetByCommitment {
501 commitment,
502 response,
503 } => {
504 let block = self
505 .reconstructed_blocks
506 .get(&commitment)
507 .map(|entry| entry.block.clone());
508 response.send_lossy(block);
509 }
510 Message::GetByDigest { digest, response } => {
511 let block = self
512 .reconstructed_blocks
513 .values()
514 .find_map(|entry| {
515 (entry.block.digest() == digest).then_some(entry.block.clone())
516 });
517 response.send_lossy(block);
518 }
519 Message::SubscribeAssignedShardVerified {
520 commitment,
521 response,
522 } => {
523 self.handle_assigned_shard_verified_subscription(commitment, response);
524 }
525 Message::SubscribeByCommitment {
526 commitment,
527 response,
528 } => {
529 self.handle_block_subscription(
530 BlockSubscriptionKey::Commitment(commitment),
531 response,
532 );
533 }
534 Message::SubscribeByDigest { digest, response } => {
535 self.handle_block_subscription(
536 BlockSubscriptionKey::Digest(digest),
537 response,
538 );
539 }
540 Message::Prune { through } => {
541 self.prune(through);
542 }
543 }
544 },
545 Some((peer, shard)) = receiver.recv() else {
546 debug!("receiver closed, stopping shard engine");
547 return;
548 } => {
549 self.handle_network_shard(&mut sender, peer, shard);
550 },
551 }
552 }
553
554 fn handle_network_shard<Sr: Sender<PublicKey = P>>(
556 &mut self,
557 sender: &mut WrappedSender<Sr, Shard<C, H>>,
558 peer: P,
559 shard: Shard<C, H>,
560 ) {
561 self.metrics.shards_received.get_or_create_by(&peer).inc();
562
563 let commitment = shard.commitment();
564 if !self.should_handle_network_shard(commitment) {
565 return;
566 }
567
568 if let Some(existing) = self.state.get(&commitment) {
569 let round = existing.round();
570 let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
571 warn!(%commitment, "no scheme for epoch, ignoring shard");
572 return;
573 };
574
575 if existing.leader().is_none() {
579 if let Some(sender_index) = scheme.participants().index(&peer) {
580 let expected_index: u16 = sender_index
581 .get()
582 .try_into()
583 .expect("participant index impossibly out of bounds");
584 if shard.index() != expected_index {
585 self.buffer_peer_shard(peer, shard);
589 return;
590 }
591 }
592 }
593
594 let state = self
595 .state
596 .get_mut(&commitment)
597 .expect("state checked as present");
598 let progressed = state.on_network_shard(
599 peer,
600 shard,
601 InsertCtx::new(scheme.as_ref(), &self.strategy),
602 &mut self.blocker,
603 );
604 if progressed {
605 self.try_advance(sender, commitment);
606 }
607 } else {
608 self.buffer_peer_shard(peer, shard);
609 }
610 }
611
612 fn should_handle_network_shard(&self, commitment: Commitment) -> bool {
619 if self.reconstructed_blocks.contains_key(&commitment) {
620 return self
625 .state
626 .get(&commitment)
627 .is_some_and(|s| !s.is_assigned_shard_verified());
628 }
629 true
630 }
631
632 #[allow(clippy::type_complexity)]
640 fn try_reconstruct(
641 &mut self,
642 commitment: Commitment,
643 ) -> Result<Option<CodedBlock<B, C, H>>, Error<C>> {
644 if let Some(entry) = self.reconstructed_blocks.get(&commitment) {
645 return Ok(Some(entry.block.clone()));
646 }
647 let Some(state) = self.state.get_mut(&commitment) else {
648 return Ok(None);
649 };
650 let round = state.round();
651 if state.checked_shards().len() < usize::from(commitment.config().minimum_shards.get()) {
652 debug!(%commitment, "not enough checked shards to reconstruct block");
653 return Ok(None);
654 }
655 let start = self.context.current();
657 let blob = C::decode(
658 &commitment.config(),
659 &commitment.root(),
660 state.checked_shards().iter(),
661 &self.strategy,
662 )
663 .map_err(Error::Coding)?;
664 self.metrics
665 .erasure_decode_duration
666 .observe_between(start, self.context.current());
667
668 let (inner, config): (B, CodingConfig) =
670 Decode::decode_cfg(&mut blob.as_slice(), &(self.block_codec_cfg.clone(), ()))?;
671
672 match validate_reconstruction::<H, _>(&inner, config, commitment) {
673 Ok(()) => {}
674 Err(InvariantError::BlockDigest) => {
675 return Err(Error::DigestMismatch);
676 }
677 Err(InvariantError::CodingConfig) => {
678 warn!(
679 %commitment,
680 expected_config = ?commitment.config(),
681 actual_config = ?config,
682 "reconstructed block config does not match commitment config, but digest matches"
683 );
684 return Err(Error::ConfigMismatch);
685 }
686 Err(InvariantError::ContextDigest(expected, actual)) => {
687 warn!(
688 %commitment,
689 expected_context_digest = ?expected,
690 actual_context_digest = ?actual,
691 "reconstructed block context digest does not match commitment context digest"
692 );
693 return Err(Error::ContextMismatch);
694 }
695 }
696
697 let block = CodedBlock::new_trusted(inner, commitment);
700 self.cache_block(round, block.clone());
701 self.metrics.blocks_reconstructed_total.inc();
702 Ok(Some(block))
703 }
704
705 fn handle_external_proposal<Sr: Sender<PublicKey = P>>(
707 &mut self,
708 sender: &mut WrappedSender<Sr, Shard<C, H>>,
709 commitment: Commitment,
710 leader: P,
711 round: Round,
712 ) {
713 if self.reconstructed_blocks.contains_key(&commitment)
718 && self
719 .state
720 .get(&commitment)
721 .is_none_or(|state| state.leader().is_some())
722 {
723 return;
724 }
725 let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
726 warn!(%commitment, "no scheme for epoch, ignoring external proposal");
727 return;
728 };
729 let participants = scheme.participants();
730 if participants.index(&leader).is_none() {
731 warn!(?leader, %commitment, "leader update for non-participant, ignoring");
732 return;
733 }
734 if let Some(state) = self.state.get_mut(&commitment) {
735 if let Some(existing) = state.leader() {
736 if existing != &leader {
737 warn!(
738 existing = ?existing,
739 ?leader,
740 %commitment,
741 "conflicting leader update, ignoring"
742 );
743 }
744 return;
745 }
746 state
747 .set_leader(leader)
748 .expect("leader was checked as absent");
749 } else {
750 let participants_len = u64::try_from(participants.len())
751 .expect("participant count impossibly out of bounds");
752 self.state.insert(
753 commitment,
754 ReconstructionState::new(Some(leader), round, participants_len),
755 );
756 }
757 let buffered_progress = self.ingest_buffered_shards(commitment);
758 if buffered_progress {
759 self.try_advance(sender, commitment);
760 }
761 }
762
763 fn handle_notarized_commitment<Sr: Sender<PublicKey = P>>(
769 &mut self,
770 sender: &mut WrappedSender<Sr, Shard<C, H>>,
771 commitment: Commitment,
772 round: Round,
773 ) {
774 if self.reconstructed_blocks.contains_key(&commitment) {
775 return;
776 }
777 if self.state.contains_key(&commitment) {
778 let buffered_progress = self.ingest_buffered_shards(commitment);
779 if buffered_progress {
780 self.try_advance(sender, commitment);
781 }
782 return;
783 }
784 let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
785 warn!(%commitment, "no scheme for epoch, ignoring notarized commitment");
786 return;
787 };
788 let participants_len = u64::try_from(scheme.participants().len())
789 .expect("participant count impossibly out of bounds");
790 self.state.insert(
791 commitment,
792 ReconstructionState::new(None, round, participants_len),
793 );
794 let buffered_progress = self.ingest_buffered_shards(commitment);
795 if buffered_progress {
796 self.try_advance(sender, commitment);
797 }
798 }
799
800 fn buffer_peer_shard(&mut self, peer: P, shard: Shard<C, H>) {
802 if self.latest_primary_peers.position(&peer).is_none() {
803 debug!(
804 ?peer,
805 "pre-leader shard from peer outside latest.primary not buffered"
806 );
807 return;
808 }
809 let queue = self.peer_buffers.entry(peer).or_default();
810 if queue.len() >= self.peer_buffer_size.get() {
811 let _ = queue.pop_front();
812 }
813 queue.push_back(shard);
814 }
815
816 fn update_latest_primary_peers(&mut self, peers: Set<P>) {
817 self.peer_buffers
818 .retain(|peer, _| peers.position(peer).is_some());
819 self.latest_primary_peers = peers;
820 }
821
822 fn ingest_buffered_shards(&mut self, commitment: Commitment) -> bool {
832 let state = self
833 .state
834 .get(&commitment)
835 .expect("buffered shards can only be ingested with reconstruction state");
836 let round = state.round();
837 let leader_known = state.leader().is_some();
838 let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
839 warn!(%commitment, "no scheme for epoch, dropping buffered shards");
840 return false;
841 };
842
843 let mut buffered = Vec::new();
844 for (peer, queue) in self.peer_buffers.iter_mut() {
845 let mut i = 0;
846 while i < queue.len() {
847 if queue[i].commitment() != commitment {
848 i += 1;
849 continue;
850 }
851 if !leader_known {
852 let Some(sender_index) = scheme.participants().index(peer) else {
853 i += 1;
854 continue;
855 };
856 let expected_index: u16 = sender_index
857 .get()
858 .try_into()
859 .expect("participant index impossibly out of bounds");
860 if queue[i].index() != expected_index {
861 i += 1;
862 continue;
863 }
864 }
865 let shard = queue.swap_remove_back(i).expect("index is valid");
866 buffered.push((peer.clone(), shard));
867 }
868 }
869
870 let state = self
871 .state
872 .get_mut(&commitment)
873 .expect("reconstruction state checked before buffered shard drain");
874
875 let mut progressed = false;
878 let ctx = InsertCtx::new(scheme.as_ref(), &self.strategy);
879 for (peer, shard) in buffered {
880 progressed |= state.on_network_shard(peer, shard, ctx, &mut self.blocker);
881 }
882 progressed
883 }
884
885 fn cache_block(&mut self, round: Round, block: CodedBlock<B, C, H>) {
887 let commitment = block.commitment();
888 self.reconstructed_blocks.insert(
889 commitment,
890 ReconstructedBlock {
891 round,
892 block: block.clone(),
893 },
894 );
895 self.notify_block_subscribers(block);
896 }
897
898 fn broadcast_shards<Sr: Sender<PublicKey = P>>(
903 &mut self,
904 sender: &mut WrappedSender<Sr, Shard<C, H>>,
905 round: Round,
906 mut block: CodedBlock<B, C, H>,
907 ) {
908 let commitment = block.commitment();
909
910 let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
911 warn!(%commitment, "no scheme available, cannot broadcast shards");
912 return;
913 };
914 let participants = scheme.participants();
915 let Some(me) = scheme.me() else {
916 warn!(
917 %commitment,
918 "cannot broadcast shards: local proposer is not a participant"
919 );
920 return;
921 };
922
923 let shard_count = block.shards(&self.strategy).len();
924 if shard_count != participants.len() {
925 warn!(
926 %commitment,
927 shard_count,
928 participants = participants.len(),
929 "cannot broadcast shards: participant/shard count mismatch"
930 );
931 return;
932 }
933
934 let my_index = me.get() as usize;
935 let leader_shard = block
936 .shard(my_index as u16)
937 .expect("proposer's shard must exist");
938
939 for (index, peer) in participants.iter().enumerate() {
941 if index == my_index {
942 continue;
943 }
944
945 let Some(shard) = block.shard(index as u16) else {
946 warn!(
947 %commitment,
948 index,
949 "cannot broadcast shards: missing shard for participant index"
950 );
951 return;
952 };
953 let _ = sender.send(Recipients::One(peer.clone()), shard, true);
954 }
955
956 let non_participants: Vec<P> = self
958 .aggregate_peers
959 .iter()
960 .filter(|peer| participants.index(peer).is_none())
961 .cloned()
962 .collect();
963 if !non_participants.is_empty() {
964 let _ = sender.send(Recipients::Some(non_participants), leader_shard, true);
965 }
966
967 self.cache_block(round, block);
969
970 self.notify_assigned_shard_verified_subscribers(commitment);
973
974 debug!(?commitment, "broadcasted shards");
975 }
976
977 fn broadcast_shard<Sr: Sender<PublicKey = P>>(
979 &mut self,
980 sender: &mut WrappedSender<Sr, Shard<C, H>>,
981 shard: Shard<C, H>,
982 ) {
983 let commitment = shard.commitment();
984 let peers = sender.send(Recipients::All, shard, true);
985 debug!(
986 ?commitment,
987 peers = peers.len(),
988 "broadcasted shard to all peers"
989 );
990 }
991
992 fn try_advance<Sr: Sender<PublicKey = P>>(
996 &mut self,
997 sender: &mut WrappedSender<Sr, Shard<C, H>>,
998 commitment: Commitment,
999 ) {
1000 if let Some(state) = self.state.get_mut(&commitment) {
1001 match state.take_pending_action() {
1002 Some(AssignedShardVerifiedAction::Broadcast(shard)) => {
1003 self.broadcast_shard(sender, shard);
1004 self.notify_assigned_shard_verified_subscribers(commitment);
1005 }
1006 Some(AssignedShardVerifiedAction::NotifyOnly) => {
1007 self.notify_assigned_shard_verified_subscribers(commitment);
1008 }
1009 None => {}
1010 }
1011 }
1012
1013 match self.try_reconstruct(commitment) {
1014 Ok(Some(block)) => {
1015 debug!(
1021 %commitment,
1022 parent = %block.parent(),
1023 height = %block.height(),
1024 "successfully reconstructed block from shards"
1025 );
1026 }
1027 Ok(None) => {
1028 debug!(%commitment, "not enough checked shards to reconstruct block");
1029 }
1030 Err(err) => {
1031 warn!(%commitment, ?err, "failed to reconstruct block from checked shards");
1032 self.state.remove(&commitment);
1033 self.drop_subscriptions(commitment);
1034 self.metrics.reconstruction_failures_total.inc();
1035 }
1036 }
1037 }
1038
1039 fn handle_assigned_shard_verified_subscription(
1044 &mut self,
1045 commitment: Commitment,
1046 response: oneshot::Sender<()>,
1047 ) {
1048 let has_shard = self
1050 .state
1051 .get(&commitment)
1052 .is_some_and(|state| state.is_assigned_shard_verified());
1053 if has_shard {
1054 response.send_lossy(());
1055 return;
1056 }
1057
1058 if !self.state.contains_key(&commitment)
1062 && self.reconstructed_blocks.contains_key(&commitment)
1063 {
1064 response.send_lossy(());
1065 return;
1066 }
1067
1068 self.assigned_shard_verified_subscriptions
1069 .entry(commitment)
1070 .or_default()
1071 .push(response);
1072 }
1073
1074 fn handle_block_subscription(
1076 &mut self,
1077 key: BlockSubscriptionKey<B::Digest>,
1078 response: oneshot::Sender<CodedBlock<B, C, H>>,
1079 ) {
1080 let block = match key {
1081 BlockSubscriptionKey::Commitment(commitment) => self
1082 .reconstructed_blocks
1083 .get(&commitment)
1084 .map(|entry| &entry.block),
1085 BlockSubscriptionKey::Digest(digest) => self
1086 .reconstructed_blocks
1087 .values()
1088 .find_map(|entry| (entry.block.digest() == digest).then_some(&entry.block)),
1089 };
1090
1091 if let Some(block) = block {
1093 response.send_lossy(block.clone());
1094 return;
1095 }
1096
1097 self.block_subscriptions
1098 .entry(key)
1099 .or_default()
1100 .push(response);
1101 }
1102
1103 fn notify_assigned_shard_verified_subscribers(&mut self, commitment: Commitment) {
1106 if let Some(mut subscribers) = self
1107 .assigned_shard_verified_subscriptions
1108 .remove(&commitment)
1109 {
1110 for subscriber in subscribers.drain(..) {
1111 subscriber.send_lossy(());
1112 }
1113 }
1114 }
1115
1116 fn notify_block_subscribers(&mut self, block: CodedBlock<B, C, H>) {
1118 let commitment = block.commitment();
1119 let digest = block.digest();
1120
1121 if let Some(mut subscribers) = self
1123 .block_subscriptions
1124 .remove(&BlockSubscriptionKey::Commitment(commitment))
1125 {
1126 for subscriber in subscribers.drain(..) {
1127 subscriber.send_lossy(block.clone());
1128 }
1129 }
1130
1131 if let Some(mut subscribers) = self
1133 .block_subscriptions
1134 .remove(&BlockSubscriptionKey::Digest(digest))
1135 {
1136 for subscriber in subscribers.drain(..) {
1137 subscriber.send_lossy(block.clone());
1138 }
1139 }
1140 }
1141
1142 fn drop_subscriptions(&mut self, commitment: Commitment) {
1147 self.assigned_shard_verified_subscriptions
1148 .remove(&commitment);
1149 self.block_subscriptions
1150 .remove(&BlockSubscriptionKey::Commitment(commitment));
1151 self.block_subscriptions
1152 .remove(&BlockSubscriptionKey::Digest(
1153 commitment.block::<B::Digest>(),
1154 ));
1155 }
1156
1157 fn prune(&mut self, through: Commitment) {
1167 let cached = self
1168 .reconstructed_blocks
1169 .get(&through)
1170 .map(|entry| (entry.round, entry.block.height()));
1171 if let Some((_, height)) = cached {
1172 self.reconstructed_blocks
1173 .retain(|_, entry| entry.block.height() > height);
1174 }
1175
1176 self.drop_subscriptions(through);
1180 let state_round = self.state.remove(&through).map(|state| state.round());
1181 let cached_round = cached.map(|(round, _)| round);
1182 let Some(round) = state_round.or(cached_round) else {
1183 return;
1184 };
1185
1186 let mut pruned_commitments = Vec::new();
1187 self.state.retain(|c, s| {
1188 let keep = s.round() > round;
1189 if !keep {
1190 pruned_commitments.push(*c);
1191 }
1192 keep
1193 });
1194 for pruned in pruned_commitments {
1195 self.drop_subscriptions(pruned);
1196 }
1197 }
1198}
1199
1200enum ReconstructionState<P, C, H>
1202where
1203 P: PublicKey,
1204 C: CodingScheme,
1205 H: Hasher,
1206{
1207 AwaitingQuorum(AwaitingQuorumState<P, C, H>),
1211 Ready(ReadyState<P, C, H>),
1214}
1215
1216enum AssignedShardVerifiedAction<C: CodingScheme, H: Hasher> {
1221 Broadcast(Shard<C, H>),
1223 NotifyOnly,
1225}
1226
1227struct IndexedShard<C: CodingScheme> {
1229 index: u16,
1230 data: C::Shard,
1231}
1232
1233struct CommonState<P, C, H>
1235where
1236 P: PublicKey,
1237 C: CodingScheme,
1238 H: Hasher,
1239{
1240 leader: Option<P>,
1243 pending_action: Option<AssignedShardVerifiedAction<C, H>>,
1245 checked_shards: Vec<C::CheckedShard>,
1247 contributed: BitMap,
1249 round: Round,
1251 received_shards: BTreeMap<u16, C::Shard>,
1254 assigned_shard_verified: bool,
1256}
1257
1258struct AwaitingQuorumState<P, C, H>
1265where
1266 P: PublicKey,
1267 C: CodingScheme,
1268 H: Hasher,
1269{
1270 common: CommonState<P, C, H>,
1271 pending_shards: BTreeMap<P, IndexedShard<C>>,
1273}
1274
1275struct ReadyState<P, C, H>
1280where
1281 P: PublicKey,
1282 C: CodingScheme,
1283 H: Hasher,
1284{
1285 common: CommonState<P, C, H>,
1286}
1287
1288impl<P, C, H> CommonState<P, C, H>
1289where
1290 P: PublicKey,
1291 C: CodingScheme,
1292 H: Hasher,
1293{
1294 fn new(leader: Option<P>, round: Round, participants_len: u64) -> Self {
1296 Self {
1297 leader,
1298 pending_action: None,
1299 checked_shards: Vec::new(),
1300 contributed: BitMap::zeroes(participants_len),
1301 round,
1302 received_shards: BTreeMap::new(),
1303 assigned_shard_verified: false,
1304 }
1305 }
1306}
1307
1308impl<P, C, H> CommonState<P, C, H>
1309where
1310 P: PublicKey,
1311 C: CodingScheme,
1312 H: Hasher,
1313{
1314 fn verify_assigned_shard(
1323 &mut self,
1324 sender: P,
1325 commitment: Commitment,
1326 shard: IndexedShard<C>,
1327 is_participant: bool,
1328 blocker: &mut impl Blocker<PublicKey = P>,
1329 ) -> bool {
1330 self.received_shards.insert(shard.index, shard.data);
1334 let data = self.received_shards.get(&shard.index).unwrap();
1335 let Ok(checked) = C::check(&commitment.config(), &commitment.root(), shard.index, data)
1336 else {
1337 self.received_shards.remove(&shard.index);
1338 commonware_p2p::block!(blocker, sender, "invalid shard received from leader");
1339 return false;
1340 };
1341
1342 self.contributed.set(u64::from(shard.index), true);
1343 self.checked_shards.push(checked);
1344 self.assigned_shard_verified = true;
1345 self.pending_action = Some(if is_participant {
1346 AssignedShardVerifiedAction::Broadcast(Shard::new(
1347 commitment,
1348 shard.index,
1349 data.clone(),
1350 ))
1351 } else {
1352 AssignedShardVerifiedAction::NotifyOnly
1353 });
1354 true
1355 }
1356}
1357
1358impl<P, C, H> AwaitingQuorumState<P, C, H>
1359where
1360 P: PublicKey,
1361 C: CodingScheme,
1362 H: Hasher,
1363{
1364 fn try_transition(
1367 &mut self,
1368 commitment: Commitment,
1369 participants_len: u64,
1370 strategy: &impl Strategy,
1371 blocker: &mut impl Blocker<PublicKey = P>,
1372 ) -> Option<ReadyState<P, C, H>> {
1373 let minimum = usize::from(commitment.config().minimum_shards.get());
1374 if self.common.checked_shards.len() + self.pending_shards.len() < minimum {
1375 return None;
1376 }
1377
1378 let pending = std::mem::take(&mut self.pending_shards);
1380 let (new_checked, to_block) =
1381 strategy.map_partition_collect_vec(pending, |(peer, shard)| {
1382 let checked = C::check(
1383 &commitment.config(),
1384 &commitment.root(),
1385 shard.index,
1386 &shard.data,
1387 );
1388 (peer, checked.ok())
1389 });
1390
1391 for peer in to_block {
1392 commonware_p2p::block!(blocker, peer, "invalid shard received");
1393 }
1394 for checked in new_checked {
1395 self.common.checked_shards.push(checked);
1396 }
1397
1398 if self.common.checked_shards.len() < minimum {
1400 return None;
1401 }
1402
1403 let round = self.common.round;
1405 let leader = self.common.leader.clone();
1406 let common = std::mem::replace(
1407 &mut self.common,
1408 CommonState::new(leader, round, participants_len),
1409 );
1410 Some(ReadyState { common })
1411 }
1412}
1413
1414struct InsertCtx<'a, Sch, S>
1416where
1417 Sch: CertificateScheme,
1418 S: Strategy,
1419{
1420 scheme: &'a Sch,
1421 strategy: &'a S,
1422 participants_len: u64,
1423}
1424
1425impl<Sch: CertificateScheme, S: Strategy> Clone for InsertCtx<'_, Sch, S> {
1426 fn clone(&self) -> Self {
1427 *self
1428 }
1429}
1430
1431impl<Sch: CertificateScheme, S: Strategy> Copy for InsertCtx<'_, Sch, S> {}
1432
1433impl<'a, Sch: CertificateScheme, S: Strategy> InsertCtx<'a, Sch, S> {
1434 fn new(scheme: &'a Sch, strategy: &'a S) -> Self {
1435 let participants_len = u64::try_from(scheme.participants().len())
1436 .expect("participant count impossibly out of bounds");
1437 Self {
1438 scheme,
1439 strategy,
1440 participants_len,
1441 }
1442 }
1443}
1444
1445impl<P, C, H> ReconstructionState<P, C, H>
1446where
1447 P: PublicKey,
1448 C: CodingScheme,
1449 H: Hasher,
1450{
1451 fn new(leader: Option<P>, round: Round, participants_len: u64) -> Self {
1453 Self::AwaitingQuorum(AwaitingQuorumState {
1454 common: CommonState::new(leader, round, participants_len),
1455 pending_shards: BTreeMap::new(),
1456 })
1457 }
1458
1459 const fn common(&self) -> &CommonState<P, C, H> {
1461 match self {
1462 Self::AwaitingQuorum(state) => &state.common,
1463 Self::Ready(state) => &state.common,
1464 }
1465 }
1466
1467 const fn common_mut(&mut self) -> &mut CommonState<P, C, H> {
1469 match self {
1470 Self::AwaitingQuorum(state) => &mut state.common,
1471 Self::Ready(state) => &mut state.common,
1472 }
1473 }
1474
1475 const fn leader(&self) -> Option<&P> {
1477 self.common().leader.as_ref()
1478 }
1479
1480 fn set_leader(&mut self, leader: P) -> Result<(), P> {
1482 if self.common().leader.is_some() {
1483 return Err(leader);
1484 }
1485 self.common_mut().leader = Some(leader);
1486 Ok(())
1487 }
1488
1489 const fn is_assigned_shard_verified(&self) -> bool {
1491 self.common().assigned_shard_verified
1492 }
1493
1494 const fn round(&self) -> Round {
1496 self.common().round
1497 }
1498
1499 const fn checked_shards(&self) -> &[C::CheckedShard] {
1501 self.common().checked_shards.as_slice()
1502 }
1503
1504 const fn take_pending_action(&mut self) -> Option<AssignedShardVerifiedAction<C, H>> {
1508 self.common_mut().pending_action.take()
1509 }
1510
1511 fn on_network_shard<Sch, S, X>(
1558 &mut self,
1559 sender: P,
1560 shard: Shard<C, H>,
1561 ctx: InsertCtx<'_, Sch, S>,
1562 blocker: &mut X,
1563 ) -> bool
1564 where
1565 Sch: CertificateScheme<PublicKey = P>,
1566 S: Strategy,
1567 X: Blocker<PublicKey = P>,
1568 {
1569 let Some(sender_index) = ctx.scheme.participants().index(&sender) else {
1570 commonware_p2p::block!(blocker, sender, "shard sent by non-participant");
1571 return false;
1572 };
1573 let commitment = shard.commitment();
1574 let indexed = IndexedShard {
1575 index: shard.index(),
1576 data: shard.into_inner(),
1577 };
1578
1579 let leader = self.common().leader.as_ref();
1584 let is_from_leader = leader.is_some_and(|leader| leader == &sender);
1585 let expected_participant = if is_from_leader {
1586 ctx.scheme.me().unwrap_or(sender_index)
1587 } else {
1588 sender_index
1589 };
1590 let expected_index: u16 = expected_participant
1591 .get()
1592 .try_into()
1593 .expect("participant index impossibly out of bounds");
1594 if indexed.index != expected_index {
1595 if leader.is_some() {
1596 commonware_p2p::block!(
1597 blocker,
1598 sender,
1599 shard_index = indexed.index,
1600 expected_index,
1601 "shard index does not match expected index"
1602 );
1603 }
1604 return false;
1605 }
1606
1607 if let Some(existing) = self.common().received_shards.get(&indexed.index) {
1609 if existing != &indexed.data {
1610 commonware_p2p::block!(blocker, sender, "shard equivocation");
1611 }
1612 return false;
1613 }
1614
1615 if self.common().contributed.get(u64::from(indexed.index)) {
1617 return false;
1618 }
1619
1620 if is_from_leader && !self.common().assigned_shard_verified {
1624 let progressed = self.common_mut().verify_assigned_shard(
1625 sender,
1626 commitment,
1627 indexed,
1628 ctx.scheme.me().is_some(),
1629 blocker,
1630 );
1631
1632 if progressed {
1633 if let Self::AwaitingQuorum(state) = self {
1634 if let Some(ready) = state.try_transition(
1635 commitment,
1636 ctx.participants_len,
1637 ctx.strategy,
1638 blocker,
1639 ) {
1640 *self = Self::Ready(ready);
1641 }
1642 }
1643 }
1644 return progressed;
1645 }
1646
1647 let Self::AwaitingQuorum(state) = self else {
1649 return false;
1650 };
1651
1652 state
1654 .common
1655 .received_shards
1656 .insert(indexed.index, indexed.data.clone());
1657 state.common.contributed.set(u64::from(indexed.index), true);
1658 state.pending_shards.insert(sender, indexed);
1659 if let Some(ready) =
1660 state.try_transition(commitment, ctx.participants_len, ctx.strategy, blocker)
1661 {
1662 *self = Self::Ready(ready);
1663 }
1664
1665 true
1666 }
1667}
1668
1669#[cfg(test)]
1670mod tests {
1671 use super::*;
1672 use crate::{
1673 marshal::{
1674 coding::types::coding_config_for_participants, mocks::block::Block as MockBlock,
1675 },
1676 types::{Epoch, Height, View},
1677 };
1678 use bytes::Bytes;
1679 use commonware_codec::Encode;
1680 use commonware_coding::{
1681 CodecConfig, Config as CodingConfig, PhasedAsScheme, ReedSolomon, Zoda,
1682 };
1683 use commonware_cryptography::{
1684 certificate::Subject,
1685 ed25519::{PrivateKey, PublicKey},
1686 impl_certificate_ed25519,
1687 sha256::Digest as Sha256Digest,
1688 Committable, Digest, Sha256, Signer,
1689 };
1690 use commonware_macros::{select, test_traced};
1691 use commonware_p2p::{
1692 simulated::{self, Control, Link, Oracle},
1693 Manager as _, TrackedPeers,
1694 };
1695 use commonware_parallel::Sequential;
1696 use commonware_runtime::{deterministic, Quota, Runner, Supervisor as _};
1697 use commonware_utils::{
1698 channel::oneshot::error::TryRecvError, ordered::Set, NZUsize, Participant,
1699 };
1700 use std::{
1701 future::Future,
1702 marker::PhantomData,
1703 num::NonZeroU32,
1704 sync::{
1705 atomic::{AtomicIsize, Ordering},
1706 Arc,
1707 },
1708 time::Duration,
1709 };
1710
1711 #[derive(Clone, Debug)]
1712 pub struct TestSubject {
1713 pub message: Bytes,
1714 }
1715
1716 impl Subject for TestSubject {
1717 type Namespace = Vec<u8>;
1718
1719 fn namespace<'a>(&self, derived: &'a Self::Namespace) -> &'a [u8] {
1720 derived
1721 }
1722
1723 fn message(&self) -> Bytes {
1724 self.message.clone()
1725 }
1726 }
1727
1728 impl_certificate_ed25519!(TestSubject, Vec<u8>);
1729
1730 const SCHEME_NAMESPACE: &[u8] = b"_COMMONWARE_SHARD_ENGINE_TEST";
1731
1732 const MAX_SHARD_SIZE: usize = 1024 * 1024; const DEFAULT_LINK: Link = Link {
1737 latency: Duration::from_millis(50),
1738 jitter: Duration::ZERO,
1739 success_rate: 1.0,
1740 };
1741
1742 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
1744
1745 const STRATEGY: Sequential = Sequential;
1747
1748 #[derive(Clone)]
1754 struct MultiEpochProvider {
1755 schemes: BTreeMap<Epoch, Arc<Scheme>>,
1756 }
1757
1758 impl MultiEpochProvider {
1759 fn single(scheme: Scheme) -> Self {
1760 let mut schemes = BTreeMap::new();
1761 schemes.insert(Epoch::zero(), Arc::new(scheme));
1762 Self { schemes }
1763 }
1764
1765 fn with_epoch(mut self, epoch: Epoch, scheme: Scheme) -> Self {
1766 self.schemes.insert(epoch, Arc::new(scheme));
1767 self
1768 }
1769 }
1770
1771 impl Provider for MultiEpochProvider {
1772 type Scope = Epoch;
1773 type Scheme = Scheme;
1774
1775 fn scoped(&self, scope: Epoch) -> Option<Arc<Scheme>> {
1776 self.schemes.get(&scope).cloned()
1777 }
1778 }
1779
1780 #[derive(Clone)]
1783 struct ChurningProvider {
1784 scheme: Arc<Scheme>,
1785 remaining_successes: Arc<AtomicIsize>,
1786 }
1787
1788 impl ChurningProvider {
1789 fn new(scheme: Scheme, successes: isize) -> Self {
1790 Self {
1791 scheme: Arc::new(scheme),
1792 remaining_successes: Arc::new(AtomicIsize::new(successes)),
1793 }
1794 }
1795 }
1796
1797 impl Provider for ChurningProvider {
1798 type Scope = Epoch;
1799 type Scheme = Scheme;
1800
1801 fn scoped(&self, scope: Epoch) -> Option<Arc<Scheme>> {
1802 if scope != Epoch::zero() {
1803 return None;
1804 }
1805 if self.remaining_successes.fetch_sub(1, Ordering::AcqRel) <= 0 {
1806 return None;
1807 }
1808 Some(Arc::clone(&self.scheme))
1809 }
1810 }
1811
1812 type B = MockBlock<Sha256Digest, ()>;
1814 type H = Sha256;
1815 type P = PublicKey;
1816 type C = ReedSolomon<H>;
1817 type X = Control<P, deterministic::Context>;
1818 type O = Oracle<P, deterministic::Context>;
1819 type Prov = MultiEpochProvider;
1820 type NetworkSender = simulated::Sender<P, deterministic::Context>;
1821 type D = simulated::Manager<P, deterministic::Context>;
1822 type ShardEngine<S> = Engine<deterministic::Context, Prov, X, D, S, H, B, P, Sequential>;
1823 type ChurningShardEngine<S> =
1824 Engine<deterministic::Context, ChurningProvider, X, D, S, H, B, P, Sequential>;
1825
1826 async fn assert_blocked(oracle: &O, blocker: &P, blocked: &P) {
1827 let blocked_peers = oracle.blocked().await.unwrap();
1828 let is_blocked = blocked_peers
1829 .iter()
1830 .any(|(a, b)| a == blocker && b == blocked);
1831 assert!(is_blocked, "expected {blocker} to have blocked {blocked}");
1832 }
1833
1834 struct Peer<S: CodingScheme = C> {
1836 public_key: PublicKey,
1838 index: Participant,
1840 mailbox: Mailbox<B, S, H, P>,
1842 sender: NetworkSender,
1844 }
1845
1846 #[allow(dead_code)]
1848 struct NonParticipant<S: CodingScheme = C> {
1849 public_key: PublicKey,
1851 mailbox: Mailbox<B, S, H, P>,
1853 sender: NetworkSender,
1855 }
1856
1857 struct Fixture<S: CodingScheme = C> {
1859 num_peers: usize,
1861 num_non_participants: usize,
1863 link: Link,
1865 _marker: PhantomData<S>,
1867 }
1868
1869 impl<S: CodingScheme> Default for Fixture<S> {
1870 fn default() -> Self {
1871 Self {
1872 num_peers: 4,
1873 num_non_participants: 0,
1874 link: DEFAULT_LINK,
1875 _marker: PhantomData,
1876 }
1877 }
1878 }
1879
1880 impl<S: CodingScheme> Fixture<S> {
1881 pub fn start<F: Future<Output = ()>>(
1882 self,
1883 f: impl FnOnce(
1884 Self,
1885 deterministic::Context,
1886 O,
1887 Vec<Peer<S>>,
1888 Vec<NonParticipant<S>>,
1889 CodingConfig,
1890 ) -> F,
1891 ) {
1892 let executor = deterministic::Runner::default();
1893 executor.start(|context| async move {
1894 let mut private_keys = (0..self.num_peers)
1895 .map(|i| PrivateKey::from_seed(i as u64))
1896 .collect::<Vec<_>>();
1897 private_keys.sort_by_key(|s| s.public_key());
1898 let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
1899
1900 let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
1901
1902 let mut np_private_keys = (0..self.num_non_participants)
1903 .map(|i| PrivateKey::from_seed((self.num_peers + i) as u64))
1904 .collect::<Vec<_>>();
1905 np_private_keys.sort_by_key(|s| s.public_key());
1906 let np_keys: Vec<P> = np_private_keys.iter().map(|k| k.public_key()).collect();
1907
1908 let (network, oracle) =
1909 simulated::Network::<deterministic::Context, P>::new_with_split_peers(
1910 context.child("network"),
1911 simulated::Config {
1912 max_size: MAX_SHARD_SIZE as u32,
1913 disconnect_on_block: true,
1914 tracked_peer_sets: NZUsize!(1),
1915 },
1916 peer_keys.clone(),
1917 np_keys.clone(),
1918 )
1919 .await;
1920 network.start();
1921
1922 let all_keys: Vec<P> = peer_keys.iter().chain(np_keys.iter()).cloned().collect();
1923
1924 let mut registrations = BTreeMap::new();
1925 for key in all_keys.iter() {
1926 let control = oracle.control(key.clone());
1927 let (sender, receiver) = control
1928 .register(0, TEST_QUOTA)
1929 .await
1930 .expect("registration should succeed");
1931 registrations.insert(key.clone(), (control, sender, receiver));
1932 }
1933 for p1 in all_keys.iter() {
1934 for p2 in all_keys.iter() {
1935 if p2 == p1 {
1936 continue;
1937 }
1938 oracle
1939 .add_link(p1.clone(), p2.clone(), self.link.clone())
1940 .await
1941 .expect("link should be added");
1942 }
1943 }
1944
1945 let coding_config =
1946 coding_config_for_participants(u16::try_from(self.num_peers).unwrap());
1947
1948 let mut peers = Vec::with_capacity(self.num_peers);
1949 for (idx, peer_key) in peer_keys.iter().enumerate() {
1950 let (control, sender, receiver) = registrations
1951 .remove(peer_key)
1952 .expect("peer should be registered");
1953
1954 let participant = Participant::new(idx as u32);
1955 let engine_context = context.child("peer").with_attribute("index", idx);
1956
1957 let scheme = Scheme::signer(
1958 SCHEME_NAMESPACE,
1959 participants.clone(),
1960 private_keys[idx].clone(),
1961 )
1962 .expect("signer scheme should be created");
1963 let scheme_provider: Prov = MultiEpochProvider::single(scheme);
1964
1965 let config = Config {
1966 scheme_provider,
1967 blocker: control.clone(),
1968 shard_codec_cfg: CodecConfig {
1969 maximum_shard_size: MAX_SHARD_SIZE,
1970 },
1971 block_codec_cfg: (),
1972 strategy: STRATEGY,
1973 mailbox_size: NZUsize!(1024),
1974 peer_buffer_size: NZUsize!(64),
1975 background_channel_capacity: NZUsize!(1024),
1976 peer_provider: oracle.manager(),
1977 };
1978
1979 let (engine, mailbox) = ShardEngine::new(engine_context, config);
1980 let sender_clone = sender.clone();
1981 engine.start((sender, receiver));
1982
1983 peers.push(Peer {
1984 public_key: peer_key.clone(),
1985 index: participant,
1986 mailbox,
1987 sender: sender_clone,
1988 });
1989 }
1990
1991 let mut non_participants = Vec::with_capacity(self.num_non_participants);
1992 for (idx, np_key) in np_keys.iter().enumerate() {
1993 let (control, sender, receiver) = registrations
1994 .remove(np_key)
1995 .expect("non-participant should be registered");
1996
1997 let engine_context = context
1998 .child("non_participant")
1999 .with_attribute("index", idx);
2000
2001 let scheme = Scheme::verifier(SCHEME_NAMESPACE, participants.clone());
2002 let scheme_provider: Prov = MultiEpochProvider::single(scheme);
2003
2004 let config = Config {
2005 scheme_provider,
2006 blocker: control.clone(),
2007 shard_codec_cfg: CodecConfig {
2008 maximum_shard_size: MAX_SHARD_SIZE,
2009 },
2010 block_codec_cfg: (),
2011 strategy: STRATEGY,
2012 mailbox_size: NZUsize!(1024),
2013 peer_buffer_size: NZUsize!(64),
2014 background_channel_capacity: NZUsize!(1024),
2015 peer_provider: oracle.manager(),
2016 };
2017
2018 let (engine, mailbox) = ShardEngine::new(engine_context, config);
2019 let sender_clone = sender.clone();
2020 engine.start((sender, receiver));
2021
2022 non_participants.push(NonParticipant {
2023 public_key: np_key.clone(),
2024 mailbox,
2025 sender: sender_clone,
2026 });
2027 }
2028
2029 f(
2030 self,
2031 context,
2032 oracle,
2033 peers,
2034 non_participants,
2035 coding_config,
2036 )
2037 .await;
2038 });
2039 }
2040 }
2041
2042 #[test_traced]
2043 fn test_e2e_broadcast_and_reconstruction() {
2044 let fixture = Fixture {
2045 num_peers: 10,
2046 ..Default::default()
2047 };
2048
2049 fixture.start(
2050 |config, context, _, mut peers, _, coding_config| async move {
2051 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2052 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2053 let commitment = coded_block.commitment();
2054
2055 let leader = peers[0].public_key.clone();
2056 let round = Round::new(Epoch::zero(), View::new(1));
2057 peers[0].mailbox.proposed(round, coded_block.clone());
2058
2059 for peer in peers[1..].iter_mut() {
2061 peer.mailbox.discovered(commitment, leader.clone(), round);
2062 }
2063 context.sleep(config.link.latency).await;
2064
2065 for peer in peers.iter_mut() {
2066 peer.mailbox
2067 .subscribe_assigned_shard_verified(commitment)
2068 .await
2069 .expect("shard subscription should complete");
2070 }
2071 context.sleep(config.link.latency).await;
2072
2073 for peer in peers.iter_mut() {
2074 let reconstructed = peer
2075 .mailbox
2076 .get(commitment)
2077 .await
2078 .expect("block should be reconstructed");
2079 assert_eq!(reconstructed.commitment(), commitment);
2080 assert_eq!(reconstructed.height(), coded_block.height());
2081 }
2082 },
2083 );
2084 }
2085
2086 #[test_traced]
2087 fn test_e2e_broadcast_and_reconstruction_zoda() {
2088 let fixture = Fixture {
2089 num_peers: 10,
2090 ..Default::default()
2091 };
2092
2093 fixture.start(
2094 |config, context, _, mut peers, _, coding_config| async move {
2095 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2096 let coded_block = CodedBlock::<B, PhasedAsScheme<Zoda<H>>, H>::new(
2097 inner,
2098 coding_config,
2099 &STRATEGY,
2100 );
2101 let commitment = coded_block.commitment();
2102
2103 let leader = peers[0].public_key.clone();
2104 let round = Round::new(Epoch::zero(), View::new(1));
2105 peers[0].mailbox.proposed(round, coded_block.clone());
2106
2107 for peer in peers[1..].iter_mut() {
2109 peer.mailbox.discovered(commitment, leader.clone(), round);
2110 }
2111 context.sleep(config.link.latency).await;
2112
2113 for peer in peers.iter_mut() {
2114 peer.mailbox
2115 .subscribe_assigned_shard_verified(commitment)
2116 .await
2117 .expect("shard subscription should complete");
2118 }
2119 context.sleep(config.link.latency).await;
2120
2121 for peer in peers.iter_mut() {
2122 let reconstructed = peer
2123 .mailbox
2124 .get(commitment)
2125 .await
2126 .expect("block should be reconstructed");
2127 assert_eq!(reconstructed.commitment(), commitment);
2128 assert_eq!(reconstructed.height(), coded_block.height());
2129 }
2130 },
2131 );
2132 }
2133
2134 #[test_traced]
2135 fn test_block_subscriptions() {
2136 let fixture = Fixture {
2137 num_peers: 10,
2138 ..Default::default()
2139 };
2140
2141 fixture.start(
2142 |config, context, _, mut peers, _, coding_config| async move {
2143 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2144 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2145 let commitment = coded_block.commitment();
2146 let digest = coded_block.digest();
2147
2148 let leader = peers[0].public_key.clone();
2149 let round = Round::new(Epoch::zero(), View::new(1));
2150
2151 let commitment_sub = peers[1].mailbox.subscribe(commitment);
2153 let digest_sub = peers[2].mailbox.subscribe_by_digest(digest);
2154
2155 peers[0].mailbox.proposed(round, coded_block.clone());
2156
2157 for peer in peers[1..].iter_mut() {
2159 peer.mailbox.discovered(commitment, leader.clone(), round);
2160 }
2161 context.sleep(config.link.latency * 2).await;
2162
2163 for peer in peers.iter_mut() {
2164 peer.mailbox
2165 .subscribe_assigned_shard_verified(commitment)
2166 .await
2167 .expect("shard subscription should complete");
2168 }
2169 context.sleep(config.link.latency).await;
2170
2171 let block_by_commitment =
2172 commitment_sub.await.expect("subscription should resolve");
2173 assert_eq!(block_by_commitment.commitment(), commitment);
2174 assert_eq!(block_by_commitment.height(), coded_block.height());
2175
2176 let block_by_digest = digest_sub.await.expect("subscription should resolve");
2177 assert_eq!(block_by_digest.commitment(), commitment);
2178 assert_eq!(block_by_digest.height(), coded_block.height());
2179 },
2180 );
2181 }
2182
2183 #[test_traced]
2184 fn test_proposer_preproposal_subscriptions_resolve_after_local_cache() {
2185 let fixture = Fixture {
2186 num_peers: 10,
2187 ..Default::default()
2188 };
2189
2190 fixture.start(|config, context, _, peers, _, coding_config| async move {
2191 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2192 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2193 let commitment = coded_block.commitment();
2194 let digest = coded_block.digest();
2195 let round = Round::new(Epoch::zero(), View::new(1));
2196
2197 let shard_sub = peers[0].mailbox.subscribe_assigned_shard_verified(commitment);
2199 let commitment_sub = peers[0].mailbox.subscribe(commitment);
2200 let digest_sub = peers[0].mailbox.subscribe_by_digest(digest);
2201
2202 peers[0].mailbox.proposed(round, coded_block.clone());
2203 context.sleep(config.link.latency).await;
2204
2205 select! {
2206 result = shard_sub => {
2207 result.expect("shard subscription should resolve");
2208 },
2209 _ = context.sleep(Duration::from_secs(5)) => {
2210 panic!("shard subscription did not resolve after local proposal cache");
2211 }
2212 }
2213
2214 let block_by_commitment = select! {
2215 result = commitment_sub => {
2216 result.expect("block subscription by commitment should resolve")
2217 },
2218 _ = context.sleep(Duration::from_secs(5)) => {
2219 panic!("block subscription by commitment did not resolve after local proposal cache");
2220 }
2221 };
2222 assert_eq!(block_by_commitment.commitment(), commitment);
2223 assert_eq!(block_by_commitment.height(), coded_block.height());
2224
2225 let block_by_digest = select! {
2226 result = digest_sub => {
2227 result.expect("block subscription by digest should resolve")
2228 },
2229 _ = context.sleep(Duration::from_secs(5)) => {
2230 panic!("block subscription by digest did not resolve after local proposal cache");
2231 }
2232 };
2233 assert_eq!(block_by_digest.commitment(), commitment);
2234 assert_eq!(block_by_digest.height(), coded_block.height());
2235 });
2236 }
2237
2238 #[test_traced]
2239 fn test_shard_subscription_rejects_invalid_shard() {
2240 let fixture = Fixture::<C>::default();
2241 fixture.start(
2242 |config, context, oracle, mut peers, _, coding_config| async move {
2243 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2248 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2249 let commitment = coded_block.commitment();
2250 let receiver_index = peers[2].index.get() as u16;
2251
2252 let valid_shard = coded_block.shard(receiver_index).expect("missing shard");
2253
2254 let mut invalid_shard = valid_shard.clone();
2257 invalid_shard.index = peers[3].index.get() as u16;
2258
2259 let receiver_pk = peers[2].public_key.clone();
2261 let leader = peers[1].public_key.clone();
2262 peers[2].mailbox.discovered(
2263 commitment,
2264 leader,
2265 Round::new(Epoch::zero(), View::new(1)),
2266 );
2267 let mut shard_sub = peers[2]
2268 .mailbox
2269 .subscribe_assigned_shard_verified(commitment);
2270
2271 let invalid_bytes = invalid_shard.encode();
2273 peers[0]
2274 .sender
2275 .send(Recipients::One(receiver_pk.clone()), invalid_bytes, true);
2276
2277 context.sleep(config.link.latency * 2).await;
2278
2279 assert!(
2280 matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
2281 "subscription should not resolve from invalid shard"
2282 );
2283 assert_blocked(&oracle, &peers[2].public_key, &peers[0].public_key).await;
2284
2285 let valid_bytes = valid_shard.encode();
2287 peers[1]
2288 .sender
2289 .send(Recipients::One(receiver_pk), valid_bytes, true);
2290 context.sleep(config.link.latency * 2).await;
2291
2292 select! {
2294 _ = shard_sub => {},
2295 _ = context.sleep(Duration::from_secs(5)) => {
2296 panic!("subscription did not complete after valid shard arrival");
2297 },
2298 };
2299 },
2300 );
2301 }
2302
2303 #[test_traced]
2304 fn test_durable_prunes_reconstructed_blocks() {
2305 let fixture = Fixture::<C>::default();
2306 fixture.start(|_, context, _, mut peers, _, coding_config| async move {
2307 let block1 = CodedBlock::<B, C, H>::new(
2309 B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100),
2310 coding_config,
2311 &STRATEGY,
2312 );
2313 let block2 = CodedBlock::<B, C, H>::new(
2314 B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 100),
2315 coding_config,
2316 &STRATEGY,
2317 );
2318 let block3 = CodedBlock::<B, C, H>::new(
2319 B::new::<H>((), Sha256Digest::EMPTY, Height::new(3), 100),
2320 coding_config,
2321 &STRATEGY,
2322 );
2323 let commitment1 = block1.commitment();
2324 let commitment2 = block2.commitment();
2325 let commitment3 = block3.commitment();
2326
2327 let peer = &mut peers[0];
2329 let round = Round::new(Epoch::zero(), View::new(1));
2330 peer.mailbox.proposed(round, block1);
2331 peer.mailbox.proposed(round, block2);
2332 peer.mailbox.proposed(round, block3);
2333 context.sleep(Duration::from_millis(10)).await;
2334
2335 assert!(
2337 peer.mailbox.get(commitment1).await.is_some(),
2338 "block1 should be cached"
2339 );
2340 assert!(
2341 peer.mailbox.get(commitment2).await.is_some(),
2342 "block2 should be cached"
2343 );
2344 assert!(
2345 peer.mailbox.get(commitment3).await.is_some(),
2346 "block3 should be cached"
2347 );
2348
2349 peer.mailbox.prune(commitment2);
2351 context.sleep(Duration::from_millis(10)).await;
2352
2353 assert!(
2355 peer.mailbox.get(commitment1).await.is_none(),
2356 "block1 should be pruned"
2357 );
2358 assert!(
2359 peer.mailbox.get(commitment2).await.is_none(),
2360 "block2 should be pruned"
2361 );
2362
2363 assert!(
2365 peer.mailbox.get(commitment3).await.is_some(),
2366 "block3 should still be cached"
2367 );
2368 });
2369 }
2370
2371 #[test_traced]
2372 fn test_duplicate_leader_shard_ignored() {
2373 let fixture = Fixture::<C>::default();
2374 fixture.start(
2375 |config, context, oracle, mut peers, _, coding_config| async move {
2376 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2377 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2378 let commitment = coded_block.commitment();
2379
2380 let peer2_index = peers[2].index.get() as u16;
2382 let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2383 let shard_bytes = peer2_shard.encode();
2384
2385 let peer2_pk = peers[2].public_key.clone();
2386 let leader = peers[0].public_key.clone();
2387
2388 peers[2].mailbox.discovered(
2390 commitment,
2391 leader,
2392 Round::new(Epoch::zero(), View::new(1)),
2393 );
2394
2395 peers[0]
2397 .sender
2398 .send(Recipients::One(peer2_pk.clone()), shard_bytes.clone(), true);
2399 context.sleep(config.link.latency * 2).await;
2400
2401 peers[0]
2403 .sender
2404 .send(Recipients::One(peer2_pk), shard_bytes, true);
2405 context.sleep(config.link.latency * 2).await;
2406
2407 let blocked_peers = oracle.blocked().await.unwrap();
2409 let is_blocked = blocked_peers
2410 .iter()
2411 .any(|(a, b)| a == &peers[2].public_key && b == &peers[0].public_key);
2412 assert!(
2413 !is_blocked,
2414 "leader should not be blocked for duplicate shard"
2415 );
2416 },
2417 );
2418 }
2419
2420 #[test_traced]
2421 fn test_equivocating_leader_shard_blocks_peer() {
2422 let fixture = Fixture::<C>::default();
2423 fixture.start(
2424 |config, context, oracle, mut peers, _, coding_config| async move {
2425 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2426 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
2427 let commitment = coded_block1.commitment();
2428
2429 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 200);
2431 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
2432
2433 let peer2_index = peers[2].index.get() as u16;
2435 let shard_bytes1 = coded_block1
2436 .shard(peer2_index)
2437 .expect("missing shard")
2438 .encode();
2439 let mut equivocating_shard =
2440 coded_block2.shard(peer2_index).expect("missing shard");
2441 equivocating_shard.commitment = commitment;
2443 let shard_bytes2 = equivocating_shard.encode();
2444
2445 let peer2_pk = peers[2].public_key.clone();
2446 let leader = peers[0].public_key.clone();
2447
2448 peers[2].mailbox.discovered(
2450 commitment,
2451 leader,
2452 Round::new(Epoch::zero(), View::new(1)),
2453 );
2454
2455 peers[0]
2457 .sender
2458 .send(Recipients::One(peer2_pk.clone()), shard_bytes1, true);
2459 context.sleep(config.link.latency * 2).await;
2460
2461 peers[0]
2463 .sender
2464 .send(Recipients::One(peer2_pk), shard_bytes2, true);
2465 context.sleep(config.link.latency * 2).await;
2466
2467 assert_blocked(&oracle, &peers[2].public_key, &peers[0].public_key).await;
2469 },
2470 );
2471 }
2472
2473 #[test_traced]
2474 fn test_non_leader_wrong_index_shard_blocked() {
2475 let fixture = Fixture::<C>::default();
2478 fixture.start(
2479 |config, context, oracle, mut peers, _, coding_config| async move {
2480 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2481 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2482 let commitment = coded_block.commitment();
2483
2484 let peer2_index = peers[2].index.get() as u16;
2486 let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2487 let shard_bytes = peer2_shard.encode();
2488
2489 let peer2_pk = peers[2].public_key.clone();
2490 let leader = peers[0].public_key.clone();
2491
2492 peers[2].mailbox.discovered(
2494 commitment,
2495 leader,
2496 Round::new(Epoch::zero(), View::new(1)),
2497 );
2498
2499 peers[1]
2502 .sender
2503 .send(Recipients::One(peer2_pk), shard_bytes, true);
2504 context.sleep(config.link.latency * 2).await;
2505
2506 assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2508 },
2509 );
2510 }
2511
2512 #[test_traced]
2513 fn test_buffered_wrong_index_shard_blocked_on_leader_arrival() {
2514 let fixture = Fixture::<C>::default();
2517 fixture.start(
2518 |config, context, oracle, mut peers, _, coding_config| async move {
2519 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2520 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2521 let commitment = coded_block.commitment();
2522
2523 let peer2_index = peers[2].index.get() as u16;
2525 let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2526 let shard_bytes = peer2_shard.encode();
2527
2528 let peer2_pk = peers[2].public_key.clone();
2529
2530 peers[1]
2533 .sender
2534 .send(Recipients::One(peer2_pk), shard_bytes, true);
2535 context.sleep(config.link.latency * 2).await;
2536
2537 let blocked = oracle.blocked().await.unwrap();
2539 assert!(
2540 blocked.is_empty(),
2541 "no peers should be blocked while leader is unknown"
2542 );
2543
2544 let leader = peers[0].public_key.clone();
2548 peers[2].mailbox.discovered(
2549 commitment,
2550 leader,
2551 Round::new(Epoch::zero(), View::new(1)),
2552 );
2553 context.sleep(Duration::from_millis(10)).await;
2554
2555 assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2556 },
2557 );
2558 }
2559
2560 #[test_traced]
2561 fn test_conflicting_external_proposed_ignored() {
2562 let fixture = Fixture::<C>::default();
2563 fixture.start(
2564 |config, context, oracle, mut peers, _, coding_config| async move {
2565 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2566 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2567 let commitment = coded_block.commitment();
2568
2569 let peer2_index = peers[2].index.get() as u16;
2571 let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2572 let shard_bytes = peer2_shard.encode();
2573
2574 let peer2_pk = peers[2].public_key.clone();
2575 let leader_a = peers[0].public_key.clone();
2576 let leader_b = peers[1].public_key.clone();
2577
2578 let shard_sub = peers[2]
2580 .mailbox
2581 .subscribe_assigned_shard_verified(commitment);
2582
2583 peers[2].mailbox.discovered(
2585 commitment,
2586 leader_a.clone(),
2587 Round::new(Epoch::zero(), View::new(1)),
2588 );
2589
2590 peers[2].mailbox.discovered(
2592 commitment,
2593 leader_b,
2594 Round::new(Epoch::zero(), View::new(1)),
2595 );
2596
2597 peers[0]
2599 .sender
2600 .send(Recipients::One(peer2_pk.clone()), shard_bytes.clone(), true);
2601 context.sleep(config.link.latency * 2).await;
2602
2603 select! {
2605 _ = shard_sub => {},
2606 _ = context.sleep(Duration::from_secs(5)) => {
2607 panic!("subscription did not complete after shard from original leader");
2608 },
2609 };
2610
2611 peers[1]
2613 .sender
2614 .send(Recipients::One(peer2_pk), shard_bytes, true);
2615 context.sleep(config.link.latency * 2).await;
2616
2617 assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2618
2619 let blocked_peers = oracle.blocked().await.unwrap();
2621 let leader_a_blocked = blocked_peers
2622 .iter()
2623 .any(|(a, b)| a == &peers[2].public_key && b == &leader_a);
2624 assert!(
2625 !leader_a_blocked,
2626 "original leader should not be blocked after conflicting leader update"
2627 );
2628 },
2629 );
2630 }
2631
2632 #[test_traced]
2633 fn test_non_participant_external_proposed_ignored() {
2634 let fixture = Fixture::<C>::default();
2635 fixture.start(
2636 |config, context, oracle, mut peers, _, coding_config| async move {
2637 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2638 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2639 let commitment = coded_block.commitment();
2640
2641 let peer2_index = peers[2].index.get() as u16;
2643 let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2644 let shard_bytes = peer2_shard.encode();
2645
2646 let peer2_pk = peers[2].public_key.clone();
2647 let leader = peers[0].public_key.clone();
2648 let non_participant_leader = PrivateKey::from_seed(10_000).public_key();
2649
2650 let shard_sub = peers[2]
2652 .mailbox
2653 .subscribe_assigned_shard_verified(commitment);
2654
2655 peers[2].mailbox.discovered(
2657 commitment,
2658 non_participant_leader,
2659 Round::new(Epoch::zero(), View::new(1)),
2660 );
2661
2662 peers[0]
2664 .sender
2665 .send(Recipients::One(peer2_pk.clone()), shard_bytes.clone(), true);
2666 context.sleep(config.link.latency * 2).await;
2667
2668 let blocked = oracle.blocked().await.unwrap();
2669 let leader_blocked = blocked
2670 .iter()
2671 .any(|(a, b)| a == &peers[2].public_key && b == &leader);
2672 assert!(
2673 !leader_blocked,
2674 "leader should not be blocked when non-participant update is ignored"
2675 );
2676
2677 peers[2].mailbox.discovered(
2679 commitment,
2680 leader,
2681 Round::new(Epoch::zero(), View::new(1)),
2682 );
2683 context.sleep(config.link.latency * 2).await;
2684
2685 select! {
2686 _ = shard_sub => {},
2687 _ = context.sleep(Duration::from_secs(5)) => {
2688 panic!("subscription did not complete after valid leader update");
2689 },
2690 };
2691 },
2692 );
2693 }
2694
2695 #[test_traced]
2696 fn test_shard_from_non_participant_blocks_peer() {
2697 let fixture = Fixture::<C>::default();
2698 fixture.start(
2699 |config, context, oracle, peers, _, coding_config| async move {
2700 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2701 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2702 let commitment = coded_block.commitment();
2703
2704 let leader = peers[0].public_key.clone();
2705 let receiver_pk = peers[2].public_key.clone();
2706
2707 let non_participant_key = PrivateKey::from_seed(10_000);
2708 let non_participant_pk = non_participant_key.public_key();
2709
2710 let non_participant_control = oracle.control(non_participant_pk.clone());
2711 let (mut non_participant_sender, _non_participant_receiver) =
2712 non_participant_control
2713 .register(0, TEST_QUOTA)
2714 .await
2715 .expect("registration should succeed");
2716 oracle
2717 .add_link(
2718 non_participant_pk.clone(),
2719 receiver_pk.clone(),
2720 DEFAULT_LINK,
2721 )
2722 .await
2723 .expect("link should be added");
2724 oracle.manager().track(
2725 2,
2726 TrackedPeers::new(
2727 Set::from_iter_dedup(peers.iter().map(|peer| peer.public_key.clone())),
2728 Set::from_iter_dedup([non_participant_pk.clone()]),
2729 ),
2730 );
2731 context.sleep(Duration::from_millis(10)).await;
2732
2733 peers[2].mailbox.discovered(
2734 commitment,
2735 leader,
2736 Round::new(Epoch::zero(), View::new(1)),
2737 );
2738
2739 let peer2_index = peers[2].index.get() as u16;
2740 let shard = coded_block.shard(peer2_index).expect("missing shard");
2741 let shard_bytes = shard.encode();
2742
2743 non_participant_sender.send(Recipients::One(receiver_pk), shard_bytes, true);
2744 context.sleep(config.link.latency * 2).await;
2745
2746 assert_blocked(&oracle, &peers[2].public_key, &non_participant_pk).await;
2747 },
2748 );
2749 }
2750
2751 #[test_traced]
2752 fn test_preleader_shard_from_non_participant_is_not_buffered() {
2753 let fixture = Fixture::<C>::default();
2754 fixture.start(
2755 |config, context, oracle, peers, _, coding_config| async move {
2756 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2757 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2758 let commitment = coded_block.commitment();
2759
2760 let leader = peers[0].public_key.clone();
2761 let receiver_pk = peers[2].public_key.clone();
2762
2763 let non_participant_key = PrivateKey::from_seed(10_000);
2764 let non_participant_pk = non_participant_key.public_key();
2765
2766 let non_participant_control = oracle.control(non_participant_pk.clone());
2767 let (mut non_participant_sender, _non_participant_receiver) =
2768 non_participant_control
2769 .register(0, TEST_QUOTA)
2770 .await
2771 .expect("registration should succeed");
2772 oracle
2773 .add_link(
2774 non_participant_pk.clone(),
2775 receiver_pk.clone(),
2776 DEFAULT_LINK,
2777 )
2778 .await
2779 .expect("link should be added");
2780 oracle.manager().track(
2781 2,
2782 TrackedPeers::new(
2783 Set::from_iter_dedup(peers.iter().map(|peer| peer.public_key.clone())),
2784 Set::from_iter_dedup([non_participant_pk.clone()]),
2785 ),
2786 );
2787 context.sleep(Duration::from_millis(10)).await;
2788
2789 let peer2_index = peers[2].index.get() as u16;
2790 let shard = coded_block.shard(peer2_index).expect("missing shard");
2791 let shard_bytes = shard.encode();
2792 let mut shard_sub = peers[2]
2793 .mailbox
2794 .subscribe_assigned_shard_verified(commitment);
2795
2796 non_participant_sender.send(Recipients::One(receiver_pk), shard_bytes, true);
2797 context.sleep(config.link.latency * 2).await;
2798
2799 peers[2].mailbox.discovered(
2800 commitment,
2801 leader,
2802 Round::new(Epoch::zero(), View::new(1)),
2803 );
2804 context.sleep(config.link.latency * 2).await;
2805
2806 let blocked = oracle.blocked().await.unwrap();
2807 let non_participant_blocked = blocked
2808 .iter()
2809 .any(|(a, b)| a == &peers[2].public_key && b == &non_participant_pk);
2810 assert!(
2811 !non_participant_blocked,
2812 "non-participant should not be blocked when its pre-leader shard is ignored"
2813 );
2814 assert!(
2815 matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
2816 "pre-leader shard from non-participant should not be buffered"
2817 );
2818 },
2819 );
2820 }
2821
2822 #[test_traced]
2823 fn test_duplicate_shard_ignored() {
2824 let fixture: Fixture<C> = Fixture {
2826 num_peers: 10,
2827 ..Default::default()
2828 };
2829
2830 fixture.start(
2831 |config, context, oracle, mut peers, _, coding_config| async move {
2832 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2833 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2834
2835 let peer2_index = peers[2].index.get() as u16;
2837 let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2838
2839 let peer1_index = peers[1].index.get() as u16;
2841 let peer1_shard = coded_block.shard(peer1_index).expect("missing shard");
2842
2843 let peer2_pk = peers[2].public_key.clone();
2844 let leader = peers[0].public_key.clone();
2845
2846 peers[2].mailbox.discovered(
2848 coded_block.commitment(),
2849 leader,
2850 Round::new(Epoch::zero(), View::new(1)),
2851 );
2852
2853 let leader_shard_bytes = peer2_shard.encode();
2855 peers[0]
2856 .sender
2857 .send(Recipients::One(peer2_pk.clone()), leader_shard_bytes, true);
2858 context.sleep(config.link.latency * 2).await;
2859
2860 let peer1_shard_bytes = peer1_shard.encode();
2862 peers[1].sender.send(
2863 Recipients::One(peer2_pk.clone()),
2864 peer1_shard_bytes.clone(),
2865 true,
2866 );
2867 context.sleep(config.link.latency * 2).await;
2868
2869 peers[1]
2872 .sender
2873 .send(Recipients::One(peer2_pk), peer1_shard_bytes, true);
2874 context.sleep(config.link.latency * 2).await;
2875
2876 let blocked_peers = oracle.blocked().await.unwrap();
2878 let is_blocked = blocked_peers
2879 .iter()
2880 .any(|(a, b)| a == &peers[2].public_key && b == &peers[1].public_key);
2881 assert!(
2882 !is_blocked,
2883 "peer should not be blocked for exact duplicate shard"
2884 );
2885 },
2886 );
2887 }
2888
2889 #[test_traced]
2890 fn test_equivocating_shard_blocks_peer() {
2891 let fixture: Fixture<C> = Fixture {
2893 num_peers: 10,
2894 ..Default::default()
2895 };
2896
2897 fixture.start(
2898 |config, context, oracle, mut peers, _, coding_config| async move {
2899 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2900 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
2901
2902 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 200);
2904 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
2905
2906 let peer1_index = peers[1].index.get() as u16;
2908 let peer1_shard = coded_block1.shard(peer1_index).expect("missing shard");
2909
2910 let mut peer1_equivocating_shard =
2912 coded_block2.shard(peer1_index).expect("missing shard");
2913 peer1_equivocating_shard.commitment = coded_block1.commitment();
2916
2917 let peer2_pk = peers[2].public_key.clone();
2918 let leader = peers[0].public_key.clone();
2919
2920 peers[2].mailbox.discovered(
2922 coded_block1.commitment(),
2923 leader,
2924 Round::new(Epoch::zero(), View::new(1)),
2925 );
2926
2927 let peer2_index = peers[2].index.get() as u16;
2929 let leader_shard = coded_block1.shard(peer2_index).expect("missing shard");
2930 let leader_shard_bytes = leader_shard.encode();
2931 peers[0]
2932 .sender
2933 .send(Recipients::One(peer2_pk.clone()), leader_shard_bytes, true);
2934 context.sleep(config.link.latency * 2).await;
2935
2936 let shard_bytes = peer1_shard.encode();
2938 peers[1]
2939 .sender
2940 .send(Recipients::One(peer2_pk.clone()), shard_bytes, true);
2941 context.sleep(config.link.latency * 2).await;
2942
2943 let equivocating_bytes = peer1_equivocating_shard.encode();
2945 peers[1]
2946 .sender
2947 .send(Recipients::One(peer2_pk), equivocating_bytes, true);
2948 context.sleep(config.link.latency * 2).await;
2949
2950 assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2952 },
2953 );
2954 }
2955
2956 #[test_traced]
2957 fn test_reconstruction_states_pruned_at_or_below_reconstructed_view() {
2958 let fixture: Fixture<C> = Fixture {
2960 num_peers: 10,
2961 ..Default::default()
2962 };
2963
2964 fixture.start(
2965 |config, context, oracle, mut peers, _, coding_config| async move {
2966 let block_a = CodedBlock::<B, C, H>::new(
2968 B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100),
2969 coding_config,
2970 &STRATEGY,
2971 );
2972 let commitment_a = block_a.commitment();
2973
2974 let block_b = CodedBlock::<B, C, H>::new(
2976 B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200),
2977 coding_config,
2978 &STRATEGY,
2979 );
2980 let commitment_b = block_b.commitment();
2981
2982 let peer2_pk = peers[2].public_key.clone();
2983 let leader = peers[0].public_key.clone();
2984
2985 peers[2].mailbox.discovered(
2987 commitment_a,
2988 leader.clone(),
2989 Round::new(Epoch::zero(), View::new(1)),
2990 );
2991 let shard_a = block_a
2992 .shard(peers[1].index.get() as u16)
2993 .expect("missing shard")
2994 .encode();
2995 peers[1]
2996 .sender
2997 .send(Recipients::One(peer2_pk.clone()), shard_a.clone(), true);
2998 context.sleep(config.link.latency * 2).await;
2999
3000 peers[2].mailbox.discovered(
3002 commitment_b,
3003 leader,
3004 Round::new(Epoch::zero(), View::new(2)),
3005 );
3006 let leader_shard_b = block_b
3008 .shard(peers[2].index.get() as u16)
3009 .expect("missing shard")
3010 .encode();
3011 peers[0]
3012 .sender
3013 .send(Recipients::One(peer2_pk.clone()), leader_shard_b, true);
3014
3015 for i in [1usize, 3usize, 4usize] {
3017 let shard = block_b
3018 .shard(peers[i].index.get() as u16)
3019 .expect("missing shard")
3020 .encode();
3021 peers[i]
3022 .sender
3023 .send(Recipients::One(peer2_pk.clone()), shard, true);
3024 }
3025 context.sleep(config.link.latency * 4).await;
3026
3027 let reconstructed = peers[2]
3029 .mailbox
3030 .get(commitment_b)
3031 .await
3032 .expect("block B should reconstruct");
3033 assert_eq!(reconstructed.commitment(), commitment_b);
3034
3035 peers[1]
3038 .sender
3039 .send(Recipients::One(peer2_pk), shard_a, true);
3040 context.sleep(config.link.latency * 2).await;
3041
3042 let blocked = oracle.blocked().await.unwrap();
3043 let blocked_peer1 = blocked
3044 .iter()
3045 .any(|(a, b)| a == &peers[2].public_key && b == &peers[1].public_key);
3046 assert!(
3047 !blocked_peer1,
3048 "peer1 should not be blocked after lower-view state was pruned"
3049 );
3050 },
3051 );
3052 }
3053
3054 #[test_traced]
3055 fn test_local_proposal_prune_clears_older_reconstruction_state() {
3056 let fixture: Fixture<C> = Fixture {
3057 num_peers: 10,
3058 ..Default::default()
3059 };
3060
3061 fixture.start(
3062 |config, context, oracle, mut peers, _, coding_config| async move {
3063 let block_a = CodedBlock::<B, C, H>::new(
3064 B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100),
3065 coding_config,
3066 &STRATEGY,
3067 );
3068 let commitment_a = block_a.commitment();
3069
3070 let block_b = CodedBlock::<B, C, H>::new(
3071 B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200),
3072 coding_config,
3073 &STRATEGY,
3074 );
3075 let commitment_b = block_b.commitment();
3076
3077 let peer2_pk = peers[2].public_key.clone();
3078 let leader = peers[0].public_key.clone();
3079 let round_a = Round::new(Epoch::zero(), View::new(1));
3080 let round_b = Round::new(Epoch::zero(), View::new(2));
3081
3082 peers[2].mailbox.discovered(commitment_a, leader, round_a);
3083 let block_sub = peers[2].mailbox.subscribe(commitment_a);
3084
3085 let peer1_index = peers[1].index.get() as u16;
3086 let shard_a = block_a.shard(peer1_index).expect("missing shard");
3087
3088 let block_a_equivocating = CodedBlock::<B, C, H>::new(
3089 B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 300),
3090 coding_config,
3091 &STRATEGY,
3092 );
3093 let mut equivocating_shard = block_a_equivocating
3094 .shard(peer1_index)
3095 .expect("missing shard");
3096 equivocating_shard.commitment = commitment_a;
3097
3098 peers[1]
3099 .sender
3100 .send(Recipients::One(peer2_pk.clone()), shard_a.encode(), true);
3101 context.sleep(config.link.latency * 2).await;
3102
3103 peers[2].mailbox.proposed(round_b, block_b);
3104 assert!(
3105 peers[2].mailbox.get(commitment_b).await.is_some(),
3106 "local proposal should be cached before pruning"
3107 );
3108 peers[2].mailbox.prune(commitment_b);
3109
3110 select! {
3111 result = block_sub => {
3112 assert!(
3113 result.is_err(),
3114 "older block subscription should close after local-proposal prune"
3115 );
3116 },
3117 _ = context.sleep(Duration::from_secs(5)) => {
3118 panic!("older block subscription remained open after local-proposal prune");
3119 },
3120 }
3121
3122 peers[1]
3123 .sender
3124 .send(Recipients::One(peer2_pk), equivocating_shard.encode(), true);
3125 context.sleep(config.link.latency * 2).await;
3126
3127 let blocked = oracle.blocked().await.unwrap();
3128 let blocked_peer1 = blocked
3129 .iter()
3130 .any(|(a, b)| a == &peers[2].public_key && b == &peers[1].public_key);
3131 assert!(
3132 !blocked_peer1,
3133 "peer1 should not be blocked after older state was pruned"
3134 );
3135 },
3136 );
3137 }
3138
3139 #[test_traced]
3140 fn test_pending_shards_batch_validated_at_quorum() {
3141 let fixture: Fixture<C> = Fixture {
3150 num_peers: 10,
3151 ..Default::default()
3152 };
3153
3154 fixture.start(
3155 |config, context, oracle, mut peers, _, coding_config| async move {
3156 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3157 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3158 let commitment = coded_block.commitment();
3159
3160 let peer3_pk = peers[3].public_key.clone();
3161 let leader = peers[0].public_key.clone();
3162
3163 peers[3].mailbox.discovered(
3165 commitment,
3166 leader,
3167 Round::new(Epoch::zero(), View::new(1)),
3168 );
3169
3170 for &sender_idx in &[1, 2, 4] {
3173 let shard = coded_block
3174 .shard(peers[sender_idx].index.get() as u16)
3175 .expect("missing shard");
3176 let shard_bytes = shard.encode();
3177 peers[sender_idx].sender.send(
3178 Recipients::One(peer3_pk.clone()),
3179 shard_bytes,
3180 true,
3181 );
3182 }
3183
3184 context.sleep(config.link.latency * 2).await;
3185
3186 let block = peers[3].mailbox.get(commitment).await;
3188 assert!(block.is_none(), "block should not be reconstructed yet");
3189
3190 let peer3_index = peers[3].index.get() as u16;
3194 let leader_shard = coded_block.shard(peer3_index).expect("missing shard");
3195 let leader_shard_bytes = leader_shard.encode();
3196 peers[0]
3197 .sender
3198 .send(Recipients::One(peer3_pk), leader_shard_bytes, true);
3199
3200 context.sleep(config.link.latency * 2).await;
3201
3202 let blocked = oracle.blocked().await.unwrap();
3204 assert!(
3205 blocked.is_empty(),
3206 "no peers should be blocked for valid pending shards"
3207 );
3208
3209 let block = peers[3].mailbox.get(commitment).await;
3211 assert!(
3212 block.is_some(),
3213 "block should be reconstructed after batch validation"
3214 );
3215
3216 let reconstructed = block.unwrap();
3218 assert_eq!(
3219 reconstructed.commitment(),
3220 commitment,
3221 "reconstructed block should have correct commitment"
3222 );
3223 },
3224 );
3225 }
3226
3227 #[test_traced]
3228 fn test_peer_shards_buffered_until_external_proposed() {
3229 let fixture: Fixture<C> = Fixture {
3232 num_peers: 10,
3233 ..Default::default()
3234 };
3235
3236 fixture.start(
3237 |config, context, oracle, mut peers, _, coding_config| async move {
3238 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3239 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3240 let commitment = coded_block.commitment();
3241
3242 let receiver_idx = 3usize;
3243 let receiver_pk = peers[receiver_idx].public_key.clone();
3244 let leader = peers[0].public_key.clone();
3245
3246 let mut shard_sub = peers[receiver_idx]
3248 .mailbox
3249 .subscribe_assigned_shard_verified(commitment);
3250
3251 let leader_shard = coded_block
3254 .shard(peers[receiver_idx].index.get() as u16)
3255 .expect("missing shard")
3256 .encode();
3257 peers[0]
3258 .sender
3259 .send(Recipients::One(receiver_pk.clone()), leader_shard, true);
3260
3261 for i in [1usize, 2usize, 4usize] {
3262 let shard = coded_block
3263 .shard(peers[i].index.get() as u16)
3264 .expect("missing shard")
3265 .encode();
3266 peers[i]
3267 .sender
3268 .send(Recipients::One(receiver_pk.clone()), shard, true);
3269 }
3270
3271 context.sleep(config.link.latency * 2).await;
3272
3273 assert!(
3275 matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
3276 "shard subscription should not resolve before leader announcement"
3277 );
3278 assert!(
3279 peers[receiver_idx].mailbox.get(commitment).await.is_none(),
3280 "block should not reconstruct before leader announcement"
3281 );
3282
3283 peers[receiver_idx].mailbox.discovered(
3285 commitment,
3286 leader,
3287 Round::new(Epoch::zero(), View::new(1)),
3288 );
3289
3290 select! {
3291 _ = shard_sub => {},
3292 _ = context.sleep(Duration::from_secs(5)) => {
3293 panic!("shard subscription did not resolve after leader announcement");
3294 },
3295 }
3296
3297 context.sleep(config.link.latency * 2).await;
3298 assert!(
3299 peers[receiver_idx].mailbox.get(commitment).await.is_some(),
3300 "block should reconstruct after buffered shards are ingested"
3301 );
3302
3303 assert!(
3305 oracle.blocked().await.unwrap().is_empty(),
3306 "no peers should be blocked for valid buffered shards"
3307 );
3308 },
3309 );
3310 }
3311
3312 #[test_traced]
3313 fn test_notarized_commitment_reconstructs_from_buffered_peer_shards_without_leader() {
3314 let fixture: Fixture<C> = Fixture {
3315 num_peers: 10,
3316 ..Default::default()
3317 };
3318
3319 fixture.start(
3320 |config, context, oracle, mut peers, _, coding_config| async move {
3321 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3322 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3323 let commitment = coded_block.commitment();
3324 let round = Round::new(Epoch::zero(), View::new(1));
3325
3326 let receiver_idx = 3usize;
3327 let receiver = peers[receiver_idx].public_key.clone();
3328
3329 let block_sub = peers[receiver_idx].mailbox.subscribe(commitment);
3330
3331 for sender_idx in [1usize, 2, 4, 5] {
3334 let shard = coded_block
3335 .shard(peers[sender_idx].index.get() as u16)
3336 .expect("missing shard")
3337 .encode();
3338 peers[sender_idx].sender.send(
3339 Recipients::One(receiver.clone()),
3340 shard,
3341 true,
3342 );
3343 }
3344 context.sleep(config.link.latency * 2).await;
3345
3346 assert!(
3347 peers[receiver_idx].mailbox.get(commitment).await.is_none(),
3348 "block should not reconstruct before the commitment is notarized"
3349 );
3350
3351 peers[receiver_idx].mailbox.notarized(commitment, round);
3352
3353 select! {
3354 _ = block_sub => {},
3355 _ = context.sleep(Duration::from_secs(5)) => {
3356 panic!("block subscription did not resolve after notarized reconstruction interest");
3357 },
3358 }
3359
3360 let reconstructed = peers[receiver_idx]
3361 .mailbox
3362 .get(commitment)
3363 .await
3364 .expect("block should reconstruct from buffered peer shards");
3365 assert_eq!(reconstructed.commitment(), commitment);
3366
3367 let mut assigned = peers[receiver_idx]
3368 .mailbox
3369 .subscribe_assigned_shard_verified(commitment);
3370 assert!(
3371 matches!(assigned.try_recv(), Err(TryRecvError::Empty)),
3372 "leaderless reconstruction must not satisfy assigned shard readiness"
3373 );
3374
3375 let leader = peers[0].public_key.clone();
3376 peers[receiver_idx]
3377 .mailbox
3378 .discovered(commitment, leader, round);
3379 let leader_shard = coded_block
3380 .shard(peers[receiver_idx].index.get() as u16)
3381 .expect("missing leader shard")
3382 .encode();
3383 peers[0].sender.send(
3384 Recipients::One(receiver),
3385 leader_shard,
3386 true,
3387 );
3388
3389 select! {
3390 _ = assigned => {},
3391 _ = context.sleep(Duration::from_secs(5)) => {
3392 panic!("assigned shard subscription did not resolve after leader discovery");
3393 },
3394 }
3395
3396 assert!(
3397 oracle.blocked().await.unwrap().is_empty(),
3398 "valid sender-indexed shards should not block peers"
3399 );
3400 },
3401 );
3402 }
3403
3404 #[test_traced]
3405 fn test_leader_shard_after_notarized_is_buffered_until_discovered() {
3406 let fixture: Fixture<C> = Fixture {
3407 num_peers: 10,
3408 ..Default::default()
3409 };
3410
3411 fixture.start(
3412 |config, context, oracle, mut peers, _, coding_config| async move {
3413 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3414 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3415 let commitment = coded_block.commitment();
3416 let round = Round::new(Epoch::zero(), View::new(1));
3417
3418 let leader_idx = 0usize;
3419 let receiver_idx = 3usize;
3420 let leader = peers[leader_idx].public_key.clone();
3421 let receiver = peers[receiver_idx].public_key.clone();
3422
3423 peers[receiver_idx].mailbox.notarized(commitment, round);
3424 let assigned = peers[receiver_idx]
3425 .mailbox
3426 .subscribe_assigned_shard_verified(commitment);
3427
3428 let leader_shard = coded_block
3429 .shard(peers[receiver_idx].index.get() as u16)
3430 .expect("missing receiver shard")
3431 .encode();
3432 peers[leader_idx]
3433 .sender
3434 .send(Recipients::One(receiver), leader_shard, true);
3435
3436 context.sleep(config.link.latency * 2).await;
3437 peers[receiver_idx]
3438 .mailbox
3439 .discovered(commitment, leader, round);
3440
3441 assigned
3442 .await
3443 .expect("assigned shard should resolve after leader discovery");
3444 assert!(
3445 oracle.blocked().await.unwrap().is_empty(),
3446 "valid leader shard should not block peers"
3447 );
3448 },
3449 );
3450 }
3451
3452 #[test_traced]
3453 fn test_post_leader_shards_processed_immediately() {
3454 let fixture: Fixture<C> = Fixture {
3457 num_peers: 10,
3458 ..Default::default()
3459 };
3460
3461 fixture.start(
3462 |config, context, oracle, mut peers, _, coding_config| async move {
3463 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3464 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3465 let commitment = coded_block.commitment();
3466
3467 let receiver_idx = 3usize;
3468 let receiver_pk = peers[receiver_idx].public_key.clone();
3469 let leader = peers[0].public_key.clone();
3470
3471 let shard_sub = peers[receiver_idx]
3472 .mailbox
3473 .subscribe_assigned_shard_verified(commitment);
3474 peers[receiver_idx].mailbox.discovered(
3475 commitment,
3476 leader.clone(),
3477 Round::new(Epoch::zero(), View::new(1)),
3478 );
3479
3480 let leader_shard = coded_block
3482 .shard(peers[receiver_idx].index.get() as u16)
3483 .expect("missing shard")
3484 .encode();
3485 peers[0]
3486 .sender
3487 .send(Recipients::One(receiver_pk.clone()), leader_shard, true);
3488
3489 select! {
3491 _ = shard_sub => {},
3492 _ = context.sleep(Duration::from_secs(5)) => {
3493 panic!("shard subscription did not resolve after post-leader shard");
3494 },
3495 }
3496
3497 for i in [1usize, 2usize, 4usize] {
3499 let shard = coded_block
3500 .shard(peers[i].index.get() as u16)
3501 .expect("missing shard")
3502 .encode();
3503 peers[i]
3504 .sender
3505 .send(Recipients::One(receiver_pk.clone()), shard, true);
3506 }
3507
3508 context.sleep(config.link.latency * 2).await;
3509 let reconstructed = peers[receiver_idx]
3510 .mailbox
3511 .get(commitment)
3512 .await
3513 .expect("block should reconstruct from post-leader shards");
3514 assert_eq!(reconstructed.commitment(), commitment);
3515
3516 assert!(
3517 oracle.blocked().await.unwrap().is_empty(),
3518 "no peers should be blocked for valid post-leader shards"
3519 );
3520 },
3521 );
3522 }
3523
3524 #[test_traced]
3525 fn test_invalid_shard_codec_blocks_peer() {
3526 let fixture: Fixture<C> = Fixture {
3528 num_peers: 4,
3529 ..Default::default()
3530 };
3531
3532 fixture.start(
3533 |config, context, oracle, mut peers, _, _coding_config| async move {
3534 let peer0_pk = peers[0].public_key.clone();
3535 let peer1_pk = peers[1].public_key.clone();
3536
3537 let garbage = Bytes::from(vec![0xFF, 0xFE, 0xFD, 0xFC, 0xFB]);
3539 peers[1]
3540 .sender
3541 .send(Recipients::One(peer0_pk.clone()), garbage, true);
3542
3543 context.sleep(config.link.latency * 2).await;
3544
3545 assert_blocked(&oracle, &peer0_pk, &peer1_pk).await;
3547 },
3548 );
3549 }
3550
3551 #[test_traced]
3552 fn test_duplicate_buffered_shard_does_not_block_before_leader() {
3553 let fixture: Fixture<C> = Fixture {
3556 ..Default::default()
3557 };
3558
3559 fixture.start(
3560 |config, context, oracle, mut peers, _, coding_config| async move {
3561 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3562 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3563
3564 let peer2_index = peers[2].index.get() as u16;
3566 let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
3567 let shard_bytes = peer2_shard.encode();
3568
3569 let peer2_pk = peers[2].public_key.clone();
3570
3571 peers[1]
3575 .sender
3576 .send(Recipients::One(peer2_pk.clone()), shard_bytes.clone(), true);
3577 context.sleep(config.link.latency * 2).await;
3578
3579 let blocked = oracle.blocked().await.unwrap();
3581 assert!(blocked.is_empty(), "no peers should be blocked yet");
3582
3583 peers[1]
3585 .sender
3586 .send(Recipients::One(peer2_pk), shard_bytes, true);
3587 context.sleep(config.link.latency * 2).await;
3588
3589 let blocked = oracle.blocked().await.unwrap();
3591 assert!(
3592 blocked.is_empty(),
3593 "no peers should be blocked before leader"
3594 );
3595 },
3596 );
3597 }
3598
3599 #[test_traced]
3600 fn test_invalid_leader_shard_crypto_blocks_leader() {
3601 let fixture: Fixture<C> = Fixture {
3604 ..Default::default()
3605 };
3606
3607 fixture.start(
3608 |config, context, oracle, mut peers, _, coding_config| async move {
3609 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3612 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3613 let commitment1 = coded_block1.commitment();
3614
3615 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3616 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3617
3618 let peer2_index = peers[2].index.get() as u16;
3621 let mut wrong_shard = coded_block2.shard(peer2_index).expect("missing shard");
3622 wrong_shard.commitment = commitment1;
3623 let wrong_bytes = wrong_shard.encode();
3624
3625 let peer2_pk = peers[2].public_key.clone();
3626 let leader = peers[0].public_key.clone();
3627
3628 peers[2].mailbox.discovered(
3630 commitment1,
3631 leader,
3632 Round::new(Epoch::zero(), View::new(1)),
3633 );
3634
3635 peers[0]
3637 .sender
3638 .send(Recipients::One(peer2_pk), wrong_bytes, true);
3639 context.sleep(config.link.latency * 2).await;
3640
3641 assert_blocked(&oracle, &peers[2].public_key, &peers[0].public_key).await;
3643 },
3644 );
3645 }
3646
3647 #[test_traced]
3648 fn test_shard_index_mismatch_blocks_peer() {
3649 let fixture: Fixture<C> = Fixture {
3652 num_peers: 10,
3653 ..Default::default()
3654 };
3655
3656 fixture.start(
3657 |config, context, oracle, mut peers, _, coding_config| async move {
3658 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3659 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3660 let commitment = coded_block.commitment();
3661
3662 let peer3_index = peers[3].index.get() as u16;
3664 let leader_shard = coded_block.shard(peer3_index).expect("missing shard");
3665
3666 let peer1_index = peers[1].index.get() as u16;
3668 let mut wrong_index_shard = coded_block.shard(peer1_index).expect("missing shard");
3669 wrong_index_shard.index = peers[4].index.get() as u16;
3671 let wrong_bytes = wrong_index_shard.encode();
3672
3673 let peer3_pk = peers[3].public_key.clone();
3674 let leader = peers[0].public_key.clone();
3675
3676 peers[3].mailbox.discovered(
3678 commitment,
3679 leader,
3680 Round::new(Epoch::zero(), View::new(1)),
3681 );
3682 let shard_bytes = leader_shard.encode();
3683 peers[0]
3684 .sender
3685 .send(Recipients::One(peer3_pk.clone()), shard_bytes, true);
3686 context.sleep(config.link.latency * 2).await;
3687
3688 peers[1]
3690 .sender
3691 .send(Recipients::One(peer3_pk), wrong_bytes, true);
3692 context.sleep(config.link.latency * 2).await;
3693
3694 assert_blocked(&oracle, &peers[3].public_key, &peers[1].public_key).await;
3696 },
3697 );
3698 }
3699
3700 #[test_traced]
3701 fn test_invalid_shard_crypto_blocks_peer() {
3702 let fixture: Fixture<C> = Fixture {
3705 num_peers: 10,
3706 ..Default::default()
3707 };
3708
3709 fixture.start(
3710 |config, context, oracle, mut peers, _, coding_config| async move {
3711 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3713 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3714 let commitment1 = coded_block1.commitment();
3715
3716 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3717 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3718
3719 let peer3_index = peers[3].index.get() as u16;
3721 let leader_shard = coded_block1.shard(peer3_index).expect("missing shard");
3722
3723 let peer1_index = peers[1].index.get() as u16;
3726 let mut wrong_shard = coded_block2.shard(peer1_index).expect("missing shard");
3727 wrong_shard.commitment = commitment1;
3728 let wrong_bytes = wrong_shard.encode();
3729
3730 let peer3_pk = peers[3].public_key.clone();
3731 let leader = peers[0].public_key.clone();
3732
3733 peers[3].mailbox.discovered(
3735 commitment1,
3736 leader,
3737 Round::new(Epoch::zero(), View::new(1)),
3738 );
3739 let shard_bytes = leader_shard.encode();
3740 peers[0]
3741 .sender
3742 .send(Recipients::One(peer3_pk.clone()), shard_bytes, true);
3743 context.sleep(config.link.latency * 2).await;
3744
3745 peers[1]
3747 .sender
3748 .send(Recipients::One(peer3_pk.clone()), wrong_bytes, true);
3749 context.sleep(config.link.latency * 2).await;
3750
3751 for &idx in &[2, 4] {
3755 let peer_index = peers[idx].index.get() as u16;
3756 let shard = coded_block1.shard(peer_index).expect("missing shard");
3757 let bytes = shard.encode();
3758 peers[idx]
3759 .sender
3760 .send(Recipients::One(peer3_pk.clone()), bytes, true);
3761 }
3762 context.sleep(config.link.latency * 2).await;
3763
3764 assert_blocked(&oracle, &peers[3].public_key, &peers[1].public_key).await;
3766 },
3767 );
3768 }
3769
3770 #[test_traced]
3771 fn test_reconstruction_recovers_after_quorum_with_one_invalid_shard() {
3772 let fixture: Fixture<C> = Fixture {
3777 num_peers: 10,
3778 ..Default::default()
3779 };
3780
3781 fixture.start(
3782 |config, context, oracle, mut peers, _, coding_config| async move {
3783 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3784 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3785 let commitment1 = coded_block1.commitment();
3786
3787 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3788 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3789
3790 let receiver_idx = 3usize;
3791 let receiver_pk = peers[receiver_idx].public_key.clone();
3792
3793 let peer1_index = peers[1].index.get() as u16;
3795 let mut invalid_shard = coded_block2.shard(peer1_index).expect("missing shard");
3796 invalid_shard.commitment = commitment1;
3797
3798 let leader = peers[0].public_key.clone();
3800 peers[receiver_idx].mailbox.discovered(
3801 commitment1,
3802 leader,
3803 Round::new(Epoch::zero(), View::new(1)),
3804 );
3805 let leader_shard = coded_block1
3806 .shard(peers[receiver_idx].index.get() as u16)
3807 .expect("missing shard")
3808 .encode();
3809 peers[0]
3810 .sender
3811 .send(Recipients::One(receiver_pk.clone()), leader_shard, true);
3812
3813 peers[1].sender.send(
3818 Recipients::One(receiver_pk.clone()),
3819 invalid_shard.encode(),
3820 true,
3821 );
3822 for idx in [2usize, 4usize] {
3823 let shard = coded_block1
3824 .shard(peers[idx].index.get() as u16)
3825 .expect("missing shard")
3826 .encode();
3827 peers[idx]
3828 .sender
3829 .send(Recipients::One(receiver_pk.clone()), shard, true);
3830 }
3831
3832 context.sleep(config.link.latency * 2).await;
3833
3834 assert_blocked(
3836 &oracle,
3837 &peers[receiver_idx].public_key,
3838 &peers[1].public_key,
3839 )
3840 .await;
3841 assert!(
3842 peers[receiver_idx].mailbox.get(commitment1).await.is_none(),
3843 "block should not reconstruct with only 3 checked shards"
3844 );
3845
3846 let extra_shard = coded_block1
3848 .shard(peers[5].index.get() as u16)
3849 .expect("missing shard")
3850 .encode();
3851 peers[5]
3852 .sender
3853 .send(Recipients::One(receiver_pk), extra_shard, true);
3854
3855 context.sleep(config.link.latency * 2).await;
3856
3857 let reconstructed = peers[receiver_idx]
3858 .mailbox
3859 .get(commitment1)
3860 .await
3861 .expect("block should reconstruct after additional valid shard");
3862 assert_eq!(reconstructed.commitment(), commitment1);
3863 },
3864 );
3865 }
3866
3867 #[test_traced]
3868 fn test_invalid_pending_shard_blocked_on_drain() {
3869 let fixture: Fixture<C> = Fixture {
3872 num_peers: 10,
3873 ..Default::default()
3874 };
3875
3876 fixture.start(
3877 |config, context, oracle, mut peers, _, coding_config| async move {
3878 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3880 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3881 let commitment1 = coded_block1.commitment();
3882
3883 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3884 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3885
3886 let peer1_index = peers[1].index.get() as u16;
3888 let mut wrong_shard = coded_block2.shard(peer1_index).expect("missing shard");
3889 wrong_shard.commitment = commitment1;
3890 let wrong_bytes = wrong_shard.encode();
3891
3892 let peer3_pk = peers[3].public_key.clone();
3893
3894 peers[1]
3897 .sender
3898 .send(Recipients::One(peer3_pk.clone()), wrong_bytes, true);
3899 context.sleep(config.link.latency * 2).await;
3900
3901 let blocked = oracle.blocked().await.unwrap();
3903 assert!(blocked.is_empty(), "no peers should be blocked yet");
3904
3905 for &idx in &[2, 4] {
3909 let peer_index = peers[idx].index.get() as u16;
3910 let shard = coded_block1.shard(peer_index).expect("missing shard");
3911 let bytes = shard.encode();
3912 peers[idx]
3913 .sender
3914 .send(Recipients::One(peer3_pk.clone()), bytes, true);
3915 }
3916 context.sleep(config.link.latency * 2).await;
3917
3918 let blocked = oracle.blocked().await.unwrap();
3920 assert!(blocked.is_empty(), "no peers should be blocked yet");
3921
3922 let leader = peers[0].public_key.clone();
3924 peers[3].mailbox.discovered(
3925 commitment1,
3926 leader,
3927 Round::new(Epoch::zero(), View::new(1)),
3928 );
3929 let peer3_index = peers[3].index.get() as u16;
3930 let leader_shard = coded_block1.shard(peer3_index).expect("missing shard");
3931 let shard_bytes = leader_shard.encode();
3932 peers[0]
3933 .sender
3934 .send(Recipients::One(peer3_pk), shard_bytes, true);
3935 context.sleep(config.link.latency * 2).await;
3936
3937 assert_blocked(&oracle, &peers[3].public_key, &peers[1].public_key).await;
3940 },
3941 );
3942 }
3943
3944 #[test_traced]
3945 fn test_cross_epoch_buffered_shard_not_blocked() {
3946 let executor = deterministic::Runner::default();
3947 executor.start(|context| async move {
3948 let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
3949 context.child("network"),
3950 simulated::Config {
3951 max_size: MAX_SHARD_SIZE as u32,
3952 disconnect_on_block: true,
3953 tracked_peer_sets: NZUsize!(1),
3954 },
3955 );
3956 network.start();
3957
3958 let mut epoch0_keys: Vec<PrivateKey> = (0..4).map(PrivateKey::from_seed).collect();
3961 epoch0_keys.sort_by_key(|s| s.public_key());
3962 let epoch0_pks: Vec<P> = epoch0_keys.iter().map(|c| c.public_key()).collect();
3963 let epoch0_set: Set<P> = Set::from_iter_dedup(epoch0_pks.clone());
3964
3965 let future_peer_key = PrivateKey::from_seed(4);
3966 let future_peer_pk = future_peer_key.public_key();
3967 let mut epoch1_pks: Vec<P> = epoch0_pks[..3]
3968 .iter()
3969 .cloned()
3970 .chain(std::iter::once(future_peer_pk.clone()))
3971 .collect();
3972 epoch1_pks.sort();
3973 let epoch1_set: Set<P> = Set::from_iter_dedup(epoch1_pks);
3974
3975 let receiver_idx_in_epoch0 = epoch0_set
3976 .index(&epoch0_pks[0])
3977 .expect("receiver must be in epoch 0")
3978 .get() as usize;
3979 let receiver_key = epoch0_keys[receiver_idx_in_epoch0].clone();
3980 let receiver_pk = receiver_key.public_key();
3981
3982 let receiver_control = oracle.control(receiver_pk.clone());
3983 let (sender_handle, receiver_handle) = receiver_control
3984 .register(0, TEST_QUOTA)
3985 .await
3986 .expect("registration should succeed");
3987
3988 let future_peer_control = oracle.control(future_peer_pk.clone());
3989 let (mut future_peer_sender, _future_peer_receiver) = future_peer_control
3990 .register(0, TEST_QUOTA)
3991 .await
3992 .expect("registration should succeed");
3993 oracle
3994 .add_link(future_peer_pk.clone(), receiver_pk.clone(), DEFAULT_LINK)
3995 .await
3996 .expect("link should be added");
3997 oracle.manager().track(
3998 0,
3999 Set::from_iter_dedup([receiver_pk.clone(), future_peer_pk.clone()]),
4000 );
4001 context.sleep(Duration::from_millis(10)).await;
4002
4003 let scheme_epoch0 =
4005 Scheme::signer(SCHEME_NAMESPACE, epoch0_set.clone(), receiver_key.clone())
4006 .expect("signer scheme should be created");
4007 let scheme_epoch1 =
4008 Scheme::signer(SCHEME_NAMESPACE, epoch1_set.clone(), receiver_key.clone())
4009 .expect("signer scheme should be created");
4010 let scheme_provider =
4011 MultiEpochProvider::single(scheme_epoch0).with_epoch(Epoch::new(1), scheme_epoch1);
4012
4013 let config: Config<_, _, _, _, C, _, _, _> = Config {
4014 scheme_provider,
4015 blocker: receiver_control.clone(),
4016 shard_codec_cfg: CodecConfig {
4017 maximum_shard_size: MAX_SHARD_SIZE,
4018 },
4019 block_codec_cfg: (),
4020 strategy: STRATEGY,
4021 mailbox_size: NZUsize!(1024),
4022 peer_buffer_size: NZUsize!(64),
4023 background_channel_capacity: NZUsize!(1024),
4024 peer_provider: oracle.manager(),
4025 };
4026
4027 let (engine, mailbox) = ShardEngine::new(context.child("receiver"), config);
4028 engine.start((sender_handle, receiver_handle));
4029
4030 let coding_config = coding_config_for_participants(epoch1_set.len() as u16);
4032 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4033 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4034 let commitment = coded_block.commitment();
4035
4036 let future_peer_index = epoch1_set
4038 .index(&future_peer_pk)
4039 .expect("future peer must be in epoch 1");
4040 let future_shard = coded_block
4041 .shard(future_peer_index.get() as u16)
4042 .expect("missing shard");
4043 let shard_bytes = future_shard.encode();
4044
4045 future_peer_sender.send(Recipients::One(receiver_pk.clone()), shard_bytes, true);
4047 context.sleep(DEFAULT_LINK.latency * 2).await;
4048
4049 let blocked = oracle.blocked().await.unwrap();
4051 assert!(
4052 blocked.is_empty(),
4053 "no peers should be blocked while shard is buffered"
4054 );
4055
4056 let leader = epoch0_pks[1].clone();
4058 mailbox.discovered(commitment, leader, Round::new(Epoch::new(1), View::new(1)));
4059 context.sleep(DEFAULT_LINK.latency * 2).await;
4060
4061 let blocked = oracle.blocked().await.unwrap();
4064 assert!(
4065 blocked.is_empty(),
4066 "future-epoch participant should not be blocked: {blocked:?}"
4067 );
4068 });
4069 }
4070
4071 #[test_traced]
4072 fn test_shard_broadcast_survives_provider_churn() {
4073 let executor = deterministic::Runner::default();
4074 executor.start(|context| async move {
4075 let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
4076 context.child("network"),
4077 simulated::Config {
4078 max_size: MAX_SHARD_SIZE as u32,
4079 disconnect_on_block: true,
4080 tracked_peer_sets: NZUsize!(1),
4081 },
4082 );
4083 network.start();
4084
4085 let mut private_keys: Vec<PrivateKey> = (0..4).map(PrivateKey::from_seed).collect();
4086 private_keys.sort_by_key(|s| s.public_key());
4087 let peer_keys: Vec<P> = private_keys.iter().map(|k| k.public_key()).collect();
4088 let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
4089
4090 let leader_idx = 0usize;
4091 let broadcaster_idx = 1usize;
4092 let receiver_idx = 2usize;
4093
4094 let leader_pk = peer_keys[leader_idx].clone();
4095 let broadcaster_pk = peer_keys[broadcaster_idx].clone();
4096 let receiver_pk = peer_keys[receiver_idx].clone();
4097
4098 let mut registrations = BTreeMap::new();
4099 for key in &peer_keys {
4100 let control = oracle.control(key.clone());
4101 let (sender, receiver) = control
4102 .register(0, TEST_QUOTA)
4103 .await
4104 .expect("registration should succeed");
4105 registrations.insert(key.clone(), (control, sender, receiver));
4106 }
4107
4108 for src in &peer_keys {
4109 for dst in &peer_keys {
4110 if src == dst {
4111 continue;
4112 }
4113 oracle
4114 .add_link(src.clone(), dst.clone(), DEFAULT_LINK)
4115 .await
4116 .expect("link should be added");
4117 }
4118 }
4119 oracle.manager().track(0, participants.clone());
4120 context.sleep(Duration::from_millis(10)).await;
4121
4122 let (_leader_control, mut leader_sender, _leader_receiver) = registrations
4123 .remove(&leader_pk)
4124 .expect("leader should be registered");
4125 let (broadcaster_control, broadcaster_sender, broadcaster_receiver) = registrations
4126 .remove(&broadcaster_pk)
4127 .expect("broadcaster should be registered");
4128 let (receiver_control, receiver_sender, receiver_receiver) = registrations
4129 .remove(&receiver_pk)
4130 .expect("receiver should be registered");
4131
4132 let broadcaster_scheme = Scheme::signer(
4133 SCHEME_NAMESPACE,
4134 participants.clone(),
4135 private_keys[broadcaster_idx].clone(),
4136 )
4137 .expect("signer scheme should be created");
4138 let broadcaster_provider = ChurningProvider::new(broadcaster_scheme, 3);
4142 let broadcaster_config: Config<_, _, _, _, C, _, _, _> = Config {
4143 scheme_provider: broadcaster_provider,
4144 blocker: broadcaster_control.clone(),
4145 shard_codec_cfg: CodecConfig {
4146 maximum_shard_size: MAX_SHARD_SIZE,
4147 },
4148 block_codec_cfg: (),
4149 strategy: STRATEGY,
4150 mailbox_size: NZUsize!(1024),
4151 peer_buffer_size: NZUsize!(64),
4152 background_channel_capacity: NZUsize!(1024),
4153 peer_provider: oracle.manager(),
4154 };
4155 let (broadcaster_engine, broadcaster_mailbox) =
4156 ChurningShardEngine::new(context.child("broadcaster"), broadcaster_config);
4157 broadcaster_engine.start((broadcaster_sender, broadcaster_receiver));
4158
4159 let receiver_scheme = Scheme::signer(
4160 SCHEME_NAMESPACE,
4161 participants.clone(),
4162 private_keys[receiver_idx].clone(),
4163 )
4164 .expect("signer scheme should be created");
4165 let receiver_config: Config<_, _, _, _, C, _, _, _> = Config {
4166 scheme_provider: MultiEpochProvider::single(receiver_scheme),
4167 blocker: receiver_control.clone(),
4168 shard_codec_cfg: CodecConfig {
4169 maximum_shard_size: MAX_SHARD_SIZE,
4170 },
4171 block_codec_cfg: (),
4172 strategy: STRATEGY,
4173 mailbox_size: NZUsize!(1024),
4174 peer_buffer_size: NZUsize!(64),
4175 background_channel_capacity: NZUsize!(1024),
4176 peer_provider: oracle.manager(),
4177 };
4178 let (receiver_engine, receiver_mailbox) =
4179 ShardEngine::new(context.child("receiver"), receiver_config);
4180 receiver_engine.start((receiver_sender, receiver_receiver));
4181
4182 let coding_config = coding_config_for_participants(peer_keys.len() as u16);
4183 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4184 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4185 let commitment = coded_block.commitment();
4186 let round = Round::new(Epoch::zero(), View::new(1));
4187
4188 broadcaster_mailbox.discovered(commitment, leader_pk.clone(), round);
4189 receiver_mailbox.discovered(commitment, leader_pk.clone(), round);
4190 context.sleep(DEFAULT_LINK.latency).await;
4191
4192 let broadcaster_index = participants
4193 .index(&broadcaster_pk)
4194 .expect("broadcaster must be a participant")
4195 .get() as u16;
4196 let broadcaster_shard = coded_block
4197 .shard(broadcaster_index)
4198 .expect("missing shard")
4199 .encode();
4200 leader_sender.send(Recipients::One(broadcaster_pk), broadcaster_shard, true);
4201
4202 let receiver_index = participants
4203 .index(&receiver_pk)
4204 .expect("receiver must be a participant")
4205 .get() as u16;
4206 let receiver_shard = coded_block
4207 .shard(receiver_index)
4208 .expect("missing shard")
4209 .encode();
4210 leader_sender.send(Recipients::One(receiver_pk.clone()), receiver_shard, true);
4211
4212 context.sleep(DEFAULT_LINK.latency * 3).await;
4213
4214 let reconstructed = receiver_mailbox.get(commitment).await;
4215 assert!(
4216 reconstructed.is_some(),
4217 "receiver should reconstruct after broadcaster validates and broadcasts shard"
4218 );
4219 });
4220 }
4221
4222 #[test_traced]
4223 fn test_failed_reconstruction_digest_mismatch_then_recovery() {
4224 let fixture: Fixture<C> = Fixture {
4231 num_peers: 10,
4232 ..Default::default()
4233 };
4234
4235 fixture.start(
4236 |config, context, _oracle, mut peers, _, coding_config| async move {
4237 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4239 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
4240
4241 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
4243 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
4244 let real_commitment2 = coded_block2.commitment();
4245
4246 let fake_commitment = Commitment::from((
4250 coded_block1.digest(),
4251 real_commitment2.root::<Sha256Digest>(),
4252 real_commitment2.context::<Sha256Digest>(),
4253 coding_config,
4254 ));
4255
4256 let receiver_idx = 3usize;
4257 let receiver_pk = peers[receiver_idx].public_key.clone();
4258 let leader = peers[0].public_key.clone();
4259 let round = Round::new(Epoch::zero(), View::new(1));
4260
4261 peers[receiver_idx]
4263 .mailbox
4264 .discovered(fake_commitment, leader.clone(), round);
4265
4266 let mut block_sub = peers[receiver_idx].mailbox.subscribe(fake_commitment);
4268 let mut digest_sub = peers[receiver_idx]
4269 .mailbox
4270 .subscribe_by_digest(coded_block1.digest());
4271
4272 let receiver_shard_idx = peers[receiver_idx].index.get() as u16;
4274 let mut leader_shard = coded_block2
4275 .shard(receiver_shard_idx)
4276 .expect("missing shard");
4277 leader_shard.commitment = fake_commitment;
4278 peers[0].sender.send(
4279 Recipients::One(receiver_pk.clone()),
4280 leader_shard.encode(),
4281 true,
4282 );
4283
4284 for &idx in &[1usize, 2, 4] {
4287 let peer_shard_idx = peers[idx].index.get() as u16;
4288 let mut shard = coded_block2.shard(peer_shard_idx).expect("missing shard");
4289 shard.commitment = fake_commitment;
4290 peers[idx].sender.send(
4291 Recipients::One(receiver_pk.clone()),
4292 shard.encode(),
4293 true,
4294 );
4295 }
4296
4297 context.sleep(config.link.latency * 2).await;
4298
4299 assert!(
4302 peers[receiver_idx]
4303 .mailbox
4304 .get(fake_commitment)
4305 .await
4306 .is_none(),
4307 "block should not be available after DigestMismatch"
4308 );
4309
4310 assert!(
4312 matches!(block_sub.try_recv(), Err(TryRecvError::Closed)),
4313 "subscription should close for failed reconstruction"
4314 );
4315 assert!(
4316 matches!(digest_sub.try_recv(), Err(TryRecvError::Closed)),
4317 "digest subscription should close after failed reconstruction"
4318 );
4319
4320 let real_commitment1 = coded_block1.commitment();
4323 let round2 = Round::new(Epoch::zero(), View::new(2));
4324 peers[receiver_idx]
4325 .mailbox
4326 .discovered(real_commitment1, leader.clone(), round2);
4327
4328 let leader_shard1 = coded_block1
4329 .shard(receiver_shard_idx)
4330 .expect("missing shard");
4331 peers[0].sender.send(
4332 Recipients::One(receiver_pk.clone()),
4333 leader_shard1.encode(),
4334 true,
4335 );
4336
4337 for &idx in &[1usize, 2, 4] {
4338 let peer_shard_idx = peers[idx].index.get() as u16;
4339 let shard = coded_block1.shard(peer_shard_idx).expect("missing shard");
4340 peers[idx].sender.send(
4341 Recipients::One(receiver_pk.clone()),
4342 shard.encode(),
4343 true,
4344 );
4345 }
4346
4347 context.sleep(config.link.latency * 2).await;
4348
4349 let reconstructed = peers[receiver_idx]
4350 .mailbox
4351 .get(real_commitment1)
4352 .await
4353 .expect("valid block should reconstruct after prior failure");
4354 assert_eq!(reconstructed.commitment(), real_commitment1);
4355 },
4356 );
4357 }
4358
4359 #[test_traced]
4360 fn test_failed_reconstruction_context_mismatch_then_recovery() {
4361 let fixture: Fixture<C> = Fixture {
4365 num_peers: 10,
4366 ..Default::default()
4367 };
4368
4369 fixture.start(
4370 |config, context, _oracle, mut peers, _, coding_config| async move {
4371 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4372 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4373 let real_commitment = coded_block.commitment();
4374
4375 let wrong_context_digest = Sha256::hash(b"wrong_context");
4376 assert_ne!(
4377 real_commitment.context::<Sha256Digest>(),
4378 wrong_context_digest,
4379 "test requires a distinct context digest"
4380 );
4381 let fake_commitment = Commitment::from((
4382 coded_block.digest(),
4383 real_commitment.root::<Sha256Digest>(),
4384 wrong_context_digest,
4385 coding_config,
4386 ));
4387
4388 let receiver_idx = 3usize;
4389 let receiver_pk = peers[receiver_idx].public_key.clone();
4390 let leader = peers[0].public_key.clone();
4391 let round = Round::new(Epoch::zero(), View::new(1));
4392
4393 peers[receiver_idx]
4394 .mailbox
4395 .discovered(fake_commitment, leader.clone(), round);
4396 let mut block_sub = peers[receiver_idx].mailbox.subscribe(fake_commitment);
4397
4398 let receiver_shard_idx = peers[receiver_idx].index.get() as u16;
4399 let mut leader_shard = coded_block
4400 .shard(receiver_shard_idx)
4401 .expect("missing shard");
4402 leader_shard.commitment = fake_commitment;
4403 peers[0].sender.send(
4404 Recipients::One(receiver_pk.clone()),
4405 leader_shard.encode(),
4406 true,
4407 );
4408
4409 for &idx in &[1usize, 2, 4] {
4410 let peer_shard_idx = peers[idx].index.get() as u16;
4411 let mut shard = coded_block.shard(peer_shard_idx).expect("missing shard");
4412 shard.commitment = fake_commitment;
4413 peers[idx].sender.send(
4414 Recipients::One(receiver_pk.clone()),
4415 shard.encode(),
4416 true,
4417 );
4418 }
4419
4420 context.sleep(config.link.latency * 2).await;
4421
4422 assert!(
4423 peers[receiver_idx]
4424 .mailbox
4425 .get(fake_commitment)
4426 .await
4427 .is_none(),
4428 "block should not be available after ContextMismatch"
4429 );
4430 assert!(
4431 matches!(block_sub.try_recv(), Err(TryRecvError::Closed)),
4432 "subscription should close for context-mismatched commitment"
4433 );
4434
4435 let round2 = Round::new(Epoch::zero(), View::new(2));
4437 peers[receiver_idx]
4438 .mailbox
4439 .discovered(real_commitment, leader.clone(), round2);
4440
4441 let real_leader_shard = coded_block
4442 .shard(receiver_shard_idx)
4443 .expect("missing shard");
4444 peers[0].sender.send(
4445 Recipients::One(receiver_pk.clone()),
4446 real_leader_shard.encode(),
4447 true,
4448 );
4449
4450 for &idx in &[1usize, 2, 4] {
4451 let peer_shard_idx = peers[idx].index.get() as u16;
4452 let shard = coded_block.shard(peer_shard_idx).expect("missing shard");
4453 peers[idx].sender.send(
4454 Recipients::One(receiver_pk.clone()),
4455 shard.encode(),
4456 true,
4457 );
4458 }
4459
4460 context.sleep(config.link.latency * 2).await;
4461
4462 let reconstructed = peers[receiver_idx]
4463 .mailbox
4464 .get(real_commitment)
4465 .await
4466 .expect("valid block should reconstruct after prior context mismatch");
4467 assert_eq!(reconstructed.commitment(), real_commitment);
4468 },
4469 );
4470 }
4471
4472 #[test_traced]
4473 fn test_same_round_equivocation_preserves_certifiable_recovery() {
4474 let fixture: Fixture<C> = Fixture {
4480 num_peers: 10,
4481 ..Default::default()
4482 };
4483
4484 fixture.start(
4485 |config, context, _oracle, mut peers, _, coding_config| async move {
4486 let receiver_idx = 3usize;
4487 let receiver_pk = peers[receiver_idx].public_key.clone();
4488 let receiver_shard_idx = peers[receiver_idx].index.get() as u16;
4489
4490 let leader = peers[0].public_key.clone();
4491 let round = Round::new(Epoch::zero(), View::new(7));
4492
4493 let block_a = CodedBlock::<B, C, H>::new(
4495 B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 111),
4496 coding_config,
4497 &STRATEGY,
4498 );
4499 let commitment_a = block_a.commitment();
4500 let block_b = CodedBlock::<B, C, H>::new(
4501 B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 222),
4502 coding_config,
4503 &STRATEGY,
4504 );
4505 let commitment_b = block_b.commitment();
4506
4507 peers[receiver_idx]
4509 .mailbox
4510 .discovered(commitment_a, leader.clone(), round);
4511 peers[receiver_idx]
4512 .mailbox
4513 .discovered(commitment_b, leader.clone(), round);
4514
4515 let certifiable_sub = peers[receiver_idx].mailbox.subscribe(commitment_b);
4517
4518 let shard_b = block_b
4520 .shard(receiver_shard_idx)
4521 .expect("missing shard")
4522 .encode();
4523 peers[0]
4524 .sender
4525 .send(Recipients::One(receiver_pk.clone()), shard_b, true);
4526
4527 let shard_a = block_a
4529 .shard(receiver_shard_idx)
4530 .expect("missing shard")
4531 .encode();
4532 peers[0]
4533 .sender
4534 .send(Recipients::One(receiver_pk.clone()), shard_a, true);
4535 for i in [1usize, 2usize, 4usize] {
4536 let shard_a = block_a
4537 .shard(peers[i].index.get() as u16)
4538 .expect("missing shard")
4539 .encode();
4540 peers[i]
4541 .sender
4542 .send(Recipients::One(receiver_pk.clone()), shard_a, true);
4543 }
4544 context.sleep(config.link.latency * 4).await;
4545 let reconstructed_a = peers[receiver_idx]
4546 .mailbox
4547 .get(commitment_a)
4548 .await
4549 .expect("conflicting commitment should reconstruct first");
4550 assert_eq!(reconstructed_a.commitment(), commitment_a);
4551
4552 for i in [1usize, 2usize, 4usize] {
4554 let shard_b = block_b
4555 .shard(peers[i].index.get() as u16)
4556 .expect("missing shard")
4557 .encode();
4558 peers[i]
4559 .sender
4560 .send(Recipients::One(receiver_pk.clone()), shard_b, true);
4561 }
4562
4563 select! {
4564 result = certifiable_sub => {
4565 let reconstructed_b =
4566 result.expect("certifiable commitment should remain recoverable");
4567 assert_eq!(reconstructed_b.commitment(), commitment_b);
4568 },
4569 _ = context.sleep(Duration::from_secs(5)) => {
4570 panic!("certifiable commitment was not recoverable after same-round equivocation");
4571 },
4572 }
4573 },
4574 );
4575 }
4576
4577 #[test_traced]
4578 fn test_leader_unrelated_shard_blocks_peer() {
4579 let fixture: Fixture<C> = Fixture {
4583 num_peers: 10,
4584 ..Default::default()
4585 };
4586
4587 fixture.start(
4588 |config, context, oracle, mut peers, _, coding_config| async move {
4589 let tracked_block = CodedBlock::<B, C, H>::new(
4591 B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100),
4592 coding_config,
4593 &STRATEGY,
4594 );
4595 let tracked_commitment = tracked_block.commitment();
4596
4597 let unrelated_block = CodedBlock::<B, C, H>::new(
4599 B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200),
4600 coding_config,
4601 &STRATEGY,
4602 );
4603
4604 let receiver_idx = 3usize;
4605 let receiver_pk = peers[receiver_idx].public_key.clone();
4606 let leader_idx = 0usize;
4607 let leader_pk = peers[leader_idx].public_key.clone();
4608
4609 peers[receiver_idx].mailbox.discovered(
4611 tracked_commitment,
4612 leader_pk.clone(),
4613 Round::new(Epoch::zero(), View::new(1)),
4614 );
4615
4616 let mut unrelated_shard = unrelated_block
4619 .shard(peers[1].index.get() as u16)
4620 .expect("missing shard");
4621 unrelated_shard.commitment = tracked_commitment;
4622
4623 peers[leader_idx].sender.send(
4627 Recipients::One(receiver_pk),
4628 unrelated_shard.encode(),
4629 true,
4630 );
4631 context.sleep(config.link.latency * 2).await;
4632
4633 assert_blocked(&oracle, &peers[receiver_idx].public_key, &leader_pk).await;
4634 },
4635 );
4636 }
4637
4638 #[test_traced]
4639 fn test_withholding_leader_victim_reconstructs_via_gossip() {
4640 let fixture = Fixture {
4645 num_peers: 10,
4646 ..Default::default()
4647 };
4648
4649 fixture.start(
4650 |config, context, oracle, mut peers, _, coding_config| async move {
4651 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4652 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4653 let commitment = coded_block.commitment();
4654 let round = Round::new(Epoch::zero(), View::new(1));
4655
4656 let leader = peers[0].public_key.clone();
4657 let victim = peers[1].public_key.clone();
4658
4659 oracle
4662 .remove_link(leader.clone(), victim.clone())
4663 .await
4664 .expect("remove_link should succeed");
4665
4666 peers[0].mailbox.proposed(round, coded_block.clone());
4669
4670 for peer in peers[1..].iter_mut() {
4673 peer.mailbox.discovered(commitment, leader.clone(), round);
4674 }
4675 context.sleep(config.link.latency * 2).await;
4676
4677 let block_sub = peers[1].mailbox.subscribe(commitment);
4680 select! {
4681 result = block_sub => {
4682 let reconstructed = result.expect("block subscription should resolve");
4683 assert_eq!(reconstructed.commitment(), commitment);
4684 assert_eq!(reconstructed.height(), coded_block.height());
4685 },
4686 _ = context.sleep(Duration::from_secs(5)) => {
4687 panic!("victim did not reconstruct block despite withholding leader");
4688 },
4689 }
4690
4691 for peer in peers[2..].iter_mut() {
4693 let reconstructed = peer
4694 .mailbox
4695 .get(commitment)
4696 .await
4697 .expect("block should be reconstructed");
4698 assert_eq!(reconstructed.commitment(), commitment);
4699 }
4700
4701 let blocked = oracle.blocked().await.unwrap();
4703 assert!(
4704 blocked.is_empty(),
4705 "no peer should be blocked in withholding leader test"
4706 );
4707 },
4708 );
4709 }
4710
4711 #[test_traced]
4717 fn test_shard_subscription_pending_after_reconstruction_without_leader_shard() {
4718 let fixture = Fixture {
4719 num_peers: 10,
4720 ..Default::default()
4721 };
4722
4723 fixture.start(
4724 |config, context, oracle, mut peers, _, coding_config| async move {
4725 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4726 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4727 let commitment = coded_block.commitment();
4728 let round = Round::new(Epoch::zero(), View::new(1));
4729
4730 let leader = peers[0].public_key.clone();
4731 let victim = peers[1].public_key.clone();
4732
4733 oracle
4736 .remove_link(leader.clone(), victim.clone())
4737 .await
4738 .expect("remove_link should succeed");
4739
4740 let mut shard_sub = peers[1]
4742 .mailbox
4743 .subscribe_assigned_shard_verified(commitment);
4744 let block_sub = peers[1].mailbox.subscribe(commitment);
4745
4746 peers[0].mailbox.proposed(round, coded_block.clone());
4748
4749 for peer in peers[1..].iter_mut() {
4751 peer.mailbox.discovered(commitment, leader.clone(), round);
4752 }
4753
4754 context.sleep(config.link.latency * 4).await;
4756
4757 let reconstructed = block_sub.await.expect("block subscription should resolve");
4760 assert_eq!(reconstructed.commitment(), commitment);
4761
4762 assert!(
4765 matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
4766 "shard subscription must not resolve without own shard verification"
4767 );
4768 },
4769 );
4770 }
4771
4772 #[test_traced]
4773 fn test_broadcast_routes_participant_and_non_participant_shards() {
4774 let fixture = Fixture {
4775 num_non_participants: 1,
4776 ..Default::default()
4777 };
4778
4779 fixture.start(
4780 |config, context, oracle, mut peers, non_participants, coding_config| async move {
4781 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4782 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4783 let commitment = coded_block.commitment();
4784
4785 let leader = peers[0].public_key.clone();
4786 let round = Round::new(Epoch::zero(), View::new(1));
4787 peers[0].mailbox.proposed(round, coded_block.clone());
4788
4789 for peer in peers[1..].iter_mut() {
4790 peer.mailbox.discovered(commitment, leader.clone(), round);
4791 }
4792 for np in non_participants.iter() {
4793 np.mailbox.discovered(commitment, leader.clone(), round);
4794 }
4795 context.sleep(config.link.latency * 2).await;
4796
4797 for peer in peers.iter_mut() {
4799 peer.mailbox
4800 .subscribe_assigned_shard_verified(commitment)
4801 .await
4802 .expect("participant shard subscription should complete");
4803 }
4804
4805 for np in non_participants.iter() {
4807 np.mailbox
4808 .subscribe_assigned_shard_verified(commitment)
4809 .await
4810 .expect("non-participant shard subscription should complete");
4811 }
4812 context.sleep(config.link.latency).await;
4813
4814 for np in non_participants.iter() {
4816 let reconstructed = np
4817 .mailbox
4818 .get(commitment)
4819 .await
4820 .expect("non-participant should reconstruct block");
4821 assert_eq!(reconstructed.commitment(), commitment);
4822 }
4823
4824 let blocked = oracle.blocked().await.unwrap();
4825 assert!(
4826 blocked.is_empty(),
4827 "no peer should be blocked in participant/non-participant shard routing test"
4828 );
4829 },
4830 );
4831 }
4832
4833 #[test_traced]
4834 fn test_non_participant_reconstructs_after_discovered() {
4835 let fixture = Fixture {
4836 num_non_participants: 1,
4837 ..Default::default()
4838 };
4839
4840 fixture.start(
4841 |config, context, oracle, mut peers, non_participants, coding_config| async move {
4842 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4843 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4844 let commitment = coded_block.commitment();
4845 let round = Round::new(Epoch::zero(), View::new(1));
4846
4847 let leader = peers[0].public_key.clone();
4848 peers[0].mailbox.proposed(round, coded_block.clone());
4849
4850 for peer in peers[1..].iter_mut() {
4853 peer.mailbox.discovered(commitment, leader.clone(), round);
4854 }
4855 context.sleep(config.link.latency).await;
4856
4857 let np = &non_participants[0];
4860 let block_sub = np.mailbox.subscribe(commitment);
4861 np.mailbox.discovered(commitment, leader.clone(), round);
4862
4863 select! {
4866 result = block_sub => {
4867 let reconstructed = result.expect("block subscription should resolve");
4868 assert_eq!(reconstructed.commitment(), commitment);
4869 assert_eq!(reconstructed.height(), coded_block.height());
4870 },
4871 _ = context.sleep(Duration::from_secs(5)) => {
4872 panic!("non-participant block subscription did not resolve");
4873 },
4874 }
4875
4876 let blocked = oracle.blocked().await.unwrap();
4877 assert!(
4878 blocked.is_empty(),
4879 "no peer should be blocked in non-participant reconstruction test"
4880 );
4881 },
4882 );
4883 }
4884
4885 #[test_traced]
4886 fn test_peer_set_update_evicts_peer_buffers() {
4887 let executor = deterministic::Runner::default();
4892 executor.start(|context| async move {
4893 let num_peers = 10usize;
4894 let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
4895 context.child("network"),
4896 simulated::Config {
4897 max_size: MAX_SHARD_SIZE as u32,
4898 disconnect_on_block: true,
4899 tracked_peer_sets: NZUsize!(2),
4900 },
4901 );
4902 network.start();
4903
4904 let mut private_keys = (0..num_peers)
4905 .map(|i| PrivateKey::from_seed(i as u64))
4906 .collect::<Vec<_>>();
4907 private_keys.sort_by_key(|s| s.public_key());
4908 let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
4909 let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
4910
4911 let receiver_idx = 3usize;
4913 let receiver_pk = peer_keys[receiver_idx].clone();
4914 let leader_pk = peer_keys[0].clone();
4915
4916 let receiver_control = oracle.control(receiver_pk.clone());
4917 let (sender_handle, receiver_handle) = receiver_control
4918 .register(0, TEST_QUOTA)
4919 .await
4920 .expect("registration should succeed");
4921
4922 let leader_control = oracle.control(leader_pk.clone());
4924 let (mut leader_sender, _leader_receiver) = leader_control
4925 .register(0, TEST_QUOTA)
4926 .await
4927 .expect("registration should succeed");
4928 oracle
4929 .add_link(leader_pk.clone(), receiver_pk.clone(), DEFAULT_LINK)
4930 .await
4931 .expect("link should be added");
4932
4933 oracle.manager().track(0, participants.clone());
4935 context.sleep(Duration::from_millis(10)).await;
4936
4937 let scheme = Scheme::signer(
4938 SCHEME_NAMESPACE,
4939 participants.clone(),
4940 private_keys[receiver_idx].clone(),
4941 )
4942 .expect("signer scheme should be created");
4943
4944 let config: Config<_, _, _, _, C, _, _, _> = Config {
4945 scheme_provider: MultiEpochProvider::single(scheme),
4946 blocker: receiver_control.clone(),
4947 shard_codec_cfg: CodecConfig {
4948 maximum_shard_size: MAX_SHARD_SIZE,
4949 },
4950 block_codec_cfg: (),
4951 strategy: STRATEGY,
4952 mailbox_size: NZUsize!(1024),
4953 peer_buffer_size: NZUsize!(64),
4954 background_channel_capacity: NZUsize!(1024),
4955 peer_provider: oracle.manager(),
4956 };
4957
4958 let (engine, mailbox) = ShardEngine::new(context.child("receiver"), config);
4959 engine.start((sender_handle, receiver_handle));
4960
4961 let coding_config = coding_config_for_participants(num_peers as u16);
4963 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4964 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4965 let commitment = coded_block.commitment();
4966
4967 let receiver_participant = participants
4968 .index(&receiver_pk)
4969 .expect("receiver must be a participant");
4970 let leader_shard = coded_block
4971 .shard(receiver_participant.get() as u16)
4972 .expect("missing shard");
4973 let shard_bytes = leader_shard.encode();
4974
4975 leader_sender.send(
4977 Recipients::One(receiver_pk.clone()),
4978 shard_bytes.clone(),
4979 true,
4980 );
4981 context.sleep(DEFAULT_LINK.latency * 2).await;
4982
4983 let remaining: Set<P> =
4985 Set::from_iter_dedup(peer_keys.iter().filter(|pk| **pk != leader_pk).cloned());
4986 oracle.manager().track(1, remaining);
4987 context.sleep(Duration::from_millis(10)).await;
4988
4989 leader_sender.send(Recipients::One(receiver_pk.clone()), shard_bytes, true);
4992 context.sleep(DEFAULT_LINK.latency * 2).await;
4993
4994 let mut shard_sub = mailbox.subscribe_assigned_shard_verified(commitment);
4997 mailbox.discovered(
4998 commitment,
4999 leader_pk.clone(),
5000 Round::new(Epoch::zero(), View::new(1)),
5001 );
5002 context.sleep(DEFAULT_LINK.latency * 2).await;
5003
5004 assert!(
5006 matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
5007 "shard subscription should not resolve after evicted leader's buffer"
5008 );
5009 assert!(
5010 mailbox.get(commitment).await.is_none(),
5011 "block should not reconstruct from evicted buffers"
5012 );
5013 });
5014 }
5015
5016 #[test_traced]
5017 fn test_peer_buffer_lifetime_tracks_latest_primary() {
5018 let executor = deterministic::Runner::default();
5019 executor.start(|context| async move {
5020 let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
5021 context.child("network"),
5022 simulated::Config {
5023 max_size: MAX_SHARD_SIZE as u32,
5024 disconnect_on_block: true,
5025 tracked_peer_sets: NZUsize!(1),
5026 },
5027 );
5028 network.start();
5029
5030 let mut private_keys = (0..4)
5031 .map(|i| PrivateKey::from_seed(i as u64))
5032 .collect::<Vec<_>>();
5033 private_keys.sort_by_key(|s| s.public_key());
5034 let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
5035 let receiver_pk = peer_keys[0].clone();
5036 let sender_pk = peer_keys[1].clone();
5037 let participants: Set<P> = Set::from_iter_dedup(peer_keys);
5038
5039 let receiver_control = oracle.control(receiver_pk);
5040 let scheme = Scheme::signer(
5041 SCHEME_NAMESPACE,
5042 participants.clone(),
5043 private_keys[0].clone(),
5044 )
5045 .expect("signer scheme should be created");
5046
5047 let config: Config<_, _, _, _, C, _, _, _> = Config {
5048 scheme_provider: MultiEpochProvider::single(scheme),
5049 blocker: receiver_control,
5050 shard_codec_cfg: CodecConfig {
5051 maximum_shard_size: MAX_SHARD_SIZE,
5052 },
5053 block_codec_cfg: (),
5054 strategy: STRATEGY,
5055 mailbox_size: NZUsize!(16),
5056 peer_buffer_size: NZUsize!(4),
5057 background_channel_capacity: NZUsize!(16),
5058 peer_provider: oracle.manager(),
5059 };
5060
5061 let (mut engine, _mailbox) = ShardEngine::new(context.child("engine"), config);
5062
5063 engine.update_latest_primary_peers(Set::from_iter_dedup([sender_pk.clone()]));
5066
5067 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
5068 let coded_block = CodedBlock::<B, C, H>::new(
5069 inner,
5070 coding_config_for_participants(participants.len() as u16),
5071 &STRATEGY,
5072 );
5073 let shard = coded_block.shard(0).expect("missing shard");
5074
5075 engine.buffer_peer_shard(sender_pk.clone(), shard);
5077 assert_eq!(
5078 engine.peer_buffers.get(&sender_pk).map(VecDeque::len),
5079 Some(1),
5080 "peer buffer should contain the buffered shard"
5081 );
5082
5083 engine.update_latest_primary_peers(Set::default());
5086 assert!(
5087 !engine.peer_buffers.contains_key(&sender_pk),
5088 "peer buffer should be evicted once sender leaves latest.primary"
5089 );
5090 });
5091 }
5092
5093 #[test_traced]
5094 fn test_old_epoch_buffered_shards_are_dropped_after_cutover() {
5095 let executor = deterministic::Runner::default();
5096 executor.start(|context| async move {
5097 let num_peers = 6usize;
5098 let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
5099 context.child("network"),
5100 simulated::Config {
5101 max_size: MAX_SHARD_SIZE as u32,
5102 disconnect_on_block: true,
5103 tracked_peer_sets: NZUsize!(2),
5104 },
5105 );
5106 network.start();
5107
5108 let mut private_keys = (0..num_peers)
5109 .map(|i| PrivateKey::from_seed(i as u64))
5110 .collect::<Vec<_>>();
5111 private_keys.sort_by_key(|s| s.public_key());
5112 let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
5113
5114 let epoch0_set: Set<P> = Set::from_iter_dedup(peer_keys[..5].iter().cloned());
5117 let epoch1_set: Set<P> = Set::from_iter_dedup([
5118 peer_keys[1].clone(),
5119 peer_keys[2].clone(),
5120 peer_keys[3].clone(),
5121 peer_keys[4].clone(),
5122 peer_keys[5].clone(),
5123 ]);
5124
5125 let receiver_idx = 3usize;
5126 let receiver_pk = peer_keys[receiver_idx].clone();
5127 let receiver_key = private_keys[receiver_idx].clone();
5128 let leader_pk = peer_keys[0].clone();
5129
5130 let receiver_control = oracle.control(receiver_pk.clone());
5131 let (sender_handle, receiver_handle) = receiver_control
5132 .register(0, TEST_QUOTA)
5133 .await
5134 .expect("registration should succeed");
5135
5136 let leader_control = oracle.control(leader_pk.clone());
5137 let (mut leader_sender, _leader_receiver) = leader_control
5138 .register(0, TEST_QUOTA)
5139 .await
5140 .expect("registration should succeed");
5141 oracle
5142 .add_link(leader_pk.clone(), receiver_pk.clone(), DEFAULT_LINK)
5143 .await
5144 .expect("link should be added");
5145
5146 oracle.manager().track(0, epoch0_set.clone());
5148 context.sleep(Duration::from_millis(10)).await;
5149
5150 let scheme_epoch0 =
5151 Scheme::signer(SCHEME_NAMESPACE, epoch0_set.clone(), receiver_key.clone())
5152 .expect("epoch 0 signer scheme should be created");
5153 let scheme_epoch1 =
5154 Scheme::signer(SCHEME_NAMESPACE, epoch1_set.clone(), receiver_key.clone())
5155 .expect("epoch 1 signer scheme should be created");
5156
5157 let config: Config<_, _, _, _, C, _, _, _> = Config {
5158 scheme_provider: MultiEpochProvider::single(scheme_epoch0)
5159 .with_epoch(Epoch::new(1), scheme_epoch1),
5160 blocker: receiver_control.clone(),
5161 shard_codec_cfg: CodecConfig {
5162 maximum_shard_size: MAX_SHARD_SIZE,
5163 },
5164 block_codec_cfg: (),
5165 strategy: STRATEGY,
5166 mailbox_size: NZUsize!(1024),
5167 peer_buffer_size: NZUsize!(64),
5168 background_channel_capacity: NZUsize!(1024),
5169 peer_provider: oracle.manager(),
5170 };
5171
5172 let (engine, mailbox) = ShardEngine::new(context.child("receiver"), config);
5174 engine.start((sender_handle, receiver_handle));
5175
5176 let coding_config = coding_config_for_participants(epoch0_set.len() as u16);
5177 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
5178 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
5179 let commitment = coded_block.commitment();
5180
5181 let receiver_participant = epoch0_set
5182 .index(&receiver_pk)
5183 .expect("receiver must be an epoch 0 participant");
5184 let leader_shard = coded_block
5185 .shard(receiver_participant.get() as u16)
5186 .expect("missing shard");
5187
5188 leader_sender.send(
5190 Recipients::One(receiver_pk.clone()),
5191 leader_shard.encode(),
5192 true,
5193 );
5194 context.sleep(DEFAULT_LINK.latency * 2).await;
5195
5196 oracle.manager().track(1, epoch1_set);
5200 context.sleep(Duration::from_millis(10)).await;
5201
5202 let mut shard_sub = mailbox.subscribe_assigned_shard_verified(commitment);
5205 mailbox.discovered(
5206 commitment,
5207 leader_pk,
5208 Round::new(Epoch::zero(), View::new(1)),
5209 );
5210 context.sleep(DEFAULT_LINK.latency * 2).await;
5211
5212 assert!(
5213 matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
5214 "old-epoch shard subscription should stay pending after cutover"
5215 );
5216 assert!(
5217 mailbox.get(commitment).await.is_none(),
5218 "old-epoch commitment should not reconstruct from overlap-only buffered shards"
5219 );
5220 });
5221 }
5222
5223 #[test_traced]
5232 fn test_evicted_node_still_reconstructs_from_buffered_peer_shards() {
5233 let executor = deterministic::Runner::default();
5234 executor.start(|context| async move {
5235 let num_peers = 10usize;
5236 let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
5237 context.child("network"),
5238 simulated::Config {
5239 max_size: MAX_SHARD_SIZE as u32,
5240 disconnect_on_block: true,
5241 tracked_peer_sets: NZUsize!(2),
5242 },
5243 );
5244 network.start();
5245
5246 let mut private_keys = (0..num_peers)
5247 .map(|i| PrivateKey::from_seed(i as u64))
5248 .collect::<Vec<_>>();
5249 private_keys.sort_by_key(|s| s.public_key());
5250 let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
5251 let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
5252
5253 let receiver_idx = 1usize;
5257 let receiver_pk = peer_keys[receiver_idx].clone();
5258 let leader_pk = peer_keys[0].clone();
5259 let peer2_pk = peer_keys[2].clone();
5260 let peer4_pk = peer_keys[4].clone();
5261 let peer5_pk = peer_keys[5].clone();
5262 let peer6_pk = peer_keys[6].clone();
5263
5264 let receiver_control = oracle.control(receiver_pk.clone());
5265 let (evicted_sender, evicted_receiver) = receiver_control
5266 .register(0, TEST_QUOTA)
5267 .await
5268 .expect("registration should succeed");
5269
5270 let peer2_control = oracle.control(peer2_pk.clone());
5271 let (mut peer2_sender, _peer2_receiver) = peer2_control
5272 .register(0, TEST_QUOTA)
5273 .await
5274 .expect("registration should succeed");
5275
5276 let peer4_control = oracle.control(peer4_pk.clone());
5277 let (mut peer4_sender, _peer4_receiver) = peer4_control
5278 .register(0, TEST_QUOTA)
5279 .await
5280 .expect("registration should succeed");
5281
5282 let peer5_control = oracle.control(peer5_pk.clone());
5283 let (mut peer5_sender, _peer5_receiver) = peer5_control
5284 .register(0, TEST_QUOTA)
5285 .await
5286 .expect("registration should succeed");
5287
5288 let peer6_control = oracle.control(peer6_pk.clone());
5289 let (mut peer6_sender, _peer6_receiver) = peer6_control
5290 .register(0, TEST_QUOTA)
5291 .await
5292 .expect("registration should succeed");
5293
5294 for sender in [&peer2_pk, &peer4_pk, &peer5_pk, &peer6_pk] {
5296 oracle
5297 .add_link(sender.clone(), receiver_pk.clone(), DEFAULT_LINK)
5298 .await
5299 .expect("link should be added");
5300 }
5301
5302 oracle.manager().track(0, participants.clone());
5304 context.sleep(Duration::from_millis(10)).await;
5305
5306 let scheme = Scheme::signer(
5307 SCHEME_NAMESPACE,
5308 participants.clone(),
5309 private_keys[receiver_idx].clone(),
5310 )
5311 .expect("signer scheme should be created");
5312
5313 let config: Config<_, _, _, _, C, _, _, _> = Config {
5314 scheme_provider: MultiEpochProvider::single(scheme),
5315 blocker: receiver_control.clone(),
5316 shard_codec_cfg: CodecConfig {
5317 maximum_shard_size: MAX_SHARD_SIZE,
5318 },
5319 block_codec_cfg: (),
5320 strategy: STRATEGY,
5321 mailbox_size: NZUsize!(1024),
5322 peer_buffer_size: NZUsize!(64),
5323 background_channel_capacity: NZUsize!(1024),
5324 peer_provider: oracle.manager(),
5325 };
5326
5327 let (engine, mailbox) = ShardEngine::new(context.child("evicted"), config);
5328 engine.start((evicted_sender, evicted_receiver));
5329
5330 let coding_config = coding_config_for_participants(num_peers as u16);
5331 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
5332 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
5333 let commitment = coded_block.commitment();
5334
5335 let peer2_shard = coded_block.shard(2).expect("missing shard 2").encode();
5336 let peer4_shard = coded_block.shard(4).expect("missing shard 4").encode();
5337 let peer5_shard = coded_block.shard(5).expect("missing shard 5").encode();
5338 let peer6_shard = coded_block.shard(6).expect("missing shard 6").encode();
5339
5340 let block_sub = mailbox.subscribe(commitment);
5341
5342 peer2_sender
5345 .send(
5346 Recipients::One(receiver_pk.clone()),
5347 peer2_shard,
5348 true,
5349 );
5350 peer4_sender
5351 .send(
5352 Recipients::One(receiver_pk.clone()),
5353 peer4_shard,
5354 true,
5355 );
5356 peer5_sender
5357 .send(
5358 Recipients::One(receiver_pk.clone()),
5359 peer5_shard,
5360 true,
5361 );
5362 peer6_sender
5363 .send(
5364 Recipients::One(receiver_pk.clone()),
5365 peer6_shard,
5366 true,
5367 );
5368 context.sleep(DEFAULT_LINK.latency * 2).await;
5369
5370 let latest_primary: Set<P> = Set::from_iter_dedup(
5373 peer_keys
5374 .iter()
5375 .filter(|pk| **pk != receiver_pk)
5376 .cloned(),
5377 );
5378 oracle.manager().track(1, latest_primary);
5379 context.sleep(Duration::from_millis(10)).await;
5380
5381 mailbox
5384 .discovered(
5385 commitment,
5386 leader_pk.clone(),
5387 Round::new(Epoch::zero(), View::new(1)),
5388 );
5389
5390 select! {
5391 _ = block_sub => {},
5392 _ = context.sleep(Duration::from_secs(5)) => {
5393 panic!("block subscription did not resolve after leader discovery");
5394 },
5395 }
5396
5397 context.sleep(DEFAULT_LINK.latency * 2).await;
5398 let block = mailbox.get(commitment).await;
5399 assert!(
5400 block.is_some(),
5401 "evicted node should reconstruct from buffered shards sent by remaining latest.primary peers"
5402 );
5403 assert_eq!(block.unwrap().commitment(), commitment);
5404
5405 assert!(
5406 oracle.blocked().await.unwrap().is_empty(),
5407 "no peer should be blocked when overlapping shards are valid"
5408 );
5409 });
5410 }
5411
5412 #[test_traced]
5417 fn test_late_leader_shard_accepted_after_quorum_transition() {
5418 let fixture = Fixture {
5419 num_peers: 10,
5420 ..Default::default()
5421 };
5422
5423 fixture.start(
5424 |config, context, oracle, mut peers, _, coding_config| async move {
5425 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
5426 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
5427 let commitment = coded_block.commitment();
5428 let round = Round::new(Epoch::zero(), View::new(1));
5429
5430 let leader_idx = 0usize;
5431 let victim_idx = 1usize;
5432 let leader = peers[leader_idx].public_key.clone();
5433 let victim = peers[victim_idx].public_key.clone();
5434
5435 oracle
5438 .remove_link(leader.clone(), victim.clone())
5439 .await
5440 .expect("remove_link should succeed");
5441
5442 peers[leader_idx]
5445 .mailbox
5446 .proposed(round, coded_block.clone());
5447
5448 for peer in peers[1..].iter_mut() {
5450 peer.mailbox.discovered(commitment, leader.clone(), round);
5451 }
5452
5453 context.sleep(config.link.latency * 4).await;
5457
5458 let block_sub = peers[victim_idx].mailbox.subscribe(commitment);
5459 select! {
5460 result = block_sub => {
5461 let reconstructed = result.expect("block subscription should resolve");
5462 assert_eq!(reconstructed.commitment(), commitment);
5463 },
5464 _ = context.sleep(Duration::from_secs(5)) => {
5465 panic!("victim did not reconstruct block from gossip");
5466 },
5467 }
5468
5469 let mut shard_sub = peers[victim_idx]
5472 .mailbox
5473 .subscribe_assigned_shard_verified(commitment);
5474 assert!(
5475 matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
5476 "shard subscription must not resolve before own shard is verified"
5477 );
5478
5479 oracle
5481 .add_link(leader.clone(), victim.clone(), DEFAULT_LINK)
5482 .await
5483 .expect("add_link should succeed");
5484
5485 let leader_shard = coded_block
5489 .shard(peers[victim_idx].index.get() as u16)
5490 .expect("missing victim shard");
5491 peers[leader_idx].sender.send(
5492 Recipients::One(victim.clone()),
5493 leader_shard.encode(),
5494 true,
5495 );
5496 context.sleep(config.link.latency * 2).await;
5497
5498 select! {
5501 _ = shard_sub => {},
5502 _ = context.sleep(Duration::from_secs(5)) => {
5503 panic!("shard subscription did not resolve after late leader shard");
5504 },
5505 }
5506
5507 let blocked = oracle.blocked().await.unwrap();
5509 assert!(
5510 blocked.is_empty(),
5511 "no peer should be blocked in late leader shard test"
5512 );
5513
5514 let extra_sender_idx = 2usize;
5517 let extra_shard = coded_block
5518 .shard(peers[extra_sender_idx].index.get() as u16)
5519 .expect("missing shard");
5520 peers[extra_sender_idx].sender.send(
5521 Recipients::One(victim.clone()),
5522 extra_shard.encode(),
5523 true,
5524 );
5525 context.sleep(config.link.latency * 2).await;
5526
5527 let blocked = oracle.blocked().await.unwrap();
5529 assert!(
5530 blocked.is_empty(),
5531 "gossip shard after full reconstruction should be silently ignored"
5532 );
5533 },
5534 );
5535 }
5536}