1use super::{
140 mailbox::{Mailbox, Message},
141 metrics::{Peer, ShardMetrics},
142};
143use crate::{
144 marshal::coding::{
145 types::{CodedBlock, Shard},
146 validation::{validate_reconstruction, ReconstructionError as InvariantError},
147 },
148 types::{coding::Commitment, Epoch, Round},
149 Block, CertifiableBlock, Heightable,
150};
151use commonware_codec::{Decode, Error as CodecError, Read};
152use commonware_coding::{Config as CodingConfig, Scheme as CodingScheme};
153use commonware_cryptography::{
154 certificate::{Provider, Scheme as CertificateScheme},
155 Committable, Digestible, Hasher, PublicKey,
156};
157use commonware_macros::select_loop;
158use commonware_p2p::{
159 utils::codec::{WrappedBackgroundReceiver, WrappedSender},
160 Blocker, Provider as PeerProvider, Receiver, Recipients, Sender,
161};
162use commonware_parallel::Strategy;
163use commonware_runtime::{
164 spawn_cell,
165 telemetry::metrics::{histogram::HistogramExt, status::GaugeExt},
166 BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner,
167};
168use commonware_utils::{
169 bitmap::BitMap,
170 channel::{fallible::OneshotExt, mpsc, oneshot},
171 ordered::{Quorum, Set},
172};
173use rand::Rng;
174use std::{
175 collections::{BTreeMap, VecDeque},
176 num::NonZeroUsize,
177 sync::Arc,
178};
179use thiserror::Error;
180use tracing::{debug, warn};
181
182#[derive(Debug, Error)]
184pub enum Error<C: CodingScheme> {
185 #[error(transparent)]
187 Coding(C::Error),
188
189 #[error(transparent)]
191 Codec(#[from] CodecError),
192
193 #[error("block digest mismatch: reconstructed block does not match commitment digest")]
195 DigestMismatch,
196
197 #[error("block config mismatch: reconstructed config does not match commitment config")]
199 ConfigMismatch,
200
201 #[error("block context mismatch: reconstructed context does not match commitment context")]
203 ContextMismatch,
204}
205
206#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
207enum BlockSubscriptionKey<D> {
208 Commitment(Commitment),
209 Digest(D),
210}
211
212pub struct Config<P, S, X, D, C, H, B, T>
214where
215 P: PublicKey,
216 S: Provider<Scope = Epoch>,
217 X: Blocker<PublicKey = P>,
218 D: PeerProvider<PublicKey = P>,
219 C: CodingScheme,
220 H: Hasher,
221 B: CertifiableBlock,
222 T: Strategy,
223{
224 pub scheme_provider: S,
226
227 pub blocker: X,
229
230 pub shard_codec_cfg: <Shard<C, H> as Read>::Cfg,
232
233 pub block_codec_cfg: B::Cfg,
235
236 pub strategy: T,
238
239 pub mailbox_size: usize,
241
242 pub peer_buffer_size: NonZeroUsize,
252
253 pub background_channel_capacity: usize,
259
260 pub peer_provider: D,
266}
267
268pub struct Engine<E, S, X, D, C, H, B, P, T>
273where
274 E: BufferPooler + Rng + Spawner + Metrics + Clock,
275 S: Provider<Scope = Epoch>,
276 S::Scheme: CertificateScheme<PublicKey = P>,
277 X: Blocker,
278 D: PeerProvider<PublicKey = P>,
279 C: CodingScheme,
280 H: Hasher,
281 B: CertifiableBlock,
282 P: PublicKey,
283 T: Strategy,
284{
285 context: ContextCell<E>,
287
288 mailbox: mpsc::Receiver<Message<B, C, H, P>>,
290
291 scheme_provider: S,
293
294 blocker: X,
296
297 shard_codec_cfg: <Shard<C, H> as Read>::Cfg,
299
300 block_codec_cfg: B::Cfg,
302
303 strategy: T,
305
306 state: BTreeMap<Commitment, ReconstructionState<P, C, H>>,
308
309 peer_buffers: BTreeMap<P, VecDeque<Shard<C, H>>>,
314
315 peer_buffer_size: NonZeroUsize,
317
318 peer_provider: D,
320
321 aggregate_peers: Set<P>,
324
325 latest_primary_peers: Set<P>,
327
328 background_channel_capacity: usize,
330
331 reconstructed_blocks: BTreeMap<Commitment, Arc<CodedBlock<B, C, H>>>,
336
337 assigned_shard_verified_subscriptions: BTreeMap<Commitment, Vec<oneshot::Sender<()>>>,
348
349 #[allow(clippy::type_complexity)]
352 block_subscriptions:
353 BTreeMap<BlockSubscriptionKey<B::Digest>, Vec<oneshot::Sender<Arc<CodedBlock<B, C, H>>>>>,
354
355 metrics: ShardMetrics,
357}
358
359impl<E, S, X, D, C, H, B, P, T> Engine<E, S, X, D, C, H, B, P, T>
360where
361 E: BufferPooler + Rng + Spawner + Metrics + Clock,
362 S: Provider<Scope = Epoch>,
363 S::Scheme: CertificateScheme<PublicKey = P>,
364 X: Blocker<PublicKey = P>,
365 D: PeerProvider<PublicKey = P>,
366 C: CodingScheme,
367 H: Hasher,
368 B: CertifiableBlock,
369 P: PublicKey,
370 T: Strategy,
371{
372 pub fn new(context: E, config: Config<P, S, X, D, C, H, B, T>) -> (Self, Mailbox<B, C, H, P>) {
374 let metrics = ShardMetrics::new(&context);
375 let (sender, mailbox) = mpsc::channel(config.mailbox_size);
376 (
377 Self {
378 context: ContextCell::new(context),
379 mailbox,
380 scheme_provider: config.scheme_provider,
381 blocker: config.blocker,
382 shard_codec_cfg: config.shard_codec_cfg,
383 block_codec_cfg: config.block_codec_cfg,
384 strategy: config.strategy,
385 state: BTreeMap::new(),
386 peer_buffers: BTreeMap::new(),
387 peer_buffer_size: config.peer_buffer_size,
388 peer_provider: config.peer_provider,
389 aggregate_peers: Set::default(),
390 latest_primary_peers: Set::default(),
391 background_channel_capacity: config.background_channel_capacity,
392 reconstructed_blocks: BTreeMap::new(),
393 assigned_shard_verified_subscriptions: BTreeMap::new(),
394 block_subscriptions: BTreeMap::new(),
395 metrics,
396 },
397 Mailbox::new(sender),
398 )
399 }
400
401 pub fn start(
403 mut self,
404 network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
405 ) -> Handle<()> {
406 spawn_cell!(self.context, self.run(network).await)
407 }
408
409 async fn run(
411 mut self,
412 (sender, receiver): (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
413 ) {
414 let mut sender = WrappedSender::<_, Shard<C, H>>::new(
415 self.context.network_buffer_pool().clone(),
416 sender,
417 );
418 let (receiver_service, mut receiver): (_, mpsc::Receiver<(P, Shard<C, H>)>) =
419 WrappedBackgroundReceiver::new(
420 self.context.with_label("shard_ingress"),
421 receiver,
422 self.shard_codec_cfg.clone(),
423 self.blocker.clone(),
424 self.background_channel_capacity,
425 &self.strategy,
426 );
427 let _receiver_handle = receiver_service.start();
429 let mut peer_set_subscription = self.peer_provider.subscribe().await;
430
431 select_loop! {
432 self.context,
433 on_start => {
434 let _ = self
435 .metrics
436 .reconstruction_states_count
437 .try_set(self.state.len());
438 let _ = self
439 .metrics
440 .reconstructed_blocks_cache_count
441 .try_set(self.reconstructed_blocks.len());
442
443 self.block_subscriptions.retain(|_, subscribers| {
445 subscribers.retain(|tx| !tx.is_closed());
446 !subscribers.is_empty()
447 });
448 self.assigned_shard_verified_subscriptions
449 .retain(|_, subscribers| {
450 subscribers.retain(|tx| !tx.is_closed());
451 !subscribers.is_empty()
452 });
453 },
454 on_stopped => {
455 debug!("received shutdown signal, stopping shard engine");
456 },
457 Some(update) = peer_set_subscription.recv() else {
458 debug!("peer set subscription closed");
459 return;
460 } => {
461 let all_peers = update.all.union();
462 self.update_latest_primary_peers(update.latest.primary);
463 self.aggregate_peers = all_peers;
464 },
465 Some(message) = self.mailbox.recv() else {
466 debug!("shard mailbox closed, stopping shard engine");
467 return;
468 } => match message {
469 Message::Proposed { block, round } => {
470 self.broadcast_shards(&mut sender, round, block).await;
471 }
472 Message::Discovered {
473 commitment,
474 leader,
475 round,
476 } => {
477 self.handle_external_proposal(&mut sender, commitment, leader, round)
478 .await;
479 }
480 Message::GetByCommitment {
481 commitment,
482 response,
483 } => {
484 let block = self.reconstructed_blocks.get(&commitment).cloned();
485 response.send_lossy(block);
486 }
487 Message::GetByDigest { digest, response } => {
488 let block = self
489 .reconstructed_blocks
490 .iter()
491 .find_map(|(_, b)| (b.digest() == digest).then_some(b))
492 .cloned();
493 response.send_lossy(block);
494 }
495 Message::SubscribeAssignedShardVerified {
496 commitment,
497 response,
498 } => {
499 self.handle_assigned_shard_verified_subscription(commitment, response);
500 }
501 Message::SubscribeByCommitment {
502 commitment,
503 response,
504 } => {
505 self.handle_block_subscription(
506 BlockSubscriptionKey::Commitment(commitment),
507 response,
508 );
509 }
510 Message::SubscribeByDigest { digest, response } => {
511 self.handle_block_subscription(BlockSubscriptionKey::Digest(digest), response);
512 }
513 Message::Prune { through } => {
514 self.prune(through);
515 }
516 },
517 Some((peer, shard)) = receiver.recv() else {
518 debug!("receiver closed, stopping shard engine");
519 return;
520 } => {
521 self.metrics
523 .shards_received
524 .get_or_create(&Peer::new(&peer))
525 .inc();
526
527 let commitment = shard.commitment();
528 if !self.should_handle_network_shard(commitment) {
529 continue;
530 }
531
532 if let Some(state) = self.state.get_mut(&commitment) {
533 let round = state.round();
534 let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
535 warn!(%commitment, "no scheme for epoch, ignoring shard");
536 continue;
537 };
538 let progressed = state
539 .on_network_shard(
540 peer,
541 shard,
542 InsertCtx::new(scheme.as_ref(), &self.strategy),
543 &mut self.blocker,
544 )
545 .await;
546 if progressed {
547 self.try_advance(&mut sender, commitment).await;
548 }
549 } else {
550 self.buffer_peer_shard(peer, shard);
551 }
552 },
553 }
554 }
555
556 fn should_handle_network_shard(&self, commitment: Commitment) -> bool {
563 if self.reconstructed_blocks.contains_key(&commitment) {
564 return self
565 .state
566 .get(&commitment)
567 .is_some_and(|s| !s.is_assigned_shard_verified());
568 }
569 true
570 }
571
572 #[allow(clippy::type_complexity)]
580 fn try_reconstruct(
581 &mut self,
582 commitment: Commitment,
583 ) -> Result<Option<Arc<CodedBlock<B, C, H>>>, Error<C>> {
584 if let Some(block) = self.reconstructed_blocks.get(&commitment) {
585 return Ok(Some(Arc::clone(block)));
586 }
587 let Some(state) = self.state.get_mut(&commitment) else {
588 return Ok(None);
589 };
590 if state.checked_shards().len() < usize::from(commitment.config().minimum_shards.get()) {
591 debug!(%commitment, "not enough checked shards to reconstruct block");
592 return Ok(None);
593 }
594 let start = self.context.current();
596 let blob = C::decode(
597 &commitment.config(),
598 &commitment.root(),
599 state.checked_shards().iter(),
600 &self.strategy,
601 )
602 .map_err(Error::Coding)?;
603 self.metrics
604 .erasure_decode_duration
605 .observe_between(start, self.context.current());
606
607 let (inner, config): (B, CodingConfig) =
609 Decode::decode_cfg(&mut blob.as_slice(), &(self.block_codec_cfg.clone(), ()))?;
610
611 match validate_reconstruction::<H, _>(&inner, config, commitment) {
612 Ok(()) => {}
613 Err(InvariantError::BlockDigest) => {
614 return Err(Error::DigestMismatch);
615 }
616 Err(InvariantError::CodingConfig) => {
617 warn!(
618 %commitment,
619 expected_config = ?commitment.config(),
620 actual_config = ?config,
621 "reconstructed block config does not match commitment config, but digest matches"
622 );
623 return Err(Error::ConfigMismatch);
624 }
625 Err(InvariantError::ContextDigest(expected, actual)) => {
626 warn!(
627 %commitment,
628 expected_context_digest = ?expected,
629 actual_context_digest = ?actual,
630 "reconstructed block context digest does not match commitment context digest"
631 );
632 return Err(Error::ContextMismatch);
633 }
634 }
635
636 let block = Arc::new(CodedBlock::new_trusted(inner, commitment));
639 self.cache_block(Arc::clone(&block));
640 self.metrics.blocks_reconstructed_total.inc();
641 Ok(Some(block))
642 }
643
644 async fn handle_external_proposal<Sr: Sender<PublicKey = P>>(
646 &mut self,
647 sender: &mut WrappedSender<Sr, Shard<C, H>>,
648 commitment: Commitment,
649 leader: P,
650 round: Round,
651 ) {
652 if self.reconstructed_blocks.contains_key(&commitment) {
653 return;
654 }
655 let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
656 warn!(%commitment, "no scheme for epoch, ignoring external proposal");
657 return;
658 };
659 let participants = scheme.participants();
660 if participants.index(&leader).is_none() {
661 warn!(?leader, %commitment, "leader update for non-participant, ignoring");
662 return;
663 }
664 if let Some(state) = self.state.get(&commitment) {
665 if state.leader() != &leader {
666 warn!(
667 existing = ?state.leader(),
668 ?leader,
669 %commitment,
670 "conflicting leader update, ignoring"
671 );
672 }
673 return;
674 }
675
676 let participants_len =
677 u64::try_from(participants.len()).expect("participant count impossibly out of bounds");
678 self.state.insert(
679 commitment,
680 ReconstructionState::new(leader, round, participants_len),
681 );
682 let buffered_progress = self.ingest_buffered_shards(commitment).await;
683 if buffered_progress {
684 self.try_advance(sender, commitment).await;
685 }
686 }
687
688 fn buffer_peer_shard(&mut self, peer: P, shard: Shard<C, H>) {
690 if self.latest_primary_peers.position(&peer).is_none() {
691 debug!(
692 ?peer,
693 "pre-leader shard from peer outside latest.primary not buffered"
694 );
695 return;
696 }
697 let queue = self.peer_buffers.entry(peer).or_default();
698 if queue.len() >= self.peer_buffer_size.get() {
699 let _ = queue.pop_front();
700 }
701 queue.push_back(shard);
702 }
703
704 fn update_latest_primary_peers(&mut self, peers: Set<P>) {
705 self.peer_buffers
706 .retain(|peer, _| peers.position(peer).is_some());
707 self.latest_primary_peers = peers;
708 }
709
710 async fn ingest_buffered_shards(&mut self, commitment: Commitment) -> bool {
712 let mut buffered = Vec::new();
713 for (peer, queue) in self.peer_buffers.iter_mut() {
714 let mut i = 0;
715 while i < queue.len() {
716 if queue[i].commitment() != commitment {
717 i += 1;
718 continue;
719 }
720 let shard = queue.swap_remove_back(i).expect("index is valid");
721 buffered.push((peer.clone(), shard));
722 }
723 }
724
725 let Some(state) = self.state.get_mut(&commitment) else {
726 return false;
727 };
728 let round = state.round();
729 let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
730 warn!(%commitment, "no scheme for epoch, dropping buffered shards");
731 return false;
732 };
733
734 let mut progressed = false;
737 let ctx = InsertCtx::new(scheme.as_ref(), &self.strategy);
738 for (peer, shard) in buffered {
739 progressed |= state
740 .on_network_shard(peer, shard, ctx, &mut self.blocker)
741 .await;
742 }
743 progressed
744 }
745
746 fn cache_block(&mut self, block: Arc<CodedBlock<B, C, H>>) {
748 let commitment = block.commitment();
749 self.reconstructed_blocks
750 .insert(commitment, Arc::clone(&block));
751 self.notify_block_subscribers(block);
752 }
753
754 async fn broadcast_shards<Sr: Sender<PublicKey = P>>(
759 &mut self,
760 sender: &mut WrappedSender<Sr, Shard<C, H>>,
761 round: Round,
762 mut block: CodedBlock<B, C, H>,
763 ) {
764 let commitment = block.commitment();
765
766 let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
767 warn!(%commitment, "no scheme available, cannot broadcast shards");
768 return;
769 };
770 let participants = scheme.participants();
771 let Some(me) = scheme.me() else {
772 warn!(
773 %commitment,
774 "cannot broadcast shards: local proposer is not a participant"
775 );
776 return;
777 };
778
779 let shard_count = block.shards(&self.strategy).len();
780 if shard_count != participants.len() {
781 warn!(
782 %commitment,
783 shard_count,
784 participants = participants.len(),
785 "cannot broadcast shards: participant/shard count mismatch"
786 );
787 return;
788 }
789
790 let my_index = me.get() as usize;
791 let leader_shard = block
792 .shard(my_index as u16)
793 .expect("proposer's shard must exist");
794
795 for (index, peer) in participants.iter().enumerate() {
797 if index == my_index {
798 continue;
799 }
800
801 let Some(shard) = block.shard(index as u16) else {
802 warn!(
803 %commitment,
804 index,
805 "cannot broadcast shards: missing shard for participant index"
806 );
807 return;
808 };
809 let _ = sender
810 .send(Recipients::One(peer.clone()), shard, true)
811 .await;
812 }
813
814 let non_participants: Vec<P> = self
816 .aggregate_peers
817 .iter()
818 .filter(|peer| participants.index(peer).is_none())
819 .cloned()
820 .collect();
821 if !non_participants.is_empty() {
822 let _ = sender
823 .send(Recipients::Some(non_participants), leader_shard, true)
824 .await;
825 }
826
827 let block = Arc::new(block);
829 self.cache_block(block);
830
831 self.notify_assigned_shard_verified_subscribers(commitment);
834
835 debug!(?commitment, "broadcasted shards");
836 }
837
838 async fn broadcast_shard<Sr: Sender<PublicKey = P>>(
840 &mut self,
841 sender: &mut WrappedSender<Sr, Shard<C, H>>,
842 shard: Shard<C, H>,
843 ) {
844 let commitment = shard.commitment();
845 if let Ok(peers) = sender.send(Recipients::All, shard, true).await {
846 debug!(
847 ?commitment,
848 peers = peers.len(),
849 "broadcasted shard to all peers"
850 );
851 }
852 }
853
854 async fn try_advance<Sr: Sender<PublicKey = P>>(
858 &mut self,
859 sender: &mut WrappedSender<Sr, Shard<C, H>>,
860 commitment: Commitment,
861 ) {
862 if let Some(state) = self.state.get_mut(&commitment) {
863 match state.take_pending_action() {
864 Some(AssignedShardVerifiedAction::Broadcast(shard)) => {
865 self.broadcast_shard(sender, shard).await;
866 self.notify_assigned_shard_verified_subscribers(commitment);
867 }
868 Some(AssignedShardVerifiedAction::NotifyOnly) => {
869 self.notify_assigned_shard_verified_subscribers(commitment);
870 }
871 None => {}
872 }
873 }
874
875 match self.try_reconstruct(commitment) {
876 Ok(Some(block)) => {
877 debug!(
883 %commitment,
884 parent = %block.parent(),
885 height = %block.height(),
886 "successfully reconstructed block from shards"
887 );
888 }
889 Ok(None) => {
890 debug!(%commitment, "not enough checked shards to reconstruct block");
891 }
892 Err(err) => {
893 warn!(%commitment, ?err, "failed to reconstruct block from checked shards");
894 self.state.remove(&commitment);
895 self.drop_subscriptions(commitment);
896 self.metrics.reconstruction_failures_total.inc();
897 }
898 }
899 }
900
901 fn handle_assigned_shard_verified_subscription(
906 &mut self,
907 commitment: Commitment,
908 response: oneshot::Sender<()>,
909 ) {
910 let has_shard = self
912 .state
913 .get(&commitment)
914 .is_some_and(|state| state.is_assigned_shard_verified());
915 if has_shard {
916 response.send_lossy(());
917 return;
918 }
919
920 if !self.state.contains_key(&commitment)
924 && self.reconstructed_blocks.contains_key(&commitment)
925 {
926 response.send_lossy(());
927 return;
928 }
929
930 self.assigned_shard_verified_subscriptions
931 .entry(commitment)
932 .or_default()
933 .push(response);
934 }
935
936 fn handle_block_subscription(
938 &mut self,
939 key: BlockSubscriptionKey<B::Digest>,
940 response: oneshot::Sender<Arc<CodedBlock<B, C, H>>>,
941 ) {
942 let block = match key {
943 BlockSubscriptionKey::Commitment(commitment) => {
944 self.reconstructed_blocks.get(&commitment)
945 }
946 BlockSubscriptionKey::Digest(digest) => self
947 .reconstructed_blocks
948 .iter()
949 .find_map(|(_, block)| (block.digest() == digest).then_some(block)),
950 };
951
952 if let Some(block) = block {
954 response.send_lossy(Arc::clone(block));
955 return;
956 }
957
958 self.block_subscriptions
959 .entry(key)
960 .or_default()
961 .push(response);
962 }
963
964 fn notify_assigned_shard_verified_subscribers(&mut self, commitment: Commitment) {
967 if let Some(mut subscribers) = self
968 .assigned_shard_verified_subscriptions
969 .remove(&commitment)
970 {
971 for subscriber in subscribers.drain(..) {
972 subscriber.send_lossy(());
973 }
974 }
975 }
976
977 fn notify_block_subscribers(&mut self, block: Arc<CodedBlock<B, C, H>>) {
979 let commitment = block.commitment();
980 let digest = block.digest();
981
982 if let Some(mut subscribers) = self
984 .block_subscriptions
985 .remove(&BlockSubscriptionKey::Commitment(commitment))
986 {
987 for subscriber in subscribers.drain(..) {
988 subscriber.send_lossy(Arc::clone(&block));
989 }
990 }
991
992 if let Some(mut subscribers) = self
994 .block_subscriptions
995 .remove(&BlockSubscriptionKey::Digest(digest))
996 {
997 for subscriber in subscribers.drain(..) {
998 subscriber.send_lossy(Arc::clone(&block));
999 }
1000 }
1001 }
1002
1003 fn drop_subscriptions(&mut self, commitment: Commitment) {
1008 self.assigned_shard_verified_subscriptions
1009 .remove(&commitment);
1010 self.block_subscriptions
1011 .remove(&BlockSubscriptionKey::Commitment(commitment));
1012 self.block_subscriptions
1013 .remove(&BlockSubscriptionKey::Digest(
1014 commitment.block::<B::Digest>(),
1015 ));
1016 }
1017
1018 fn prune(&mut self, through: Commitment) {
1028 if let Some(height) = self.reconstructed_blocks.get(&through).map(|b| b.height()) {
1029 self.reconstructed_blocks
1030 .retain(|_, block| block.height() > height);
1031 }
1032
1033 self.drop_subscriptions(through);
1037 let Some(round) = self.state.remove(&through).map(|state| state.round()) else {
1038 return;
1039 };
1040
1041 let mut pruned_commitments = Vec::new();
1042 self.state.retain(|c, s| {
1043 let keep = s.round() > round;
1044 if !keep {
1045 pruned_commitments.push(*c);
1046 }
1047 keep
1048 });
1049 for pruned in pruned_commitments {
1050 self.drop_subscriptions(pruned);
1051 }
1052 }
1053}
1054
1055enum ReconstructionState<P, C, H>
1057where
1058 P: PublicKey,
1059 C: CodingScheme,
1060 H: Hasher,
1061{
1062 AwaitingQuorum(AwaitingQuorumState<P, C, H>),
1066 Ready(ReadyState<P, C, H>),
1069}
1070
1071enum AssignedShardVerifiedAction<C: CodingScheme, H: Hasher> {
1076 Broadcast(Shard<C, H>),
1078 NotifyOnly,
1080}
1081
1082struct IndexedShard<C: CodingScheme> {
1084 index: u16,
1085 data: C::Shard,
1086}
1087
1088struct CommonState<P, C, H>
1090where
1091 P: PublicKey,
1092 C: CodingScheme,
1093 H: Hasher,
1094{
1095 leader: P,
1097 pending_action: Option<AssignedShardVerifiedAction<C, H>>,
1099 checked_shards: Vec<C::CheckedShard>,
1101 contributed: BitMap,
1103 round: Round,
1105 received_shards: BTreeMap<u16, C::Shard>,
1108 assigned_shard_verified: bool,
1110}
1111
1112struct AwaitingQuorumState<P, C, H>
1118where
1119 P: PublicKey,
1120 C: CodingScheme,
1121 H: Hasher,
1122{
1123 common: CommonState<P, C, H>,
1124 pending_shards: BTreeMap<P, IndexedShard<C>>,
1126}
1127
1128struct ReadyState<P, C, H>
1133where
1134 P: PublicKey,
1135 C: CodingScheme,
1136 H: Hasher,
1137{
1138 common: CommonState<P, C, H>,
1139}
1140
1141impl<P, C, H> CommonState<P, C, H>
1142where
1143 P: PublicKey,
1144 C: CodingScheme,
1145 H: Hasher,
1146{
1147 fn new(leader: P, round: Round, participants_len: u64) -> Self {
1149 Self {
1150 leader,
1151 pending_action: None,
1152 checked_shards: Vec::new(),
1153 contributed: BitMap::zeroes(participants_len),
1154 round,
1155 received_shards: BTreeMap::new(),
1156 assigned_shard_verified: false,
1157 }
1158 }
1159}
1160
1161impl<P, C, H> CommonState<P, C, H>
1162where
1163 P: PublicKey,
1164 C: CodingScheme,
1165 H: Hasher,
1166{
1167 async fn verify_assigned_shard(
1176 &mut self,
1177 sender: P,
1178 commitment: Commitment,
1179 shard: IndexedShard<C>,
1180 is_participant: bool,
1181 blocker: &mut impl Blocker<PublicKey = P>,
1182 ) -> bool {
1183 self.received_shards.insert(shard.index, shard.data);
1187 let data = self.received_shards.get(&shard.index).unwrap();
1188 let Ok(checked) = C::check(&commitment.config(), &commitment.root(), shard.index, data)
1189 else {
1190 self.received_shards.remove(&shard.index);
1191 commonware_p2p::block!(blocker, sender, "invalid shard received from leader");
1192 return false;
1193 };
1194
1195 self.contributed.set(u64::from(shard.index), true);
1196 self.checked_shards.push(checked);
1197 self.assigned_shard_verified = true;
1198 self.pending_action = Some(if is_participant {
1199 AssignedShardVerifiedAction::Broadcast(Shard::new(
1200 commitment,
1201 shard.index,
1202 data.clone(),
1203 ))
1204 } else {
1205 AssignedShardVerifiedAction::NotifyOnly
1206 });
1207 true
1208 }
1209}
1210
1211impl<P, C, H> AwaitingQuorumState<P, C, H>
1212where
1213 P: PublicKey,
1214 C: CodingScheme,
1215 H: Hasher,
1216{
1217 async fn try_transition(
1220 &mut self,
1221 commitment: Commitment,
1222 participants_len: u64,
1223 strategy: &impl Strategy,
1224 blocker: &mut impl Blocker<PublicKey = P>,
1225 ) -> Option<ReadyState<P, C, H>> {
1226 let minimum = usize::from(commitment.config().minimum_shards.get());
1227 if self.common.checked_shards.len() + self.pending_shards.len() < minimum {
1228 return None;
1229 }
1230
1231 let pending = std::mem::take(&mut self.pending_shards);
1233 let (new_checked, to_block) =
1234 strategy.map_partition_collect_vec(pending, |(peer, shard)| {
1235 let checked = C::check(
1236 &commitment.config(),
1237 &commitment.root(),
1238 shard.index,
1239 &shard.data,
1240 );
1241 (peer, checked.ok())
1242 });
1243
1244 for peer in to_block {
1245 commonware_p2p::block!(blocker, peer, "invalid shard received");
1246 }
1247 for checked in new_checked {
1248 self.common.checked_shards.push(checked);
1249 }
1250
1251 if self.common.checked_shards.len() < minimum {
1253 return None;
1254 }
1255
1256 let round = self.common.round;
1258 let leader = self.common.leader.clone();
1259 let common = std::mem::replace(
1260 &mut self.common,
1261 CommonState::new(leader, round, participants_len),
1262 );
1263 Some(ReadyState { common })
1264 }
1265}
1266
1267struct InsertCtx<'a, Sch, S>
1269where
1270 Sch: CertificateScheme,
1271 S: Strategy,
1272{
1273 scheme: &'a Sch,
1274 strategy: &'a S,
1275 participants_len: u64,
1276}
1277
1278impl<Sch: CertificateScheme, S: Strategy> Clone for InsertCtx<'_, Sch, S> {
1279 fn clone(&self) -> Self {
1280 *self
1281 }
1282}
1283
1284impl<Sch: CertificateScheme, S: Strategy> Copy for InsertCtx<'_, Sch, S> {}
1285
1286impl<'a, Sch: CertificateScheme, S: Strategy> InsertCtx<'a, Sch, S> {
1287 fn new(scheme: &'a Sch, strategy: &'a S) -> Self {
1288 let participants_len = u64::try_from(scheme.participants().len())
1289 .expect("participant count impossibly out of bounds");
1290 Self {
1291 scheme,
1292 strategy,
1293 participants_len,
1294 }
1295 }
1296}
1297
1298impl<P, C, H> ReconstructionState<P, C, H>
1299where
1300 P: PublicKey,
1301 C: CodingScheme,
1302 H: Hasher,
1303{
1304 fn new(leader: P, round: Round, participants_len: u64) -> Self {
1306 Self::AwaitingQuorum(AwaitingQuorumState {
1307 common: CommonState::new(leader, round, participants_len),
1308 pending_shards: BTreeMap::new(),
1309 })
1310 }
1311
1312 const fn common(&self) -> &CommonState<P, C, H> {
1314 match self {
1315 Self::AwaitingQuorum(state) => &state.common,
1316 Self::Ready(state) => &state.common,
1317 }
1318 }
1319
1320 const fn common_mut(&mut self) -> &mut CommonState<P, C, H> {
1322 match self {
1323 Self::AwaitingQuorum(state) => &mut state.common,
1324 Self::Ready(state) => &mut state.common,
1325 }
1326 }
1327
1328 const fn leader(&self) -> &P {
1330 &self.common().leader
1331 }
1332
1333 const fn is_assigned_shard_verified(&self) -> bool {
1335 self.common().assigned_shard_verified
1336 }
1337
1338 const fn round(&self) -> Round {
1340 self.common().round
1341 }
1342
1343 const fn checked_shards(&self) -> &[C::CheckedShard] {
1345 self.common().checked_shards.as_slice()
1346 }
1347
1348 const fn take_pending_action(&mut self) -> Option<AssignedShardVerifiedAction<C, H>> {
1352 self.common_mut().pending_action.take()
1353 }
1354
1355 async fn on_network_shard<Sch, S, X>(
1402 &mut self,
1403 sender: P,
1404 shard: Shard<C, H>,
1405 ctx: InsertCtx<'_, Sch, S>,
1406 blocker: &mut X,
1407 ) -> bool
1408 where
1409 Sch: CertificateScheme<PublicKey = P>,
1410 S: Strategy,
1411 X: Blocker<PublicKey = P>,
1412 {
1413 let Some(sender_index) = ctx.scheme.participants().index(&sender) else {
1414 commonware_p2p::block!(blocker, sender, "shard sent by non-participant");
1415 return false;
1416 };
1417 let commitment = shard.commitment();
1418 let indexed = IndexedShard {
1419 index: shard.index(),
1420 data: shard.into_inner(),
1421 };
1422
1423 let is_from_leader = sender == self.common().leader;
1425 let expected_participant = if is_from_leader {
1426 ctx.scheme.me().unwrap_or(sender_index)
1427 } else {
1428 sender_index
1429 };
1430 let expected_index: u16 = expected_participant
1431 .get()
1432 .try_into()
1433 .expect("participant index impossibly out of bounds");
1434 if indexed.index != expected_index {
1435 commonware_p2p::block!(
1436 blocker,
1437 sender,
1438 shard_index = indexed.index,
1439 expected_index,
1440 "shard index does not match expected index"
1441 );
1442 return false;
1443 }
1444
1445 if let Some(existing) = self.common().received_shards.get(&indexed.index) {
1447 if existing != &indexed.data {
1448 commonware_p2p::block!(blocker, sender, "shard equivocation");
1449 }
1450 return false;
1451 }
1452
1453 if self.common().contributed.get(u64::from(indexed.index)) {
1455 return false;
1456 }
1457
1458 if is_from_leader && !self.common().assigned_shard_verified {
1462 let progressed = self
1463 .common_mut()
1464 .verify_assigned_shard(
1465 sender,
1466 commitment,
1467 indexed,
1468 ctx.scheme.me().is_some(),
1469 blocker,
1470 )
1471 .await;
1472
1473 if progressed {
1474 if let Self::AwaitingQuorum(state) = self {
1475 if let Some(ready) = state
1476 .try_transition(commitment, ctx.participants_len, ctx.strategy, blocker)
1477 .await
1478 {
1479 *self = Self::Ready(ready);
1480 }
1481 }
1482 }
1483 return progressed;
1484 }
1485
1486 let Self::AwaitingQuorum(state) = self else {
1488 return false;
1489 };
1490
1491 state
1493 .common
1494 .received_shards
1495 .insert(indexed.index, indexed.data.clone());
1496 state.common.contributed.set(u64::from(indexed.index), true);
1497 state.pending_shards.insert(sender, indexed);
1498 if let Some(ready) = state
1499 .try_transition(commitment, ctx.participants_len, ctx.strategy, blocker)
1500 .await
1501 {
1502 *self = Self::Ready(ready);
1503 }
1504
1505 true
1506 }
1507}
1508
1509#[cfg(test)]
1510mod tests {
1511 use super::*;
1512 use crate::{
1513 marshal::{
1514 coding::types::coding_config_for_participants, mocks::block::Block as MockBlock,
1515 },
1516 types::{Epoch, Height, View},
1517 };
1518 use bytes::Bytes;
1519 use commonware_codec::Encode;
1520 use commonware_coding::{
1521 CodecConfig, Config as CodingConfig, PhasedAsScheme, ReedSolomon, Zoda,
1522 };
1523 use commonware_cryptography::{
1524 certificate::Subject,
1525 ed25519::{PrivateKey, PublicKey},
1526 impl_certificate_ed25519,
1527 sha256::Digest as Sha256Digest,
1528 Committable, Digest, Sha256, Signer,
1529 };
1530 use commonware_macros::{select, test_traced};
1531 use commonware_p2p::{
1532 simulated::{self, Control, Link, Oracle},
1533 Manager as _, TrackedPeers,
1534 };
1535 use commonware_parallel::Sequential;
1536 use commonware_runtime::{deterministic, Quota, Runner};
1537 use commonware_utils::{
1538 channel::oneshot::error::TryRecvError, ordered::Set, NZUsize, Participant,
1539 };
1540 use std::{
1541 future::Future,
1542 marker::PhantomData,
1543 num::NonZeroU32,
1544 sync::atomic::{AtomicIsize, Ordering},
1545 time::Duration,
1546 };
1547
1548 #[derive(Clone, Debug)]
1549 pub struct TestSubject {
1550 pub message: Bytes,
1551 }
1552
1553 impl Subject for TestSubject {
1554 type Namespace = Vec<u8>;
1555
1556 fn namespace<'a>(&self, derived: &'a Self::Namespace) -> &'a [u8] {
1557 derived
1558 }
1559
1560 fn message(&self) -> Bytes {
1561 self.message.clone()
1562 }
1563 }
1564
1565 impl_certificate_ed25519!(TestSubject, Vec<u8>);
1566
1567 const SCHEME_NAMESPACE: &[u8] = b"_COMMONWARE_SHARD_ENGINE_TEST";
1568
1569 const MAX_SHARD_SIZE: usize = 1024 * 1024; const DEFAULT_LINK: Link = Link {
1574 latency: Duration::from_millis(50),
1575 jitter: Duration::ZERO,
1576 success_rate: 1.0,
1577 };
1578
1579 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
1581
1582 const STRATEGY: Sequential = Sequential;
1584
1585 #[derive(Clone)]
1591 struct MultiEpochProvider {
1592 schemes: BTreeMap<Epoch, Arc<Scheme>>,
1593 }
1594
1595 impl MultiEpochProvider {
1596 fn single(scheme: Scheme) -> Self {
1597 let mut schemes = BTreeMap::new();
1598 schemes.insert(Epoch::zero(), Arc::new(scheme));
1599 Self { schemes }
1600 }
1601
1602 fn with_epoch(mut self, epoch: Epoch, scheme: Scheme) -> Self {
1603 self.schemes.insert(epoch, Arc::new(scheme));
1604 self
1605 }
1606 }
1607
1608 impl Provider for MultiEpochProvider {
1609 type Scope = Epoch;
1610 type Scheme = Scheme;
1611
1612 fn scoped(&self, scope: Epoch) -> Option<Arc<Scheme>> {
1613 self.schemes.get(&scope).cloned()
1614 }
1615 }
1616
1617 #[derive(Clone)]
1620 struct ChurningProvider {
1621 scheme: Arc<Scheme>,
1622 remaining_successes: Arc<AtomicIsize>,
1623 }
1624
1625 impl ChurningProvider {
1626 fn new(scheme: Scheme, successes: isize) -> Self {
1627 Self {
1628 scheme: Arc::new(scheme),
1629 remaining_successes: Arc::new(AtomicIsize::new(successes)),
1630 }
1631 }
1632 }
1633
1634 impl Provider for ChurningProvider {
1635 type Scope = Epoch;
1636 type Scheme = Scheme;
1637
1638 fn scoped(&self, scope: Epoch) -> Option<Arc<Scheme>> {
1639 if scope != Epoch::zero() {
1640 return None;
1641 }
1642 if self.remaining_successes.fetch_sub(1, Ordering::AcqRel) <= 0 {
1643 return None;
1644 }
1645 Some(Arc::clone(&self.scheme))
1646 }
1647 }
1648
1649 type B = MockBlock<Sha256Digest, ()>;
1651 type H = Sha256;
1652 type P = PublicKey;
1653 type C = ReedSolomon<H>;
1654 type X = Control<P, deterministic::Context>;
1655 type O = Oracle<P, deterministic::Context>;
1656 type Prov = MultiEpochProvider;
1657 type NetworkSender = simulated::Sender<P, deterministic::Context>;
1658 type D = simulated::Manager<P, deterministic::Context>;
1659 type ShardEngine<S> = Engine<deterministic::Context, Prov, X, D, S, H, B, P, Sequential>;
1660 type ChurningShardEngine<S> =
1661 Engine<deterministic::Context, ChurningProvider, X, D, S, H, B, P, Sequential>;
1662
1663 async fn assert_blocked(oracle: &O, blocker: &P, blocked: &P) {
1664 let blocked_peers = oracle.blocked().await.unwrap();
1665 let is_blocked = blocked_peers
1666 .iter()
1667 .any(|(a, b)| a == blocker && b == blocked);
1668 assert!(is_blocked, "expected {blocker} to have blocked {blocked}");
1669 }
1670
1671 struct Peer<S: CodingScheme = C> {
1673 public_key: PublicKey,
1675 index: Participant,
1677 mailbox: Mailbox<B, S, H, P>,
1679 sender: NetworkSender,
1681 }
1682
1683 #[allow(dead_code)]
1685 struct NonParticipant<S: CodingScheme = C> {
1686 public_key: PublicKey,
1688 mailbox: Mailbox<B, S, H, P>,
1690 sender: NetworkSender,
1692 }
1693
1694 struct Fixture<S: CodingScheme = C> {
1696 num_peers: usize,
1698 num_non_participants: usize,
1700 link: Link,
1702 _marker: PhantomData<S>,
1704 }
1705
1706 impl<S: CodingScheme> Default for Fixture<S> {
1707 fn default() -> Self {
1708 Self {
1709 num_peers: 4,
1710 num_non_participants: 0,
1711 link: DEFAULT_LINK,
1712 _marker: PhantomData,
1713 }
1714 }
1715 }
1716
1717 impl<S: CodingScheme> Fixture<S> {
1718 pub fn start<F: Future<Output = ()>>(
1719 self,
1720 f: impl FnOnce(
1721 Self,
1722 deterministic::Context,
1723 O,
1724 Vec<Peer<S>>,
1725 Vec<NonParticipant<S>>,
1726 CodingConfig,
1727 ) -> F,
1728 ) {
1729 let executor = deterministic::Runner::default();
1730 executor.start(|context| async move {
1731 let mut private_keys = (0..self.num_peers)
1732 .map(|i| PrivateKey::from_seed(i as u64))
1733 .collect::<Vec<_>>();
1734 private_keys.sort_by_key(|s| s.public_key());
1735 let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
1736
1737 let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
1738
1739 let mut np_private_keys = (0..self.num_non_participants)
1740 .map(|i| PrivateKey::from_seed((self.num_peers + i) as u64))
1741 .collect::<Vec<_>>();
1742 np_private_keys.sort_by_key(|s| s.public_key());
1743 let np_keys: Vec<P> = np_private_keys.iter().map(|k| k.public_key()).collect();
1744
1745 let (network, oracle) =
1746 simulated::Network::<deterministic::Context, P>::new_with_split_peers(
1747 context.with_label("network"),
1748 simulated::Config {
1749 max_size: MAX_SHARD_SIZE as u32,
1750 disconnect_on_block: true,
1751 tracked_peer_sets: NZUsize!(1),
1752 },
1753 peer_keys.clone(),
1754 np_keys.clone(),
1755 )
1756 .await;
1757 network.start();
1758
1759 let all_keys: Vec<P> = peer_keys.iter().chain(np_keys.iter()).cloned().collect();
1760
1761 let mut registrations = BTreeMap::new();
1762 for key in all_keys.iter() {
1763 let control = oracle.control(key.clone());
1764 let (sender, receiver) = control
1765 .register(0, TEST_QUOTA)
1766 .await
1767 .expect("registration should succeed");
1768 registrations.insert(key.clone(), (control, sender, receiver));
1769 }
1770 for p1 in all_keys.iter() {
1771 for p2 in all_keys.iter() {
1772 if p2 == p1 {
1773 continue;
1774 }
1775 oracle
1776 .add_link(p1.clone(), p2.clone(), self.link.clone())
1777 .await
1778 .expect("link should be added");
1779 }
1780 }
1781
1782 let coding_config =
1783 coding_config_for_participants(u16::try_from(self.num_peers).unwrap());
1784
1785 let mut peers = Vec::with_capacity(self.num_peers);
1786 for (idx, peer_key) in peer_keys.iter().enumerate() {
1787 let (control, sender, receiver) = registrations
1788 .remove(peer_key)
1789 .expect("peer should be registered");
1790
1791 let participant = Participant::new(idx as u32);
1792 let engine_context = context.with_label(&format!("peer_{}", idx));
1793
1794 let scheme = Scheme::signer(
1795 SCHEME_NAMESPACE,
1796 participants.clone(),
1797 private_keys[idx].clone(),
1798 )
1799 .expect("signer scheme should be created");
1800 let scheme_provider: Prov = MultiEpochProvider::single(scheme);
1801
1802 let config = Config {
1803 scheme_provider,
1804 blocker: control.clone(),
1805 shard_codec_cfg: CodecConfig {
1806 maximum_shard_size: MAX_SHARD_SIZE,
1807 },
1808 block_codec_cfg: (),
1809 strategy: STRATEGY,
1810 mailbox_size: 1024,
1811 peer_buffer_size: NZUsize!(64),
1812 background_channel_capacity: 1024,
1813 peer_provider: oracle.manager(),
1814 };
1815
1816 let (engine, mailbox) = ShardEngine::new(engine_context, config);
1817 let sender_clone = sender.clone();
1818 engine.start((sender, receiver));
1819
1820 peers.push(Peer {
1821 public_key: peer_key.clone(),
1822 index: participant,
1823 mailbox,
1824 sender: sender_clone,
1825 });
1826 }
1827
1828 let mut non_participants = Vec::with_capacity(self.num_non_participants);
1829 for (idx, np_key) in np_keys.iter().enumerate() {
1830 let (control, sender, receiver) = registrations
1831 .remove(np_key)
1832 .expect("non-participant should be registered");
1833
1834 let engine_context = context.with_label(&format!("non_participant_{}", idx));
1835
1836 let scheme = Scheme::verifier(SCHEME_NAMESPACE, participants.clone());
1837 let scheme_provider: Prov = MultiEpochProvider::single(scheme);
1838
1839 let config = Config {
1840 scheme_provider,
1841 blocker: control.clone(),
1842 shard_codec_cfg: CodecConfig {
1843 maximum_shard_size: MAX_SHARD_SIZE,
1844 },
1845 block_codec_cfg: (),
1846 strategy: STRATEGY,
1847 mailbox_size: 1024,
1848 peer_buffer_size: NZUsize!(64),
1849 background_channel_capacity: 1024,
1850 peer_provider: oracle.manager(),
1851 };
1852
1853 let (engine, mailbox) = ShardEngine::new(engine_context, config);
1854 let sender_clone = sender.clone();
1855 engine.start((sender, receiver));
1856
1857 non_participants.push(NonParticipant {
1858 public_key: np_key.clone(),
1859 mailbox,
1860 sender: sender_clone,
1861 });
1862 }
1863
1864 f(
1865 self,
1866 context,
1867 oracle,
1868 peers,
1869 non_participants,
1870 coding_config,
1871 )
1872 .await;
1873 });
1874 }
1875 }
1876
1877 #[test_traced]
1878 fn test_e2e_broadcast_and_reconstruction() {
1879 let fixture = Fixture {
1880 num_peers: 10,
1881 ..Default::default()
1882 };
1883
1884 fixture.start(
1885 |config, context, _, mut peers, _, coding_config| async move {
1886 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
1887 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
1888 let commitment = coded_block.commitment();
1889
1890 let leader = peers[0].public_key.clone();
1891 let round = Round::new(Epoch::zero(), View::new(1));
1892 peers[0].mailbox.proposed(round, coded_block.clone()).await;
1893
1894 for peer in peers[1..].iter_mut() {
1896 peer.mailbox
1897 .discovered(commitment, leader.clone(), round)
1898 .await;
1899 }
1900 context.sleep(config.link.latency).await;
1901
1902 for peer in peers.iter_mut() {
1903 peer.mailbox
1904 .subscribe_assigned_shard_verified(commitment)
1905 .await
1906 .await
1907 .expect("shard subscription should complete");
1908 }
1909 context.sleep(config.link.latency).await;
1910
1911 for peer in peers.iter_mut() {
1912 let reconstructed = peer
1913 .mailbox
1914 .get(commitment)
1915 .await
1916 .expect("block should be reconstructed");
1917 assert_eq!(reconstructed.commitment(), commitment);
1918 assert_eq!(reconstructed.height(), coded_block.height());
1919 }
1920 },
1921 );
1922 }
1923
1924 #[test_traced]
1925 fn test_e2e_broadcast_and_reconstruction_zoda() {
1926 let fixture = Fixture {
1927 num_peers: 10,
1928 ..Default::default()
1929 };
1930
1931 fixture.start(
1932 |config, context, _, mut peers, _, coding_config| async move {
1933 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
1934 let coded_block = CodedBlock::<B, PhasedAsScheme<Zoda<H>>, H>::new(
1935 inner,
1936 coding_config,
1937 &STRATEGY,
1938 );
1939 let commitment = coded_block.commitment();
1940
1941 let leader = peers[0].public_key.clone();
1942 let round = Round::new(Epoch::zero(), View::new(1));
1943 peers[0].mailbox.proposed(round, coded_block.clone()).await;
1944
1945 for peer in peers[1..].iter_mut() {
1947 peer.mailbox
1948 .discovered(commitment, leader.clone(), round)
1949 .await;
1950 }
1951 context.sleep(config.link.latency).await;
1952
1953 for peer in peers.iter_mut() {
1954 peer.mailbox
1955 .subscribe_assigned_shard_verified(commitment)
1956 .await
1957 .await
1958 .expect("shard subscription should complete");
1959 }
1960 context.sleep(config.link.latency).await;
1961
1962 for peer in peers.iter_mut() {
1963 let reconstructed = peer
1964 .mailbox
1965 .get(commitment)
1966 .await
1967 .expect("block should be reconstructed");
1968 assert_eq!(reconstructed.commitment(), commitment);
1969 assert_eq!(reconstructed.height(), coded_block.height());
1970 }
1971 },
1972 );
1973 }
1974
1975 #[test_traced]
1976 fn test_block_subscriptions() {
1977 let fixture = Fixture {
1978 num_peers: 10,
1979 ..Default::default()
1980 };
1981
1982 fixture.start(
1983 |config, context, _, mut peers, _, coding_config| async move {
1984 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
1985 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
1986 let commitment = coded_block.commitment();
1987 let digest = coded_block.digest();
1988
1989 let leader = peers[0].public_key.clone();
1990 let round = Round::new(Epoch::zero(), View::new(1));
1991
1992 let commitment_sub = peers[1].mailbox.subscribe(commitment).await;
1994 let digest_sub = peers[2].mailbox.subscribe_by_digest(digest).await;
1995
1996 peers[0].mailbox.proposed(round, coded_block.clone()).await;
1997
1998 for peer in peers[1..].iter_mut() {
2000 peer.mailbox
2001 .discovered(commitment, leader.clone(), round)
2002 .await;
2003 }
2004 context.sleep(config.link.latency * 2).await;
2005
2006 for peer in peers.iter_mut() {
2007 peer.mailbox
2008 .subscribe_assigned_shard_verified(commitment)
2009 .await
2010 .await
2011 .expect("shard subscription should complete");
2012 }
2013 context.sleep(config.link.latency).await;
2014
2015 let block_by_commitment =
2016 commitment_sub.await.expect("subscription should resolve");
2017 assert_eq!(block_by_commitment.commitment(), commitment);
2018 assert_eq!(block_by_commitment.height(), coded_block.height());
2019
2020 let block_by_digest = digest_sub.await.expect("subscription should resolve");
2021 assert_eq!(block_by_digest.commitment(), commitment);
2022 assert_eq!(block_by_digest.height(), coded_block.height());
2023 },
2024 );
2025 }
2026
2027 #[test_traced]
2028 fn test_proposer_preproposal_subscriptions_resolve_after_local_cache() {
2029 let fixture = Fixture {
2030 num_peers: 10,
2031 ..Default::default()
2032 };
2033
2034 fixture.start(|config, context, _, peers, _, coding_config| async move {
2035 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2036 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2037 let commitment = coded_block.commitment();
2038 let digest = coded_block.digest();
2039 let round = Round::new(Epoch::zero(), View::new(1));
2040
2041 let shard_sub = peers[0].mailbox.subscribe_assigned_shard_verified(commitment).await;
2043 let commitment_sub = peers[0].mailbox.subscribe(commitment).await;
2044 let digest_sub = peers[0].mailbox.subscribe_by_digest(digest).await;
2045
2046 peers[0].mailbox.proposed(round, coded_block.clone()).await;
2047 context.sleep(config.link.latency).await;
2048
2049 select! {
2050 result = shard_sub => {
2051 result.expect("shard subscription should resolve");
2052 },
2053 _ = context.sleep(Duration::from_secs(5)) => {
2054 panic!("shard subscription did not resolve after local proposal cache");
2055 }
2056 }
2057
2058 let block_by_commitment = select! {
2059 result = commitment_sub => {
2060 result.expect("block subscription by commitment should resolve")
2061 },
2062 _ = context.sleep(Duration::from_secs(5)) => {
2063 panic!("block subscription by commitment did not resolve after local proposal cache");
2064 }
2065 };
2066 assert_eq!(block_by_commitment.commitment(), commitment);
2067 assert_eq!(block_by_commitment.height(), coded_block.height());
2068
2069 let block_by_digest = select! {
2070 result = digest_sub => {
2071 result.expect("block subscription by digest should resolve")
2072 },
2073 _ = context.sleep(Duration::from_secs(5)) => {
2074 panic!("block subscription by digest did not resolve after local proposal cache");
2075 }
2076 };
2077 assert_eq!(block_by_digest.commitment(), commitment);
2078 assert_eq!(block_by_digest.height(), coded_block.height());
2079 });
2080 }
2081
2082 #[test_traced]
2083 fn test_shard_subscription_rejects_invalid_shard() {
2084 let fixture = Fixture::<C>::default();
2085 fixture.start(
2086 |config, context, oracle, mut peers, _, coding_config| async move {
2087 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2092 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2093 let commitment = coded_block.commitment();
2094 let receiver_index = peers[2].index.get() as u16;
2095
2096 let valid_shard = coded_block.shard(receiver_index).expect("missing shard");
2097
2098 let mut invalid_shard = valid_shard.clone();
2101 invalid_shard.index = peers[3].index.get() as u16;
2102
2103 let receiver_pk = peers[2].public_key.clone();
2105 let leader = peers[1].public_key.clone();
2106 peers[2]
2107 .mailbox
2108 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2109 .await;
2110 let mut shard_sub = peers[2]
2111 .mailbox
2112 .subscribe_assigned_shard_verified(commitment)
2113 .await;
2114
2115 let invalid_bytes = invalid_shard.encode();
2117 peers[0]
2118 .sender
2119 .send(Recipients::One(receiver_pk.clone()), invalid_bytes, true)
2120 .await
2121 .expect("send failed");
2122
2123 context.sleep(config.link.latency * 2).await;
2124
2125 assert!(
2126 matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
2127 "subscription should not resolve from invalid shard"
2128 );
2129 assert_blocked(&oracle, &peers[2].public_key, &peers[0].public_key).await;
2130
2131 let valid_bytes = valid_shard.encode();
2133 peers[1]
2134 .sender
2135 .send(Recipients::One(receiver_pk), valid_bytes, true)
2136 .await
2137 .expect("send failed");
2138 context.sleep(config.link.latency * 2).await;
2139
2140 select! {
2142 _ = shard_sub => {},
2143 _ = context.sleep(Duration::from_secs(5)) => {
2144 panic!("subscription did not complete after valid shard arrival");
2145 },
2146 };
2147 },
2148 );
2149 }
2150
2151 #[test_traced]
2152 fn test_durable_prunes_reconstructed_blocks() {
2153 let fixture = Fixture::<C>::default();
2154 fixture.start(|_, context, _, mut peers, _, coding_config| async move {
2155 let block1 = CodedBlock::<B, C, H>::new(
2157 B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100),
2158 coding_config,
2159 &STRATEGY,
2160 );
2161 let block2 = CodedBlock::<B, C, H>::new(
2162 B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 100),
2163 coding_config,
2164 &STRATEGY,
2165 );
2166 let block3 = CodedBlock::<B, C, H>::new(
2167 B::new::<H>((), Sha256Digest::EMPTY, Height::new(3), 100),
2168 coding_config,
2169 &STRATEGY,
2170 );
2171 let commitment1 = block1.commitment();
2172 let commitment2 = block2.commitment();
2173 let commitment3 = block3.commitment();
2174
2175 let peer = &mut peers[0];
2177 let round = Round::new(Epoch::zero(), View::new(1));
2178 peer.mailbox.proposed(round, block1).await;
2179 peer.mailbox.proposed(round, block2).await;
2180 peer.mailbox.proposed(round, block3).await;
2181 context.sleep(Duration::from_millis(10)).await;
2182
2183 assert!(
2185 peer.mailbox.get(commitment1).await.is_some(),
2186 "block1 should be cached"
2187 );
2188 assert!(
2189 peer.mailbox.get(commitment2).await.is_some(),
2190 "block2 should be cached"
2191 );
2192 assert!(
2193 peer.mailbox.get(commitment3).await.is_some(),
2194 "block3 should be cached"
2195 );
2196
2197 peer.mailbox.prune(commitment2).await;
2199 context.sleep(Duration::from_millis(10)).await;
2200
2201 assert!(
2203 peer.mailbox.get(commitment1).await.is_none(),
2204 "block1 should be pruned"
2205 );
2206 assert!(
2207 peer.mailbox.get(commitment2).await.is_none(),
2208 "block2 should be pruned"
2209 );
2210
2211 assert!(
2213 peer.mailbox.get(commitment3).await.is_some(),
2214 "block3 should still be cached"
2215 );
2216 });
2217 }
2218
2219 #[test_traced]
2220 fn test_duplicate_leader_shard_ignored() {
2221 let fixture = Fixture::<C>::default();
2222 fixture.start(
2223 |config, context, oracle, mut peers, _, coding_config| async move {
2224 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2225 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2226 let commitment = coded_block.commitment();
2227
2228 let peer2_index = peers[2].index.get() as u16;
2230 let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2231 let shard_bytes = peer2_shard.encode();
2232
2233 let peer2_pk = peers[2].public_key.clone();
2234 let leader = peers[0].public_key.clone();
2235
2236 peers[2]
2238 .mailbox
2239 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2240 .await;
2241
2242 peers[0]
2244 .sender
2245 .send(Recipients::One(peer2_pk.clone()), shard_bytes.clone(), true)
2246 .await
2247 .expect("send failed");
2248 context.sleep(config.link.latency * 2).await;
2249
2250 peers[0]
2252 .sender
2253 .send(Recipients::One(peer2_pk), shard_bytes, true)
2254 .await
2255 .expect("send failed");
2256 context.sleep(config.link.latency * 2).await;
2257
2258 let blocked_peers = oracle.blocked().await.unwrap();
2260 let is_blocked = blocked_peers
2261 .iter()
2262 .any(|(a, b)| a == &peers[2].public_key && b == &peers[0].public_key);
2263 assert!(
2264 !is_blocked,
2265 "leader should not be blocked for duplicate shard"
2266 );
2267 },
2268 );
2269 }
2270
2271 #[test_traced]
2272 fn test_equivocating_leader_shard_blocks_peer() {
2273 let fixture = Fixture::<C>::default();
2274 fixture.start(
2275 |config, context, oracle, mut peers, _, coding_config| async move {
2276 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2277 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
2278 let commitment = coded_block1.commitment();
2279
2280 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 200);
2282 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
2283
2284 let peer2_index = peers[2].index.get() as u16;
2286 let shard_bytes1 = coded_block1
2287 .shard(peer2_index)
2288 .expect("missing shard")
2289 .encode();
2290 let mut equivocating_shard =
2291 coded_block2.shard(peer2_index).expect("missing shard");
2292 equivocating_shard.commitment = commitment;
2294 let shard_bytes2 = equivocating_shard.encode();
2295
2296 let peer2_pk = peers[2].public_key.clone();
2297 let leader = peers[0].public_key.clone();
2298
2299 peers[2]
2301 .mailbox
2302 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2303 .await;
2304
2305 peers[0]
2307 .sender
2308 .send(Recipients::One(peer2_pk.clone()), shard_bytes1, true)
2309 .await
2310 .expect("send failed");
2311 context.sleep(config.link.latency * 2).await;
2312
2313 peers[0]
2315 .sender
2316 .send(Recipients::One(peer2_pk), shard_bytes2, true)
2317 .await
2318 .expect("send failed");
2319 context.sleep(config.link.latency * 2).await;
2320
2321 assert_blocked(&oracle, &peers[2].public_key, &peers[0].public_key).await;
2323 },
2324 );
2325 }
2326
2327 #[test_traced]
2328 fn test_non_leader_wrong_index_shard_blocked() {
2329 let fixture = Fixture::<C>::default();
2332 fixture.start(
2333 |config, context, oracle, mut peers, _, coding_config| async move {
2334 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2335 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2336 let commitment = coded_block.commitment();
2337
2338 let peer2_index = peers[2].index.get() as u16;
2340 let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2341 let shard_bytes = peer2_shard.encode();
2342
2343 let peer2_pk = peers[2].public_key.clone();
2344 let leader = peers[0].public_key.clone();
2345
2346 peers[2]
2348 .mailbox
2349 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2350 .await;
2351
2352 peers[1]
2355 .sender
2356 .send(Recipients::One(peer2_pk), shard_bytes, true)
2357 .await
2358 .expect("send failed");
2359 context.sleep(config.link.latency * 2).await;
2360
2361 assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2363 },
2364 );
2365 }
2366
2367 #[test_traced]
2368 fn test_buffered_wrong_index_shard_blocked_on_leader_arrival() {
2369 let fixture = Fixture::<C>::default();
2372 fixture.start(
2373 |config, context, oracle, mut peers, _, coding_config| async move {
2374 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2375 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2376 let commitment = coded_block.commitment();
2377
2378 let peer2_index = peers[2].index.get() as u16;
2380 let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2381 let shard_bytes = peer2_shard.encode();
2382
2383 let peer2_pk = peers[2].public_key.clone();
2384
2385 peers[1]
2388 .sender
2389 .send(Recipients::One(peer2_pk), shard_bytes, true)
2390 .await
2391 .expect("send failed");
2392 context.sleep(config.link.latency * 2).await;
2393
2394 let blocked = oracle.blocked().await.unwrap();
2396 assert!(
2397 blocked.is_empty(),
2398 "no peers should be blocked while leader is unknown"
2399 );
2400
2401 let leader = peers[0].public_key.clone();
2405 peers[2]
2406 .mailbox
2407 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2408 .await;
2409 context.sleep(Duration::from_millis(10)).await;
2410
2411 assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2412 },
2413 );
2414 }
2415
2416 #[test_traced]
2417 fn test_conflicting_external_proposed_ignored() {
2418 let fixture = Fixture::<C>::default();
2419 fixture.start(
2420 |config, context, oracle, mut peers, _, coding_config| async move {
2421 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2422 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2423 let commitment = coded_block.commitment();
2424
2425 let peer2_index = peers[2].index.get() as u16;
2427 let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2428 let shard_bytes = peer2_shard.encode();
2429
2430 let peer2_pk = peers[2].public_key.clone();
2431 let leader_a = peers[0].public_key.clone();
2432 let leader_b = peers[1].public_key.clone();
2433
2434 let shard_sub = peers[2]
2436 .mailbox
2437 .subscribe_assigned_shard_verified(commitment)
2438 .await;
2439
2440 peers[2]
2442 .mailbox
2443 .discovered(
2444 commitment,
2445 leader_a.clone(),
2446 Round::new(Epoch::zero(), View::new(1)),
2447 )
2448 .await;
2449 peers[2]
2451 .mailbox
2452 .discovered(
2453 commitment,
2454 leader_b,
2455 Round::new(Epoch::zero(), View::new(1)),
2456 )
2457 .await;
2458
2459 peers[0]
2461 .sender
2462 .send(Recipients::One(peer2_pk.clone()), shard_bytes.clone(), true)
2463 .await
2464 .expect("send failed");
2465 context.sleep(config.link.latency * 2).await;
2466
2467 select! {
2469 _ = shard_sub => {},
2470 _ = context.sleep(Duration::from_secs(5)) => {
2471 panic!("subscription did not complete after shard from original leader");
2472 },
2473 };
2474
2475 peers[1]
2477 .sender
2478 .send(Recipients::One(peer2_pk), shard_bytes, true)
2479 .await
2480 .expect("send failed");
2481 context.sleep(config.link.latency * 2).await;
2482
2483 assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2484
2485 let blocked_peers = oracle.blocked().await.unwrap();
2487 let leader_a_blocked = blocked_peers
2488 .iter()
2489 .any(|(a, b)| a == &peers[2].public_key && b == &leader_a);
2490 assert!(
2491 !leader_a_blocked,
2492 "original leader should not be blocked after conflicting leader update"
2493 );
2494 },
2495 );
2496 }
2497
2498 #[test_traced]
2499 fn test_non_participant_external_proposed_ignored() {
2500 let fixture = Fixture::<C>::default();
2501 fixture.start(
2502 |config, context, oracle, mut peers, _, coding_config| async move {
2503 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2504 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2505 let commitment = coded_block.commitment();
2506
2507 let peer2_index = peers[2].index.get() as u16;
2509 let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2510 let shard_bytes = peer2_shard.encode();
2511
2512 let peer2_pk = peers[2].public_key.clone();
2513 let leader = peers[0].public_key.clone();
2514 let non_participant_leader = PrivateKey::from_seed(10_000).public_key();
2515
2516 let shard_sub = peers[2]
2518 .mailbox
2519 .subscribe_assigned_shard_verified(commitment)
2520 .await;
2521
2522 peers[2]
2524 .mailbox
2525 .discovered(
2526 commitment,
2527 non_participant_leader,
2528 Round::new(Epoch::zero(), View::new(1)),
2529 )
2530 .await;
2531
2532 peers[0]
2534 .sender
2535 .send(Recipients::One(peer2_pk.clone()), shard_bytes.clone(), true)
2536 .await
2537 .expect("send failed");
2538 context.sleep(config.link.latency * 2).await;
2539
2540 let blocked = oracle.blocked().await.unwrap();
2541 let leader_blocked = blocked
2542 .iter()
2543 .any(|(a, b)| a == &peers[2].public_key && b == &leader);
2544 assert!(
2545 !leader_blocked,
2546 "leader should not be blocked when non-participant update is ignored"
2547 );
2548
2549 peers[2]
2551 .mailbox
2552 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2553 .await;
2554 context.sleep(config.link.latency * 2).await;
2555
2556 select! {
2557 _ = shard_sub => {},
2558 _ = context.sleep(Duration::from_secs(5)) => {
2559 panic!("subscription did not complete after valid leader update");
2560 },
2561 };
2562 },
2563 );
2564 }
2565
2566 #[test_traced]
2567 fn test_shard_from_non_participant_blocks_peer() {
2568 let fixture = Fixture::<C>::default();
2569 fixture.start(
2570 |config, context, oracle, peers, _, coding_config| async move {
2571 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2572 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2573 let commitment = coded_block.commitment();
2574
2575 let leader = peers[0].public_key.clone();
2576 let receiver_pk = peers[2].public_key.clone();
2577
2578 let non_participant_key = PrivateKey::from_seed(10_000);
2579 let non_participant_pk = non_participant_key.public_key();
2580
2581 let non_participant_control = oracle.control(non_participant_pk.clone());
2582 let (mut non_participant_sender, _non_participant_receiver) =
2583 non_participant_control
2584 .register(0, TEST_QUOTA)
2585 .await
2586 .expect("registration should succeed");
2587 oracle
2588 .add_link(
2589 non_participant_pk.clone(),
2590 receiver_pk.clone(),
2591 DEFAULT_LINK,
2592 )
2593 .await
2594 .expect("link should be added");
2595 oracle
2596 .manager()
2597 .track(
2598 2,
2599 TrackedPeers::new(
2600 Set::from_iter_dedup(peers.iter().map(|peer| peer.public_key.clone())),
2601 Set::from_iter_dedup([non_participant_pk.clone()]),
2602 ),
2603 )
2604 .await;
2605 context.sleep(Duration::from_millis(10)).await;
2606
2607 peers[2]
2608 .mailbox
2609 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2610 .await;
2611
2612 let peer2_index = peers[2].index.get() as u16;
2613 let shard = coded_block.shard(peer2_index).expect("missing shard");
2614 let shard_bytes = shard.encode();
2615
2616 non_participant_sender
2617 .send(Recipients::One(receiver_pk), shard_bytes, true)
2618 .await
2619 .expect("send failed");
2620 context.sleep(config.link.latency * 2).await;
2621
2622 assert_blocked(&oracle, &peers[2].public_key, &non_participant_pk).await;
2623 },
2624 );
2625 }
2626
2627 #[test_traced]
2628 fn test_preleader_shard_from_non_participant_is_not_buffered() {
2629 let fixture = Fixture::<C>::default();
2630 fixture.start(
2631 |config, context, oracle, peers, _, coding_config| async move {
2632 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2633 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2634 let commitment = coded_block.commitment();
2635
2636 let leader = peers[0].public_key.clone();
2637 let receiver_pk = peers[2].public_key.clone();
2638
2639 let non_participant_key = PrivateKey::from_seed(10_000);
2640 let non_participant_pk = non_participant_key.public_key();
2641
2642 let non_participant_control = oracle.control(non_participant_pk.clone());
2643 let (mut non_participant_sender, _non_participant_receiver) =
2644 non_participant_control
2645 .register(0, TEST_QUOTA)
2646 .await
2647 .expect("registration should succeed");
2648 oracle
2649 .add_link(
2650 non_participant_pk.clone(),
2651 receiver_pk.clone(),
2652 DEFAULT_LINK,
2653 )
2654 .await
2655 .expect("link should be added");
2656 oracle
2657 .manager()
2658 .track(
2659 2,
2660 TrackedPeers::new(
2661 Set::from_iter_dedup(peers.iter().map(|peer| peer.public_key.clone())),
2662 Set::from_iter_dedup([non_participant_pk.clone()]),
2663 ),
2664 )
2665 .await;
2666 context.sleep(Duration::from_millis(10)).await;
2667
2668 let peer2_index = peers[2].index.get() as u16;
2669 let shard = coded_block.shard(peer2_index).expect("missing shard");
2670 let shard_bytes = shard.encode();
2671 let mut shard_sub = peers[2]
2672 .mailbox
2673 .subscribe_assigned_shard_verified(commitment)
2674 .await;
2675
2676 non_participant_sender
2677 .send(Recipients::One(receiver_pk), shard_bytes, true)
2678 .await
2679 .expect("send failed");
2680 context.sleep(config.link.latency * 2).await;
2681
2682 peers[2]
2683 .mailbox
2684 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2685 .await;
2686 context.sleep(config.link.latency * 2).await;
2687
2688 let blocked = oracle.blocked().await.unwrap();
2689 let non_participant_blocked = blocked
2690 .iter()
2691 .any(|(a, b)| a == &peers[2].public_key && b == &non_participant_pk);
2692 assert!(
2693 !non_participant_blocked,
2694 "non-participant should not be blocked when its pre-leader shard is ignored"
2695 );
2696 assert!(
2697 matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
2698 "pre-leader shard from non-participant should not be buffered"
2699 );
2700 },
2701 );
2702 }
2703
2704 #[test_traced]
2705 fn test_duplicate_shard_ignored() {
2706 let fixture: Fixture<C> = Fixture {
2708 num_peers: 10,
2709 ..Default::default()
2710 };
2711
2712 fixture.start(
2713 |config, context, oracle, mut peers, _, coding_config| async move {
2714 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2715 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2716
2717 let peer2_index = peers[2].index.get() as u16;
2719 let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2720
2721 let peer1_index = peers[1].index.get() as u16;
2723 let peer1_shard = coded_block.shard(peer1_index).expect("missing shard");
2724
2725 let peer2_pk = peers[2].public_key.clone();
2726 let leader = peers[0].public_key.clone();
2727
2728 peers[2]
2730 .mailbox
2731 .discovered(
2732 coded_block.commitment(),
2733 leader,
2734 Round::new(Epoch::zero(), View::new(1)),
2735 )
2736 .await;
2737
2738 let leader_shard_bytes = peer2_shard.encode();
2740 peers[0]
2741 .sender
2742 .send(Recipients::One(peer2_pk.clone()), leader_shard_bytes, true)
2743 .await
2744 .expect("send failed");
2745 context.sleep(config.link.latency * 2).await;
2746
2747 let peer1_shard_bytes = peer1_shard.encode();
2749 peers[1]
2750 .sender
2751 .send(
2752 Recipients::One(peer2_pk.clone()),
2753 peer1_shard_bytes.clone(),
2754 true,
2755 )
2756 .await
2757 .expect("send failed");
2758 context.sleep(config.link.latency * 2).await;
2759
2760 peers[1]
2763 .sender
2764 .send(Recipients::One(peer2_pk), peer1_shard_bytes, true)
2765 .await
2766 .expect("send failed");
2767 context.sleep(config.link.latency * 2).await;
2768
2769 let blocked_peers = oracle.blocked().await.unwrap();
2771 let is_blocked = blocked_peers
2772 .iter()
2773 .any(|(a, b)| a == &peers[2].public_key && b == &peers[1].public_key);
2774 assert!(
2775 !is_blocked,
2776 "peer should not be blocked for exact duplicate shard"
2777 );
2778 },
2779 );
2780 }
2781
2782 #[test_traced]
2783 fn test_equivocating_shard_blocks_peer() {
2784 let fixture: Fixture<C> = Fixture {
2786 num_peers: 10,
2787 ..Default::default()
2788 };
2789
2790 fixture.start(
2791 |config, context, oracle, mut peers, _, coding_config| async move {
2792 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2793 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
2794
2795 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 200);
2797 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
2798
2799 let peer1_index = peers[1].index.get() as u16;
2801 let peer1_shard = coded_block1.shard(peer1_index).expect("missing shard");
2802
2803 let mut peer1_equivocating_shard =
2805 coded_block2.shard(peer1_index).expect("missing shard");
2806 peer1_equivocating_shard.commitment = coded_block1.commitment();
2809
2810 let peer2_pk = peers[2].public_key.clone();
2811 let leader = peers[0].public_key.clone();
2812
2813 peers[2]
2815 .mailbox
2816 .discovered(
2817 coded_block1.commitment(),
2818 leader,
2819 Round::new(Epoch::zero(), View::new(1)),
2820 )
2821 .await;
2822
2823 let peer2_index = peers[2].index.get() as u16;
2825 let leader_shard = coded_block1.shard(peer2_index).expect("missing shard");
2826 let leader_shard_bytes = leader_shard.encode();
2827 peers[0]
2828 .sender
2829 .send(Recipients::One(peer2_pk.clone()), leader_shard_bytes, true)
2830 .await
2831 .expect("send failed");
2832 context.sleep(config.link.latency * 2).await;
2833
2834 let shard_bytes = peer1_shard.encode();
2836 peers[1]
2837 .sender
2838 .send(Recipients::One(peer2_pk.clone()), shard_bytes, true)
2839 .await
2840 .expect("send failed");
2841 context.sleep(config.link.latency * 2).await;
2842
2843 let equivocating_bytes = peer1_equivocating_shard.encode();
2845 peers[1]
2846 .sender
2847 .send(Recipients::One(peer2_pk), equivocating_bytes, true)
2848 .await
2849 .expect("send failed");
2850 context.sleep(config.link.latency * 2).await;
2851
2852 assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2854 },
2855 );
2856 }
2857
2858 #[test_traced]
2859 fn test_reconstruction_states_pruned_at_or_below_reconstructed_view() {
2860 let fixture: Fixture<C> = Fixture {
2862 num_peers: 10,
2863 ..Default::default()
2864 };
2865
2866 fixture.start(
2867 |config, context, oracle, mut peers, _, coding_config| async move {
2868 let block_a = CodedBlock::<B, C, H>::new(
2870 B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100),
2871 coding_config,
2872 &STRATEGY,
2873 );
2874 let commitment_a = block_a.commitment();
2875
2876 let block_b = CodedBlock::<B, C, H>::new(
2878 B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200),
2879 coding_config,
2880 &STRATEGY,
2881 );
2882 let commitment_b = block_b.commitment();
2883
2884 let peer2_pk = peers[2].public_key.clone();
2885 let leader = peers[0].public_key.clone();
2886
2887 peers[2]
2889 .mailbox
2890 .discovered(
2891 commitment_a,
2892 leader.clone(),
2893 Round::new(Epoch::zero(), View::new(1)),
2894 )
2895 .await;
2896 let shard_a = block_a
2897 .shard(peers[1].index.get() as u16)
2898 .expect("missing shard")
2899 .encode();
2900 peers[1]
2901 .sender
2902 .send(Recipients::One(peer2_pk.clone()), shard_a.clone(), true)
2903 .await
2904 .expect("send failed");
2905 context.sleep(config.link.latency * 2).await;
2906
2907 peers[2]
2909 .mailbox
2910 .discovered(
2911 commitment_b,
2912 leader,
2913 Round::new(Epoch::zero(), View::new(2)),
2914 )
2915 .await;
2916 let leader_shard_b = block_b
2918 .shard(peers[2].index.get() as u16)
2919 .expect("missing shard")
2920 .encode();
2921 peers[0]
2922 .sender
2923 .send(Recipients::One(peer2_pk.clone()), leader_shard_b, true)
2924 .await
2925 .expect("send failed");
2926
2927 for i in [1usize, 3usize, 4usize] {
2929 let shard = block_b
2930 .shard(peers[i].index.get() as u16)
2931 .expect("missing shard")
2932 .encode();
2933 peers[i]
2934 .sender
2935 .send(Recipients::One(peer2_pk.clone()), shard, true)
2936 .await
2937 .expect("send failed");
2938 }
2939 context.sleep(config.link.latency * 4).await;
2940
2941 let reconstructed = peers[2]
2943 .mailbox
2944 .get(commitment_b)
2945 .await
2946 .expect("block B should reconstruct");
2947 assert_eq!(reconstructed.commitment(), commitment_b);
2948
2949 peers[1]
2952 .sender
2953 .send(Recipients::One(peer2_pk), shard_a, true)
2954 .await
2955 .expect("send failed");
2956 context.sleep(config.link.latency * 2).await;
2957
2958 let blocked = oracle.blocked().await.unwrap();
2959 let blocked_peer1 = blocked
2960 .iter()
2961 .any(|(a, b)| a == &peers[2].public_key && b == &peers[1].public_key);
2962 assert!(
2963 !blocked_peer1,
2964 "peer1 should not be blocked after lower-view state was pruned"
2965 );
2966 },
2967 );
2968 }
2969
2970 #[test_traced]
2971 fn test_pending_shards_batch_validated_at_quorum() {
2972 let fixture: Fixture<C> = Fixture {
2981 num_peers: 10,
2982 ..Default::default()
2983 };
2984
2985 fixture.start(
2986 |config, context, oracle, mut peers, _, coding_config| async move {
2987 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2988 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2989 let commitment = coded_block.commitment();
2990
2991 let peer3_pk = peers[3].public_key.clone();
2992 let leader = peers[0].public_key.clone();
2993
2994 peers[3]
2996 .mailbox
2997 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2998 .await;
2999
3000 for &sender_idx in &[1, 2, 4] {
3003 let shard = coded_block
3004 .shard(peers[sender_idx].index.get() as u16)
3005 .expect("missing shard");
3006 let shard_bytes = shard.encode();
3007 peers[sender_idx]
3008 .sender
3009 .send(Recipients::One(peer3_pk.clone()), shard_bytes, true)
3010 .await
3011 .expect("send failed");
3012 }
3013
3014 context.sleep(config.link.latency * 2).await;
3015
3016 let block = peers[3].mailbox.get(commitment).await;
3018 assert!(block.is_none(), "block should not be reconstructed yet");
3019
3020 let peer3_index = peers[3].index.get() as u16;
3024 let leader_shard = coded_block.shard(peer3_index).expect("missing shard");
3025 let leader_shard_bytes = leader_shard.encode();
3026 peers[0]
3027 .sender
3028 .send(Recipients::One(peer3_pk), leader_shard_bytes, true)
3029 .await
3030 .expect("send failed");
3031
3032 context.sleep(config.link.latency * 2).await;
3033
3034 let blocked = oracle.blocked().await.unwrap();
3036 assert!(
3037 blocked.is_empty(),
3038 "no peers should be blocked for valid pending shards"
3039 );
3040
3041 let block = peers[3].mailbox.get(commitment).await;
3043 assert!(
3044 block.is_some(),
3045 "block should be reconstructed after batch validation"
3046 );
3047
3048 let reconstructed = block.unwrap();
3050 assert_eq!(
3051 reconstructed.commitment(),
3052 commitment,
3053 "reconstructed block should have correct commitment"
3054 );
3055 },
3056 );
3057 }
3058
3059 #[test_traced]
3060 fn test_peer_shards_buffered_until_external_proposed() {
3061 let fixture: Fixture<C> = Fixture {
3064 num_peers: 10,
3065 ..Default::default()
3066 };
3067
3068 fixture.start(
3069 |config, context, oracle, mut peers, _, coding_config| async move {
3070 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3071 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3072 let commitment = coded_block.commitment();
3073
3074 let receiver_idx = 3usize;
3075 let receiver_pk = peers[receiver_idx].public_key.clone();
3076 let leader = peers[0].public_key.clone();
3077
3078 let mut shard_sub = peers[receiver_idx]
3080 .mailbox
3081 .subscribe_assigned_shard_verified(commitment)
3082 .await;
3083
3084 let leader_shard = coded_block
3087 .shard(peers[receiver_idx].index.get() as u16)
3088 .expect("missing shard")
3089 .encode();
3090 peers[0]
3091 .sender
3092 .send(Recipients::One(receiver_pk.clone()), leader_shard, true)
3093 .await
3094 .expect("send failed");
3095
3096 for i in [1usize, 2usize, 4usize] {
3097 let shard = coded_block
3098 .shard(peers[i].index.get() as u16)
3099 .expect("missing shard")
3100 .encode();
3101 peers[i]
3102 .sender
3103 .send(Recipients::One(receiver_pk.clone()), shard, true)
3104 .await
3105 .expect("send failed");
3106 }
3107
3108 context.sleep(config.link.latency * 2).await;
3109
3110 assert!(
3112 matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
3113 "shard subscription should not resolve before leader announcement"
3114 );
3115 assert!(
3116 peers[receiver_idx].mailbox.get(commitment).await.is_none(),
3117 "block should not reconstruct before leader announcement"
3118 );
3119
3120 peers[receiver_idx]
3122 .mailbox
3123 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
3124 .await;
3125
3126 select! {
3127 _ = shard_sub => {},
3128 _ = context.sleep(Duration::from_secs(5)) => {
3129 panic!("shard subscription did not resolve after leader announcement");
3130 },
3131 }
3132
3133 context.sleep(config.link.latency * 2).await;
3134 assert!(
3135 peers[receiver_idx].mailbox.get(commitment).await.is_some(),
3136 "block should reconstruct after buffered shards are ingested"
3137 );
3138
3139 assert!(
3141 oracle.blocked().await.unwrap().is_empty(),
3142 "no peers should be blocked for valid buffered shards"
3143 );
3144 },
3145 );
3146 }
3147
3148 #[test_traced]
3149 fn test_post_leader_shards_processed_immediately() {
3150 let fixture: Fixture<C> = Fixture {
3153 num_peers: 10,
3154 ..Default::default()
3155 };
3156
3157 fixture.start(
3158 |config, context, oracle, mut peers, _, coding_config| async move {
3159 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3160 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3161 let commitment = coded_block.commitment();
3162
3163 let receiver_idx = 3usize;
3164 let receiver_pk = peers[receiver_idx].public_key.clone();
3165 let leader = peers[0].public_key.clone();
3166
3167 let shard_sub = peers[receiver_idx]
3168 .mailbox
3169 .subscribe_assigned_shard_verified(commitment)
3170 .await;
3171 peers[receiver_idx]
3172 .mailbox
3173 .discovered(
3174 commitment,
3175 leader.clone(),
3176 Round::new(Epoch::zero(), View::new(1)),
3177 )
3178 .await;
3179
3180 let leader_shard = coded_block
3182 .shard(peers[receiver_idx].index.get() as u16)
3183 .expect("missing shard")
3184 .encode();
3185 peers[0]
3186 .sender
3187 .send(Recipients::One(receiver_pk.clone()), leader_shard, true)
3188 .await
3189 .expect("send failed");
3190
3191 select! {
3193 _ = shard_sub => {},
3194 _ = context.sleep(Duration::from_secs(5)) => {
3195 panic!("shard subscription did not resolve after post-leader shard");
3196 },
3197 }
3198
3199 for i in [1usize, 2usize, 4usize] {
3201 let shard = coded_block
3202 .shard(peers[i].index.get() as u16)
3203 .expect("missing shard")
3204 .encode();
3205 peers[i]
3206 .sender
3207 .send(Recipients::One(receiver_pk.clone()), shard, true)
3208 .await
3209 .expect("send failed");
3210 }
3211
3212 context.sleep(config.link.latency * 2).await;
3213 let reconstructed = peers[receiver_idx]
3214 .mailbox
3215 .get(commitment)
3216 .await
3217 .expect("block should reconstruct from post-leader shards");
3218 assert_eq!(reconstructed.commitment(), commitment);
3219
3220 assert!(
3221 oracle.blocked().await.unwrap().is_empty(),
3222 "no peers should be blocked for valid post-leader shards"
3223 );
3224 },
3225 );
3226 }
3227
3228 #[test_traced]
3229 fn test_invalid_shard_codec_blocks_peer() {
3230 let fixture: Fixture<C> = Fixture {
3232 num_peers: 4,
3233 ..Default::default()
3234 };
3235
3236 fixture.start(
3237 |config, context, oracle, mut peers, _, _coding_config| async move {
3238 let peer0_pk = peers[0].public_key.clone();
3239 let peer1_pk = peers[1].public_key.clone();
3240
3241 let garbage = Bytes::from(vec![0xFF, 0xFE, 0xFD, 0xFC, 0xFB]);
3243 peers[1]
3244 .sender
3245 .send(Recipients::One(peer0_pk.clone()), garbage, true)
3246 .await
3247 .expect("send failed");
3248
3249 context.sleep(config.link.latency * 2).await;
3250
3251 assert_blocked(&oracle, &peer0_pk, &peer1_pk).await;
3253 },
3254 );
3255 }
3256
3257 #[test_traced]
3258 fn test_duplicate_buffered_shard_does_not_block_before_leader() {
3259 let fixture: Fixture<C> = Fixture {
3262 ..Default::default()
3263 };
3264
3265 fixture.start(
3266 |config, context, oracle, mut peers, _, coding_config| async move {
3267 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3268 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3269
3270 let peer2_index = peers[2].index.get() as u16;
3272 let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
3273 let shard_bytes = peer2_shard.encode();
3274
3275 let peer2_pk = peers[2].public_key.clone();
3276
3277 peers[1]
3281 .sender
3282 .send(Recipients::One(peer2_pk.clone()), shard_bytes.clone(), true)
3283 .await
3284 .expect("send failed");
3285 context.sleep(config.link.latency * 2).await;
3286
3287 let blocked = oracle.blocked().await.unwrap();
3289 assert!(blocked.is_empty(), "no peers should be blocked yet");
3290
3291 peers[1]
3293 .sender
3294 .send(Recipients::One(peer2_pk), shard_bytes, true)
3295 .await
3296 .expect("send failed");
3297 context.sleep(config.link.latency * 2).await;
3298
3299 let blocked = oracle.blocked().await.unwrap();
3301 assert!(
3302 blocked.is_empty(),
3303 "no peers should be blocked before leader"
3304 );
3305 },
3306 );
3307 }
3308
3309 #[test_traced]
3310 fn test_invalid_leader_shard_crypto_blocks_leader() {
3311 let fixture: Fixture<C> = Fixture {
3314 ..Default::default()
3315 };
3316
3317 fixture.start(
3318 |config, context, oracle, mut peers, _, coding_config| async move {
3319 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3322 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3323 let commitment1 = coded_block1.commitment();
3324
3325 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3326 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3327
3328 let peer2_index = peers[2].index.get() as u16;
3331 let mut wrong_shard = coded_block2.shard(peer2_index).expect("missing shard");
3332 wrong_shard.commitment = commitment1;
3333 let wrong_bytes = wrong_shard.encode();
3334
3335 let peer2_pk = peers[2].public_key.clone();
3336 let leader = peers[0].public_key.clone();
3337
3338 peers[2]
3340 .mailbox
3341 .discovered(commitment1, leader, Round::new(Epoch::zero(), View::new(1)))
3342 .await;
3343
3344 peers[0]
3346 .sender
3347 .send(Recipients::One(peer2_pk), wrong_bytes, true)
3348 .await
3349 .expect("send failed");
3350 context.sleep(config.link.latency * 2).await;
3351
3352 assert_blocked(&oracle, &peers[2].public_key, &peers[0].public_key).await;
3354 },
3355 );
3356 }
3357
3358 #[test_traced]
3359 fn test_shard_index_mismatch_blocks_peer() {
3360 let fixture: Fixture<C> = Fixture {
3363 num_peers: 10,
3364 ..Default::default()
3365 };
3366
3367 fixture.start(
3368 |config, context, oracle, mut peers, _, coding_config| async move {
3369 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3370 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3371 let commitment = coded_block.commitment();
3372
3373 let peer3_index = peers[3].index.get() as u16;
3375 let leader_shard = coded_block.shard(peer3_index).expect("missing shard");
3376
3377 let peer1_index = peers[1].index.get() as u16;
3379 let mut wrong_index_shard = coded_block.shard(peer1_index).expect("missing shard");
3380 wrong_index_shard.index = peers[4].index.get() as u16;
3382 let wrong_bytes = wrong_index_shard.encode();
3383
3384 let peer3_pk = peers[3].public_key.clone();
3385 let leader = peers[0].public_key.clone();
3386
3387 peers[3]
3389 .mailbox
3390 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
3391 .await;
3392 let shard_bytes = leader_shard.encode();
3393 peers[0]
3394 .sender
3395 .send(Recipients::One(peer3_pk.clone()), shard_bytes, true)
3396 .await
3397 .expect("send failed");
3398 context.sleep(config.link.latency * 2).await;
3399
3400 peers[1]
3402 .sender
3403 .send(Recipients::One(peer3_pk), wrong_bytes, true)
3404 .await
3405 .expect("send failed");
3406 context.sleep(config.link.latency * 2).await;
3407
3408 assert_blocked(&oracle, &peers[3].public_key, &peers[1].public_key).await;
3410 },
3411 );
3412 }
3413
3414 #[test_traced]
3415 fn test_invalid_shard_crypto_blocks_peer() {
3416 let fixture: Fixture<C> = Fixture {
3419 num_peers: 10,
3420 ..Default::default()
3421 };
3422
3423 fixture.start(
3424 |config, context, oracle, mut peers, _, coding_config| async move {
3425 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3427 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3428 let commitment1 = coded_block1.commitment();
3429
3430 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3431 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3432
3433 let peer3_index = peers[3].index.get() as u16;
3435 let leader_shard = coded_block1.shard(peer3_index).expect("missing shard");
3436
3437 let peer1_index = peers[1].index.get() as u16;
3440 let mut wrong_shard = coded_block2.shard(peer1_index).expect("missing shard");
3441 wrong_shard.commitment = commitment1;
3442 let wrong_bytes = wrong_shard.encode();
3443
3444 let peer3_pk = peers[3].public_key.clone();
3445 let leader = peers[0].public_key.clone();
3446
3447 peers[3]
3449 .mailbox
3450 .discovered(commitment1, leader, Round::new(Epoch::zero(), View::new(1)))
3451 .await;
3452 let shard_bytes = leader_shard.encode();
3453 peers[0]
3454 .sender
3455 .send(Recipients::One(peer3_pk.clone()), shard_bytes, true)
3456 .await
3457 .expect("send failed");
3458 context.sleep(config.link.latency * 2).await;
3459
3460 peers[1]
3462 .sender
3463 .send(Recipients::One(peer3_pk.clone()), wrong_bytes, true)
3464 .await
3465 .expect("send failed");
3466 context.sleep(config.link.latency * 2).await;
3467
3468 for &idx in &[2, 4] {
3472 let peer_index = peers[idx].index.get() as u16;
3473 let shard = coded_block1.shard(peer_index).expect("missing shard");
3474 let bytes = shard.encode();
3475 peers[idx]
3476 .sender
3477 .send(Recipients::One(peer3_pk.clone()), bytes, true)
3478 .await
3479 .expect("send failed");
3480 }
3481 context.sleep(config.link.latency * 2).await;
3482
3483 assert_blocked(&oracle, &peers[3].public_key, &peers[1].public_key).await;
3485 },
3486 );
3487 }
3488
3489 #[test_traced]
3490 fn test_reconstruction_recovers_after_quorum_with_one_invalid_shard() {
3491 let fixture: Fixture<C> = Fixture {
3496 num_peers: 10,
3497 ..Default::default()
3498 };
3499
3500 fixture.start(
3501 |config, context, oracle, mut peers, _, coding_config| async move {
3502 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3503 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3504 let commitment1 = coded_block1.commitment();
3505
3506 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3507 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3508
3509 let receiver_idx = 3usize;
3510 let receiver_pk = peers[receiver_idx].public_key.clone();
3511
3512 let peer1_index = peers[1].index.get() as u16;
3514 let mut invalid_shard = coded_block2.shard(peer1_index).expect("missing shard");
3515 invalid_shard.commitment = commitment1;
3516
3517 let leader = peers[0].public_key.clone();
3519 peers[receiver_idx]
3520 .mailbox
3521 .discovered(commitment1, leader, Round::new(Epoch::zero(), View::new(1)))
3522 .await;
3523 let leader_shard = coded_block1
3524 .shard(peers[receiver_idx].index.get() as u16)
3525 .expect("missing shard")
3526 .encode();
3527 peers[0]
3528 .sender
3529 .send(Recipients::One(receiver_pk.clone()), leader_shard, true)
3530 .await
3531 .expect("send failed");
3532
3533 peers[1]
3538 .sender
3539 .send(
3540 Recipients::One(receiver_pk.clone()),
3541 invalid_shard.encode(),
3542 true,
3543 )
3544 .await
3545 .expect("send failed");
3546 for idx in [2usize, 4usize] {
3547 let shard = coded_block1
3548 .shard(peers[idx].index.get() as u16)
3549 .expect("missing shard")
3550 .encode();
3551 peers[idx]
3552 .sender
3553 .send(Recipients::One(receiver_pk.clone()), shard, true)
3554 .await
3555 .expect("send failed");
3556 }
3557
3558 context.sleep(config.link.latency * 2).await;
3559
3560 assert_blocked(
3562 &oracle,
3563 &peers[receiver_idx].public_key,
3564 &peers[1].public_key,
3565 )
3566 .await;
3567 assert!(
3568 peers[receiver_idx].mailbox.get(commitment1).await.is_none(),
3569 "block should not reconstruct with only 3 checked shards"
3570 );
3571
3572 let extra_shard = coded_block1
3574 .shard(peers[5].index.get() as u16)
3575 .expect("missing shard")
3576 .encode();
3577 peers[5]
3578 .sender
3579 .send(Recipients::One(receiver_pk), extra_shard, true)
3580 .await
3581 .expect("send failed");
3582
3583 context.sleep(config.link.latency * 2).await;
3584
3585 let reconstructed = peers[receiver_idx]
3586 .mailbox
3587 .get(commitment1)
3588 .await
3589 .expect("block should reconstruct after additional valid shard");
3590 assert_eq!(reconstructed.commitment(), commitment1);
3591 },
3592 );
3593 }
3594
3595 #[test_traced]
3596 fn test_invalid_pending_shard_blocked_on_drain() {
3597 let fixture: Fixture<C> = Fixture {
3600 num_peers: 10,
3601 ..Default::default()
3602 };
3603
3604 fixture.start(
3605 |config, context, oracle, mut peers, _, coding_config| async move {
3606 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3608 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3609 let commitment1 = coded_block1.commitment();
3610
3611 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3612 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3613
3614 let peer1_index = peers[1].index.get() as u16;
3616 let mut wrong_shard = coded_block2.shard(peer1_index).expect("missing shard");
3617 wrong_shard.commitment = commitment1;
3618 let wrong_bytes = wrong_shard.encode();
3619
3620 let peer3_pk = peers[3].public_key.clone();
3621
3622 peers[1]
3625 .sender
3626 .send(Recipients::One(peer3_pk.clone()), wrong_bytes, true)
3627 .await
3628 .expect("send failed");
3629 context.sleep(config.link.latency * 2).await;
3630
3631 let blocked = oracle.blocked().await.unwrap();
3633 assert!(blocked.is_empty(), "no peers should be blocked yet");
3634
3635 for &idx in &[2, 4] {
3639 let peer_index = peers[idx].index.get() as u16;
3640 let shard = coded_block1.shard(peer_index).expect("missing shard");
3641 let bytes = shard.encode();
3642 peers[idx]
3643 .sender
3644 .send(Recipients::One(peer3_pk.clone()), bytes, true)
3645 .await
3646 .expect("send failed");
3647 }
3648 context.sleep(config.link.latency * 2).await;
3649
3650 let blocked = oracle.blocked().await.unwrap();
3652 assert!(blocked.is_empty(), "no peers should be blocked yet");
3653
3654 let leader = peers[0].public_key.clone();
3656 peers[3]
3657 .mailbox
3658 .discovered(commitment1, leader, Round::new(Epoch::zero(), View::new(1)))
3659 .await;
3660 let peer3_index = peers[3].index.get() as u16;
3661 let leader_shard = coded_block1.shard(peer3_index).expect("missing shard");
3662 let shard_bytes = leader_shard.encode();
3663 peers[0]
3664 .sender
3665 .send(Recipients::One(peer3_pk), shard_bytes, true)
3666 .await
3667 .expect("send failed");
3668 context.sleep(config.link.latency * 2).await;
3669
3670 assert_blocked(&oracle, &peers[3].public_key, &peers[1].public_key).await;
3673 },
3674 );
3675 }
3676
3677 #[test_traced]
3678 fn test_cross_epoch_buffered_shard_not_blocked() {
3679 let executor = deterministic::Runner::default();
3680 executor.start(|context| async move {
3681 let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
3682 context.with_label("network"),
3683 simulated::Config {
3684 max_size: MAX_SHARD_SIZE as u32,
3685 disconnect_on_block: true,
3686 tracked_peer_sets: NZUsize!(1),
3687 },
3688 );
3689 network.start();
3690
3691 let mut epoch0_keys: Vec<PrivateKey> = (0..4).map(PrivateKey::from_seed).collect();
3694 epoch0_keys.sort_by_key(|s| s.public_key());
3695 let epoch0_pks: Vec<P> = epoch0_keys.iter().map(|c| c.public_key()).collect();
3696 let epoch0_set: Set<P> = Set::from_iter_dedup(epoch0_pks.clone());
3697
3698 let future_peer_key = PrivateKey::from_seed(4);
3699 let future_peer_pk = future_peer_key.public_key();
3700 let mut epoch1_pks: Vec<P> = epoch0_pks[..3]
3701 .iter()
3702 .cloned()
3703 .chain(std::iter::once(future_peer_pk.clone()))
3704 .collect();
3705 epoch1_pks.sort();
3706 let epoch1_set: Set<P> = Set::from_iter_dedup(epoch1_pks);
3707
3708 let receiver_idx_in_epoch0 = epoch0_set
3709 .index(&epoch0_pks[0])
3710 .expect("receiver must be in epoch 0")
3711 .get() as usize;
3712 let receiver_key = epoch0_keys[receiver_idx_in_epoch0].clone();
3713 let receiver_pk = receiver_key.public_key();
3714
3715 let receiver_control = oracle.control(receiver_pk.clone());
3716 let (sender_handle, receiver_handle) = receiver_control
3717 .register(0, TEST_QUOTA)
3718 .await
3719 .expect("registration should succeed");
3720
3721 let future_peer_control = oracle.control(future_peer_pk.clone());
3722 let (mut future_peer_sender, _future_peer_receiver) = future_peer_control
3723 .register(0, TEST_QUOTA)
3724 .await
3725 .expect("registration should succeed");
3726 oracle
3727 .add_link(future_peer_pk.clone(), receiver_pk.clone(), DEFAULT_LINK)
3728 .await
3729 .expect("link should be added");
3730 oracle
3731 .manager()
3732 .track(
3733 0,
3734 Set::from_iter_dedup([receiver_pk.clone(), future_peer_pk.clone()]),
3735 )
3736 .await;
3737 context.sleep(Duration::from_millis(10)).await;
3738
3739 let scheme_epoch0 =
3741 Scheme::signer(SCHEME_NAMESPACE, epoch0_set.clone(), receiver_key.clone())
3742 .expect("signer scheme should be created");
3743 let scheme_epoch1 =
3744 Scheme::signer(SCHEME_NAMESPACE, epoch1_set.clone(), receiver_key.clone())
3745 .expect("signer scheme should be created");
3746 let scheme_provider =
3747 MultiEpochProvider::single(scheme_epoch0).with_epoch(Epoch::new(1), scheme_epoch1);
3748
3749 let config: Config<_, _, _, _, C, _, _, _> = Config {
3750 scheme_provider,
3751 blocker: receiver_control.clone(),
3752 shard_codec_cfg: CodecConfig {
3753 maximum_shard_size: MAX_SHARD_SIZE,
3754 },
3755 block_codec_cfg: (),
3756 strategy: STRATEGY,
3757 mailbox_size: 1024,
3758 peer_buffer_size: NZUsize!(64),
3759 background_channel_capacity: 1024,
3760 peer_provider: oracle.manager(),
3761 };
3762
3763 let (engine, mailbox) = ShardEngine::new(context.with_label("receiver"), config);
3764 engine.start((sender_handle, receiver_handle));
3765
3766 let coding_config = coding_config_for_participants(epoch1_set.len() as u16);
3768 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3769 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3770 let commitment = coded_block.commitment();
3771
3772 let future_peer_index = epoch1_set
3774 .index(&future_peer_pk)
3775 .expect("future peer must be in epoch 1");
3776 let future_shard = coded_block
3777 .shard(future_peer_index.get() as u16)
3778 .expect("missing shard");
3779 let shard_bytes = future_shard.encode();
3780
3781 future_peer_sender
3783 .send(Recipients::One(receiver_pk.clone()), shard_bytes, true)
3784 .await
3785 .expect("send failed");
3786 context.sleep(DEFAULT_LINK.latency * 2).await;
3787
3788 let blocked = oracle.blocked().await.unwrap();
3790 assert!(
3791 blocked.is_empty(),
3792 "no peers should be blocked while shard is buffered"
3793 );
3794
3795 let leader = epoch0_pks[1].clone();
3797 mailbox
3798 .discovered(commitment, leader, Round::new(Epoch::new(1), View::new(1)))
3799 .await;
3800 context.sleep(DEFAULT_LINK.latency * 2).await;
3801
3802 let blocked = oracle.blocked().await.unwrap();
3805 assert!(
3806 blocked.is_empty(),
3807 "future-epoch participant should not be blocked: {blocked:?}"
3808 );
3809 });
3810 }
3811
3812 #[test_traced]
3813 fn test_shard_broadcast_survives_provider_churn() {
3814 let executor = deterministic::Runner::default();
3815 executor.start(|context| async move {
3816 let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
3817 context.with_label("network"),
3818 simulated::Config {
3819 max_size: MAX_SHARD_SIZE as u32,
3820 disconnect_on_block: true,
3821 tracked_peer_sets: NZUsize!(1),
3822 },
3823 );
3824 network.start();
3825
3826 let mut private_keys: Vec<PrivateKey> = (0..4).map(PrivateKey::from_seed).collect();
3827 private_keys.sort_by_key(|s| s.public_key());
3828 let peer_keys: Vec<P> = private_keys.iter().map(|k| k.public_key()).collect();
3829 let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
3830
3831 let leader_idx = 0usize;
3832 let broadcaster_idx = 1usize;
3833 let receiver_idx = 2usize;
3834
3835 let leader_pk = peer_keys[leader_idx].clone();
3836 let broadcaster_pk = peer_keys[broadcaster_idx].clone();
3837 let receiver_pk = peer_keys[receiver_idx].clone();
3838
3839 let mut registrations = BTreeMap::new();
3840 for key in &peer_keys {
3841 let control = oracle.control(key.clone());
3842 let (sender, receiver) = control
3843 .register(0, TEST_QUOTA)
3844 .await
3845 .expect("registration should succeed");
3846 registrations.insert(key.clone(), (control, sender, receiver));
3847 }
3848
3849 for src in &peer_keys {
3850 for dst in &peer_keys {
3851 if src == dst {
3852 continue;
3853 }
3854 oracle
3855 .add_link(src.clone(), dst.clone(), DEFAULT_LINK)
3856 .await
3857 .expect("link should be added");
3858 }
3859 }
3860 oracle.manager().track(0, participants.clone()).await;
3861 context.sleep(Duration::from_millis(10)).await;
3862
3863 let (_leader_control, mut leader_sender, _leader_receiver) = registrations
3864 .remove(&leader_pk)
3865 .expect("leader should be registered");
3866 let (broadcaster_control, broadcaster_sender, broadcaster_receiver) = registrations
3867 .remove(&broadcaster_pk)
3868 .expect("broadcaster should be registered");
3869 let (receiver_control, receiver_sender, receiver_receiver) = registrations
3870 .remove(&receiver_pk)
3871 .expect("receiver should be registered");
3872
3873 let broadcaster_scheme = Scheme::signer(
3874 SCHEME_NAMESPACE,
3875 participants.clone(),
3876 private_keys[broadcaster_idx].clone(),
3877 )
3878 .expect("signer scheme should be created");
3879 let broadcaster_provider = ChurningProvider::new(broadcaster_scheme, 3);
3883 let broadcaster_config: Config<_, _, _, _, C, _, _, _> = Config {
3884 scheme_provider: broadcaster_provider,
3885 blocker: broadcaster_control.clone(),
3886 shard_codec_cfg: CodecConfig {
3887 maximum_shard_size: MAX_SHARD_SIZE,
3888 },
3889 block_codec_cfg: (),
3890 strategy: STRATEGY,
3891 mailbox_size: 1024,
3892 peer_buffer_size: NZUsize!(64),
3893 background_channel_capacity: 1024,
3894 peer_provider: oracle.manager(),
3895 };
3896 let (broadcaster_engine, broadcaster_mailbox) =
3897 ChurningShardEngine::new(context.with_label("broadcaster"), broadcaster_config);
3898 broadcaster_engine.start((broadcaster_sender, broadcaster_receiver));
3899
3900 let receiver_scheme = Scheme::signer(
3901 SCHEME_NAMESPACE,
3902 participants.clone(),
3903 private_keys[receiver_idx].clone(),
3904 )
3905 .expect("signer scheme should be created");
3906 let receiver_config: Config<_, _, _, _, C, _, _, _> = Config {
3907 scheme_provider: MultiEpochProvider::single(receiver_scheme),
3908 blocker: receiver_control.clone(),
3909 shard_codec_cfg: CodecConfig {
3910 maximum_shard_size: MAX_SHARD_SIZE,
3911 },
3912 block_codec_cfg: (),
3913 strategy: STRATEGY,
3914 mailbox_size: 1024,
3915 peer_buffer_size: NZUsize!(64),
3916 background_channel_capacity: 1024,
3917 peer_provider: oracle.manager(),
3918 };
3919 let (receiver_engine, receiver_mailbox) =
3920 ShardEngine::new(context.with_label("receiver"), receiver_config);
3921 receiver_engine.start((receiver_sender, receiver_receiver));
3922
3923 let coding_config = coding_config_for_participants(peer_keys.len() as u16);
3924 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3925 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3926 let commitment = coded_block.commitment();
3927 let round = Round::new(Epoch::zero(), View::new(1));
3928
3929 broadcaster_mailbox
3930 .discovered(commitment, leader_pk.clone(), round)
3931 .await;
3932 receiver_mailbox
3933 .discovered(commitment, leader_pk.clone(), round)
3934 .await;
3935 context.sleep(DEFAULT_LINK.latency).await;
3936
3937 let broadcaster_index = participants
3938 .index(&broadcaster_pk)
3939 .expect("broadcaster must be a participant")
3940 .get() as u16;
3941 let broadcaster_shard = coded_block
3942 .shard(broadcaster_index)
3943 .expect("missing shard")
3944 .encode();
3945 leader_sender
3946 .send(Recipients::One(broadcaster_pk), broadcaster_shard, true)
3947 .await
3948 .expect("send failed");
3949
3950 let receiver_index = participants
3951 .index(&receiver_pk)
3952 .expect("receiver must be a participant")
3953 .get() as u16;
3954 let receiver_shard = coded_block
3955 .shard(receiver_index)
3956 .expect("missing shard")
3957 .encode();
3958 leader_sender
3959 .send(Recipients::One(receiver_pk.clone()), receiver_shard, true)
3960 .await
3961 .expect("send failed");
3962
3963 context.sleep(DEFAULT_LINK.latency * 3).await;
3964
3965 let reconstructed = receiver_mailbox.get(commitment).await;
3966 assert!(
3967 reconstructed.is_some(),
3968 "receiver should reconstruct after broadcaster validates and broadcasts shard"
3969 );
3970 });
3971 }
3972
3973 #[test_traced]
3974 fn test_failed_reconstruction_digest_mismatch_then_recovery() {
3975 let fixture: Fixture<C> = Fixture {
3982 num_peers: 10,
3983 ..Default::default()
3984 };
3985
3986 fixture.start(
3987 |config, context, _oracle, mut peers, _, coding_config| async move {
3988 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3990 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3991
3992 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3994 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3995 let real_commitment2 = coded_block2.commitment();
3996
3997 let fake_commitment = Commitment::from((
4001 coded_block1.digest(),
4002 real_commitment2.root::<Sha256Digest>(),
4003 real_commitment2.context::<Sha256Digest>(),
4004 coding_config,
4005 ));
4006
4007 let receiver_idx = 3usize;
4008 let receiver_pk = peers[receiver_idx].public_key.clone();
4009 let leader = peers[0].public_key.clone();
4010 let round = Round::new(Epoch::zero(), View::new(1));
4011
4012 peers[receiver_idx]
4014 .mailbox
4015 .discovered(fake_commitment, leader.clone(), round)
4016 .await;
4017
4018 let mut block_sub = peers[receiver_idx].mailbox.subscribe(fake_commitment).await;
4020 let mut digest_sub = peers[receiver_idx]
4021 .mailbox
4022 .subscribe_by_digest(coded_block1.digest())
4023 .await;
4024
4025 let receiver_shard_idx = peers[receiver_idx].index.get() as u16;
4027 let mut leader_shard = coded_block2
4028 .shard(receiver_shard_idx)
4029 .expect("missing shard");
4030 leader_shard.commitment = fake_commitment;
4031 peers[0]
4032 .sender
4033 .send(
4034 Recipients::One(receiver_pk.clone()),
4035 leader_shard.encode(),
4036 true,
4037 )
4038 .await
4039 .expect("send failed");
4040
4041 for &idx in &[1usize, 2, 4] {
4044 let peer_shard_idx = peers[idx].index.get() as u16;
4045 let mut shard = coded_block2.shard(peer_shard_idx).expect("missing shard");
4046 shard.commitment = fake_commitment;
4047 peers[idx]
4048 .sender
4049 .send(Recipients::One(receiver_pk.clone()), shard.encode(), true)
4050 .await
4051 .expect("send failed");
4052 }
4053
4054 context.sleep(config.link.latency * 2).await;
4055
4056 assert!(
4059 peers[receiver_idx]
4060 .mailbox
4061 .get(fake_commitment)
4062 .await
4063 .is_none(),
4064 "block should not be available after DigestMismatch"
4065 );
4066
4067 assert!(
4069 matches!(block_sub.try_recv(), Err(TryRecvError::Closed)),
4070 "subscription should close for failed reconstruction"
4071 );
4072 assert!(
4073 matches!(digest_sub.try_recv(), Err(TryRecvError::Closed)),
4074 "digest subscription should close after failed reconstruction"
4075 );
4076
4077 let real_commitment1 = coded_block1.commitment();
4080 let round2 = Round::new(Epoch::zero(), View::new(2));
4081 peers[receiver_idx]
4082 .mailbox
4083 .discovered(real_commitment1, leader.clone(), round2)
4084 .await;
4085
4086 let leader_shard1 = coded_block1
4087 .shard(receiver_shard_idx)
4088 .expect("missing shard");
4089 peers[0]
4090 .sender
4091 .send(
4092 Recipients::One(receiver_pk.clone()),
4093 leader_shard1.encode(),
4094 true,
4095 )
4096 .await
4097 .expect("send failed");
4098
4099 for &idx in &[1usize, 2, 4] {
4100 let peer_shard_idx = peers[idx].index.get() as u16;
4101 let shard = coded_block1.shard(peer_shard_idx).expect("missing shard");
4102 peers[idx]
4103 .sender
4104 .send(Recipients::One(receiver_pk.clone()), shard.encode(), true)
4105 .await
4106 .expect("send failed");
4107 }
4108
4109 context.sleep(config.link.latency * 2).await;
4110
4111 let reconstructed = peers[receiver_idx]
4112 .mailbox
4113 .get(real_commitment1)
4114 .await
4115 .expect("valid block should reconstruct after prior failure");
4116 assert_eq!(reconstructed.commitment(), real_commitment1);
4117 },
4118 );
4119 }
4120
4121 #[test_traced]
4122 fn test_failed_reconstruction_context_mismatch_then_recovery() {
4123 let fixture: Fixture<C> = Fixture {
4127 num_peers: 10,
4128 ..Default::default()
4129 };
4130
4131 fixture.start(
4132 |config, context, _oracle, mut peers, _, coding_config| async move {
4133 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4134 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4135 let real_commitment = coded_block.commitment();
4136
4137 let wrong_context_digest = Sha256::hash(b"wrong_context");
4138 assert_ne!(
4139 real_commitment.context::<Sha256Digest>(),
4140 wrong_context_digest,
4141 "test requires a distinct context digest"
4142 );
4143 let fake_commitment = Commitment::from((
4144 coded_block.digest(),
4145 real_commitment.root::<Sha256Digest>(),
4146 wrong_context_digest,
4147 coding_config,
4148 ));
4149
4150 let receiver_idx = 3usize;
4151 let receiver_pk = peers[receiver_idx].public_key.clone();
4152 let leader = peers[0].public_key.clone();
4153 let round = Round::new(Epoch::zero(), View::new(1));
4154
4155 peers[receiver_idx]
4156 .mailbox
4157 .discovered(fake_commitment, leader.clone(), round)
4158 .await;
4159 let mut block_sub = peers[receiver_idx].mailbox.subscribe(fake_commitment).await;
4160
4161 let receiver_shard_idx = peers[receiver_idx].index.get() as u16;
4162 let mut leader_shard = coded_block
4163 .shard(receiver_shard_idx)
4164 .expect("missing shard");
4165 leader_shard.commitment = fake_commitment;
4166 peers[0]
4167 .sender
4168 .send(
4169 Recipients::One(receiver_pk.clone()),
4170 leader_shard.encode(),
4171 true,
4172 )
4173 .await
4174 .expect("send failed");
4175
4176 for &idx in &[1usize, 2, 4] {
4177 let peer_shard_idx = peers[idx].index.get() as u16;
4178 let mut shard = coded_block.shard(peer_shard_idx).expect("missing shard");
4179 shard.commitment = fake_commitment;
4180 peers[idx]
4181 .sender
4182 .send(Recipients::One(receiver_pk.clone()), shard.encode(), true)
4183 .await
4184 .expect("send failed");
4185 }
4186
4187 context.sleep(config.link.latency * 2).await;
4188
4189 assert!(
4190 peers[receiver_idx]
4191 .mailbox
4192 .get(fake_commitment)
4193 .await
4194 .is_none(),
4195 "block should not be available after ContextMismatch"
4196 );
4197 assert!(
4198 matches!(block_sub.try_recv(), Err(TryRecvError::Closed)),
4199 "subscription should close for context-mismatched commitment"
4200 );
4201
4202 let round2 = Round::new(Epoch::zero(), View::new(2));
4204 peers[receiver_idx]
4205 .mailbox
4206 .discovered(real_commitment, leader.clone(), round2)
4207 .await;
4208
4209 let real_leader_shard = coded_block
4210 .shard(receiver_shard_idx)
4211 .expect("missing shard");
4212 peers[0]
4213 .sender
4214 .send(
4215 Recipients::One(receiver_pk.clone()),
4216 real_leader_shard.encode(),
4217 true,
4218 )
4219 .await
4220 .expect("send failed");
4221
4222 for &idx in &[1usize, 2, 4] {
4223 let peer_shard_idx = peers[idx].index.get() as u16;
4224 let shard = coded_block.shard(peer_shard_idx).expect("missing shard");
4225 peers[idx]
4226 .sender
4227 .send(Recipients::One(receiver_pk.clone()), shard.encode(), true)
4228 .await
4229 .expect("send failed");
4230 }
4231
4232 context.sleep(config.link.latency * 2).await;
4233
4234 let reconstructed = peers[receiver_idx]
4235 .mailbox
4236 .get(real_commitment)
4237 .await
4238 .expect("valid block should reconstruct after prior context mismatch");
4239 assert_eq!(reconstructed.commitment(), real_commitment);
4240 },
4241 );
4242 }
4243
4244 #[test_traced]
4245 fn test_same_round_equivocation_preserves_certifiable_recovery() {
4246 let fixture: Fixture<C> = Fixture {
4252 num_peers: 10,
4253 ..Default::default()
4254 };
4255
4256 fixture.start(
4257 |config, context, _oracle, mut peers, _, coding_config| async move {
4258 let receiver_idx = 3usize;
4259 let receiver_pk = peers[receiver_idx].public_key.clone();
4260 let receiver_shard_idx = peers[receiver_idx].index.get() as u16;
4261
4262 let leader = peers[0].public_key.clone();
4263 let round = Round::new(Epoch::zero(), View::new(7));
4264
4265 let block_a = CodedBlock::<B, C, H>::new(
4267 B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 111),
4268 coding_config,
4269 &STRATEGY,
4270 );
4271 let commitment_a = block_a.commitment();
4272 let block_b = CodedBlock::<B, C, H>::new(
4273 B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 222),
4274 coding_config,
4275 &STRATEGY,
4276 );
4277 let commitment_b = block_b.commitment();
4278
4279 peers[receiver_idx]
4281 .mailbox
4282 .discovered(commitment_a, leader.clone(), round)
4283 .await;
4284 peers[receiver_idx]
4285 .mailbox
4286 .discovered(commitment_b, leader.clone(), round)
4287 .await;
4288
4289 let certifiable_sub = peers[receiver_idx].mailbox.subscribe(commitment_b).await;
4291
4292 let shard_b = block_b
4294 .shard(receiver_shard_idx)
4295 .expect("missing shard")
4296 .encode();
4297 peers[0]
4298 .sender
4299 .send(Recipients::One(receiver_pk.clone()), shard_b, true)
4300 .await
4301 .expect("send failed");
4302
4303 let shard_a = block_a
4305 .shard(receiver_shard_idx)
4306 .expect("missing shard")
4307 .encode();
4308 peers[0]
4309 .sender
4310 .send(Recipients::One(receiver_pk.clone()), shard_a, true)
4311 .await
4312 .expect("send failed");
4313 for i in [1usize, 2usize, 4usize] {
4314 let shard_a = block_a
4315 .shard(peers[i].index.get() as u16)
4316 .expect("missing shard")
4317 .encode();
4318 peers[i]
4319 .sender
4320 .send(Recipients::One(receiver_pk.clone()), shard_a, true)
4321 .await
4322 .expect("send failed");
4323 }
4324 context.sleep(config.link.latency * 4).await;
4325 let reconstructed_a = peers[receiver_idx]
4326 .mailbox
4327 .get(commitment_a)
4328 .await
4329 .expect("conflicting commitment should reconstruct first");
4330 assert_eq!(reconstructed_a.commitment(), commitment_a);
4331
4332 for i in [1usize, 2usize, 4usize] {
4334 let shard_b = block_b
4335 .shard(peers[i].index.get() as u16)
4336 .expect("missing shard")
4337 .encode();
4338 peers[i]
4339 .sender
4340 .send(Recipients::One(receiver_pk.clone()), shard_b, true)
4341 .await
4342 .expect("send failed");
4343 }
4344
4345 select! {
4346 result = certifiable_sub => {
4347 let reconstructed_b =
4348 result.expect("certifiable commitment should remain recoverable");
4349 assert_eq!(reconstructed_b.commitment(), commitment_b);
4350 },
4351 _ = context.sleep(Duration::from_secs(5)) => {
4352 panic!("certifiable commitment was not recoverable after same-round equivocation");
4353 },
4354 }
4355 },
4356 );
4357 }
4358
4359 #[test_traced]
4360 fn test_leader_unrelated_shard_blocks_peer() {
4361 let fixture: Fixture<C> = Fixture {
4365 num_peers: 10,
4366 ..Default::default()
4367 };
4368
4369 fixture.start(
4370 |config, context, oracle, mut peers, _, coding_config| async move {
4371 let tracked_block = CodedBlock::<B, C, H>::new(
4373 B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100),
4374 coding_config,
4375 &STRATEGY,
4376 );
4377 let tracked_commitment = tracked_block.commitment();
4378
4379 let unrelated_block = CodedBlock::<B, C, H>::new(
4381 B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200),
4382 coding_config,
4383 &STRATEGY,
4384 );
4385
4386 let receiver_idx = 3usize;
4387 let receiver_pk = peers[receiver_idx].public_key.clone();
4388 let leader_idx = 0usize;
4389 let leader_pk = peers[leader_idx].public_key.clone();
4390
4391 peers[receiver_idx]
4393 .mailbox
4394 .discovered(
4395 tracked_commitment,
4396 leader_pk.clone(),
4397 Round::new(Epoch::zero(), View::new(1)),
4398 )
4399 .await;
4400
4401 let mut unrelated_shard = unrelated_block
4404 .shard(peers[1].index.get() as u16)
4405 .expect("missing shard");
4406 unrelated_shard.commitment = tracked_commitment;
4407
4408 peers[leader_idx]
4412 .sender
4413 .send(Recipients::One(receiver_pk), unrelated_shard.encode(), true)
4414 .await
4415 .expect("send failed");
4416 context.sleep(config.link.latency * 2).await;
4417
4418 assert_blocked(&oracle, &peers[receiver_idx].public_key, &leader_pk).await;
4419 },
4420 );
4421 }
4422
4423 #[test_traced]
4424 fn test_withholding_leader_victim_reconstructs_via_gossip() {
4425 let fixture = Fixture {
4430 num_peers: 10,
4431 ..Default::default()
4432 };
4433
4434 fixture.start(
4435 |config, context, oracle, mut peers, _, coding_config| async move {
4436 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4437 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4438 let commitment = coded_block.commitment();
4439 let round = Round::new(Epoch::zero(), View::new(1));
4440
4441 let leader = peers[0].public_key.clone();
4442 let victim = peers[1].public_key.clone();
4443
4444 oracle
4447 .remove_link(leader.clone(), victim.clone())
4448 .await
4449 .expect("remove_link should succeed");
4450
4451 peers[0].mailbox.proposed(round, coded_block.clone()).await;
4454
4455 for peer in peers[1..].iter_mut() {
4458 peer.mailbox
4459 .discovered(commitment, leader.clone(), round)
4460 .await;
4461 }
4462 context.sleep(config.link.latency * 2).await;
4463
4464 let block_sub = peers[1].mailbox.subscribe(commitment).await;
4467 select! {
4468 result = block_sub => {
4469 let reconstructed = result.expect("block subscription should resolve");
4470 assert_eq!(reconstructed.commitment(), commitment);
4471 assert_eq!(reconstructed.height(), coded_block.height());
4472 },
4473 _ = context.sleep(Duration::from_secs(5)) => {
4474 panic!("victim did not reconstruct block despite withholding leader");
4475 },
4476 }
4477
4478 for peer in peers[2..].iter_mut() {
4480 let reconstructed = peer
4481 .mailbox
4482 .get(commitment)
4483 .await
4484 .expect("block should be reconstructed");
4485 assert_eq!(reconstructed.commitment(), commitment);
4486 }
4487
4488 let blocked = oracle.blocked().await.unwrap();
4490 assert!(
4491 blocked.is_empty(),
4492 "no peer should be blocked in withholding leader test"
4493 );
4494 },
4495 );
4496 }
4497
4498 #[test_traced]
4504 fn test_shard_subscription_pending_after_reconstruction_without_leader_shard() {
4505 let fixture = Fixture {
4506 num_peers: 10,
4507 ..Default::default()
4508 };
4509
4510 fixture.start(
4511 |config, context, oracle, mut peers, _, coding_config| async move {
4512 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4513 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4514 let commitment = coded_block.commitment();
4515 let round = Round::new(Epoch::zero(), View::new(1));
4516
4517 let leader = peers[0].public_key.clone();
4518 let victim = peers[1].public_key.clone();
4519
4520 oracle
4523 .remove_link(leader.clone(), victim.clone())
4524 .await
4525 .expect("remove_link should succeed");
4526
4527 let mut shard_sub = peers[1]
4529 .mailbox
4530 .subscribe_assigned_shard_verified(commitment)
4531 .await;
4532 let block_sub = peers[1].mailbox.subscribe(commitment).await;
4533
4534 peers[0].mailbox.proposed(round, coded_block.clone()).await;
4536
4537 for peer in peers[1..].iter_mut() {
4539 peer.mailbox
4540 .discovered(commitment, leader.clone(), round)
4541 .await;
4542 }
4543
4544 context.sleep(config.link.latency * 4).await;
4546
4547 let reconstructed = block_sub.await.expect("block subscription should resolve");
4550 assert_eq!(reconstructed.commitment(), commitment);
4551
4552 assert!(
4555 matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
4556 "shard subscription must not resolve without own shard verification"
4557 );
4558 },
4559 );
4560 }
4561
4562 #[test_traced]
4563 fn test_broadcast_routes_participant_and_non_participant_shards() {
4564 let fixture = Fixture {
4565 num_non_participants: 1,
4566 ..Default::default()
4567 };
4568
4569 fixture.start(
4570 |config, context, oracle, mut peers, non_participants, coding_config| async move {
4571 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4572 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4573 let commitment = coded_block.commitment();
4574
4575 let leader = peers[0].public_key.clone();
4576 let round = Round::new(Epoch::zero(), View::new(1));
4577 peers[0].mailbox.proposed(round, coded_block.clone()).await;
4578
4579 for peer in peers[1..].iter_mut() {
4580 peer.mailbox
4581 .discovered(commitment, leader.clone(), round)
4582 .await;
4583 }
4584 for np in non_participants.iter() {
4585 np.mailbox
4586 .discovered(commitment, leader.clone(), round)
4587 .await;
4588 }
4589 context.sleep(config.link.latency * 2).await;
4590
4591 for peer in peers.iter_mut() {
4593 peer.mailbox
4594 .subscribe_assigned_shard_verified(commitment)
4595 .await
4596 .await
4597 .expect("participant shard subscription should complete");
4598 }
4599
4600 for np in non_participants.iter() {
4602 np.mailbox
4603 .subscribe_assigned_shard_verified(commitment)
4604 .await
4605 .await
4606 .expect("non-participant shard subscription should complete");
4607 }
4608 context.sleep(config.link.latency).await;
4609
4610 for np in non_participants.iter() {
4612 let reconstructed = np
4613 .mailbox
4614 .get(commitment)
4615 .await
4616 .expect("non-participant should reconstruct block");
4617 assert_eq!(reconstructed.commitment(), commitment);
4618 }
4619
4620 let blocked = oracle.blocked().await.unwrap();
4621 assert!(
4622 blocked.is_empty(),
4623 "no peer should be blocked in participant/non-participant shard routing test"
4624 );
4625 },
4626 );
4627 }
4628
4629 #[test_traced]
4630 fn test_non_participant_reconstructs_after_discovered() {
4631 let fixture = Fixture {
4632 num_non_participants: 1,
4633 ..Default::default()
4634 };
4635
4636 fixture.start(
4637 |config, context, oracle, mut peers, non_participants, coding_config| async move {
4638 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4639 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4640 let commitment = coded_block.commitment();
4641 let round = Round::new(Epoch::zero(), View::new(1));
4642
4643 let leader = peers[0].public_key.clone();
4644 peers[0].mailbox.proposed(round, coded_block.clone()).await;
4645
4646 for peer in peers[1..].iter_mut() {
4649 peer.mailbox
4650 .discovered(commitment, leader.clone(), round)
4651 .await;
4652 }
4653 context.sleep(config.link.latency).await;
4654
4655 let np = &non_participants[0];
4658 let block_sub = np.mailbox.subscribe(commitment).await;
4659 np.mailbox
4660 .discovered(commitment, leader.clone(), round)
4661 .await;
4662
4663 select! {
4666 result = block_sub => {
4667 let reconstructed = result.expect("block subscription should resolve");
4668 assert_eq!(reconstructed.commitment(), commitment);
4669 assert_eq!(reconstructed.height(), coded_block.height());
4670 },
4671 _ = context.sleep(Duration::from_secs(5)) => {
4672 panic!("non-participant block subscription did not resolve");
4673 },
4674 }
4675
4676 let blocked = oracle.blocked().await.unwrap();
4677 assert!(
4678 blocked.is_empty(),
4679 "no peer should be blocked in non-participant reconstruction test"
4680 );
4681 },
4682 );
4683 }
4684
4685 #[test_traced]
4686 fn test_peer_set_update_evicts_peer_buffers() {
4687 let executor = deterministic::Runner::default();
4692 executor.start(|context| async move {
4693 let num_peers = 10usize;
4694 let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
4695 context.with_label("network"),
4696 simulated::Config {
4697 max_size: MAX_SHARD_SIZE as u32,
4698 disconnect_on_block: true,
4699 tracked_peer_sets: NZUsize!(2),
4700 },
4701 );
4702 network.start();
4703
4704 let mut private_keys = (0..num_peers)
4705 .map(|i| PrivateKey::from_seed(i as u64))
4706 .collect::<Vec<_>>();
4707 private_keys.sort_by_key(|s| s.public_key());
4708 let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
4709 let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
4710
4711 let receiver_idx = 3usize;
4713 let receiver_pk = peer_keys[receiver_idx].clone();
4714 let leader_pk = peer_keys[0].clone();
4715
4716 let receiver_control = oracle.control(receiver_pk.clone());
4717 let (sender_handle, receiver_handle) = receiver_control
4718 .register(0, TEST_QUOTA)
4719 .await
4720 .expect("registration should succeed");
4721
4722 let leader_control = oracle.control(leader_pk.clone());
4724 let (mut leader_sender, _leader_receiver) = leader_control
4725 .register(0, TEST_QUOTA)
4726 .await
4727 .expect("registration should succeed");
4728 oracle
4729 .add_link(leader_pk.clone(), receiver_pk.clone(), DEFAULT_LINK)
4730 .await
4731 .expect("link should be added");
4732
4733 oracle.manager().track(0, participants.clone()).await;
4735 context.sleep(Duration::from_millis(10)).await;
4736
4737 let scheme = Scheme::signer(
4738 SCHEME_NAMESPACE,
4739 participants.clone(),
4740 private_keys[receiver_idx].clone(),
4741 )
4742 .expect("signer scheme should be created");
4743
4744 let config: Config<_, _, _, _, C, _, _, _> = Config {
4745 scheme_provider: MultiEpochProvider::single(scheme),
4746 blocker: receiver_control.clone(),
4747 shard_codec_cfg: CodecConfig {
4748 maximum_shard_size: MAX_SHARD_SIZE,
4749 },
4750 block_codec_cfg: (),
4751 strategy: STRATEGY,
4752 mailbox_size: 1024,
4753 peer_buffer_size: NZUsize!(64),
4754 background_channel_capacity: 1024,
4755 peer_provider: oracle.manager(),
4756 };
4757
4758 let (engine, mailbox) = ShardEngine::new(context.with_label("receiver"), config);
4759 engine.start((sender_handle, receiver_handle));
4760
4761 let coding_config = coding_config_for_participants(num_peers as u16);
4763 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4764 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4765 let commitment = coded_block.commitment();
4766
4767 let receiver_participant = participants
4768 .index(&receiver_pk)
4769 .expect("receiver must be a participant");
4770 let leader_shard = coded_block
4771 .shard(receiver_participant.get() as u16)
4772 .expect("missing shard");
4773 let shard_bytes = leader_shard.encode();
4774
4775 leader_sender
4777 .send(
4778 Recipients::One(receiver_pk.clone()),
4779 shard_bytes.clone(),
4780 true,
4781 )
4782 .await
4783 .expect("send failed");
4784 context.sleep(DEFAULT_LINK.latency * 2).await;
4785
4786 let remaining: Set<P> =
4788 Set::from_iter_dedup(peer_keys.iter().filter(|pk| **pk != leader_pk).cloned());
4789 oracle.manager().track(1, remaining).await;
4790 context.sleep(Duration::from_millis(10)).await;
4791
4792 leader_sender
4795 .send(Recipients::One(receiver_pk.clone()), shard_bytes, true)
4796 .await
4797 .expect("send failed");
4798 context.sleep(DEFAULT_LINK.latency * 2).await;
4799
4800 let mut shard_sub = mailbox.subscribe_assigned_shard_verified(commitment).await;
4803 mailbox
4804 .discovered(
4805 commitment,
4806 leader_pk.clone(),
4807 Round::new(Epoch::zero(), View::new(1)),
4808 )
4809 .await;
4810 context.sleep(DEFAULT_LINK.latency * 2).await;
4811
4812 assert!(
4814 matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
4815 "shard subscription should not resolve after evicted leader's buffer"
4816 );
4817 assert!(
4818 mailbox.get(commitment).await.is_none(),
4819 "block should not reconstruct from evicted buffers"
4820 );
4821 });
4822 }
4823
4824 #[test_traced]
4825 fn test_empty_peer_buffer_is_retained_until_peer_leaves_latest_primary() {
4826 let executor = deterministic::Runner::default();
4827 executor.start(|context| async move {
4828 let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
4829 context.with_label("network"),
4830 simulated::Config {
4831 max_size: MAX_SHARD_SIZE as u32,
4832 disconnect_on_block: true,
4833 tracked_peer_sets: NZUsize!(1),
4834 },
4835 );
4836 network.start();
4837
4838 let mut private_keys = (0..4)
4839 .map(|i| PrivateKey::from_seed(i as u64))
4840 .collect::<Vec<_>>();
4841 private_keys.sort_by_key(|s| s.public_key());
4842 let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
4843 let receiver_pk = peer_keys[0].clone();
4844 let sender_pk = peer_keys[1].clone();
4845 let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
4846
4847 let receiver_control = oracle.control(receiver_pk.clone());
4848 let scheme = Scheme::signer(
4849 SCHEME_NAMESPACE,
4850 participants.clone(),
4851 private_keys[0].clone(),
4852 )
4853 .expect("signer scheme should be created");
4854
4855 let config: Config<_, _, _, _, C, _, _, _> = Config {
4856 scheme_provider: MultiEpochProvider::single(scheme),
4857 blocker: receiver_control,
4858 shard_codec_cfg: CodecConfig {
4859 maximum_shard_size: MAX_SHARD_SIZE,
4860 },
4861 block_codec_cfg: (),
4862 strategy: STRATEGY,
4863 mailbox_size: 16,
4864 peer_buffer_size: NZUsize!(4),
4865 background_channel_capacity: 16,
4866 peer_provider: oracle.manager(),
4867 };
4868
4869 let (mut engine, _mailbox) = ShardEngine::new(context.with_label("engine"), config);
4870
4871 engine.update_latest_primary_peers(Set::from_iter_dedup([sender_pk.clone()]));
4874
4875 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4876 let coded_block = CodedBlock::<B, C, H>::new(
4877 inner,
4878 coding_config_for_participants(participants.len() as u16),
4879 &STRATEGY,
4880 );
4881 let commitment = coded_block.commitment();
4882 let shard = coded_block.shard(0).expect("missing shard");
4883
4884 engine.buffer_peer_shard(sender_pk.clone(), shard);
4887 assert_eq!(
4888 engine.peer_buffers.get(&sender_pk).map(VecDeque::len),
4889 Some(1),
4890 "peer buffer should contain the buffered shard"
4891 );
4892
4893 let progressed = engine.ingest_buffered_shards(commitment).await;
4897 assert!(
4898 !progressed,
4899 "ingest should not progress without reconstruction state"
4900 );
4901 assert!(
4902 engine.peer_buffers.contains_key(&sender_pk),
4903 "empty peer buffer should be retained while sender remains in latest.primary"
4904 );
4905 assert!(
4906 engine
4907 .peer_buffers
4908 .get(&sender_pk)
4909 .is_some_and(VecDeque::is_empty),
4910 "retained peer buffer should now be empty"
4911 );
4912
4913 engine.update_latest_primary_peers(Set::default());
4916 assert!(
4917 !engine.peer_buffers.contains_key(&sender_pk),
4918 "peer buffer should be evicted once sender leaves latest.primary"
4919 );
4920 });
4921 }
4922
4923 #[test_traced]
4924 fn test_old_epoch_buffered_shards_are_dropped_after_cutover() {
4925 let executor = deterministic::Runner::default();
4926 executor.start(|context| async move {
4927 let num_peers = 6usize;
4928 let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
4929 context.with_label("network"),
4930 simulated::Config {
4931 max_size: MAX_SHARD_SIZE as u32,
4932 disconnect_on_block: true,
4933 tracked_peer_sets: NZUsize!(2),
4934 },
4935 );
4936 network.start();
4937
4938 let mut private_keys = (0..num_peers)
4939 .map(|i| PrivateKey::from_seed(i as u64))
4940 .collect::<Vec<_>>();
4941 private_keys.sort_by_key(|s| s.public_key());
4942 let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
4943
4944 let epoch0_set: Set<P> = Set::from_iter_dedup(peer_keys[..5].iter().cloned());
4947 let epoch1_set: Set<P> = Set::from_iter_dedup([
4948 peer_keys[1].clone(),
4949 peer_keys[2].clone(),
4950 peer_keys[3].clone(),
4951 peer_keys[4].clone(),
4952 peer_keys[5].clone(),
4953 ]);
4954
4955 let receiver_idx = 3usize;
4956 let receiver_pk = peer_keys[receiver_idx].clone();
4957 let receiver_key = private_keys[receiver_idx].clone();
4958 let leader_pk = peer_keys[0].clone();
4959
4960 let receiver_control = oracle.control(receiver_pk.clone());
4961 let (sender_handle, receiver_handle) = receiver_control
4962 .register(0, TEST_QUOTA)
4963 .await
4964 .expect("registration should succeed");
4965
4966 let leader_control = oracle.control(leader_pk.clone());
4967 let (mut leader_sender, _leader_receiver) = leader_control
4968 .register(0, TEST_QUOTA)
4969 .await
4970 .expect("registration should succeed");
4971 oracle
4972 .add_link(leader_pk.clone(), receiver_pk.clone(), DEFAULT_LINK)
4973 .await
4974 .expect("link should be added");
4975
4976 oracle.manager().track(0, epoch0_set.clone()).await;
4978 context.sleep(Duration::from_millis(10)).await;
4979
4980 let scheme_epoch0 =
4981 Scheme::signer(SCHEME_NAMESPACE, epoch0_set.clone(), receiver_key.clone())
4982 .expect("epoch 0 signer scheme should be created");
4983 let scheme_epoch1 =
4984 Scheme::signer(SCHEME_NAMESPACE, epoch1_set.clone(), receiver_key.clone())
4985 .expect("epoch 1 signer scheme should be created");
4986
4987 let config: Config<_, _, _, _, C, _, _, _> = Config {
4988 scheme_provider: MultiEpochProvider::single(scheme_epoch0)
4989 .with_epoch(Epoch::new(1), scheme_epoch1),
4990 blocker: receiver_control.clone(),
4991 shard_codec_cfg: CodecConfig {
4992 maximum_shard_size: MAX_SHARD_SIZE,
4993 },
4994 block_codec_cfg: (),
4995 strategy: STRATEGY,
4996 mailbox_size: 1024,
4997 peer_buffer_size: NZUsize!(64),
4998 background_channel_capacity: 1024,
4999 peer_provider: oracle.manager(),
5000 };
5001
5002 let (engine, mailbox) = ShardEngine::new(context.with_label("receiver"), config);
5004 engine.start((sender_handle, receiver_handle));
5005
5006 let coding_config = coding_config_for_participants(epoch0_set.len() as u16);
5007 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
5008 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
5009 let commitment = coded_block.commitment();
5010
5011 let receiver_participant = epoch0_set
5012 .index(&receiver_pk)
5013 .expect("receiver must be an epoch 0 participant");
5014 let leader_shard = coded_block
5015 .shard(receiver_participant.get() as u16)
5016 .expect("missing shard");
5017
5018 leader_sender
5020 .send(
5021 Recipients::One(receiver_pk.clone()),
5022 leader_shard.encode(),
5023 true,
5024 )
5025 .await
5026 .expect("send failed");
5027 context.sleep(DEFAULT_LINK.latency * 2).await;
5028
5029 oracle.manager().track(1, epoch1_set).await;
5033 context.sleep(Duration::from_millis(10)).await;
5034
5035 let mut shard_sub = mailbox.subscribe_assigned_shard_verified(commitment).await;
5038 mailbox
5039 .discovered(
5040 commitment,
5041 leader_pk,
5042 Round::new(Epoch::zero(), View::new(1)),
5043 )
5044 .await;
5045 context.sleep(DEFAULT_LINK.latency * 2).await;
5046
5047 assert!(
5048 matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
5049 "old-epoch shard subscription should stay pending after cutover"
5050 );
5051 assert!(
5052 mailbox.get(commitment).await.is_none(),
5053 "old-epoch commitment should not reconstruct from overlap-only buffered shards"
5054 );
5055 });
5056 }
5057
5058 #[test_traced]
5067 fn test_evicted_node_still_reconstructs_from_buffered_peer_shards() {
5068 let executor = deterministic::Runner::default();
5069 executor.start(|context| async move {
5070 let num_peers = 10usize;
5071 let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
5072 context.with_label("network"),
5073 simulated::Config {
5074 max_size: MAX_SHARD_SIZE as u32,
5075 disconnect_on_block: true,
5076 tracked_peer_sets: NZUsize!(2),
5077 },
5078 );
5079 network.start();
5080
5081 let mut private_keys = (0..num_peers)
5082 .map(|i| PrivateKey::from_seed(i as u64))
5083 .collect::<Vec<_>>();
5084 private_keys.sort_by_key(|s| s.public_key());
5085 let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
5086 let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
5087
5088 let receiver_idx = 1usize;
5092 let receiver_pk = peer_keys[receiver_idx].clone();
5093 let leader_pk = peer_keys[0].clone();
5094 let peer2_pk = peer_keys[2].clone();
5095 let peer4_pk = peer_keys[4].clone();
5096 let peer5_pk = peer_keys[5].clone();
5097 let peer6_pk = peer_keys[6].clone();
5098
5099 let receiver_control = oracle.control(receiver_pk.clone());
5100 let (evicted_sender, evicted_receiver) = receiver_control
5101 .register(0, TEST_QUOTA)
5102 .await
5103 .expect("registration should succeed");
5104
5105 let peer2_control = oracle.control(peer2_pk.clone());
5106 let (mut peer2_sender, _peer2_receiver) = peer2_control
5107 .register(0, TEST_QUOTA)
5108 .await
5109 .expect("registration should succeed");
5110
5111 let peer4_control = oracle.control(peer4_pk.clone());
5112 let (mut peer4_sender, _peer4_receiver) = peer4_control
5113 .register(0, TEST_QUOTA)
5114 .await
5115 .expect("registration should succeed");
5116
5117 let peer5_control = oracle.control(peer5_pk.clone());
5118 let (mut peer5_sender, _peer5_receiver) = peer5_control
5119 .register(0, TEST_QUOTA)
5120 .await
5121 .expect("registration should succeed");
5122
5123 let peer6_control = oracle.control(peer6_pk.clone());
5124 let (mut peer6_sender, _peer6_receiver) = peer6_control
5125 .register(0, TEST_QUOTA)
5126 .await
5127 .expect("registration should succeed");
5128
5129 for sender in [&peer2_pk, &peer4_pk, &peer5_pk, &peer6_pk] {
5131 oracle
5132 .add_link(sender.clone(), receiver_pk.clone(), DEFAULT_LINK)
5133 .await
5134 .expect("link should be added");
5135 }
5136
5137 oracle.manager().track(0, participants.clone()).await;
5139 context.sleep(Duration::from_millis(10)).await;
5140
5141 let scheme = Scheme::signer(
5142 SCHEME_NAMESPACE,
5143 participants.clone(),
5144 private_keys[receiver_idx].clone(),
5145 )
5146 .expect("signer scheme should be created");
5147
5148 let config: Config<_, _, _, _, C, _, _, _> = Config {
5149 scheme_provider: MultiEpochProvider::single(scheme),
5150 blocker: receiver_control.clone(),
5151 shard_codec_cfg: CodecConfig {
5152 maximum_shard_size: MAX_SHARD_SIZE,
5153 },
5154 block_codec_cfg: (),
5155 strategy: STRATEGY,
5156 mailbox_size: 1024,
5157 peer_buffer_size: NZUsize!(64),
5158 background_channel_capacity: 1024,
5159 peer_provider: oracle.manager(),
5160 };
5161
5162 let (engine, mailbox) = ShardEngine::new(context.with_label("evicted"), config);
5163 engine.start((evicted_sender, evicted_receiver));
5164
5165 let coding_config = coding_config_for_participants(num_peers as u16);
5166 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
5167 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
5168 let commitment = coded_block.commitment();
5169
5170 let peer2_shard = coded_block.shard(2).expect("missing shard 2").encode();
5171 let peer4_shard = coded_block.shard(4).expect("missing shard 4").encode();
5172 let peer5_shard = coded_block.shard(5).expect("missing shard 5").encode();
5173 let peer6_shard = coded_block.shard(6).expect("missing shard 6").encode();
5174
5175 let block_sub = mailbox.subscribe(commitment).await;
5176
5177 peer2_sender
5180 .send(
5181 Recipients::One(receiver_pk.clone()),
5182 peer2_shard,
5183 true,
5184 )
5185 .await
5186 .expect("send failed");
5187 peer4_sender
5188 .send(
5189 Recipients::One(receiver_pk.clone()),
5190 peer4_shard,
5191 true,
5192 )
5193 .await
5194 .expect("send failed");
5195 peer5_sender
5196 .send(
5197 Recipients::One(receiver_pk.clone()),
5198 peer5_shard,
5199 true,
5200 )
5201 .await
5202 .expect("send failed");
5203 peer6_sender
5204 .send(
5205 Recipients::One(receiver_pk.clone()),
5206 peer6_shard,
5207 true,
5208 )
5209 .await
5210 .expect("send failed");
5211 context.sleep(DEFAULT_LINK.latency * 2).await;
5212
5213 let latest_primary: Set<P> = Set::from_iter_dedup(
5216 peer_keys
5217 .iter()
5218 .filter(|pk| **pk != receiver_pk)
5219 .cloned(),
5220 );
5221 oracle.manager().track(1, latest_primary).await;
5222 context.sleep(Duration::from_millis(10)).await;
5223
5224 mailbox
5227 .discovered(
5228 commitment,
5229 leader_pk.clone(),
5230 Round::new(Epoch::zero(), View::new(1)),
5231 )
5232 .await;
5233
5234 select! {
5235 _ = block_sub => {},
5236 _ = context.sleep(Duration::from_secs(5)) => {
5237 panic!("block subscription did not resolve after leader discovery");
5238 },
5239 }
5240
5241 context.sleep(DEFAULT_LINK.latency * 2).await;
5242 let block = mailbox.get(commitment).await;
5243 assert!(
5244 block.is_some(),
5245 "evicted node should reconstruct from buffered shards sent by remaining latest.primary peers"
5246 );
5247 assert_eq!(block.unwrap().commitment(), commitment);
5248
5249 assert!(
5250 oracle.blocked().await.unwrap().is_empty(),
5251 "no peer should be blocked when overlapping shards are valid"
5252 );
5253 });
5254 }
5255
5256 #[test_traced]
5261 fn test_late_leader_shard_accepted_after_quorum_transition() {
5262 let fixture = Fixture {
5263 num_peers: 10,
5264 ..Default::default()
5265 };
5266
5267 fixture.start(
5268 |config, context, oracle, mut peers, _, coding_config| async move {
5269 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
5270 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
5271 let commitment = coded_block.commitment();
5272 let round = Round::new(Epoch::zero(), View::new(1));
5273
5274 let leader_idx = 0usize;
5275 let victim_idx = 1usize;
5276 let leader = peers[leader_idx].public_key.clone();
5277 let victim = peers[victim_idx].public_key.clone();
5278
5279 oracle
5282 .remove_link(leader.clone(), victim.clone())
5283 .await
5284 .expect("remove_link should succeed");
5285
5286 peers[leader_idx]
5289 .mailbox
5290 .proposed(round, coded_block.clone())
5291 .await;
5292
5293 for peer in peers[1..].iter_mut() {
5295 peer.mailbox
5296 .discovered(commitment, leader.clone(), round)
5297 .await;
5298 }
5299
5300 context.sleep(config.link.latency * 4).await;
5304
5305 let block_sub = peers[victim_idx].mailbox.subscribe(commitment).await;
5306 select! {
5307 result = block_sub => {
5308 let reconstructed = result.expect("block subscription should resolve");
5309 assert_eq!(reconstructed.commitment(), commitment);
5310 },
5311 _ = context.sleep(Duration::from_secs(5)) => {
5312 panic!("victim did not reconstruct block from gossip");
5313 },
5314 }
5315
5316 let mut shard_sub = peers[victim_idx]
5319 .mailbox
5320 .subscribe_assigned_shard_verified(commitment)
5321 .await;
5322 assert!(
5323 matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
5324 "shard subscription must not resolve before own shard is verified"
5325 );
5326
5327 oracle
5329 .add_link(leader.clone(), victim.clone(), DEFAULT_LINK)
5330 .await
5331 .expect("add_link should succeed");
5332
5333 let leader_shard = coded_block
5337 .shard(peers[victim_idx].index.get() as u16)
5338 .expect("missing victim shard");
5339 peers[leader_idx]
5340 .sender
5341 .send(Recipients::One(victim.clone()), leader_shard.encode(), true)
5342 .await
5343 .expect("send failed");
5344 context.sleep(config.link.latency * 2).await;
5345
5346 select! {
5349 _ = shard_sub => {},
5350 _ = context.sleep(Duration::from_secs(5)) => {
5351 panic!("shard subscription did not resolve after late leader shard");
5352 },
5353 }
5354
5355 let blocked = oracle.blocked().await.unwrap();
5357 assert!(
5358 blocked.is_empty(),
5359 "no peer should be blocked in late leader shard test"
5360 );
5361
5362 let extra_sender_idx = 2usize;
5365 let extra_shard = coded_block
5366 .shard(peers[extra_sender_idx].index.get() as u16)
5367 .expect("missing shard");
5368 peers[extra_sender_idx]
5369 .sender
5370 .send(Recipients::One(victim.clone()), extra_shard.encode(), true)
5371 .await
5372 .expect("send failed");
5373 context.sleep(config.link.latency * 2).await;
5374
5375 let blocked = oracle.blocked().await.unwrap();
5377 assert!(
5378 blocked.is_empty(),
5379 "gossip shard after full reconstruction should be silently ignored"
5380 );
5381 },
5382 );
5383 }
5384}