1use super::{
164 mailbox::{Mailbox, Message},
165 metrics::{Peer, ShardMetrics},
166};
167use crate::{
168 marshal::coding::{
169 types::{CodedBlock, DistributionShard, Shard},
170 validation::{validate_reconstruction, ReconstructionError as InvariantError},
171 },
172 types::{coding::Commitment, Epoch, Round},
173 Block, CertifiableBlock, Heightable,
174};
175use commonware_codec::{Decode, Error as CodecError, Read};
176use commonware_coding::{Config as CodingConfig, Scheme as CodingScheme};
177use commonware_cryptography::{
178 certificate::{Provider, Scheme as CertificateScheme},
179 Committable, Digestible, Hasher, PublicKey,
180};
181use commonware_macros::select_loop;
182use commonware_p2p::{
183 utils::codec::{WrappedBackgroundReceiver, WrappedSender},
184 Blocker, Provider as PeerProvider, Receiver, Recipients, Sender,
185};
186use commonware_parallel::Strategy;
187use commonware_runtime::{
188 spawn_cell,
189 telemetry::metrics::{histogram::HistogramExt, status::GaugeExt},
190 BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner,
191};
192use commonware_utils::{
193 bitmap::BitMap,
194 channel::{fallible::OneshotExt, mpsc, oneshot},
195 ordered::{Quorum, Set},
196 Participant,
197};
198use rand::Rng;
199use std::{
200 collections::{BTreeMap, VecDeque},
201 num::NonZeroUsize,
202 sync::Arc,
203};
204use thiserror::Error;
205use tracing::{debug, warn};
206
207#[derive(Debug, Error)]
209pub enum Error<C: CodingScheme> {
210 #[error(transparent)]
212 Coding(C::Error),
213
214 #[error(transparent)]
216 Codec(#[from] CodecError),
217
218 #[error("block digest mismatch: reconstructed block does not match commitment digest")]
220 DigestMismatch,
221
222 #[error("block config mismatch: reconstructed config does not match commitment config")]
224 ConfigMismatch,
225
226 #[error("block context mismatch: reconstructed context does not match commitment context")]
228 ContextMismatch,
229}
230
231#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
232enum BlockSubscriptionKey<D> {
233 Commitment(Commitment),
234 Digest(D),
235}
236
237pub struct Config<P, S, X, D, C, H, B, T>
239where
240 P: PublicKey,
241 S: Provider<Scope = Epoch>,
242 X: Blocker<PublicKey = P>,
243 D: PeerProvider<PublicKey = P>,
244 C: CodingScheme,
245 H: Hasher,
246 B: CertifiableBlock,
247 T: Strategy,
248{
249 pub scheme_provider: S,
251
252 pub blocker: X,
254
255 pub shard_codec_cfg: <Shard<C, H> as Read>::Cfg,
257
258 pub block_codec_cfg: B::Cfg,
260
261 pub strategy: T,
263
264 pub mailbox_size: usize,
266
267 pub peer_buffer_size: NonZeroUsize,
277
278 pub background_channel_capacity: usize,
284
285 pub peer_provider: D,
288}
289
290pub struct Engine<E, S, X, D, C, H, B, P, T>
295where
296 E: BufferPooler + Rng + Spawner + Metrics + Clock,
297 S: Provider<Scope = Epoch>,
298 S::Scheme: CertificateScheme<PublicKey = P>,
299 X: Blocker,
300 D: PeerProvider<PublicKey = P>,
301 C: CodingScheme,
302 H: Hasher,
303 B: CertifiableBlock,
304 P: PublicKey,
305 T: Strategy,
306{
307 context: ContextCell<E>,
309
310 mailbox: mpsc::Receiver<Message<B, C, H, P>>,
312
313 scheme_provider: S,
315
316 blocker: X,
318
319 shard_codec_cfg: <Shard<C, H> as Read>::Cfg,
321
322 block_codec_cfg: B::Cfg,
324
325 strategy: T,
327
328 state: BTreeMap<Commitment, ReconstructionState<P, C, H>>,
330
331 peer_buffers: BTreeMap<P, VecDeque<Shard<C, H>>>,
333
334 peer_buffer_size: NonZeroUsize,
336
337 peer_provider: D,
339
340 tracked_peers: Set<P>,
342
343 background_channel_capacity: usize,
345
346 reconstructed_blocks: BTreeMap<Commitment, Arc<CodedBlock<B, C, H>>>,
351
352 shard_subscriptions: BTreeMap<Commitment, Vec<oneshot::Sender<()>>>,
355
356 #[allow(clippy::type_complexity)]
359 block_subscriptions:
360 BTreeMap<BlockSubscriptionKey<B::Digest>, Vec<oneshot::Sender<Arc<CodedBlock<B, C, H>>>>>,
361
362 metrics: ShardMetrics,
364}
365
366impl<E, S, X, D, C, H, B, P, T> Engine<E, S, X, D, C, H, B, P, T>
367where
368 E: BufferPooler + Rng + Spawner + Metrics + Clock,
369 S: Provider<Scope = Epoch>,
370 S::Scheme: CertificateScheme<PublicKey = P>,
371 X: Blocker<PublicKey = P>,
372 D: PeerProvider<PublicKey = P>,
373 C: CodingScheme,
374 H: Hasher,
375 B: CertifiableBlock,
376 P: PublicKey,
377 T: Strategy,
378{
379 pub fn new(context: E, config: Config<P, S, X, D, C, H, B, T>) -> (Self, Mailbox<B, C, H, P>) {
381 let metrics = ShardMetrics::new(&context);
382 let (sender, mailbox) = mpsc::channel(config.mailbox_size);
383 (
384 Self {
385 context: ContextCell::new(context),
386 mailbox,
387 scheme_provider: config.scheme_provider,
388 blocker: config.blocker,
389 shard_codec_cfg: config.shard_codec_cfg,
390 block_codec_cfg: config.block_codec_cfg,
391 strategy: config.strategy,
392 state: BTreeMap::new(),
393 peer_buffers: BTreeMap::new(),
394 peer_buffer_size: config.peer_buffer_size,
395 peer_provider: config.peer_provider,
396 tracked_peers: Set::default(),
397 background_channel_capacity: config.background_channel_capacity,
398 reconstructed_blocks: BTreeMap::new(),
399 shard_subscriptions: BTreeMap::new(),
400 block_subscriptions: BTreeMap::new(),
401 metrics,
402 },
403 Mailbox::new(sender),
404 )
405 }
406
407 pub fn start(
409 mut self,
410 network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
411 ) -> Handle<()> {
412 spawn_cell!(self.context, self.run(network).await)
413 }
414
415 async fn run(
417 mut self,
418 (sender, receiver): (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
419 ) {
420 let mut sender = WrappedSender::<_, Shard<C, H>>::new(
421 self.context.network_buffer_pool().clone(),
422 sender,
423 );
424 let (receiver_service, mut receiver): (_, mpsc::Receiver<(P, Shard<C, H>)>) =
425 WrappedBackgroundReceiver::new(
426 self.context.with_label("shard_ingress"),
427 receiver,
428 self.shard_codec_cfg.clone(),
429 self.blocker.clone(),
430 self.background_channel_capacity,
431 &self.strategy,
432 );
433 let _receiver_handle = receiver_service.start();
435 let mut peer_set_subscription = self.peer_provider.subscribe().await;
436
437 select_loop! {
438 self.context,
439 on_start => {
440 let _ = self
441 .metrics
442 .reconstruction_states_count
443 .try_set(self.state.len());
444 let _ = self
445 .metrics
446 .reconstructed_blocks_cache_count
447 .try_set(self.reconstructed_blocks.len());
448
449 self.block_subscriptions.retain(|_, subscribers| {
451 subscribers.retain(|tx| !tx.is_closed());
452 !subscribers.is_empty()
453 });
454 self.shard_subscriptions.retain(|_, subscribers| {
455 subscribers.retain(|tx| !tx.is_closed());
456 !subscribers.is_empty()
457 });
458 },
459 on_stopped => {
460 debug!("received shutdown signal, stopping shard engine");
461 },
462 Some((_, _, tracked_peers)) = peer_set_subscription.recv() else {
463 debug!("peer set subscription closed");
464 return;
465 } => {
466 self.peer_buffers
467 .retain(|peer, _| tracked_peers.as_ref().contains(peer));
468 self.tracked_peers = tracked_peers;
469 },
470 Some(message) = self.mailbox.recv() else {
471 debug!("shard mailbox closed, stopping shard engine");
472 return;
473 } => match message {
474 Message::Proposed { block, round } => {
475 self.broadcast_shards(&mut sender, round, block).await;
476 }
477 Message::Discovered {
478 commitment,
479 leader,
480 round,
481 } => {
482 self.handle_external_proposal(&mut sender, commitment, leader, round)
483 .await;
484 }
485 Message::GetByCommitment {
486 commitment,
487 response,
488 } => {
489 let block = self.reconstructed_blocks.get(&commitment).cloned();
490 response.send_lossy(block);
491 }
492 Message::GetByDigest { digest, response } => {
493 let block = self
494 .reconstructed_blocks
495 .iter()
496 .find_map(|(_, b)| (b.digest() == digest).then_some(b))
497 .cloned();
498 response.send_lossy(block);
499 }
500 Message::SubscribeShard {
501 commitment,
502 response,
503 } => {
504 self.handle_shard_subscription(commitment, response);
505 }
506 Message::SubscribeByCommitment {
507 commitment,
508 response,
509 } => {
510 self.handle_block_subscription(
511 BlockSubscriptionKey::Commitment(commitment),
512 response,
513 );
514 }
515 Message::SubscribeByDigest { digest, response } => {
516 self.handle_block_subscription(BlockSubscriptionKey::Digest(digest), response);
517 }
518 Message::Prune { through } => {
519 self.prune(through);
520 }
521 },
522 Some((peer, shard)) = receiver.recv() else {
523 debug!("receiver closed, stopping shard engine");
524 return;
525 } => {
526 self.metrics
528 .shards_received
529 .get_or_create(&Peer::new(&peer))
530 .inc();
531
532 let commitment = shard.commitment();
533 if self.reconstructed_blocks.contains_key(&commitment) {
534 continue;
535 }
536
537 if let Some(state) = self.state.get_mut(&commitment) {
538 let round = state.round();
539 let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
540 warn!(%commitment, "no scheme for epoch, ignoring shard");
541 continue;
542 };
543 let progressed = state
544 .on_network_shard(
545 peer,
546 shard,
547 InsertCtx::new(scheme.as_ref(), &self.strategy),
548 &mut self.blocker,
549 )
550 .await;
551 if progressed {
552 self.try_advance(&mut sender, commitment).await;
553 }
554 } else {
555 self.buffer_peer_shard(peer, shard);
556 }
557 },
558 }
559 }
560
561 #[allow(clippy::type_complexity)]
569 fn try_reconstruct(
570 &mut self,
571 commitment: Commitment,
572 ) -> Result<Option<Arc<CodedBlock<B, C, H>>>, Error<C>> {
573 if let Some(block) = self.reconstructed_blocks.get(&commitment) {
574 return Ok(Some(Arc::clone(block)));
575 }
576 let Some(state) = self.state.get(&commitment) else {
577 return Ok(None);
578 };
579 if state.checked_shards().len() < usize::from(commitment.config().minimum_shards.get()) {
580 debug!(%commitment, "not enough checked shards to reconstruct block");
581 return Ok(None);
582 }
583 let checking_data = state
584 .checking_data()
585 .expect("checking data must be present");
586
587 let start = self.context.current();
589 let blob = C::decode(
590 &commitment.config(),
591 &commitment.root(),
592 checking_data.clone(),
593 state.checked_shards(),
594 &self.strategy,
595 )
596 .map_err(Error::Coding)?;
597 self.metrics
598 .erasure_decode_duration
599 .observe_between(start, self.context.current());
600
601 let (inner, config): (B, CodingConfig) =
603 Decode::decode_cfg(&mut blob.as_slice(), &(self.block_codec_cfg.clone(), ()))?;
604
605 match validate_reconstruction::<H, _>(&inner, config, commitment) {
606 Ok(()) => {}
607 Err(InvariantError::BlockDigest) => {
608 return Err(Error::DigestMismatch);
609 }
610 Err(InvariantError::CodingConfig) => {
611 warn!(
612 %commitment,
613 expected_config = ?commitment.config(),
614 actual_config = ?config,
615 "reconstructed block config does not match commitment config, but digest matches"
616 );
617 return Err(Error::ConfigMismatch);
618 }
619 Err(InvariantError::ContextDigest(expected, actual)) => {
620 warn!(
621 %commitment,
622 expected_context_digest = ?expected,
623 actual_context_digest = ?actual,
624 "reconstructed block context digest does not match commitment context digest"
625 );
626 return Err(Error::ContextMismatch);
627 }
628 }
629
630 let block = Arc::new(CodedBlock::new_trusted(inner, commitment));
633 self.cache_block(Arc::clone(&block));
634 self.metrics.blocks_reconstructed_total.inc();
635 Ok(Some(block))
636 }
637
638 async fn handle_external_proposal<Sr: Sender<PublicKey = P>>(
640 &mut self,
641 sender: &mut WrappedSender<Sr, Shard<C, H>>,
642 commitment: Commitment,
643 leader: P,
644 round: Round,
645 ) {
646 if self.reconstructed_blocks.contains_key(&commitment) {
647 return;
648 }
649 let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
650 warn!(%commitment, "no scheme for epoch, ignoring external proposal");
651 return;
652 };
653 let participants = scheme.participants();
654 if participants.index(&leader).is_none() {
655 warn!(?leader, %commitment, "leader update for non-participant, ignoring");
656 return;
657 }
658 if let Some(state) = self.state.get(&commitment) {
659 if state.leader() != &leader {
660 warn!(
661 existing = ?state.leader(),
662 ?leader,
663 %commitment,
664 "conflicting leader update, ignoring"
665 );
666 }
667 return;
668 }
669
670 let participants_len =
671 u64::try_from(participants.len()).expect("participant count impossibly out of bounds");
672 self.state.insert(
673 commitment,
674 ReconstructionState::new(leader, round, participants_len),
675 );
676 let buffered_progress = self.ingest_buffered_shards(commitment).await;
677 if buffered_progress {
678 self.try_advance(sender, commitment).await;
679 }
680 }
681
682 fn buffer_peer_shard(&mut self, peer: P, shard: Shard<C, H>) {
684 let queue = self.peer_buffers.entry(peer).or_default();
685 if queue.len() >= self.peer_buffer_size.get() {
686 let _ = queue.pop_front();
687 }
688 queue.push_back(shard);
689 }
690
691 async fn ingest_buffered_shards(&mut self, commitment: Commitment) -> bool {
693 let mut buffered_weak = Vec::new();
694 let mut buffered_strong = Vec::new();
695 for (peer, queue) in self.peer_buffers.iter_mut() {
696 let mut i = 0;
697 while i < queue.len() {
698 if queue[i].commitment() != commitment {
699 i += 1;
700 continue;
701 }
702 let shard = queue.swap_remove_back(i).expect("index is valid");
703 if shard.is_strong() {
704 buffered_strong.push((peer.clone(), shard));
705 } else {
706 buffered_weak.push((peer.clone(), shard));
707 }
708 }
709 }
710 self.peer_buffers.retain(|_, queue| !queue.is_empty());
711
712 let Some(state) = self.state.get_mut(&commitment) else {
713 return false;
714 };
715 let round = state.round();
716 let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
717 warn!(%commitment, "no scheme for epoch, dropping buffered shards");
718 return false;
719 };
720
721 let mut progressed = false;
724 let ctx = InsertCtx::new(scheme.as_ref(), &self.strategy);
725 for (peer, shard) in buffered_weak.into_iter().chain(buffered_strong) {
726 progressed |= state
727 .on_network_shard(peer, shard, ctx, &mut self.blocker)
728 .await;
729 }
730 progressed
731 }
732
733 fn cache_block(&mut self, block: Arc<CodedBlock<B, C, H>>) {
735 let commitment = block.commitment();
736 self.reconstructed_blocks
737 .insert(commitment, Arc::clone(&block));
738 self.notify_block_subscribers(block);
739 }
740
741 async fn broadcast_shards<Sr: Sender<PublicKey = P>>(
746 &mut self,
747 sender: &mut WrappedSender<Sr, Shard<C, H>>,
748 round: Round,
749 mut block: CodedBlock<B, C, H>,
750 ) {
751 let commitment = block.commitment();
752
753 let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
754 warn!(%commitment, "no scheme available, cannot broadcast shards");
755 return;
756 };
757 let participants = scheme.participants();
758 let Some(me) = scheme.me() else {
759 warn!(
760 %commitment,
761 "cannot broadcast shards: local proposer is not a participant"
762 );
763 return;
764 };
765
766 let shard_count = block.shards(&self.strategy).len();
767 if shard_count != participants.len() {
768 warn!(
769 %commitment,
770 shard_count,
771 participants = participants.len(),
772 "cannot broadcast shards: participant/shard count mismatch"
773 );
774 return;
775 }
776
777 let my_index = me.get() as usize;
778 let leader_shard = block
779 .shard(my_index as u16)
780 .expect("proposer's shard must exist");
781
782 for (index, peer) in participants.iter().enumerate() {
784 if index == my_index {
785 continue;
786 }
787
788 let Some(shard) = block.shard(index as u16) else {
789 warn!(
790 %commitment,
791 index,
792 "cannot broadcast shards: missing shard for participant index"
793 );
794 return;
795 };
796 let _ = sender
797 .send(Recipients::One(peer.clone()), shard, true)
798 .await;
799 }
800
801 let non_participants: Vec<P> = self
803 .tracked_peers
804 .iter()
805 .filter(|peer| participants.index(peer).is_none())
806 .cloned()
807 .collect();
808 if !non_participants.is_empty() {
809 let _ = sender
810 .send(Recipients::Some(non_participants), leader_shard, true)
811 .await;
812 }
813
814 let block = Arc::new(block);
816 self.cache_block(block);
817
818 self.notify_shard_subscribers(commitment);
821
822 debug!(?commitment, "broadcasted shards");
823 }
824
825 async fn broadcast_weak_shard<Sr: Sender<PublicKey = P>>(
827 &mut self,
828 sender: &mut WrappedSender<Sr, Shard<C, H>>,
829 shard: Shard<C, H>,
830 ) {
831 let commitment = shard.commitment();
832 if let Ok(peers) = sender.send(Recipients::All, shard, true).await {
833 debug!(
834 ?commitment,
835 peers = peers.len(),
836 "broadcasted shard to all participants"
837 );
838 }
839 }
840
841 async fn try_advance<Sr: Sender<PublicKey = P>>(
845 &mut self,
846 sender: &mut WrappedSender<Sr, Shard<C, H>>,
847 commitment: Commitment,
848 ) {
849 if let Some(state) = self.state.get_mut(&commitment) {
850 match state.take_pending_action() {
851 Some(ValidatedShardAction::Broadcast(shard)) => {
852 self.broadcast_weak_shard(sender, shard).await;
853 self.notify_shard_subscribers(commitment);
854 }
855 Some(ValidatedShardAction::NotifyOnly) => {
856 self.notify_shard_subscribers(commitment);
857 }
858 None => {}
859 }
860 }
861
862 match self.try_reconstruct(commitment) {
863 Ok(Some(block)) => {
864 debug!(
870 %commitment,
871 parent = %block.parent(),
872 height = %block.height(),
873 "successfully reconstructed block from shards"
874 );
875 }
876 Ok(None) => {
877 debug!(%commitment, "not enough checked shards to reconstruct block");
878 }
879 Err(err) => {
880 warn!(%commitment, ?err, "failed to reconstruct block from checked shards");
881 self.state.remove(&commitment);
882 self.drop_subscriptions(commitment);
883 self.metrics.reconstruction_failures_total.inc();
884 }
885 }
886 }
887
888 fn handle_shard_subscription(&mut self, commitment: Commitment, response: oneshot::Sender<()>) {
890 let has_shard = self
893 .state
894 .get(&commitment)
895 .is_some_and(|state| state.checking_data().is_some());
896 let block_reconstructed = self.reconstructed_blocks.contains_key(&commitment);
897 if has_shard || block_reconstructed {
898 response.send_lossy(());
899 return;
900 }
901
902 self.shard_subscriptions
903 .entry(commitment)
904 .or_default()
905 .push(response);
906 }
907
908 fn handle_block_subscription(
910 &mut self,
911 key: BlockSubscriptionKey<B::Digest>,
912 response: oneshot::Sender<Arc<CodedBlock<B, C, H>>>,
913 ) {
914 let block = match key {
915 BlockSubscriptionKey::Commitment(commitment) => {
916 self.reconstructed_blocks.get(&commitment)
917 }
918 BlockSubscriptionKey::Digest(digest) => self
919 .reconstructed_blocks
920 .iter()
921 .find_map(|(_, block)| (block.digest() == digest).then_some(block)),
922 };
923
924 if let Some(block) = block {
926 response.send_lossy(Arc::clone(block));
927 return;
928 }
929
930 self.block_subscriptions
931 .entry(key)
932 .or_default()
933 .push(response);
934 }
935
936 fn notify_shard_subscribers(&mut self, commitment: Commitment) {
938 if let Some(mut subscribers) = self.shard_subscriptions.remove(&commitment) {
939 for subscriber in subscribers.drain(..) {
940 subscriber.send_lossy(());
941 }
942 }
943 }
944
945 fn notify_block_subscribers(&mut self, block: Arc<CodedBlock<B, C, H>>) {
947 let commitment = block.commitment();
948 let digest = block.digest();
949
950 if let Some(mut subscribers) = self
952 .block_subscriptions
953 .remove(&BlockSubscriptionKey::Commitment(commitment))
954 {
955 for subscriber in subscribers.drain(..) {
956 subscriber.send_lossy(Arc::clone(&block));
957 }
958 }
959
960 if let Some(mut subscribers) = self
962 .block_subscriptions
963 .remove(&BlockSubscriptionKey::Digest(digest))
964 {
965 for subscriber in subscribers.drain(..) {
966 subscriber.send_lossy(Arc::clone(&block));
967 }
968 }
969 }
970
971 fn drop_subscriptions(&mut self, commitment: Commitment) {
976 self.shard_subscriptions.remove(&commitment);
977 self.block_subscriptions
978 .remove(&BlockSubscriptionKey::Commitment(commitment));
979 self.block_subscriptions
980 .remove(&BlockSubscriptionKey::Digest(
981 commitment.block::<B::Digest>(),
982 ));
983 }
984
985 fn prune(&mut self, through: Commitment) {
995 if let Some(height) = self.reconstructed_blocks.get(&through).map(|b| b.height()) {
996 self.reconstructed_blocks
997 .retain(|_, block| block.height() > height);
998 }
999
1000 self.drop_subscriptions(through);
1004 let Some(round) = self.state.remove(&through).map(|state| state.round()) else {
1005 return;
1006 };
1007
1008 let mut pruned_commitments = Vec::new();
1009 self.state.retain(|c, s| {
1010 let keep = s.round() > round;
1011 if !keep {
1012 pruned_commitments.push(*c);
1013 }
1014 keep
1015 });
1016 for pruned in pruned_commitments {
1017 self.drop_subscriptions(pruned);
1018 }
1019 }
1020}
1021
1022enum ReconstructionState<P, C, H>
1024where
1025 P: PublicKey,
1026 C: CodingScheme,
1027 H: Hasher,
1028{
1029 AwaitingQuorum(AwaitingQuorumState<P, C, H>),
1033 Ready(ReadyState<P, C, H>),
1036}
1037
1038enum ValidatedShardAction<C: CodingScheme, H: Hasher> {
1043 Broadcast(Shard<C, H>),
1045 NotifyOnly,
1047}
1048
1049struct CommonState<P, C, H>
1051where
1052 P: PublicKey,
1053 C: CodingScheme,
1054 H: Hasher,
1055{
1056 leader: P,
1058 pending_action: Option<ValidatedShardAction<C, H>>,
1060 checked_shards: Vec<C::CheckedShard>,
1062 contributed: BitMap,
1064 round: Round,
1066 received_strong: Option<C::StrongShard>,
1068}
1069
1070struct AwaitingQuorumState<P, C, H>
1077where
1078 P: PublicKey,
1079 C: CodingScheme,
1080 H: Hasher,
1081{
1082 common: CommonState<P, C, H>,
1083 pending_weak_shards: BTreeMap<P, WeakShard<C>>,
1084 checking_data: Option<C::CheckingData>,
1085}
1086
1087struct ReadyState<P, C, H>
1092where
1093 P: PublicKey,
1094 C: CodingScheme,
1095 H: Hasher,
1096{
1097 common: CommonState<P, C, H>,
1098 checking_data: C::CheckingData,
1099}
1100
1101struct StrongShard<C>
1103where
1104 C: CodingScheme,
1105{
1106 commitment: Commitment,
1107 index: u16,
1108 data: C::StrongShard,
1109}
1110
1111struct WeakShard<C>
1113where
1114 C: CodingScheme,
1115{
1116 index: u16,
1117 data: C::WeakShard,
1118}
1119
1120impl<P, C, H> CommonState<P, C, H>
1121where
1122 P: PublicKey,
1123 C: CodingScheme,
1124 H: Hasher,
1125{
1126 fn new(leader: P, round: Round, participants_len: u64) -> Self {
1128 Self {
1129 leader,
1130 pending_action: None,
1131 checked_shards: Vec::new(),
1132 contributed: BitMap::zeroes(participants_len),
1133 round,
1134 received_strong: None,
1135 }
1136 }
1137}
1138
1139impl<P, C, H> AwaitingQuorumState<P, C, H>
1140where
1141 P: PublicKey,
1142 C: CodingScheme,
1143 H: Hasher,
1144{
1145 async fn verify_strong_shard(
1155 &mut self,
1156 sender: P,
1157 shard: StrongShard<C>,
1158 is_participant: bool,
1159 blocker: &mut impl Blocker<PublicKey = P>,
1160 ) -> bool {
1161 let StrongShard {
1162 commitment,
1163 index,
1164 data,
1165 } = shard;
1166 let received_strong = data.clone();
1167 let Ok((checking_data, checked, weak_shard_data)) =
1168 C::weaken(&commitment.config(), &commitment.root(), index, data)
1169 else {
1170 commonware_p2p::block!(blocker, sender, "invalid strong shard received");
1171 return false;
1172 };
1173
1174 self.common.received_strong = Some(received_strong);
1177 self.common.contributed.set(u64::from(index), true);
1178 self.common.checked_shards.push(checked);
1179 self.common.pending_action = Some(if is_participant {
1180 ValidatedShardAction::Broadcast(Shard::new(
1181 commitment,
1182 index,
1183 DistributionShard::Weak(weak_shard_data),
1184 ))
1185 } else {
1186 ValidatedShardAction::NotifyOnly
1187 });
1188 self.checking_data = Some(checking_data);
1189 true
1190 }
1191
1192 async fn try_transition(
1195 &mut self,
1196 commitment: Commitment,
1197 participants_len: u64,
1198 strategy: &impl Strategy,
1199 blocker: &mut impl Blocker<PublicKey = P>,
1200 ) -> Option<ReadyState<P, C, H>> {
1201 self.checking_data.as_ref()?;
1202 let minimum = usize::from(commitment.config().minimum_shards.get());
1203 if self.common.checked_shards.len() + self.pending_weak_shards.len() < minimum {
1204 return None;
1205 }
1206
1207 let pending = std::mem::take(&mut self.pending_weak_shards);
1209 let checking_data = self.checking_data.as_ref().unwrap();
1210 let (new_checked, to_block) =
1211 strategy.map_partition_collect_vec(pending, |(peer, shard)| {
1212 let checked = C::check(
1213 &commitment.config(),
1214 &commitment.root(),
1215 checking_data,
1216 shard.index,
1217 shard.data,
1218 );
1219 (peer, checked.ok())
1220 });
1221
1222 for peer in to_block {
1223 commonware_p2p::block!(blocker, peer, "invalid shard received");
1224 }
1225 self.common.checked_shards.extend(new_checked);
1226
1227 if self.common.checked_shards.len() < minimum {
1229 return None;
1230 }
1231
1232 let checking_data = self.checking_data.take().unwrap();
1234 let round = self.common.round;
1235 let leader = self.common.leader.clone();
1236 let common = std::mem::replace(
1237 &mut self.common,
1238 CommonState::new(leader, round, participants_len),
1239 );
1240 Some(ReadyState {
1241 common,
1242 checking_data,
1243 })
1244 }
1245}
1246
1247struct InsertCtx<'a, Sch, S>
1249where
1250 Sch: CertificateScheme,
1251 S: Strategy,
1252{
1253 scheme: &'a Sch,
1254 strategy: &'a S,
1255 participants_len: u64,
1256}
1257
1258impl<Sch: CertificateScheme, S: Strategy> Clone for InsertCtx<'_, Sch, S> {
1259 fn clone(&self) -> Self {
1260 *self
1261 }
1262}
1263
1264impl<Sch: CertificateScheme, S: Strategy> Copy for InsertCtx<'_, Sch, S> {}
1265
1266impl<'a, Sch: CertificateScheme, S: Strategy> InsertCtx<'a, Sch, S> {
1267 fn new(scheme: &'a Sch, strategy: &'a S) -> Self {
1268 let participants_len = u64::try_from(scheme.participants().len())
1269 .expect("participant count impossibly out of bounds");
1270 Self {
1271 scheme,
1272 strategy,
1273 participants_len,
1274 }
1275 }
1276}
1277
1278impl<P, C, H> ReconstructionState<P, C, H>
1279where
1280 P: PublicKey,
1281 C: CodingScheme,
1282 H: Hasher,
1283{
1284 fn new(leader: P, round: Round, participants_len: u64) -> Self {
1286 Self::AwaitingQuorum(AwaitingQuorumState {
1287 common: CommonState::new(leader, round, participants_len),
1288 pending_weak_shards: BTreeMap::new(),
1289 checking_data: None,
1290 })
1291 }
1292
1293 const fn common(&self) -> &CommonState<P, C, H> {
1295 match self {
1296 Self::AwaitingQuorum(state) => &state.common,
1297 Self::Ready(state) => &state.common,
1298 }
1299 }
1300
1301 const fn common_mut(&mut self) -> &mut CommonState<P, C, H> {
1303 match self {
1304 Self::AwaitingQuorum(state) => &mut state.common,
1305 Self::Ready(state) => &mut state.common,
1306 }
1307 }
1308
1309 const fn leader(&self) -> &P {
1311 &self.common().leader
1312 }
1313
1314 const fn checking_data(&self) -> Option<&C::CheckingData> {
1319 match self {
1320 Self::AwaitingQuorum(state) => state.checking_data.as_ref(),
1321 Self::Ready(state) => Some(&state.checking_data),
1322 }
1323 }
1324
1325 const fn round(&self) -> Round {
1327 self.common().round
1328 }
1329
1330 const fn checked_shards(&self) -> &[C::CheckedShard] {
1334 self.common().checked_shards.as_slice()
1335 }
1336
1337 const fn take_pending_action(&mut self) -> Option<ValidatedShardAction<C, H>> {
1341 self.common_mut().pending_action.take()
1342 }
1343
1344 async fn on_network_shard<Sch, S, X>(
1382 &mut self,
1383 sender: P,
1384 shard: Shard<C, H>,
1385 ctx: InsertCtx<'_, Sch, S>,
1386 blocker: &mut X,
1387 ) -> bool
1388 where
1389 Sch: CertificateScheme<PublicKey = P>,
1390 S: Strategy,
1391 X: Blocker<PublicKey = P>,
1392 {
1393 let Some(sender_index) = ctx.scheme.participants().index(&sender) else {
1394 commonware_p2p::block!(blocker, sender, "shard sent by non-participant");
1395 return false;
1396 };
1397 let commitment = shard.commitment();
1398 let index = shard.index();
1399
1400 let progressed = match shard.into_inner() {
1401 DistributionShard::Strong(data) => {
1402 let strong = StrongShard {
1403 commitment,
1404 index,
1405 data,
1406 };
1407 self.insert_strong_shard(
1408 ctx.scheme.me().as_ref(),
1409 (sender, sender_index),
1410 strong,
1411 blocker,
1412 )
1413 .await
1414 }
1415 DistributionShard::Weak(data) => {
1416 let weak = WeakShard { index, data };
1417 self.insert_weak_shard((sender, sender_index), weak, blocker)
1418 .await
1419 }
1420 };
1421
1422 if progressed {
1423 if let Self::AwaitingQuorum(state) = self {
1424 if let Some(ready) = state
1425 .try_transition(commitment, ctx.participants_len, ctx.strategy, blocker)
1426 .await
1427 {
1428 *self = Self::Ready(ready);
1429 }
1430 }
1431 }
1432
1433 progressed
1434 }
1435
1436 async fn insert_strong_shard(
1440 &mut self,
1441 me: Option<&Participant>,
1442 (sender, sender_index): (P, Participant),
1443 shard: StrongShard<C>,
1444 blocker: &mut impl Blocker<PublicKey = P>,
1445 ) -> bool {
1446 let expected = me.copied().unwrap_or(sender_index);
1447 let expected_index: u16 = expected
1448 .get()
1449 .try_into()
1450 .expect("participant index impossibly out of bounds");
1451 if shard.index != expected_index {
1452 commonware_p2p::block!(
1453 blocker,
1454 sender,
1455 shard_index = shard.index,
1456 expected_index = expected.get() as usize,
1457 "strong shard index does not match expected index"
1458 );
1459 return false;
1460 }
1461
1462 let common = self.common();
1463 if sender != common.leader {
1464 commonware_p2p::block!(
1465 blocker,
1466 sender,
1467 leader = ?common.leader,
1468 "strong shard from non-leader"
1469 );
1470 return false;
1471 }
1472 if let Some(received_strong) = common.received_strong.as_ref() {
1473 if received_strong != &shard.data {
1474 commonware_p2p::block!(blocker, sender, "strong shard equivocation from leader");
1475 }
1476 return false;
1477 }
1478
1479 match self {
1480 Self::AwaitingQuorum(state) => {
1481 state
1482 .verify_strong_shard(sender, shard, me.is_some(), blocker)
1483 .await
1484 }
1485 Self::Ready(_) => false,
1486 }
1487 }
1488
1489 async fn insert_weak_shard(
1493 &mut self,
1494 (sender, sender_index): (P, Participant),
1495 shard: WeakShard<C>,
1496 blocker: &mut impl Blocker<PublicKey = P>,
1497 ) -> bool {
1498 let expected_index: u16 = sender_index
1499 .get()
1500 .try_into()
1501 .expect("participant index impossibly out of bounds");
1502 if shard.index != expected_index {
1503 commonware_p2p::block!(
1504 blocker,
1505 sender,
1506 shard_index = shard.index,
1507 expected_index = sender_index.get() as usize,
1508 "weak shard index does not match participant index"
1509 );
1510 return false;
1511 }
1512
1513 if self.common().contributed.get(u64::from(sender_index.get())) {
1514 let equivocated = matches!(
1515 self,
1516 Self::AwaitingQuorum(state)
1517 if state.pending_weak_shards.get(&sender).is_some_and(|existing| existing.data != shard.data)
1518 );
1519 if equivocated {
1520 commonware_p2p::block!(blocker, sender, "duplicate weak shard with different data");
1521 }
1522 return false;
1523 }
1524 self.common_mut()
1525 .contributed
1526 .set(u64::from(sender_index.get()), true);
1527
1528 match self {
1529 Self::AwaitingQuorum(state) => {
1530 state.pending_weak_shards.insert(sender, shard);
1531 true
1532 }
1533 Self::Ready(_) => false,
1534 }
1535 }
1536}
1537
1538#[cfg(test)]
1539mod tests {
1540 use super::*;
1541 use crate::{
1542 marshal::{
1543 coding::types::coding_config_for_participants, mocks::block::Block as MockBlock,
1544 },
1545 types::{Height, View},
1546 };
1547 use bytes::Bytes;
1548 use commonware_codec::Encode;
1549 use commonware_coding::{CodecConfig, Config as CodingConfig, ReedSolomon, Zoda};
1550 use commonware_cryptography::{
1551 certificate::Subject,
1552 ed25519::{PrivateKey, PublicKey},
1553 impl_certificate_ed25519,
1554 sha256::Digest as Sha256Digest,
1555 Committable, Digest, Sha256, Signer,
1556 };
1557 use commonware_macros::{select, test_traced};
1558 use commonware_p2p::{
1559 simulated::{self, Control, Link, Oracle},
1560 Manager as _,
1561 };
1562 use commonware_parallel::Sequential;
1563 use commonware_runtime::{deterministic, Quota, Runner};
1564 use commonware_utils::{
1565 channel::oneshot::error::TryRecvError, ordered::Set, NZUsize, Participant,
1566 };
1567 use std::{
1568 future::Future,
1569 marker::PhantomData,
1570 num::NonZeroU32,
1571 sync::atomic::{AtomicIsize, Ordering},
1572 time::Duration,
1573 };
1574
1575 #[derive(Clone, Debug)]
1576 pub struct TestSubject {
1577 pub message: Bytes,
1578 }
1579
1580 impl Subject for TestSubject {
1581 type Namespace = Vec<u8>;
1582
1583 fn namespace<'a>(&self, derived: &'a Self::Namespace) -> &'a [u8] {
1584 derived
1585 }
1586
1587 fn message(&self) -> Bytes {
1588 self.message.clone()
1589 }
1590 }
1591
1592 impl_certificate_ed25519!(TestSubject, Vec<u8>);
1593
1594 const SCHEME_NAMESPACE: &[u8] = b"_COMMONWARE_SHARD_ENGINE_TEST";
1595
1596 const MAX_SHARD_SIZE: usize = 1024 * 1024; const DEFAULT_LINK: Link = Link {
1601 latency: Duration::from_millis(50),
1602 jitter: Duration::ZERO,
1603 success_rate: 1.0,
1604 };
1605
1606 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
1608
1609 const STRATEGY: Sequential = Sequential;
1611
1612 #[derive(Clone)]
1618 struct MultiEpochProvider {
1619 schemes: BTreeMap<Epoch, Arc<Scheme>>,
1620 }
1621
1622 impl MultiEpochProvider {
1623 fn single(scheme: Scheme) -> Self {
1624 let mut schemes = BTreeMap::new();
1625 schemes.insert(Epoch::zero(), Arc::new(scheme));
1626 Self { schemes }
1627 }
1628
1629 fn with_epoch(mut self, epoch: Epoch, scheme: Scheme) -> Self {
1630 self.schemes.insert(epoch, Arc::new(scheme));
1631 self
1632 }
1633 }
1634
1635 impl Provider for MultiEpochProvider {
1636 type Scope = Epoch;
1637 type Scheme = Scheme;
1638
1639 fn scoped(&self, scope: Epoch) -> Option<Arc<Scheme>> {
1640 self.schemes.get(&scope).cloned()
1641 }
1642 }
1643
1644 #[derive(Clone)]
1647 struct ChurningProvider {
1648 scheme: Arc<Scheme>,
1649 remaining_successes: Arc<AtomicIsize>,
1650 }
1651
1652 impl ChurningProvider {
1653 fn new(scheme: Scheme, successes: isize) -> Self {
1654 Self {
1655 scheme: Arc::new(scheme),
1656 remaining_successes: Arc::new(AtomicIsize::new(successes)),
1657 }
1658 }
1659 }
1660
1661 impl Provider for ChurningProvider {
1662 type Scope = Epoch;
1663 type Scheme = Scheme;
1664
1665 fn scoped(&self, scope: Epoch) -> Option<Arc<Scheme>> {
1666 if scope != Epoch::zero() {
1667 return None;
1668 }
1669 if self.remaining_successes.fetch_sub(1, Ordering::AcqRel) <= 0 {
1670 return None;
1671 }
1672 Some(Arc::clone(&self.scheme))
1673 }
1674 }
1675
1676 type B = MockBlock<Sha256Digest, ()>;
1678 type H = Sha256;
1679 type P = PublicKey;
1680 type C = ReedSolomon<H>;
1681 type X = Control<P, deterministic::Context>;
1682 type O = Oracle<P, deterministic::Context>;
1683 type Prov = MultiEpochProvider;
1684 type NetworkSender = simulated::Sender<P, deterministic::Context>;
1685 type D = simulated::Manager<P, deterministic::Context>;
1686 type ShardEngine<S> = Engine<deterministic::Context, Prov, X, D, S, H, B, P, Sequential>;
1687 type ChurningShardEngine<S> =
1688 Engine<deterministic::Context, ChurningProvider, X, D, S, H, B, P, Sequential>;
1689
1690 async fn assert_blocked(oracle: &O, blocker: &P, blocked: &P) {
1691 let blocked_peers = oracle.blocked().await.unwrap();
1692 let is_blocked = blocked_peers
1693 .iter()
1694 .any(|(a, b)| a == blocker && b == blocked);
1695 assert!(is_blocked, "expected {blocker} to have blocked {blocked}");
1696 }
1697
1698 struct Peer<S: CodingScheme = C> {
1700 public_key: PublicKey,
1702 index: Participant,
1704 mailbox: Mailbox<B, S, H, P>,
1706 sender: NetworkSender,
1708 }
1709
1710 #[allow(dead_code)]
1712 struct NonParticipant<S: CodingScheme = C> {
1713 public_key: PublicKey,
1715 mailbox: Mailbox<B, S, H, P>,
1717 sender: NetworkSender,
1719 }
1720
1721 struct Fixture<S: CodingScheme = C> {
1723 num_peers: usize,
1725 num_non_participants: usize,
1727 link: Link,
1729 _marker: PhantomData<S>,
1731 }
1732
1733 impl<S: CodingScheme> Default for Fixture<S> {
1734 fn default() -> Self {
1735 Self {
1736 num_peers: 4,
1737 num_non_participants: 0,
1738 link: DEFAULT_LINK,
1739 _marker: PhantomData,
1740 }
1741 }
1742 }
1743
1744 impl<S: CodingScheme> Fixture<S> {
1745 pub fn start<F: Future<Output = ()>>(
1746 self,
1747 f: impl FnOnce(
1748 Self,
1749 deterministic::Context,
1750 O,
1751 Vec<Peer<S>>,
1752 Vec<NonParticipant<S>>,
1753 CodingConfig,
1754 ) -> F,
1755 ) {
1756 let executor = deterministic::Runner::default();
1757 executor.start(|context| async move {
1758 let tracked_peer_sets = if self.num_non_participants > 0 {
1759 Some(1)
1760 } else {
1761 None
1762 };
1763 let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
1764 context.with_label("network"),
1765 simulated::Config {
1766 max_size: MAX_SHARD_SIZE as u32,
1767 disconnect_on_block: true,
1768 tracked_peer_sets,
1769 },
1770 );
1771 network.start();
1772
1773 let mut private_keys = (0..self.num_peers)
1774 .map(|i| PrivateKey::from_seed(i as u64))
1775 .collect::<Vec<_>>();
1776 private_keys.sort_by_key(|s| s.public_key());
1777 let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
1778
1779 let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
1780
1781 let mut np_private_keys = (0..self.num_non_participants)
1782 .map(|i| PrivateKey::from_seed((self.num_peers + i) as u64))
1783 .collect::<Vec<_>>();
1784 np_private_keys.sort_by_key(|s| s.public_key());
1785 let np_keys: Vec<P> = np_private_keys.iter().map(|k| k.public_key()).collect();
1786
1787 let all_keys: Vec<P> = peer_keys.iter().chain(np_keys.iter()).cloned().collect();
1788
1789 let mut registrations = BTreeMap::new();
1790 for key in all_keys.iter() {
1791 let control = oracle.control(key.clone());
1792 let (sender, receiver) = control
1793 .register(0, TEST_QUOTA)
1794 .await
1795 .expect("registration should succeed");
1796 registrations.insert(key.clone(), (control, sender, receiver));
1797 }
1798 for p1 in all_keys.iter() {
1799 for p2 in all_keys.iter() {
1800 if p2 == p1 {
1801 continue;
1802 }
1803 oracle
1804 .add_link(p1.clone(), p2.clone(), self.link.clone())
1805 .await
1806 .expect("link should be added");
1807 }
1808 }
1809
1810 let coding_config =
1811 coding_config_for_participants(u16::try_from(self.num_peers).unwrap());
1812
1813 let mut peers = Vec::with_capacity(self.num_peers);
1814 for (idx, peer_key) in peer_keys.iter().enumerate() {
1815 let (control, sender, receiver) = registrations
1816 .remove(peer_key)
1817 .expect("peer should be registered");
1818
1819 let participant = Participant::new(idx as u32);
1820 let engine_context = context.with_label(&format!("peer_{}", idx));
1821
1822 let scheme = Scheme::signer(
1823 SCHEME_NAMESPACE,
1824 participants.clone(),
1825 private_keys[idx].clone(),
1826 )
1827 .expect("signer scheme should be created");
1828 let scheme_provider: Prov = MultiEpochProvider::single(scheme);
1829
1830 let config = Config {
1831 scheme_provider,
1832 blocker: control.clone(),
1833 shard_codec_cfg: CodecConfig {
1834 maximum_shard_size: MAX_SHARD_SIZE,
1835 },
1836 block_codec_cfg: (),
1837 strategy: STRATEGY,
1838 mailbox_size: 1024,
1839 peer_buffer_size: NZUsize!(64),
1840 background_channel_capacity: 1024,
1841 peer_provider: oracle.manager(),
1842 };
1843
1844 let (engine, mailbox) = ShardEngine::new(engine_context, config);
1845 let sender_clone = sender.clone();
1846 engine.start((sender, receiver));
1847
1848 peers.push(Peer {
1849 public_key: peer_key.clone(),
1850 index: participant,
1851 mailbox,
1852 sender: sender_clone,
1853 });
1854 }
1855
1856 let mut non_participants = Vec::with_capacity(self.num_non_participants);
1857 for (idx, np_key) in np_keys.iter().enumerate() {
1858 let (control, sender, receiver) = registrations
1859 .remove(np_key)
1860 .expect("non-participant should be registered");
1861
1862 let engine_context = context.with_label(&format!("non_participant_{}", idx));
1863
1864 let scheme = Scheme::verifier(SCHEME_NAMESPACE, participants.clone());
1865 let scheme_provider: Prov = MultiEpochProvider::single(scheme);
1866
1867 let config = Config {
1868 scheme_provider,
1869 blocker: control.clone(),
1870 shard_codec_cfg: CodecConfig {
1871 maximum_shard_size: MAX_SHARD_SIZE,
1872 },
1873 block_codec_cfg: (),
1874 strategy: STRATEGY,
1875 mailbox_size: 1024,
1876 peer_buffer_size: NZUsize!(64),
1877 background_channel_capacity: 1024,
1878 peer_provider: oracle.manager(),
1879 };
1880
1881 let (engine, mailbox) = ShardEngine::new(engine_context, config);
1882 let sender_clone = sender.clone();
1883 engine.start((sender, receiver));
1884
1885 non_participants.push(NonParticipant {
1886 public_key: np_key.clone(),
1887 mailbox,
1888 sender: sender_clone,
1889 });
1890 }
1891
1892 if self.num_non_participants > 0 {
1893 let all_tracked: Set<P> = Set::from_iter_dedup(all_keys);
1894 oracle.manager().track(1, all_tracked).await;
1895 context.sleep(Duration::from_millis(10)).await;
1896 }
1897
1898 f(
1899 self,
1900 context,
1901 oracle,
1902 peers,
1903 non_participants,
1904 coding_config,
1905 )
1906 .await;
1907 });
1908 }
1909 }
1910
1911 #[test_traced]
1912 fn test_e2e_broadcast_and_reconstruction() {
1913 let fixture = Fixture {
1914 num_peers: 10,
1915 ..Default::default()
1916 };
1917
1918 fixture.start(
1919 |config, context, _, mut peers, _, coding_config| async move {
1920 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
1921 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
1922 let commitment = coded_block.commitment();
1923
1924 let leader = peers[0].public_key.clone();
1925 let round = Round::new(Epoch::zero(), View::new(1));
1926 peers[0].mailbox.proposed(round, coded_block.clone()).await;
1927
1928 for peer in peers[1..].iter_mut() {
1930 peer.mailbox
1931 .discovered(commitment, leader.clone(), round)
1932 .await;
1933 }
1934 context.sleep(config.link.latency).await;
1935
1936 for peer in peers.iter_mut() {
1937 peer.mailbox
1938 .subscribe_shard(commitment)
1939 .await
1940 .await
1941 .expect("shard subscription should complete");
1942 }
1943 context.sleep(config.link.latency).await;
1944
1945 for peer in peers.iter_mut() {
1946 let reconstructed = peer
1947 .mailbox
1948 .get(commitment)
1949 .await
1950 .expect("block should be reconstructed");
1951 assert_eq!(reconstructed.commitment(), commitment);
1952 assert_eq!(reconstructed.height(), coded_block.height());
1953 }
1954 },
1955 );
1956 }
1957
1958 #[test_traced]
1959 fn test_e2e_broadcast_and_reconstruction_zoda() {
1960 let fixture = Fixture {
1961 num_peers: 10,
1962 ..Default::default()
1963 };
1964
1965 fixture.start(
1966 |config, context, _, mut peers, _, coding_config| async move {
1967 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
1968 let coded_block = CodedBlock::<B, Zoda<H>, H>::new(inner, coding_config, &STRATEGY);
1969 let commitment = coded_block.commitment();
1970
1971 let leader = peers[0].public_key.clone();
1972 let round = Round::new(Epoch::zero(), View::new(1));
1973 peers[0].mailbox.proposed(round, coded_block.clone()).await;
1974
1975 for peer in peers[1..].iter_mut() {
1977 peer.mailbox
1978 .discovered(commitment, leader.clone(), round)
1979 .await;
1980 }
1981 context.sleep(config.link.latency).await;
1982
1983 for peer in peers.iter_mut() {
1984 peer.mailbox
1985 .subscribe_shard(commitment)
1986 .await
1987 .await
1988 .expect("shard subscription should complete");
1989 }
1990 context.sleep(config.link.latency).await;
1991
1992 for peer in peers.iter_mut() {
1993 let reconstructed = peer
1994 .mailbox
1995 .get(commitment)
1996 .await
1997 .expect("block should be reconstructed");
1998 assert_eq!(reconstructed.commitment(), commitment);
1999 assert_eq!(reconstructed.height(), coded_block.height());
2000 }
2001 },
2002 );
2003 }
2004
2005 #[test_traced]
2006 fn test_block_subscriptions() {
2007 let fixture = Fixture {
2008 num_peers: 10,
2009 ..Default::default()
2010 };
2011
2012 fixture.start(
2013 |config, context, _, mut peers, _, coding_config| async move {
2014 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2015 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2016 let commitment = coded_block.commitment();
2017 let digest = coded_block.digest();
2018
2019 let leader = peers[0].public_key.clone();
2020 let round = Round::new(Epoch::zero(), View::new(1));
2021
2022 let commitment_sub = peers[1].mailbox.subscribe(commitment).await;
2024 let digest_sub = peers[2].mailbox.subscribe_by_digest(digest).await;
2025
2026 peers[0].mailbox.proposed(round, coded_block.clone()).await;
2027
2028 for peer in peers[1..].iter_mut() {
2030 peer.mailbox
2031 .discovered(commitment, leader.clone(), round)
2032 .await;
2033 }
2034 context.sleep(config.link.latency * 2).await;
2035
2036 for peer in peers.iter_mut() {
2037 peer.mailbox
2038 .subscribe_shard(commitment)
2039 .await
2040 .await
2041 .expect("shard subscription should complete");
2042 }
2043 context.sleep(config.link.latency).await;
2044
2045 let block_by_commitment =
2046 commitment_sub.await.expect("subscription should resolve");
2047 assert_eq!(block_by_commitment.commitment(), commitment);
2048 assert_eq!(block_by_commitment.height(), coded_block.height());
2049
2050 let block_by_digest = digest_sub.await.expect("subscription should resolve");
2051 assert_eq!(block_by_digest.commitment(), commitment);
2052 assert_eq!(block_by_digest.height(), coded_block.height());
2053 },
2054 );
2055 }
2056
2057 #[test_traced]
2058 fn test_proposer_preproposal_subscriptions_resolve_after_local_cache() {
2059 let fixture = Fixture {
2060 num_peers: 10,
2061 ..Default::default()
2062 };
2063
2064 fixture.start(|config, context, _, peers, _, coding_config| async move {
2065 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2066 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2067 let commitment = coded_block.commitment();
2068 let digest = coded_block.digest();
2069 let round = Round::new(Epoch::zero(), View::new(1));
2070
2071 let shard_sub = peers[0].mailbox.subscribe_shard(commitment).await;
2073 let commitment_sub = peers[0].mailbox.subscribe(commitment).await;
2074 let digest_sub = peers[0].mailbox.subscribe_by_digest(digest).await;
2075
2076 peers[0].mailbox.proposed(round, coded_block.clone()).await;
2077 context.sleep(config.link.latency).await;
2078
2079 select! {
2080 result = shard_sub => {
2081 result.expect("shard subscription should resolve");
2082 },
2083 _ = context.sleep(Duration::from_secs(5)) => {
2084 panic!("shard subscription did not resolve after local proposal cache");
2085 }
2086 }
2087
2088 let block_by_commitment = select! {
2089 result = commitment_sub => {
2090 result.expect("block subscription by commitment should resolve")
2091 },
2092 _ = context.sleep(Duration::from_secs(5)) => {
2093 panic!("block subscription by commitment did not resolve after local proposal cache");
2094 }
2095 };
2096 assert_eq!(block_by_commitment.commitment(), commitment);
2097 assert_eq!(block_by_commitment.height(), coded_block.height());
2098
2099 let block_by_digest = select! {
2100 result = digest_sub => {
2101 result.expect("block subscription by digest should resolve")
2102 },
2103 _ = context.sleep(Duration::from_secs(5)) => {
2104 panic!("block subscription by digest did not resolve after local proposal cache");
2105 }
2106 };
2107 assert_eq!(block_by_digest.commitment(), commitment);
2108 assert_eq!(block_by_digest.height(), coded_block.height());
2109 });
2110 }
2111
2112 #[test_traced]
2113 fn test_shard_subscription_rejects_invalid_shard() {
2114 let fixture = Fixture::<C>::default();
2115 fixture.start(
2116 |config, context, oracle, mut peers, _, coding_config| async move {
2117 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2122 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2123 let commitment = coded_block.commitment();
2124 let receiver_index = peers[2].index.get() as u16;
2125
2126 let valid_shard = coded_block.shard(receiver_index).expect("missing shard");
2127
2128 let mut invalid_shard = valid_shard.clone();
2130 invalid_shard.index = 0;
2131
2132 let receiver_pk = peers[2].public_key.clone();
2134 let leader = peers[1].public_key.clone();
2135 peers[2]
2136 .mailbox
2137 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2138 .await;
2139 let mut shard_sub = peers[2].mailbox.subscribe_shard(commitment).await;
2140
2141 let invalid_bytes = invalid_shard.encode();
2143 peers[0]
2144 .sender
2145 .send(Recipients::One(receiver_pk.clone()), invalid_bytes, true)
2146 .await
2147 .expect("send failed");
2148
2149 context.sleep(config.link.latency * 2).await;
2150
2151 assert!(
2152 matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
2153 "subscription should not resolve from invalid shard"
2154 );
2155 assert_blocked(&oracle, &peers[2].public_key, &peers[0].public_key).await;
2156
2157 let valid_bytes = valid_shard.encode();
2159 peers[1]
2160 .sender
2161 .send(Recipients::One(receiver_pk), valid_bytes, true)
2162 .await
2163 .expect("send failed");
2164 context.sleep(config.link.latency * 2).await;
2165
2166 select! {
2168 _ = shard_sub => {},
2169 _ = context.sleep(Duration::from_secs(5)) => {
2170 panic!("subscription did not complete after valid shard arrival");
2171 },
2172 };
2173 },
2174 );
2175 }
2176
2177 #[test_traced]
2178 fn test_durable_prunes_reconstructed_blocks() {
2179 let fixture = Fixture::<C>::default();
2180 fixture.start(|_, context, _, mut peers, _, coding_config| async move {
2181 let block1 = CodedBlock::<B, C, H>::new(
2183 B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100),
2184 coding_config,
2185 &STRATEGY,
2186 );
2187 let block2 = CodedBlock::<B, C, H>::new(
2188 B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 100),
2189 coding_config,
2190 &STRATEGY,
2191 );
2192 let block3 = CodedBlock::<B, C, H>::new(
2193 B::new::<H>((), Sha256Digest::EMPTY, Height::new(3), 100),
2194 coding_config,
2195 &STRATEGY,
2196 );
2197 let commitment1 = block1.commitment();
2198 let commitment2 = block2.commitment();
2199 let commitment3 = block3.commitment();
2200
2201 let peer = &mut peers[0];
2203 let round = Round::new(Epoch::zero(), View::new(1));
2204 peer.mailbox.proposed(round, block1).await;
2205 peer.mailbox.proposed(round, block2).await;
2206 peer.mailbox.proposed(round, block3).await;
2207 context.sleep(Duration::from_millis(10)).await;
2208
2209 assert!(
2211 peer.mailbox.get(commitment1).await.is_some(),
2212 "block1 should be cached"
2213 );
2214 assert!(
2215 peer.mailbox.get(commitment2).await.is_some(),
2216 "block2 should be cached"
2217 );
2218 assert!(
2219 peer.mailbox.get(commitment3).await.is_some(),
2220 "block3 should be cached"
2221 );
2222
2223 peer.mailbox.prune(commitment2).await;
2225 context.sleep(Duration::from_millis(10)).await;
2226
2227 assert!(
2229 peer.mailbox.get(commitment1).await.is_none(),
2230 "block1 should be pruned"
2231 );
2232 assert!(
2233 peer.mailbox.get(commitment2).await.is_none(),
2234 "block2 should be pruned"
2235 );
2236
2237 assert!(
2239 peer.mailbox.get(commitment3).await.is_some(),
2240 "block3 should still be cached"
2241 );
2242 });
2243 }
2244
2245 #[test_traced]
2246 fn test_duplicate_leader_strong_shard_ignored() {
2247 let fixture = Fixture::<C>::default();
2248 fixture.start(
2249 |config, context, oracle, mut peers, _, coding_config| async move {
2250 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2251 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2252 let commitment = coded_block.commitment();
2253
2254 let peer2_index = peers[2].index.get() as u16;
2256 let peer2_strong_shard = coded_block.shard(peer2_index).expect("missing shard");
2257 let strong_bytes = peer2_strong_shard.encode();
2258
2259 let peer2_pk = peers[2].public_key.clone();
2260 let leader = peers[0].public_key.clone();
2261
2262 peers[2]
2264 .mailbox
2265 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2266 .await;
2267
2268 peers[0]
2270 .sender
2271 .send(
2272 Recipients::One(peer2_pk.clone()),
2273 strong_bytes.clone(),
2274 true,
2275 )
2276 .await
2277 .expect("send failed");
2278 context.sleep(config.link.latency * 2).await;
2279
2280 peers[0]
2282 .sender
2283 .send(Recipients::One(peer2_pk), strong_bytes, true)
2284 .await
2285 .expect("send failed");
2286 context.sleep(config.link.latency * 2).await;
2287
2288 let blocked_peers = oracle.blocked().await.unwrap();
2290 let is_blocked = blocked_peers
2291 .iter()
2292 .any(|(a, b)| a == &peers[2].public_key && b == &peers[0].public_key);
2293 assert!(
2294 !is_blocked,
2295 "leader should not be blocked for duplicate strong shard"
2296 );
2297 },
2298 );
2299 }
2300
2301 #[test_traced]
2302 fn test_equivocating_leader_strong_shard_blocks_peer() {
2303 let fixture = Fixture::<C>::default();
2304 fixture.start(
2305 |config, context, oracle, mut peers, _, coding_config| async move {
2306 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2307 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
2308 let commitment = coded_block1.commitment();
2309
2310 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 200);
2312 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
2313
2314 let peer2_index = peers[2].index.get() as u16;
2316 let strong_bytes1 = coded_block1
2317 .shard(peer2_index)
2318 .expect("missing shard")
2319 .encode();
2320 let mut equivocating_shard =
2321 coded_block2.shard(peer2_index).expect("missing shard");
2322 equivocating_shard.commitment = commitment;
2324 let strong_bytes2 = equivocating_shard.encode();
2325
2326 let peer2_pk = peers[2].public_key.clone();
2327 let leader = peers[0].public_key.clone();
2328
2329 peers[2]
2331 .mailbox
2332 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2333 .await;
2334
2335 peers[0]
2337 .sender
2338 .send(Recipients::One(peer2_pk.clone()), strong_bytes1, true)
2339 .await
2340 .expect("send failed");
2341 context.sleep(config.link.latency * 2).await;
2342
2343 peers[0]
2345 .sender
2346 .send(Recipients::One(peer2_pk), strong_bytes2, true)
2347 .await
2348 .expect("send failed");
2349 context.sleep(config.link.latency * 2).await;
2350
2351 assert_blocked(&oracle, &peers[2].public_key, &peers[0].public_key).await;
2353 },
2354 );
2355 }
2356
2357 #[test_traced]
2358 fn test_non_leader_strong_shard_blocked() {
2359 let fixture = Fixture::<C>::default();
2361 fixture.start(
2362 |config, context, oracle, mut peers, _, coding_config| async move {
2363 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2364 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2365 let commitment = coded_block.commitment();
2366
2367 let peer2_index = peers[2].index.get() as u16;
2369 let peer2_strong_shard = coded_block.shard(peer2_index).expect("missing shard");
2370 let strong_bytes = peer2_strong_shard.encode();
2371
2372 let peer2_pk = peers[2].public_key.clone();
2373 let leader = peers[0].public_key.clone();
2374
2375 peers[2]
2377 .mailbox
2378 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2379 .await;
2380
2381 peers[1]
2383 .sender
2384 .send(Recipients::One(peer2_pk), strong_bytes, true)
2385 .await
2386 .expect("send failed");
2387 context.sleep(config.link.latency * 2).await;
2388
2389 assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2391 },
2392 );
2393 }
2394
2395 #[test_traced]
2396 fn test_buffered_non_leader_blocked_on_leader_arrival() {
2397 let fixture = Fixture::<C>::default();
2400 fixture.start(
2401 |config, context, oracle, mut peers, _, coding_config| async move {
2402 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2403 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2404 let commitment = coded_block.commitment();
2405
2406 let peer2_index = peers[2].index.get() as u16;
2408 let peer2_strong_shard = coded_block.shard(peer2_index).expect("missing shard");
2409 let strong_bytes = peer2_strong_shard.encode();
2410
2411 let peer2_pk = peers[2].public_key.clone();
2412
2413 peers[1]
2415 .sender
2416 .send(Recipients::One(peer2_pk), strong_bytes, true)
2417 .await
2418 .expect("send failed");
2419 context.sleep(config.link.latency * 2).await;
2420
2421 let blocked = oracle.blocked().await.unwrap();
2423 assert!(
2424 blocked.is_empty(),
2425 "no peers should be blocked while leader is unknown"
2426 );
2427
2428 let leader = peers[0].public_key.clone();
2432 peers[2]
2433 .mailbox
2434 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2435 .await;
2436 context.sleep(Duration::from_millis(10)).await;
2437
2438 assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2439 },
2440 );
2441 }
2442
2443 #[test_traced]
2444 fn test_conflicting_external_proposed_ignored() {
2445 let fixture = Fixture::<C>::default();
2446 fixture.start(
2447 |config, context, oracle, mut peers, _, coding_config| async move {
2448 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2449 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2450 let commitment = coded_block.commitment();
2451
2452 let peer2_index = peers[2].index.get() as u16;
2454 let peer2_strong_shard = coded_block.shard(peer2_index).expect("missing shard");
2455 let strong_bytes = peer2_strong_shard.encode();
2456
2457 let peer2_pk = peers[2].public_key.clone();
2458 let leader_a = peers[0].public_key.clone();
2459 let leader_b = peers[1].public_key.clone();
2460
2461 let shard_sub = peers[2].mailbox.subscribe_shard(commitment).await;
2463
2464 peers[2]
2466 .mailbox
2467 .discovered(
2468 commitment,
2469 leader_a.clone(),
2470 Round::new(Epoch::zero(), View::new(1)),
2471 )
2472 .await;
2473 peers[2]
2475 .mailbox
2476 .discovered(
2477 commitment,
2478 leader_b,
2479 Round::new(Epoch::zero(), View::new(1)),
2480 )
2481 .await;
2482
2483 peers[0]
2485 .sender
2486 .send(
2487 Recipients::One(peer2_pk.clone()),
2488 strong_bytes.clone(),
2489 true,
2490 )
2491 .await
2492 .expect("send failed");
2493 context.sleep(config.link.latency * 2).await;
2494
2495 select! {
2497 _ = shard_sub => {},
2498 _ = context.sleep(Duration::from_secs(5)) => {
2499 panic!(
2500 "subscription did not complete after strong shard from original leader"
2501 );
2502 },
2503 };
2504
2505 peers[1]
2507 .sender
2508 .send(Recipients::One(peer2_pk), strong_bytes, true)
2509 .await
2510 .expect("send failed");
2511 context.sleep(config.link.latency * 2).await;
2512
2513 assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2514
2515 let blocked_peers = oracle.blocked().await.unwrap();
2517 let leader_a_blocked = blocked_peers
2518 .iter()
2519 .any(|(a, b)| a == &peers[2].public_key && b == &leader_a);
2520 assert!(
2521 !leader_a_blocked,
2522 "original leader should not be blocked after conflicting leader update"
2523 );
2524 },
2525 );
2526 }
2527
2528 #[test_traced]
2529 fn test_non_participant_external_proposed_ignored() {
2530 let fixture = Fixture::<C>::default();
2531 fixture.start(
2532 |config, context, oracle, mut peers, _, coding_config| async move {
2533 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2534 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2535 let commitment = coded_block.commitment();
2536
2537 let peer2_index = peers[2].index.get() as u16;
2539 let peer2_strong_shard = coded_block.shard(peer2_index).expect("missing shard");
2540 let strong_bytes = peer2_strong_shard.encode();
2541
2542 let peer2_pk = peers[2].public_key.clone();
2543 let leader = peers[0].public_key.clone();
2544 let non_participant_leader = PrivateKey::from_seed(10_000).public_key();
2545
2546 let shard_sub = peers[2].mailbox.subscribe_shard(commitment).await;
2548
2549 peers[2]
2551 .mailbox
2552 .discovered(
2553 commitment,
2554 non_participant_leader,
2555 Round::new(Epoch::zero(), View::new(1)),
2556 )
2557 .await;
2558
2559 peers[0]
2561 .sender
2562 .send(
2563 Recipients::One(peer2_pk.clone()),
2564 strong_bytes.clone(),
2565 true,
2566 )
2567 .await
2568 .expect("send failed");
2569 context.sleep(config.link.latency * 2).await;
2570
2571 let blocked = oracle.blocked().await.unwrap();
2572 let leader_blocked = blocked
2573 .iter()
2574 .any(|(a, b)| a == &peers[2].public_key && b == &leader);
2575 assert!(
2576 !leader_blocked,
2577 "leader should not be blocked when non-participant update is ignored"
2578 );
2579
2580 peers[2]
2582 .mailbox
2583 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2584 .await;
2585 context.sleep(config.link.latency * 2).await;
2586
2587 select! {
2588 _ = shard_sub => {},
2589 _ = context.sleep(Duration::from_secs(5)) => {
2590 panic!("subscription did not complete after valid leader update");
2591 },
2592 };
2593 },
2594 );
2595 }
2596
2597 #[test_traced]
2598 fn test_shard_from_non_participant_blocks_peer() {
2599 let fixture = Fixture::<C>::default();
2600 fixture.start(
2601 |config, context, oracle, peers, _, coding_config| async move {
2602 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2603 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2604 let commitment = coded_block.commitment();
2605
2606 let leader = peers[0].public_key.clone();
2607 let receiver_pk = peers[2].public_key.clone();
2608
2609 let non_participant_key = PrivateKey::from_seed(10_000);
2610 let non_participant_pk = non_participant_key.public_key();
2611
2612 let non_participant_control = oracle.control(non_participant_pk.clone());
2613 let (mut non_participant_sender, _non_participant_receiver) =
2614 non_participant_control
2615 .register(0, TEST_QUOTA)
2616 .await
2617 .expect("registration should succeed");
2618 oracle
2619 .add_link(
2620 non_participant_pk.clone(),
2621 receiver_pk.clone(),
2622 DEFAULT_LINK,
2623 )
2624 .await
2625 .expect("link should be added");
2626
2627 peers[2]
2628 .mailbox
2629 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2630 .await;
2631
2632 let peer2_index = peers[2].index.get() as u16;
2633 let strong_shard = coded_block.shard(peer2_index).expect("missing shard");
2634 let weak_shard = strong_shard
2635 .verify_into_weak()
2636 .expect("verify_into_weak failed");
2637 let weak_bytes = weak_shard.encode();
2638
2639 non_participant_sender
2640 .send(Recipients::One(receiver_pk), weak_bytes, true)
2641 .await
2642 .expect("send failed");
2643 context.sleep(config.link.latency * 2).await;
2644
2645 assert_blocked(&oracle, &peers[2].public_key, &non_participant_pk).await;
2646 },
2647 );
2648 }
2649
2650 #[test_traced]
2651 fn test_buffered_shard_from_non_participant_blocks_peer() {
2652 let fixture = Fixture::<C>::default();
2653 fixture.start(
2654 |config, context, oracle, peers, _, coding_config| async move {
2655 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2656 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2657 let commitment = coded_block.commitment();
2658
2659 let leader = peers[0].public_key.clone();
2660 let receiver_pk = peers[2].public_key.clone();
2661
2662 let non_participant_key = PrivateKey::from_seed(10_000);
2663 let non_participant_pk = non_participant_key.public_key();
2664
2665 let non_participant_control = oracle.control(non_participant_pk.clone());
2666 let (mut non_participant_sender, _non_participant_receiver) =
2667 non_participant_control
2668 .register(0, TEST_QUOTA)
2669 .await
2670 .expect("registration should succeed");
2671 oracle
2672 .add_link(
2673 non_participant_pk.clone(),
2674 receiver_pk.clone(),
2675 DEFAULT_LINK,
2676 )
2677 .await
2678 .expect("link should be added");
2679
2680 let peer2_index = peers[2].index.get() as u16;
2681 let strong_shard = coded_block.shard(peer2_index).expect("missing shard");
2682 let weak_shard = strong_shard
2683 .verify_into_weak()
2684 .expect("verify_into_weak failed");
2685 let weak_bytes = weak_shard.encode();
2686
2687 non_participant_sender
2688 .send(Recipients::One(receiver_pk), weak_bytes, true)
2689 .await
2690 .expect("send failed");
2691 context.sleep(config.link.latency * 2).await;
2692
2693 peers[2]
2694 .mailbox
2695 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2696 .await;
2697 context.sleep(config.link.latency * 2).await;
2698
2699 assert_blocked(&oracle, &peers[2].public_key, &non_participant_pk).await;
2700 },
2701 );
2702 }
2703
2704 #[test_traced]
2705 fn test_duplicate_weak_shard_ignored() {
2706 let fixture: Fixture<C> = Fixture {
2708 num_peers: 10,
2709 ..Default::default()
2710 };
2711
2712 fixture.start(
2713 |config, context, oracle, mut peers, _, coding_config| async move {
2714 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2715 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2716
2717 let peer2_index = peers[2].index.get() as u16;
2719 let peer2_strong_shard = coded_block.shard(peer2_index).expect("missing shard");
2720
2721 let peer1_index = peers[1].index.get() as u16;
2723 let peer1_strong_shard = coded_block.shard(peer1_index).expect("missing shard");
2724 let peer1_weak_shard = peer1_strong_shard
2725 .verify_into_weak()
2726 .expect("verify_into_weak failed");
2727
2728 let peer2_pk = peers[2].public_key.clone();
2729 let leader = peers[0].public_key.clone();
2730
2731 peers[2]
2733 .mailbox
2734 .discovered(
2735 coded_block.commitment(),
2736 leader,
2737 Round::new(Epoch::zero(), View::new(1)),
2738 )
2739 .await;
2740
2741 let strong_bytes = peer2_strong_shard.encode();
2743 peers[0]
2744 .sender
2745 .send(Recipients::One(peer2_pk.clone()), strong_bytes, true)
2746 .await
2747 .expect("send failed");
2748 context.sleep(config.link.latency * 2).await;
2749
2750 let weak_shard_bytes = peer1_weak_shard.encode();
2752 peers[1]
2753 .sender
2754 .send(
2755 Recipients::One(peer2_pk.clone()),
2756 weak_shard_bytes.clone(),
2757 true,
2758 )
2759 .await
2760 .expect("send failed");
2761 context.sleep(config.link.latency * 2).await;
2762
2763 peers[1]
2766 .sender
2767 .send(Recipients::One(peer2_pk), weak_shard_bytes, true)
2768 .await
2769 .expect("send failed");
2770 context.sleep(config.link.latency * 2).await;
2771
2772 let blocked_peers = oracle.blocked().await.unwrap();
2774 let is_blocked = blocked_peers
2775 .iter()
2776 .any(|(a, b)| a == &peers[2].public_key && b == &peers[1].public_key);
2777 assert!(
2778 !is_blocked,
2779 "peer should not be blocked for exact duplicate weak shard"
2780 );
2781 },
2782 );
2783 }
2784
2785 #[test_traced]
2786 fn test_equivocating_weak_shard_blocks_peer() {
2787 let fixture: Fixture<C> = Fixture {
2789 num_peers: 10,
2790 ..Default::default()
2791 };
2792
2793 fixture.start(
2794 |config, context, oracle, mut peers, _, coding_config| async move {
2795 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2796 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
2797
2798 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 200);
2800 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
2801
2802 let peer2_index = peers[2].index.get() as u16;
2804 let peer2_strong_shard = coded_block1.shard(peer2_index).expect("missing shard");
2805
2806 let peer1_index = peers[1].index.get() as u16;
2808 let peer1_strong_shard = coded_block1.shard(peer1_index).expect("missing shard");
2809 let peer1_weak_shard = peer1_strong_shard
2810 .verify_into_weak()
2811 .expect("verify_into_weak failed");
2812
2813 let peer1_strong_shard2 = coded_block2.shard(peer1_index).expect("missing shard");
2815 let mut peer1_equivocating_shard = peer1_strong_shard2
2816 .verify_into_weak()
2817 .expect("verify_into_weak failed");
2818 peer1_equivocating_shard.commitment = coded_block1.commitment();
2821
2822 let peer2_pk = peers[2].public_key.clone();
2823 let leader = peers[0].public_key.clone();
2824
2825 peers[2]
2827 .mailbox
2828 .discovered(
2829 coded_block1.commitment(),
2830 leader,
2831 Round::new(Epoch::zero(), View::new(1)),
2832 )
2833 .await;
2834
2835 let strong_bytes = peer2_strong_shard.encode();
2837 peers[0]
2838 .sender
2839 .send(Recipients::One(peer2_pk.clone()), strong_bytes, true)
2840 .await
2841 .expect("send failed");
2842 context.sleep(config.link.latency * 2).await;
2843
2844 let weak_shard_bytes = peer1_weak_shard.encode();
2846 peers[1]
2847 .sender
2848 .send(Recipients::One(peer2_pk.clone()), weak_shard_bytes, true)
2849 .await
2850 .expect("send failed");
2851 context.sleep(config.link.latency * 2).await;
2852
2853 let equivocating_bytes = peer1_equivocating_shard.encode();
2855 peers[1]
2856 .sender
2857 .send(Recipients::One(peer2_pk), equivocating_bytes, true)
2858 .await
2859 .expect("send failed");
2860 context.sleep(config.link.latency * 2).await;
2861
2862 assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2864 },
2865 );
2866 }
2867
2868 #[test_traced]
2869 fn test_reconstruction_states_pruned_at_or_below_reconstructed_view() {
2870 let fixture: Fixture<C> = Fixture {
2872 num_peers: 10,
2873 ..Default::default()
2874 };
2875
2876 fixture.start(
2877 |config, context, oracle, mut peers, _, coding_config| async move {
2878 let block_a = CodedBlock::<B, C, H>::new(
2880 B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100),
2881 coding_config,
2882 &STRATEGY,
2883 );
2884 let commitment_a = block_a.commitment();
2885
2886 let block_b = CodedBlock::<B, C, H>::new(
2888 B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200),
2889 coding_config,
2890 &STRATEGY,
2891 );
2892 let commitment_b = block_b.commitment();
2893
2894 let peer2_pk = peers[2].public_key.clone();
2895 let leader = peers[0].public_key.clone();
2896
2897 peers[2]
2899 .mailbox
2900 .discovered(
2901 commitment_a,
2902 leader.clone(),
2903 Round::new(Epoch::zero(), View::new(1)),
2904 )
2905 .await;
2906 let peer1_strong_a = block_a
2907 .shard(peers[1].index.get() as u16)
2908 .expect("missing shard");
2909 let weak_a = peer1_strong_a
2910 .verify_into_weak()
2911 .expect("verify_into_weak failed")
2912 .encode();
2913 peers[1]
2914 .sender
2915 .send(Recipients::One(peer2_pk.clone()), weak_a.clone(), true)
2916 .await
2917 .expect("send failed");
2918 context.sleep(config.link.latency * 2).await;
2919
2920 peers[2]
2922 .mailbox
2923 .discovered(
2924 commitment_b,
2925 leader,
2926 Round::new(Epoch::zero(), View::new(2)),
2927 )
2928 .await;
2929 let strong_b = block_b
2931 .shard(peers[2].index.get() as u16)
2932 .expect("missing shard")
2933 .encode();
2934 peers[0]
2935 .sender
2936 .send(Recipients::One(peer2_pk.clone()), strong_b, true)
2937 .await
2938 .expect("send failed");
2939
2940 for i in [1usize, 3usize, 4usize] {
2942 let weak = block_b
2943 .shard(peers[i].index.get() as u16)
2944 .expect("missing shard")
2945 .verify_into_weak()
2946 .expect("verify_into_weak failed")
2947 .encode();
2948 peers[i]
2949 .sender
2950 .send(Recipients::One(peer2_pk.clone()), weak, true)
2951 .await
2952 .expect("send failed");
2953 }
2954 context.sleep(config.link.latency * 4).await;
2955
2956 let reconstructed = peers[2]
2958 .mailbox
2959 .get(commitment_b)
2960 .await
2961 .expect("block B should reconstruct");
2962 assert_eq!(reconstructed.commitment(), commitment_b);
2963
2964 peers[1]
2967 .sender
2968 .send(Recipients::One(peer2_pk), weak_a, true)
2969 .await
2970 .expect("send failed");
2971 context.sleep(config.link.latency * 2).await;
2972
2973 let blocked = oracle.blocked().await.unwrap();
2974 let blocked_peer1 = blocked
2975 .iter()
2976 .any(|(a, b)| a == &peers[2].public_key && b == &peers[1].public_key);
2977 assert!(
2978 !blocked_peer1,
2979 "peer1 should not be blocked after lower-view state was pruned"
2980 );
2981 },
2982 );
2983 }
2984
2985 #[test_traced]
2986 fn test_drain_pending_validates_weak_shards_after_strong_shard() {
2987 let fixture: Fixture<C> = Fixture {
2993 num_peers: 10,
2994 ..Default::default()
2995 };
2996
2997 fixture.start(
2998 |config, context, oracle, mut peers, _, coding_config| async move {
2999 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3000 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3001 let commitment = coded_block.commitment();
3002
3003 let peer3_index = peers[3].index.get() as u16;
3005 let peer3_strong_shard = coded_block.shard(peer3_index).expect("missing shard");
3006
3007 let weak_shards: Vec<_> = [0, 1, 2]
3009 .iter()
3010 .map(|&i| {
3011 coded_block
3012 .shard(peers[i].index.get() as u16)
3013 .expect("missing shard")
3014 .verify_into_weak()
3015 .expect("verify_into_weak failed")
3016 })
3017 .collect();
3018
3019 let peer3_pk = peers[3].public_key.clone();
3020
3021 for (i, weak_shard) in weak_shards.iter().enumerate() {
3024 let sender_idx = [0, 1, 2][i];
3025 let weak_shard_bytes = weak_shard.encode();
3026 peers[sender_idx]
3027 .sender
3028 .send(Recipients::One(peer3_pk.clone()), weak_shard_bytes, true)
3029 .await
3030 .expect("send failed");
3031 }
3032
3033 context.sleep(config.link.latency * 2).await;
3034
3035 let block = peers[3].mailbox.get(commitment).await;
3037 assert!(block.is_none(), "block should not be reconstructed yet");
3038
3039 let leader = peers[2].public_key.clone();
3041 peers[3]
3042 .mailbox
3043 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
3044 .await;
3045
3046 let strong_bytes = peer3_strong_shard.encode();
3051 peers[2]
3052 .sender
3053 .send(Recipients::One(peer3_pk), strong_bytes, true)
3054 .await
3055 .expect("send failed");
3056
3057 context.sleep(config.link.latency * 2).await;
3058
3059 let blocked = oracle.blocked().await.unwrap();
3061 assert!(
3062 blocked.is_empty(),
3063 "no peers should be blocked for valid pending weak shards"
3064 );
3065
3066 let block = peers[3].mailbox.get(commitment).await;
3068 assert!(
3069 block.is_some(),
3070 "block should be reconstructed after drain_pending"
3071 );
3072
3073 let reconstructed = block.unwrap();
3075 assert_eq!(
3076 reconstructed.commitment(),
3077 commitment,
3078 "reconstructed block should have correct commitment"
3079 );
3080 },
3081 );
3082 }
3083
3084 #[test_traced]
3085 fn test_peer_shards_buffered_until_external_proposed() {
3086 let fixture: Fixture<C> = Fixture {
3089 num_peers: 10,
3090 ..Default::default()
3091 };
3092
3093 fixture.start(
3094 |config, context, oracle, mut peers, _, coding_config| async move {
3095 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3096 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3097 let commitment = coded_block.commitment();
3098
3099 let receiver_idx = 3usize;
3100 let receiver_pk = peers[receiver_idx].public_key.clone();
3101 let leader = peers[0].public_key.clone();
3102
3103 let mut shard_sub = peers[receiver_idx]
3105 .mailbox
3106 .subscribe_shard(commitment)
3107 .await;
3108
3109 let strong = coded_block
3112 .shard(peers[receiver_idx].index.get() as u16)
3113 .expect("missing shard")
3114 .encode();
3115 peers[0]
3116 .sender
3117 .send(Recipients::One(receiver_pk.clone()), strong, true)
3118 .await
3119 .expect("send failed");
3120
3121 for i in [1usize, 2usize, 4usize] {
3122 let weak = coded_block
3123 .shard(peers[i].index.get() as u16)
3124 .expect("missing shard")
3125 .verify_into_weak()
3126 .expect("verify_into_weak failed")
3127 .encode();
3128 peers[i]
3129 .sender
3130 .send(Recipients::One(receiver_pk.clone()), weak, true)
3131 .await
3132 .expect("send failed");
3133 }
3134
3135 context.sleep(config.link.latency * 2).await;
3136
3137 assert!(
3139 matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
3140 "shard subscription should not resolve before leader announcement"
3141 );
3142 assert!(
3143 peers[receiver_idx].mailbox.get(commitment).await.is_none(),
3144 "block should not reconstruct before leader announcement"
3145 );
3146
3147 peers[receiver_idx]
3149 .mailbox
3150 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
3151 .await;
3152
3153 select! {
3154 _ = shard_sub => {},
3155 _ = context.sleep(Duration::from_secs(5)) => {
3156 panic!("shard subscription did not resolve after leader announcement");
3157 },
3158 }
3159
3160 context.sleep(config.link.latency * 2).await;
3161 assert!(
3162 peers[receiver_idx].mailbox.get(commitment).await.is_some(),
3163 "block should reconstruct after buffered shards are ingested"
3164 );
3165
3166 assert!(
3168 oracle.blocked().await.unwrap().is_empty(),
3169 "no peers should be blocked for valid buffered shards"
3170 );
3171 },
3172 );
3173 }
3174
3175 #[test_traced]
3176 fn test_post_leader_shards_processed_immediately() {
3177 let fixture: Fixture<C> = Fixture {
3180 num_peers: 10,
3181 ..Default::default()
3182 };
3183
3184 fixture.start(
3185 |config, context, oracle, mut peers, _, coding_config| async move {
3186 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3187 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3188 let commitment = coded_block.commitment();
3189
3190 let receiver_idx = 3usize;
3191 let receiver_pk = peers[receiver_idx].public_key.clone();
3192 let leader = peers[0].public_key.clone();
3193
3194 let shard_sub = peers[receiver_idx]
3195 .mailbox
3196 .subscribe_shard(commitment)
3197 .await;
3198 peers[receiver_idx]
3199 .mailbox
3200 .discovered(
3201 commitment,
3202 leader.clone(),
3203 Round::new(Epoch::zero(), View::new(1)),
3204 )
3205 .await;
3206
3207 let strong = coded_block
3209 .shard(peers[receiver_idx].index.get() as u16)
3210 .expect("missing shard")
3211 .encode();
3212 peers[0]
3213 .sender
3214 .send(Recipients::One(receiver_pk.clone()), strong, true)
3215 .await
3216 .expect("send failed");
3217
3218 select! {
3220 _ = shard_sub => {},
3221 _ = context.sleep(Duration::from_secs(5)) => {
3222 panic!("shard subscription did not resolve after post-leader strong shard");
3223 },
3224 }
3225
3226 for i in [1usize, 2usize, 4usize] {
3228 let weak = coded_block
3229 .shard(peers[i].index.get() as u16)
3230 .expect("missing shard")
3231 .verify_into_weak()
3232 .expect("verify_into_weak failed")
3233 .encode();
3234 peers[i]
3235 .sender
3236 .send(Recipients::One(receiver_pk.clone()), weak, true)
3237 .await
3238 .expect("send failed");
3239 }
3240
3241 context.sleep(config.link.latency * 2).await;
3242 let reconstructed = peers[receiver_idx]
3243 .mailbox
3244 .get(commitment)
3245 .await
3246 .expect("block should reconstruct from post-leader shards");
3247 assert_eq!(reconstructed.commitment(), commitment);
3248
3249 assert!(
3250 oracle.blocked().await.unwrap().is_empty(),
3251 "no peers should be blocked for valid post-leader shards"
3252 );
3253 },
3254 );
3255 }
3256
3257 #[test_traced]
3258 fn test_invalid_shard_codec_blocks_peer() {
3259 let fixture: Fixture<C> = Fixture {
3261 num_peers: 4,
3262 ..Default::default()
3263 };
3264
3265 fixture.start(
3266 |config, context, oracle, mut peers, _, _coding_config| async move {
3267 let peer0_pk = peers[0].public_key.clone();
3268 let peer1_pk = peers[1].public_key.clone();
3269
3270 let garbage = Bytes::from(vec![0xFF, 0xFE, 0xFD, 0xFC, 0xFB]);
3272 peers[1]
3273 .sender
3274 .send(Recipients::One(peer0_pk.clone()), garbage, true)
3275 .await
3276 .expect("send failed");
3277
3278 context.sleep(config.link.latency * 2).await;
3279
3280 assert_blocked(&oracle, &peer0_pk, &peer1_pk).await;
3282 },
3283 );
3284 }
3285
3286 #[test_traced]
3287 fn test_duplicate_buffered_strong_shard_does_not_block_before_leader() {
3288 let fixture: Fixture<C> = Fixture {
3291 ..Default::default()
3292 };
3293
3294 fixture.start(
3295 |config, context, oracle, mut peers, _, coding_config| async move {
3296 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3297 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3298
3299 let peer2_index = peers[2].index.get() as u16;
3301 let peer2_strong_shard = coded_block.shard(peer2_index).expect("missing shard");
3302 let strong_bytes = peer2_strong_shard.encode();
3303
3304 let peer2_pk = peers[2].public_key.clone();
3305
3306 peers[1]
3310 .sender
3311 .send(
3312 Recipients::One(peer2_pk.clone()),
3313 strong_bytes.clone(),
3314 true,
3315 )
3316 .await
3317 .expect("send failed");
3318 context.sleep(config.link.latency * 2).await;
3319
3320 let blocked = oracle.blocked().await.unwrap();
3322 assert!(blocked.is_empty(), "no peers should be blocked yet");
3323
3324 peers[1]
3326 .sender
3327 .send(Recipients::One(peer2_pk), strong_bytes, true)
3328 .await
3329 .expect("send failed");
3330 context.sleep(config.link.latency * 2).await;
3331
3332 let blocked = oracle.blocked().await.unwrap();
3334 assert!(
3335 blocked.is_empty(),
3336 "no peers should be blocked before leader"
3337 );
3338 },
3339 );
3340 }
3341
3342 #[test_traced]
3343 fn test_invalid_strong_shard_crypto_blocks_leader() {
3344 let fixture: Fixture<C> = Fixture {
3347 ..Default::default()
3348 };
3349
3350 fixture.start(
3351 |config, context, oracle, mut peers, _, coding_config| async move {
3352 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3355 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3356 let commitment1 = coded_block1.commitment();
3357
3358 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3359 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3360
3361 let peer2_index = peers[2].index.get() as u16;
3364 let mut wrong_shard = coded_block2.shard(peer2_index).expect("missing shard");
3365 wrong_shard.commitment = commitment1;
3366 let wrong_bytes = wrong_shard.encode();
3367
3368 let peer2_pk = peers[2].public_key.clone();
3369 let leader = peers[0].public_key.clone();
3370
3371 peers[2]
3373 .mailbox
3374 .discovered(commitment1, leader, Round::new(Epoch::zero(), View::new(1)))
3375 .await;
3376
3377 peers[0]
3379 .sender
3380 .send(Recipients::One(peer2_pk), wrong_bytes, true)
3381 .await
3382 .expect("send failed");
3383 context.sleep(config.link.latency * 2).await;
3384
3385 assert_blocked(&oracle, &peers[2].public_key, &peers[0].public_key).await;
3387 },
3388 );
3389 }
3390
3391 #[test_traced]
3392 fn test_weak_shard_index_mismatch_blocks_peer() {
3393 let fixture: Fixture<C> = Fixture {
3396 num_peers: 10,
3397 ..Default::default()
3398 };
3399
3400 fixture.start(
3401 |config, context, oracle, mut peers, _, coding_config| async move {
3402 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3403 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3404 let commitment = coded_block.commitment();
3405
3406 let peer3_index = peers[3].index.get() as u16;
3408 let peer3_strong_shard = coded_block.shard(peer3_index).expect("missing shard");
3409
3410 let peer1_index = peers[1].index.get() as u16;
3412 let mut wrong_index_weak_shard = coded_block
3413 .shard(peer1_index)
3414 .expect("missing shard")
3415 .verify_into_weak()
3416 .expect("verify_into_weak failed");
3417 wrong_index_weak_shard.index = peers[4].index.get() as u16;
3419 let wrong_bytes = wrong_index_weak_shard.encode();
3420
3421 let peer3_pk = peers[3].public_key.clone();
3422 let leader = peers[0].public_key.clone();
3423
3424 peers[3]
3426 .mailbox
3427 .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
3428 .await;
3429 let strong_bytes = peer3_strong_shard.encode();
3430 peers[0]
3431 .sender
3432 .send(Recipients::One(peer3_pk.clone()), strong_bytes, true)
3433 .await
3434 .expect("send failed");
3435 context.sleep(config.link.latency * 2).await;
3436
3437 peers[1]
3439 .sender
3440 .send(Recipients::One(peer3_pk), wrong_bytes, true)
3441 .await
3442 .expect("send failed");
3443 context.sleep(config.link.latency * 2).await;
3444
3445 assert_blocked(&oracle, &peers[3].public_key, &peers[1].public_key).await;
3447 },
3448 );
3449 }
3450
3451 #[test_traced]
3452 fn test_invalid_weak_shard_crypto_blocks_peer() {
3453 let fixture: Fixture<C> = Fixture {
3456 num_peers: 10,
3457 ..Default::default()
3458 };
3459
3460 fixture.start(
3461 |config, context, oracle, mut peers, _, coding_config| async move {
3462 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3464 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3465 let commitment1 = coded_block1.commitment();
3466
3467 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3468 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3469
3470 let peer3_index = peers[3].index.get() as u16;
3472 let peer3_strong_shard = coded_block1.shard(peer3_index).expect("missing shard");
3473
3474 let peer1_index = peers[1].index.get() as u16;
3477 let mut wrong_weak_shard = coded_block2
3478 .shard(peer1_index)
3479 .expect("missing shard")
3480 .verify_into_weak()
3481 .expect("verify_into_weak failed");
3482 wrong_weak_shard.commitment = commitment1;
3483 let wrong_bytes = wrong_weak_shard.encode();
3484
3485 let peer3_pk = peers[3].public_key.clone();
3486 let leader = peers[0].public_key.clone();
3487
3488 peers[3]
3490 .mailbox
3491 .discovered(commitment1, leader, Round::new(Epoch::zero(), View::new(1)))
3492 .await;
3493 let strong_bytes = peer3_strong_shard.encode();
3494 peers[0]
3495 .sender
3496 .send(Recipients::One(peer3_pk.clone()), strong_bytes, true)
3497 .await
3498 .expect("send failed");
3499 context.sleep(config.link.latency * 2).await;
3500
3501 peers[1]
3503 .sender
3504 .send(Recipients::One(peer3_pk.clone()), wrong_bytes, true)
3505 .await
3506 .expect("send failed");
3507 context.sleep(config.link.latency * 2).await;
3508
3509 for &idx in &[2, 4] {
3513 let peer_index = peers[idx].index.get() as u16;
3514 let weak = coded_block1
3515 .shard(peer_index)
3516 .expect("missing shard")
3517 .verify_into_weak()
3518 .expect("verify_into_weak failed");
3519 let bytes = weak.encode();
3520 peers[idx]
3521 .sender
3522 .send(Recipients::One(peer3_pk.clone()), bytes, true)
3523 .await
3524 .expect("send failed");
3525 }
3526 context.sleep(config.link.latency * 2).await;
3527
3528 assert_blocked(&oracle, &peers[3].public_key, &peers[1].public_key).await;
3530 },
3531 );
3532 }
3533
3534 #[test_traced]
3535 fn test_reconstruction_recovers_after_quorum_with_one_invalid_weak_shard() {
3536 let fixture: Fixture<C> = Fixture {
3541 num_peers: 10,
3542 ..Default::default()
3543 };
3544
3545 fixture.start(
3546 |config, context, oracle, mut peers, _, coding_config| async move {
3547 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3548 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3549 let commitment1 = coded_block1.commitment();
3550
3551 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3552 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3553
3554 let receiver_idx = 3usize;
3555 let receiver_pk = peers[receiver_idx].public_key.clone();
3556
3557 let peer1_index = peers[1].index.get() as u16;
3559 let mut invalid_weak = coded_block2
3560 .shard(peer1_index)
3561 .expect("missing shard")
3562 .verify_into_weak()
3563 .expect("verify_into_weak failed");
3564 invalid_weak.commitment = commitment1;
3565
3566 let leader = peers[0].public_key.clone();
3568 peers[receiver_idx]
3569 .mailbox
3570 .discovered(commitment1, leader, Round::new(Epoch::zero(), View::new(1)))
3571 .await;
3572 let receiver_strong = coded_block1
3573 .shard(peers[receiver_idx].index.get() as u16)
3574 .expect("missing shard")
3575 .encode();
3576 peers[0]
3577 .sender
3578 .send(Recipients::One(receiver_pk.clone()), receiver_strong, true)
3579 .await
3580 .expect("send failed");
3581
3582 peers[1]
3587 .sender
3588 .send(
3589 Recipients::One(receiver_pk.clone()),
3590 invalid_weak.encode(),
3591 true,
3592 )
3593 .await
3594 .expect("send failed");
3595 for idx in [2usize, 4usize] {
3596 let weak = coded_block1
3597 .shard(peers[idx].index.get() as u16)
3598 .expect("missing shard")
3599 .verify_into_weak()
3600 .expect("verify_into_weak failed")
3601 .encode();
3602 peers[idx]
3603 .sender
3604 .send(Recipients::One(receiver_pk.clone()), weak, true)
3605 .await
3606 .expect("send failed");
3607 }
3608
3609 context.sleep(config.link.latency * 2).await;
3610
3611 assert_blocked(
3613 &oracle,
3614 &peers[receiver_idx].public_key,
3615 &peers[1].public_key,
3616 )
3617 .await;
3618 assert!(
3619 peers[receiver_idx].mailbox.get(commitment1).await.is_none(),
3620 "block should not reconstruct with only 3 checked shards"
3621 );
3622
3623 let extra_valid = coded_block1
3625 .shard(peers[5].index.get() as u16)
3626 .expect("missing shard")
3627 .verify_into_weak()
3628 .expect("verify_into_weak failed")
3629 .encode();
3630 peers[5]
3631 .sender
3632 .send(Recipients::One(receiver_pk), extra_valid, true)
3633 .await
3634 .expect("send failed");
3635
3636 context.sleep(config.link.latency * 2).await;
3637
3638 let reconstructed = peers[receiver_idx]
3639 .mailbox
3640 .get(commitment1)
3641 .await
3642 .expect("block should reconstruct after additional valid weak shard");
3643 assert_eq!(reconstructed.commitment(), commitment1);
3644 },
3645 );
3646 }
3647
3648 #[test_traced]
3649 fn test_invalid_pending_weak_shard_blocked_on_drain() {
3650 let fixture: Fixture<C> = Fixture {
3653 num_peers: 10,
3654 ..Default::default()
3655 };
3656
3657 fixture.start(
3658 |config, context, oracle, mut peers, _, coding_config| async move {
3659 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3661 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3662 let commitment1 = coded_block1.commitment();
3663
3664 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3665 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3666
3667 let peer1_index = peers[1].index.get() as u16;
3669 let mut wrong_weak_shard = coded_block2
3670 .shard(peer1_index)
3671 .expect("missing shard")
3672 .verify_into_weak()
3673 .expect("verify_into_weak failed");
3674 wrong_weak_shard.commitment = commitment1;
3675 let wrong_bytes = wrong_weak_shard.encode();
3676
3677 let peer3_pk = peers[3].public_key.clone();
3678
3679 peers[1]
3682 .sender
3683 .send(Recipients::One(peer3_pk.clone()), wrong_bytes, true)
3684 .await
3685 .expect("send failed");
3686 context.sleep(config.link.latency * 2).await;
3687
3688 let blocked = oracle.blocked().await.unwrap();
3690 assert!(blocked.is_empty(), "no peers should be blocked yet");
3691
3692 for &idx in &[2, 4] {
3696 let peer_index = peers[idx].index.get() as u16;
3697 let weak = coded_block1
3698 .shard(peer_index)
3699 .expect("missing shard")
3700 .verify_into_weak()
3701 .expect("verify_into_weak failed");
3702 let bytes = weak.encode();
3703 peers[idx]
3704 .sender
3705 .send(Recipients::One(peer3_pk.clone()), bytes, true)
3706 .await
3707 .expect("send failed");
3708 }
3709 context.sleep(config.link.latency * 2).await;
3710
3711 let blocked = oracle.blocked().await.unwrap();
3713 assert!(blocked.is_empty(), "no peers should be blocked yet");
3714
3715 let leader = peers[0].public_key.clone();
3717 peers[3]
3718 .mailbox
3719 .discovered(commitment1, leader, Round::new(Epoch::zero(), View::new(1)))
3720 .await;
3721 let peer3_index = peers[3].index.get() as u16;
3722 let peer3_strong_shard = coded_block1.shard(peer3_index).expect("missing shard");
3723 let strong_bytes = peer3_strong_shard.encode();
3724 peers[0]
3725 .sender
3726 .send(Recipients::One(peer3_pk), strong_bytes, true)
3727 .await
3728 .expect("send failed");
3729 context.sleep(config.link.latency * 2).await;
3730
3731 assert_blocked(&oracle, &peers[3].public_key, &peers[1].public_key).await;
3734 },
3735 );
3736 }
3737
3738 #[test_traced]
3739 fn test_cross_epoch_buffered_shard_not_blocked() {
3740 let executor = deterministic::Runner::default();
3741 executor.start(|context| async move {
3742 let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
3743 context.with_label("network"),
3744 simulated::Config {
3745 max_size: MAX_SHARD_SIZE as u32,
3746 disconnect_on_block: true,
3747 tracked_peer_sets: None,
3748 },
3749 );
3750 network.start();
3751
3752 let mut epoch0_keys: Vec<PrivateKey> = (0..4).map(PrivateKey::from_seed).collect();
3755 epoch0_keys.sort_by_key(|s| s.public_key());
3756 let epoch0_pks: Vec<P> = epoch0_keys.iter().map(|c| c.public_key()).collect();
3757 let epoch0_set: Set<P> = Set::from_iter_dedup(epoch0_pks.clone());
3758
3759 let future_peer_key = PrivateKey::from_seed(4);
3760 let future_peer_pk = future_peer_key.public_key();
3761 let mut epoch1_pks: Vec<P> = epoch0_pks[..3]
3762 .iter()
3763 .cloned()
3764 .chain(std::iter::once(future_peer_pk.clone()))
3765 .collect();
3766 epoch1_pks.sort();
3767 let epoch1_set: Set<P> = Set::from_iter_dedup(epoch1_pks);
3768
3769 let receiver_idx_in_epoch0 = epoch0_set
3770 .index(&epoch0_pks[0])
3771 .expect("receiver must be in epoch 0")
3772 .get() as usize;
3773 let receiver_key = epoch0_keys[receiver_idx_in_epoch0].clone();
3774 let receiver_pk = receiver_key.public_key();
3775
3776 let receiver_control = oracle.control(receiver_pk.clone());
3777 let (sender_handle, receiver_handle) = receiver_control
3778 .register(0, TEST_QUOTA)
3779 .await
3780 .expect("registration should succeed");
3781
3782 let future_peer_control = oracle.control(future_peer_pk.clone());
3783 let (mut future_peer_sender, _future_peer_receiver) = future_peer_control
3784 .register(0, TEST_QUOTA)
3785 .await
3786 .expect("registration should succeed");
3787 oracle
3788 .add_link(future_peer_pk.clone(), receiver_pk.clone(), DEFAULT_LINK)
3789 .await
3790 .expect("link should be added");
3791
3792 let scheme_epoch0 =
3794 Scheme::signer(SCHEME_NAMESPACE, epoch0_set.clone(), receiver_key.clone())
3795 .expect("signer scheme should be created");
3796 let scheme_epoch1 =
3797 Scheme::signer(SCHEME_NAMESPACE, epoch1_set.clone(), receiver_key.clone())
3798 .expect("signer scheme should be created");
3799 let scheme_provider =
3800 MultiEpochProvider::single(scheme_epoch0).with_epoch(Epoch::new(1), scheme_epoch1);
3801
3802 let config: Config<_, _, _, _, C, _, _, _> = Config {
3803 scheme_provider,
3804 blocker: receiver_control.clone(),
3805 shard_codec_cfg: CodecConfig {
3806 maximum_shard_size: MAX_SHARD_SIZE,
3807 },
3808 block_codec_cfg: (),
3809 strategy: STRATEGY,
3810 mailbox_size: 1024,
3811 peer_buffer_size: NZUsize!(64),
3812 background_channel_capacity: 1024,
3813 peer_provider: oracle.manager(),
3814 };
3815
3816 let (engine, mailbox) = ShardEngine::new(context.with_label("receiver"), config);
3817 engine.start((sender_handle, receiver_handle));
3818
3819 let coding_config = coding_config_for_participants(epoch1_set.len() as u16);
3821 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3822 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3823 let commitment = coded_block.commitment();
3824
3825 let future_peer_index = epoch1_set
3827 .index(&future_peer_pk)
3828 .expect("future peer must be in epoch 1");
3829 let strong_shard = coded_block
3830 .shard(future_peer_index.get() as u16)
3831 .expect("missing shard");
3832 let weak_shard = strong_shard
3833 .verify_into_weak()
3834 .expect("verify_into_weak failed");
3835 let weak_bytes = weak_shard.encode();
3836
3837 future_peer_sender
3839 .send(Recipients::One(receiver_pk.clone()), weak_bytes, true)
3840 .await
3841 .expect("send failed");
3842 context.sleep(DEFAULT_LINK.latency * 2).await;
3843
3844 let blocked = oracle.blocked().await.unwrap();
3846 assert!(
3847 blocked.is_empty(),
3848 "no peers should be blocked while shard is buffered"
3849 );
3850
3851 let leader = epoch0_pks[1].clone();
3853 mailbox
3854 .discovered(commitment, leader, Round::new(Epoch::new(1), View::new(1)))
3855 .await;
3856 context.sleep(DEFAULT_LINK.latency * 2).await;
3857
3858 let blocked = oracle.blocked().await.unwrap();
3861 assert!(
3862 blocked.is_empty(),
3863 "future-epoch participant should not be blocked: {blocked:?}"
3864 );
3865 });
3866 }
3867
3868 #[test_traced]
3869 fn test_weak_shard_broadcast_survives_provider_churn() {
3870 let executor = deterministic::Runner::default();
3871 executor.start(|context| async move {
3872 let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
3873 context.with_label("network"),
3874 simulated::Config {
3875 max_size: MAX_SHARD_SIZE as u32,
3876 disconnect_on_block: true,
3877 tracked_peer_sets: None,
3878 },
3879 );
3880 network.start();
3881
3882 let mut private_keys: Vec<PrivateKey> = (0..4).map(PrivateKey::from_seed).collect();
3883 private_keys.sort_by_key(|s| s.public_key());
3884 let peer_keys: Vec<P> = private_keys.iter().map(|k| k.public_key()).collect();
3885 let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
3886
3887 let leader_idx = 0usize;
3888 let broadcaster_idx = 1usize;
3889 let receiver_idx = 2usize;
3890
3891 let leader_pk = peer_keys[leader_idx].clone();
3892 let broadcaster_pk = peer_keys[broadcaster_idx].clone();
3893 let receiver_pk = peer_keys[receiver_idx].clone();
3894
3895 let mut registrations = BTreeMap::new();
3896 for key in &peer_keys {
3897 let control = oracle.control(key.clone());
3898 let (sender, receiver) = control
3899 .register(0, TEST_QUOTA)
3900 .await
3901 .expect("registration should succeed");
3902 registrations.insert(key.clone(), (control, sender, receiver));
3903 }
3904
3905 for src in &peer_keys {
3906 for dst in &peer_keys {
3907 if src == dst {
3908 continue;
3909 }
3910 oracle
3911 .add_link(src.clone(), dst.clone(), DEFAULT_LINK)
3912 .await
3913 .expect("link should be added");
3914 }
3915 }
3916
3917 let (_leader_control, mut leader_sender, _leader_receiver) = registrations
3918 .remove(&leader_pk)
3919 .expect("leader should be registered");
3920 let (broadcaster_control, broadcaster_sender, broadcaster_receiver) = registrations
3921 .remove(&broadcaster_pk)
3922 .expect("broadcaster should be registered");
3923 let (receiver_control, receiver_sender, receiver_receiver) = registrations
3924 .remove(&receiver_pk)
3925 .expect("receiver should be registered");
3926
3927 let broadcaster_scheme = Scheme::signer(
3928 SCHEME_NAMESPACE,
3929 participants.clone(),
3930 private_keys[broadcaster_idx].clone(),
3931 )
3932 .expect("signer scheme should be created");
3933 let broadcaster_provider = ChurningProvider::new(broadcaster_scheme, 3);
3937 let broadcaster_config: Config<_, _, _, _, C, _, _, _> = Config {
3938 scheme_provider: broadcaster_provider,
3939 blocker: broadcaster_control.clone(),
3940 shard_codec_cfg: CodecConfig {
3941 maximum_shard_size: MAX_SHARD_SIZE,
3942 },
3943 block_codec_cfg: (),
3944 strategy: STRATEGY,
3945 mailbox_size: 1024,
3946 peer_buffer_size: NZUsize!(64),
3947 background_channel_capacity: 1024,
3948 peer_provider: oracle.manager(),
3949 };
3950 let (broadcaster_engine, broadcaster_mailbox) =
3951 ChurningShardEngine::new(context.with_label("broadcaster"), broadcaster_config);
3952 broadcaster_engine.start((broadcaster_sender, broadcaster_receiver));
3953
3954 let receiver_scheme = Scheme::signer(
3955 SCHEME_NAMESPACE,
3956 participants.clone(),
3957 private_keys[receiver_idx].clone(),
3958 )
3959 .expect("signer scheme should be created");
3960 let receiver_config: Config<_, _, _, _, C, _, _, _> = Config {
3961 scheme_provider: MultiEpochProvider::single(receiver_scheme),
3962 blocker: receiver_control.clone(),
3963 shard_codec_cfg: CodecConfig {
3964 maximum_shard_size: MAX_SHARD_SIZE,
3965 },
3966 block_codec_cfg: (),
3967 strategy: STRATEGY,
3968 mailbox_size: 1024,
3969 peer_buffer_size: NZUsize!(64),
3970 background_channel_capacity: 1024,
3971 peer_provider: oracle.manager(),
3972 };
3973 let (receiver_engine, receiver_mailbox) =
3974 ShardEngine::new(context.with_label("receiver"), receiver_config);
3975 receiver_engine.start((receiver_sender, receiver_receiver));
3976
3977 let coding_config = coding_config_for_participants(peer_keys.len() as u16);
3978 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3979 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3980 let commitment = coded_block.commitment();
3981 let round = Round::new(Epoch::zero(), View::new(1));
3982
3983 broadcaster_mailbox
3984 .discovered(commitment, leader_pk.clone(), round)
3985 .await;
3986 receiver_mailbox
3987 .discovered(commitment, leader_pk.clone(), round)
3988 .await;
3989 context.sleep(DEFAULT_LINK.latency).await;
3990
3991 let broadcaster_index = participants
3992 .index(&broadcaster_pk)
3993 .expect("broadcaster must be a participant")
3994 .get() as u16;
3995 let broadcaster_strong = coded_block
3996 .shard(broadcaster_index)
3997 .expect("missing shard")
3998 .encode();
3999 leader_sender
4000 .send(Recipients::One(broadcaster_pk), broadcaster_strong, true)
4001 .await
4002 .expect("send failed");
4003
4004 let receiver_index = participants
4005 .index(&receiver_pk)
4006 .expect("receiver must be a participant")
4007 .get() as u16;
4008 let receiver_strong = coded_block
4009 .shard(receiver_index)
4010 .expect("missing shard")
4011 .encode();
4012 leader_sender
4013 .send(Recipients::One(receiver_pk.clone()), receiver_strong, true)
4014 .await
4015 .expect("send failed");
4016
4017 context.sleep(DEFAULT_LINK.latency * 3).await;
4018
4019 let reconstructed = receiver_mailbox.get(commitment).await;
4020 assert!(
4021 reconstructed.is_some(),
4022 "receiver should reconstruct after broadcaster validates and gossips weak shard"
4023 );
4024 });
4025 }
4026
4027 #[test_traced]
4028 fn test_failed_reconstruction_digest_mismatch_then_recovery() {
4029 let fixture: Fixture<C> = Fixture {
4036 num_peers: 10,
4037 ..Default::default()
4038 };
4039
4040 fixture.start(
4041 |config, context, _oracle, mut peers, _, coding_config| async move {
4042 let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4044 let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
4045
4046 let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
4048 let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
4049 let real_commitment2 = coded_block2.commitment();
4050
4051 let fake_commitment = Commitment::from((
4055 coded_block1.digest(),
4056 real_commitment2.root::<Sha256Digest>(),
4057 real_commitment2.context::<Sha256Digest>(),
4058 coding_config,
4059 ));
4060
4061 let receiver_idx = 3usize;
4062 let receiver_pk = peers[receiver_idx].public_key.clone();
4063 let leader = peers[0].public_key.clone();
4064 let round = Round::new(Epoch::zero(), View::new(1));
4065
4066 peers[receiver_idx]
4068 .mailbox
4069 .discovered(fake_commitment, leader.clone(), round)
4070 .await;
4071
4072 let mut block_sub = peers[receiver_idx].mailbox.subscribe(fake_commitment).await;
4074 let mut digest_sub = peers[receiver_idx]
4075 .mailbox
4076 .subscribe_by_digest(coded_block1.digest())
4077 .await;
4078
4079 let receiver_shard_idx = peers[receiver_idx].index.get() as u16;
4081 let mut strong_shard = coded_block2
4082 .shard(receiver_shard_idx)
4083 .expect("missing shard");
4084 strong_shard.commitment = fake_commitment;
4085 peers[0]
4086 .sender
4087 .send(
4088 Recipients::One(receiver_pk.clone()),
4089 strong_shard.encode(),
4090 true,
4091 )
4092 .await
4093 .expect("send failed");
4094
4095 for &idx in &[1usize, 2, 4] {
4098 let peer_shard_idx = peers[idx].index.get() as u16;
4099 let mut weak = coded_block2
4100 .shard(peer_shard_idx)
4101 .expect("missing shard")
4102 .verify_into_weak()
4103 .expect("verify_into_weak failed");
4104 weak.commitment = fake_commitment;
4105 peers[idx]
4106 .sender
4107 .send(Recipients::One(receiver_pk.clone()), weak.encode(), true)
4108 .await
4109 .expect("send failed");
4110 }
4111
4112 context.sleep(config.link.latency * 2).await;
4113
4114 assert!(
4117 peers[receiver_idx]
4118 .mailbox
4119 .get(fake_commitment)
4120 .await
4121 .is_none(),
4122 "block should not be available after DigestMismatch"
4123 );
4124
4125 assert!(
4127 matches!(block_sub.try_recv(), Err(TryRecvError::Closed)),
4128 "subscription should close for failed reconstruction"
4129 );
4130 assert!(
4131 matches!(digest_sub.try_recv(), Err(TryRecvError::Closed)),
4132 "digest subscription should close after failed reconstruction"
4133 );
4134
4135 let real_commitment1 = coded_block1.commitment();
4138 let round2 = Round::new(Epoch::zero(), View::new(2));
4139 peers[receiver_idx]
4140 .mailbox
4141 .discovered(real_commitment1, leader.clone(), round2)
4142 .await;
4143
4144 let strong1 = coded_block1
4145 .shard(receiver_shard_idx)
4146 .expect("missing shard");
4147 peers[0]
4148 .sender
4149 .send(Recipients::One(receiver_pk.clone()), strong1.encode(), true)
4150 .await
4151 .expect("send failed");
4152
4153 for &idx in &[1usize, 2, 4] {
4154 let peer_shard_idx = peers[idx].index.get() as u16;
4155 let weak = coded_block1
4156 .shard(peer_shard_idx)
4157 .expect("missing shard")
4158 .verify_into_weak()
4159 .expect("verify_into_weak failed");
4160 peers[idx]
4161 .sender
4162 .send(Recipients::One(receiver_pk.clone()), weak.encode(), true)
4163 .await
4164 .expect("send failed");
4165 }
4166
4167 context.sleep(config.link.latency * 2).await;
4168
4169 let reconstructed = peers[receiver_idx]
4170 .mailbox
4171 .get(real_commitment1)
4172 .await
4173 .expect("valid block should reconstruct after prior failure");
4174 assert_eq!(reconstructed.commitment(), real_commitment1);
4175 },
4176 );
4177 }
4178
4179 #[test_traced]
4180 fn test_failed_reconstruction_context_mismatch_then_recovery() {
4181 let fixture: Fixture<C> = Fixture {
4185 num_peers: 10,
4186 ..Default::default()
4187 };
4188
4189 fixture.start(
4190 |config, context, _oracle, mut peers, _, coding_config| async move {
4191 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4192 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4193 let real_commitment = coded_block.commitment();
4194
4195 let wrong_context_digest = Sha256::hash(b"wrong_context");
4196 assert_ne!(
4197 real_commitment.context::<Sha256Digest>(),
4198 wrong_context_digest,
4199 "test requires a distinct context digest"
4200 );
4201 let fake_commitment = Commitment::from((
4202 coded_block.digest(),
4203 real_commitment.root::<Sha256Digest>(),
4204 wrong_context_digest,
4205 coding_config,
4206 ));
4207
4208 let receiver_idx = 3usize;
4209 let receiver_pk = peers[receiver_idx].public_key.clone();
4210 let leader = peers[0].public_key.clone();
4211 let round = Round::new(Epoch::zero(), View::new(1));
4212
4213 peers[receiver_idx]
4214 .mailbox
4215 .discovered(fake_commitment, leader.clone(), round)
4216 .await;
4217 let mut block_sub = peers[receiver_idx].mailbox.subscribe(fake_commitment).await;
4218
4219 let receiver_shard_idx = peers[receiver_idx].index.get() as u16;
4220 let mut strong_shard = coded_block
4221 .shard(receiver_shard_idx)
4222 .expect("missing shard");
4223 strong_shard.commitment = fake_commitment;
4224 peers[0]
4225 .sender
4226 .send(
4227 Recipients::One(receiver_pk.clone()),
4228 strong_shard.encode(),
4229 true,
4230 )
4231 .await
4232 .expect("send failed");
4233
4234 for &idx in &[1usize, 2, 4] {
4235 let peer_shard_idx = peers[idx].index.get() as u16;
4236 let mut weak = coded_block
4237 .shard(peer_shard_idx)
4238 .expect("missing shard")
4239 .verify_into_weak()
4240 .expect("verify_into_weak failed");
4241 weak.commitment = fake_commitment;
4242 peers[idx]
4243 .sender
4244 .send(Recipients::One(receiver_pk.clone()), weak.encode(), true)
4245 .await
4246 .expect("send failed");
4247 }
4248
4249 context.sleep(config.link.latency * 2).await;
4250
4251 assert!(
4252 peers[receiver_idx]
4253 .mailbox
4254 .get(fake_commitment)
4255 .await
4256 .is_none(),
4257 "block should not be available after ContextMismatch"
4258 );
4259 assert!(
4260 matches!(block_sub.try_recv(), Err(TryRecvError::Closed)),
4261 "subscription should close for context-mismatched commitment"
4262 );
4263
4264 let round2 = Round::new(Epoch::zero(), View::new(2));
4266 peers[receiver_idx]
4267 .mailbox
4268 .discovered(real_commitment, leader.clone(), round2)
4269 .await;
4270
4271 let strong_real = coded_block
4272 .shard(receiver_shard_idx)
4273 .expect("missing shard");
4274 peers[0]
4275 .sender
4276 .send(
4277 Recipients::One(receiver_pk.clone()),
4278 strong_real.encode(),
4279 true,
4280 )
4281 .await
4282 .expect("send failed");
4283
4284 for &idx in &[1usize, 2, 4] {
4285 let peer_shard_idx = peers[idx].index.get() as u16;
4286 let weak = coded_block
4287 .shard(peer_shard_idx)
4288 .expect("missing shard")
4289 .verify_into_weak()
4290 .expect("verify_into_weak failed");
4291 peers[idx]
4292 .sender
4293 .send(Recipients::One(receiver_pk.clone()), weak.encode(), true)
4294 .await
4295 .expect("send failed");
4296 }
4297
4298 context.sleep(config.link.latency * 2).await;
4299
4300 let reconstructed = peers[receiver_idx]
4301 .mailbox
4302 .get(real_commitment)
4303 .await
4304 .expect("valid block should reconstruct after prior context mismatch");
4305 assert_eq!(reconstructed.commitment(), real_commitment);
4306 },
4307 );
4308 }
4309
4310 #[test_traced]
4311 fn test_same_round_equivocation_preserves_certifiable_recovery() {
4312 let fixture: Fixture<C> = Fixture {
4318 num_peers: 10,
4319 ..Default::default()
4320 };
4321
4322 fixture.start(
4323 |config, context, _oracle, mut peers, _, coding_config| async move {
4324 let receiver_idx = 3usize;
4325 let receiver_pk = peers[receiver_idx].public_key.clone();
4326 let receiver_shard_idx = peers[receiver_idx].index.get() as u16;
4327
4328 let leader = peers[0].public_key.clone();
4329 let round = Round::new(Epoch::zero(), View::new(7));
4330
4331 let block_a = CodedBlock::<B, C, H>::new(
4333 B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 111),
4334 coding_config,
4335 &STRATEGY,
4336 );
4337 let commitment_a = block_a.commitment();
4338 let block_b = CodedBlock::<B, C, H>::new(
4339 B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 222),
4340 coding_config,
4341 &STRATEGY,
4342 );
4343 let commitment_b = block_b.commitment();
4344
4345 peers[receiver_idx]
4347 .mailbox
4348 .discovered(commitment_a, leader.clone(), round)
4349 .await;
4350 peers[receiver_idx]
4351 .mailbox
4352 .discovered(commitment_b, leader.clone(), round)
4353 .await;
4354
4355 let certifiable_sub = peers[receiver_idx].mailbox.subscribe(commitment_b).await;
4357
4358 let strong_b = block_b
4360 .shard(receiver_shard_idx)
4361 .expect("missing shard")
4362 .encode();
4363 peers[0]
4364 .sender
4365 .send(Recipients::One(receiver_pk.clone()), strong_b, true)
4366 .await
4367 .expect("send failed");
4368
4369 let strong_a = block_a
4371 .shard(receiver_shard_idx)
4372 .expect("missing shard")
4373 .encode();
4374 peers[0]
4375 .sender
4376 .send(Recipients::One(receiver_pk.clone()), strong_a, true)
4377 .await
4378 .expect("send failed");
4379 for i in [1usize, 2usize, 4usize] {
4380 let weak_a = block_a
4381 .shard(peers[i].index.get() as u16)
4382 .expect("missing shard")
4383 .verify_into_weak()
4384 .expect("verify_into_weak failed")
4385 .encode();
4386 peers[i]
4387 .sender
4388 .send(Recipients::One(receiver_pk.clone()), weak_a, true)
4389 .await
4390 .expect("send failed");
4391 }
4392 context.sleep(config.link.latency * 4).await;
4393 let reconstructed_a = peers[receiver_idx]
4394 .mailbox
4395 .get(commitment_a)
4396 .await
4397 .expect("conflicting commitment should reconstruct first");
4398 assert_eq!(reconstructed_a.commitment(), commitment_a);
4399
4400 for i in [1usize, 2usize, 4usize] {
4402 let weak_b = block_b
4403 .shard(peers[i].index.get() as u16)
4404 .expect("missing shard")
4405 .verify_into_weak()
4406 .expect("verify_into_weak failed")
4407 .encode();
4408 peers[i]
4409 .sender
4410 .send(Recipients::One(receiver_pk.clone()), weak_b, true)
4411 .await
4412 .expect("send failed");
4413 }
4414
4415 select! {
4416 result = certifiable_sub => {
4417 let reconstructed_b =
4418 result.expect("certifiable commitment should remain recoverable");
4419 assert_eq!(reconstructed_b.commitment(), commitment_b);
4420 },
4421 _ = context.sleep(Duration::from_secs(5)) => {
4422 panic!("certifiable commitment was not recoverable after same-round equivocation");
4423 },
4424 }
4425 },
4426 );
4427 }
4428
4429 #[test_traced]
4430 fn test_leader_unrelated_weak_shard_blocks_peer() {
4431 let fixture: Fixture<C> = Fixture {
4435 num_peers: 10,
4436 ..Default::default()
4437 };
4438
4439 fixture.start(
4440 |config, context, oracle, mut peers, _, coding_config| async move {
4441 let tracked_block = CodedBlock::<B, C, H>::new(
4443 B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100),
4444 coding_config,
4445 &STRATEGY,
4446 );
4447 let tracked_commitment = tracked_block.commitment();
4448
4449 let unrelated_block = CodedBlock::<B, C, H>::new(
4451 B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200),
4452 coding_config,
4453 &STRATEGY,
4454 );
4455
4456 let receiver_idx = 3usize;
4457 let receiver_pk = peers[receiver_idx].public_key.clone();
4458 let leader_idx = 0usize;
4459 let leader_pk = peers[leader_idx].public_key.clone();
4460
4461 peers[receiver_idx]
4463 .mailbox
4464 .discovered(
4465 tracked_commitment,
4466 leader_pk.clone(),
4467 Round::new(Epoch::zero(), View::new(1)),
4468 )
4469 .await;
4470
4471 let mut unrelated_weak = unrelated_block
4474 .shard(peers[1].index.get() as u16)
4475 .expect("missing shard")
4476 .verify_into_weak()
4477 .expect("verify_into_weak failed");
4478 unrelated_weak.commitment = tracked_commitment;
4479
4480 peers[leader_idx]
4484 .sender
4485 .send(Recipients::One(receiver_pk), unrelated_weak.encode(), true)
4486 .await
4487 .expect("send failed");
4488 context.sleep(config.link.latency * 2).await;
4489
4490 assert_blocked(&oracle, &peers[receiver_idx].public_key, &leader_pk).await;
4491 },
4492 );
4493 }
4494
4495 #[test_traced]
4496 fn test_broadcast_routes_participant_and_non_participant_shards() {
4497 let fixture = Fixture {
4498 num_non_participants: 1,
4499 ..Default::default()
4500 };
4501
4502 fixture.start(
4503 |config, context, oracle, mut peers, non_participants, coding_config| async move {
4504 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4505 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4506 let commitment = coded_block.commitment();
4507
4508 let leader = peers[0].public_key.clone();
4509 let round = Round::new(Epoch::zero(), View::new(1));
4510 peers[0].mailbox.proposed(round, coded_block.clone()).await;
4511
4512 for peer in peers[1..].iter_mut() {
4513 peer.mailbox
4514 .discovered(commitment, leader.clone(), round)
4515 .await;
4516 }
4517 for np in non_participants.iter() {
4518 np.mailbox
4519 .discovered(commitment, leader.clone(), round)
4520 .await;
4521 }
4522 context.sleep(config.link.latency * 2).await;
4523
4524 for peer in peers.iter_mut() {
4526 peer.mailbox
4527 .subscribe_shard(commitment)
4528 .await
4529 .await
4530 .expect("participant shard subscription should complete");
4531 }
4532
4533 for np in non_participants.iter() {
4535 np.mailbox
4536 .subscribe_shard(commitment)
4537 .await
4538 .await
4539 .expect("non-participant shard subscription should complete");
4540 }
4541 context.sleep(config.link.latency).await;
4542
4543 for np in non_participants.iter() {
4545 let reconstructed = np
4546 .mailbox
4547 .get(commitment)
4548 .await
4549 .expect("non-participant should reconstruct block");
4550 assert_eq!(reconstructed.commitment(), commitment);
4551 }
4552
4553 let blocked = oracle.blocked().await.unwrap();
4554 assert!(
4555 blocked.is_empty(),
4556 "no peer should be blocked in participant/non-participant shard routing test"
4557 );
4558 },
4559 );
4560 }
4561
4562 #[test_traced]
4563 fn test_non_participant_reconstructs_after_discovered() {
4564 let fixture = Fixture {
4565 num_non_participants: 1,
4566 ..Default::default()
4567 };
4568
4569 fixture.start(
4570 |config, context, oracle, mut peers, non_participants, coding_config| async move {
4571 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4572 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4573 let commitment = coded_block.commitment();
4574 let round = Round::new(Epoch::zero(), View::new(1));
4575
4576 let leader = peers[0].public_key.clone();
4577 peers[0].mailbox.proposed(round, coded_block.clone()).await;
4578
4579 for peer in peers[1..].iter_mut() {
4582 peer.mailbox
4583 .discovered(commitment, leader.clone(), round)
4584 .await;
4585 }
4586 context.sleep(config.link.latency).await;
4587
4588 let np = &non_participants[0];
4591 let block_sub = np.mailbox.subscribe(commitment).await;
4592 np.mailbox
4593 .discovered(commitment, leader.clone(), round)
4594 .await;
4595
4596 select! {
4599 result = block_sub => {
4600 let reconstructed = result.expect("block subscription should resolve");
4601 assert_eq!(reconstructed.commitment(), commitment);
4602 assert_eq!(reconstructed.height(), coded_block.height());
4603 },
4604 _ = context.sleep(Duration::from_secs(5)) => {
4605 panic!("non-participant block subscription did not resolve");
4606 },
4607 }
4608
4609 let blocked = oracle.blocked().await.unwrap();
4610 assert!(
4611 blocked.is_empty(),
4612 "no peer should be blocked in non-participant reconstruction test"
4613 );
4614 },
4615 );
4616 }
4617
4618 #[test_traced]
4619 fn test_peer_set_update_evicts_peer_buffers() {
4620 let executor = deterministic::Runner::default();
4625 executor.start(|context| async move {
4626 let num_peers = 10usize;
4627 let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
4628 context.with_label("network"),
4629 simulated::Config {
4630 max_size: MAX_SHARD_SIZE as u32,
4631 disconnect_on_block: true,
4632 tracked_peer_sets: Some(1),
4633 },
4634 );
4635 network.start();
4636
4637 let mut private_keys = (0..num_peers)
4638 .map(|i| PrivateKey::from_seed(i as u64))
4639 .collect::<Vec<_>>();
4640 private_keys.sort_by_key(|s| s.public_key());
4641 let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
4642 let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
4643
4644 let receiver_idx = 3usize;
4646 let receiver_pk = peer_keys[receiver_idx].clone();
4647 let leader_pk = peer_keys[0].clone();
4648
4649 let receiver_control = oracle.control(receiver_pk.clone());
4650 let (sender_handle, receiver_handle) = receiver_control
4651 .register(0, TEST_QUOTA)
4652 .await
4653 .expect("registration should succeed");
4654
4655 let leader_control = oracle.control(leader_pk.clone());
4657 let (mut leader_sender, _leader_receiver) = leader_control
4658 .register(0, TEST_QUOTA)
4659 .await
4660 .expect("registration should succeed");
4661 oracle
4662 .add_link(leader_pk.clone(), receiver_pk.clone(), DEFAULT_LINK)
4663 .await
4664 .expect("link should be added");
4665
4666 oracle.manager().track(0, participants.clone()).await;
4668 context.sleep(Duration::from_millis(10)).await;
4669
4670 let scheme = Scheme::signer(
4671 SCHEME_NAMESPACE,
4672 participants.clone(),
4673 private_keys[receiver_idx].clone(),
4674 )
4675 .expect("signer scheme should be created");
4676
4677 let config: Config<_, _, _, _, C, _, _, _> = Config {
4678 scheme_provider: MultiEpochProvider::single(scheme),
4679 blocker: receiver_control.clone(),
4680 shard_codec_cfg: CodecConfig {
4681 maximum_shard_size: MAX_SHARD_SIZE,
4682 },
4683 block_codec_cfg: (),
4684 strategy: STRATEGY,
4685 mailbox_size: 1024,
4686 peer_buffer_size: NZUsize!(64),
4687 background_channel_capacity: 1024,
4688 peer_provider: oracle.manager(),
4689 };
4690
4691 let (engine, mailbox) = ShardEngine::new(context.with_label("receiver"), config);
4692 engine.start((sender_handle, receiver_handle));
4693
4694 let coding_config = coding_config_for_participants(num_peers as u16);
4696 let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4697 let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4698 let commitment = coded_block.commitment();
4699
4700 let receiver_participant = participants
4701 .index(&receiver_pk)
4702 .expect("receiver must be a participant");
4703 let strong_shard = coded_block
4704 .shard(receiver_participant.get() as u16)
4705 .expect("missing shard");
4706 let strong_bytes = strong_shard.encode();
4707
4708 leader_sender
4710 .send(
4711 Recipients::One(receiver_pk.clone()),
4712 strong_bytes.clone(),
4713 true,
4714 )
4715 .await
4716 .expect("send failed");
4717 context.sleep(DEFAULT_LINK.latency * 2).await;
4718
4719 let remaining: Set<P> =
4721 Set::from_iter_dedup(peer_keys.iter().filter(|pk| **pk != leader_pk).cloned());
4722 oracle.manager().track(1, remaining).await;
4723 context.sleep(Duration::from_millis(10)).await;
4724
4725 let mut shard_sub = mailbox.subscribe_shard(commitment).await;
4728 mailbox
4729 .discovered(
4730 commitment,
4731 leader_pk.clone(),
4732 Round::new(Epoch::zero(), View::new(1)),
4733 )
4734 .await;
4735 context.sleep(DEFAULT_LINK.latency * 2).await;
4736
4737 assert!(
4739 matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
4740 "shard subscription should not resolve after evicted leader's buffer"
4741 );
4742 assert!(
4743 mailbox.get(commitment).await.is_none(),
4744 "block should not reconstruct from evicted buffers"
4745 );
4746 });
4747 }
4748}