Skip to main content

commonware_consensus/marshal/coding/shards/
engine.rs

1//! Shard engine for erasure-coded block distribution and reconstruction.
2//!
3//! This module implements the core logic for distributing blocks as erasure-coded
4//! shards and reconstructing blocks from received shards.
5//!
6//! # Overview
7//!
8//! The shard engine serves two primary functions:
9//! 1. Broadcast: When a node proposes a block, the engine broadcasts
10//!    erasure-coded shards to all participants and to non-participants in
11//!    aggregate membership (peers in [`commonware_p2p::PeerSetUpdate::all`]
12//!    but not in the epoch participant list).
13//!    The leader sends each participant their indexed shard.
14//! 2. Block Reconstruction: When a node receives shards from peers, the engine
15//!    validates them and reconstructs the original block once enough valid
16//!    shards are available. Both participants and non-participants can
17//!    reconstruct blocks: participants receive their own indexed shard from
18//!    the leader, while non-participants reconstruct from shards gossiped
19//!    by participants. All participants gossip their validated shard to peers.
20//!
21//! # Message Flow
22//!
23//! ```text
24//!                           PROPOSER
25//!                              |
26//!                              | Proposed(block)
27//!                              v
28//!                    +------------------+
29//!                    |   Shard Engine   |
30//!                    +------------------+
31//!                              |
32//!            broadcast_shards (each participant's indexed shard)
33//!                              |
34//!         +--------------------+--------------------+
35//!         |                    |                    |
36//!         v                    v                    v
37//!    Participant 0        Participant 1        Participant N
38//!         |                    |                    |
39//!         | (receive shard     | (receive shard     |
40//!         |  for own index)    |  for own index)    |
41//!         v                    v                    v
42//!    +----------+         +----------+         +----------+
43//!    | Validate |         | Validate |         | Validate |
44//!    | (check)  |         | (check)  |         | (check)  |
45//!    +----------+         +----------+         +----------+
46//!         |                    |                    |
47//!         +--------------------+--------------------+
48//!                              |
49//!                    (gossip validated shards)
50//!                              |
51//!         +--------------------+--------------------+
52//!         |                    |                    |
53//!         v                    v                    v
54//!    Accumulate checked shards until minimum_shards reached
55//!         |                    |                    |
56//!         v                    v                    v
57//!            Batch verify pending shards at quorum
58//!         |                    |                    |
59//!         v                    v                    v
60//!    +-------------+      +-------------+      +-------------+
61//!    | Reconstruct |      | Reconstruct |      | Reconstruct |
62//!    |    Block    |      |    Block    |      |    Block    |
63//!    +-------------+      +-------------+      +-------------+
64//! ```
65//!
66//! # Reconstruction State Machine
67//!
68//! For each [`Commitment`] with a known leader, nodes (both participants
69//! and non-participants) maintain a [`ReconstructionState`]. Before leader
70//! announcement, shards are buffered in bounded per-peer queues:
71//!
72//! ```text
73//!    +----------------------+
74//!    | AwaitingQuorum       |
75//!    | - leader known       |
76//!    | - leader's shard     |  <--- verified immediately on receipt
77//!    |   verified eagerly   |
78//!    | - other shards       |  <--- buffered in pending_shards
79//!    |   buffered           |
80//!    +----------------------+
81//!               |
82//!               | quorum met + batch validation passes
83//!               v
84//!    +----------------------+
85//!    | Ready                |
86//!    | - checked shards     |
87//!    | (frozen; no new      |
88//!    |  shards accepted)    |
89//!    +----------------------+
90//!               |
91//!               | checked_shards.len() >= minimum_shards
92//!               v
93//!    +----------------------+
94//!    | Reconstruction       |
95//!    | Attempt              |
96//!    +----------------------+
97//!               |
98//!          +----+----+
99//!          |         |
100//!          v         v
101//!       Success    Failure
102//!          |         |
103//!          v         v
104//!       Cache      Remove
105//!       Block      State
106//! ```
107//!
108//! _Per-peer buffers are only kept for peers in `latest.primary`, matching [`commonware_broadcast::buffered`].
109//! When a peer is no longer in `latest.primary`, all its buffered shards are evicted._
110//!
111//! # Peer Validation and Blocking Rules
112//!
113//! The engine enforces strict validation to prevent Byzantine attacks:
114//!
115//! - All shards MUST be sent by participants in the current epoch.
116//! - If the sender is the leader: the shard index MUST match the recipient's
117//!   participant index (for participants) or the leader's index (for
118//!   non-participants).
119//! - If the sender is not the leader: the shard index MUST match the sender's
120//!   participant index (each participant can only gossip their own shard).
121//! - All shards MUST pass cryptographic verification against the commitment.
122//! - Each shard index may only contribute ONE shard per commitment.
123//! - Sending a second shard for the same index with different data
124//!   (equivocation) results in blocking. Exact duplicates are silently
125//!   ignored.
126//!
127//! Peers violating these rules are blocked via the [`Blocker`] trait.
128//! Validation and blocking rules are applied while a commitment is actively
129//! tracked in reconstruction state. Once a block is already reconstructed and
130//! cached, additional shards for that commitment are ignored.
131//!
132//! _If the leader is not yet known, shards are buffered in fixed-size per-peer
133//! queues until consensus signals the leader via [`Discovered`]. Once leader
134//! is known, buffered shards for that commitment are ingested into the active
135//! state machine._
136//!
137//! [`Discovered`]: super::Message::Discovered
138
139use super::{
140    mailbox::{Mailbox, Message},
141    metrics::{Peer, ShardMetrics},
142};
143use crate::{
144    marshal::coding::{
145        types::{CodedBlock, Shard},
146        validation::{validate_reconstruction, ReconstructionError as InvariantError},
147    },
148    types::{coding::Commitment, Epoch, Round},
149    Block, CertifiableBlock, Heightable,
150};
151use commonware_codec::{Decode, Error as CodecError, Read};
152use commonware_coding::{Config as CodingConfig, Scheme as CodingScheme};
153use commonware_cryptography::{
154    certificate::{Provider, Scheme as CertificateScheme},
155    Committable, Digestible, Hasher, PublicKey,
156};
157use commonware_macros::select_loop;
158use commonware_p2p::{
159    utils::codec::{WrappedBackgroundReceiver, WrappedSender},
160    Blocker, Provider as PeerProvider, Receiver, Recipients, Sender,
161};
162use commonware_parallel::Strategy;
163use commonware_runtime::{
164    spawn_cell,
165    telemetry::metrics::{histogram::HistogramExt, status::GaugeExt},
166    BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner,
167};
168use commonware_utils::{
169    bitmap::BitMap,
170    channel::{fallible::OneshotExt, mpsc, oneshot},
171    ordered::{Quorum, Set},
172};
173use rand::Rng;
174use std::{
175    collections::{BTreeMap, VecDeque},
176    num::NonZeroUsize,
177    sync::Arc,
178};
179use thiserror::Error;
180use tracing::{debug, warn};
181
182/// An error that can occur during reconstruction of a [`CodedBlock`] from [`Shard`]s
183#[derive(Debug, Error)]
184pub enum Error<C: CodingScheme> {
185    /// An error occurred while recovering the encoded blob from the [`Shard`]s
186    #[error(transparent)]
187    Coding(C::Error),
188
189    /// An error occurred while decoding the reconstructed blob into a [`CodedBlock`]
190    #[error(transparent)]
191    Codec(#[from] CodecError),
192
193    /// The reconstructed block's digest does not match the commitment's block digest
194    #[error("block digest mismatch: reconstructed block does not match commitment digest")]
195    DigestMismatch,
196
197    /// The reconstructed block's config does not match the commitment's coding config
198    #[error("block config mismatch: reconstructed config does not match commitment config")]
199    ConfigMismatch,
200
201    /// The reconstructed block's embedded context does not match the commitment context digest
202    #[error("block context mismatch: reconstructed context does not match commitment context")]
203    ContextMismatch,
204}
205
206#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
207enum BlockSubscriptionKey<D> {
208    Commitment(Commitment),
209    Digest(D),
210}
211
212/// Configuration for the [`Engine`].
213pub struct Config<P, S, X, D, C, H, B, T>
214where
215    P: PublicKey,
216    S: Provider<Scope = Epoch>,
217    X: Blocker<PublicKey = P>,
218    D: PeerProvider<PublicKey = P>,
219    C: CodingScheme,
220    H: Hasher,
221    B: CertifiableBlock,
222    T: Strategy,
223{
224    /// The scheme provider.
225    pub scheme_provider: S,
226
227    /// The peer blocker.
228    pub blocker: X,
229
230    /// [`Read`] configuration for decoding [`Shard`]s.
231    pub shard_codec_cfg: <Shard<C, H> as Read>::Cfg,
232
233    /// [`commonware_codec::Read`] configuration for decoding blocks.
234    pub block_codec_cfg: B::Cfg,
235
236    /// The strategy used for parallel computation.
237    pub strategy: T,
238
239    /// The size of the mailbox buffer.
240    pub mailbox_size: usize,
241
242    /// Number of shards to buffer per peer.
243    ///
244    /// Shards for commitments without a reconstruction state are buffered per
245    /// peer in a fixed-size ring to bound memory under Byzantine spam. These
246    /// shards are only ingested when consensus provides a leader via
247    /// [`Discovered`](super::Message::Discovered).
248    ///
249    /// The worst-case total memory usage for the set of shard buffers is
250    /// `num_participants * peer_buffer_size * max_shard_size`.
251    pub peer_buffer_size: NonZeroUsize,
252
253    /// Capacity of the channel between the background receiver and the engine.
254    ///
255    /// The background receiver decodes incoming network messages in a separate
256    /// task and forwards them to the engine over an `mpsc` channel with this
257    /// capacity.
258    pub background_channel_capacity: usize,
259
260    /// Provider for peer set information. Pre-leader shards are buffered per
261    /// peer only while that peer appears in the
262    /// [`commonware_p2p::PeerSetUpdate::latest`] primary set, matching
263    /// [`commonware_broadcast::buffered::Engine`]. Broadcast delivery uses the
264    /// aggregate [`commonware_p2p::PeerSetUpdate::all`] union.
265    pub peer_provider: D,
266}
267
268/// A network layer for broadcasting and receiving [`CodedBlock`]s as [`Shard`]s.
269///
270/// When enough [`Shard`]s are present in the mailbox, the [`Engine`] may facilitate
271/// reconstruction of the original [`CodedBlock`] and notify any subscribers waiting for it.
272pub struct Engine<E, S, X, D, C, H, B, P, T>
273where
274    E: BufferPooler + Rng + Spawner + Metrics + Clock,
275    S: Provider<Scope = Epoch>,
276    S::Scheme: CertificateScheme<PublicKey = P>,
277    X: Blocker,
278    D: PeerProvider<PublicKey = P>,
279    C: CodingScheme,
280    H: Hasher,
281    B: CertifiableBlock,
282    P: PublicKey,
283    T: Strategy,
284{
285    /// Context held by the actor.
286    context: ContextCell<E>,
287
288    /// Receiver for incoming messages to the actor.
289    mailbox: mpsc::Receiver<Message<B, C, H, P>>,
290
291    /// The scheme provider.
292    scheme_provider: S,
293
294    /// The peer blocker.
295    blocker: X,
296
297    /// [`Read`] configuration for decoding [`Shard`]s.
298    shard_codec_cfg: <Shard<C, H> as Read>::Cfg,
299
300    /// [`Read`] configuration for decoding [`CodedBlock`]s.
301    block_codec_cfg: B::Cfg,
302
303    /// The strategy used for parallel shard verification.
304    strategy: T,
305
306    /// A map of [`Commitment`]s to [`ReconstructionState`]s.
307    state: BTreeMap<Commitment, ReconstructionState<P, C, H>>,
308
309    /// Per-peer ring buffers for shards received before leader announcement.
310    ///
311    /// Empty buffers are retained for active peers and only evicted when the
312    /// peer leaves `latest.primary`.
313    peer_buffers: BTreeMap<P, VecDeque<Shard<C, H>>>,
314
315    /// Maximum buffered pre-leader shards per peer.
316    peer_buffer_size: NonZeroUsize,
317
318    /// Provider for peer set information.
319    peer_provider: D,
320
321    /// Latest union of peer membership from the peer set subscription
322    /// ([`commonware_p2p::PeerSetUpdate::all`]).
323    aggregate_peers: Set<P>,
324
325    /// Latest primary peers allowed to retain pre-leader shard buffers.
326    latest_primary_peers: Set<P>,
327
328    /// Capacity of the background receiver channel.
329    background_channel_capacity: usize,
330
331    /// An ephemeral cache of reconstructed blocks, keyed by commitment.
332    ///
333    /// These blocks are evicted after a durability signal from the marshal.
334    /// Wrapped in [`Arc`] to enable cheap cloning when serving multiple subscribers.
335    reconstructed_blocks: BTreeMap<Commitment, Arc<CodedBlock<B, C, H>>>,
336
337    /// Open subscriptions for assigned shard verification for the keyed
338    /// [`Commitment`].
339    ///
340    /// For participants, readiness is satisfied once the leader-delivered
341    /// shard for the local participant index has been verified. Block
342    /// reconstruction from peer gossip is tracked separately and does not
343    /// satisfy this readiness condition.
344    ///
345    /// Proposers are a special case: they satisfy readiness once their local
346    /// proposal is cached because they already hold all shards.
347    assigned_shard_verified_subscriptions: BTreeMap<Commitment, Vec<oneshot::Sender<()>>>,
348
349    /// Open subscriptions for the reconstruction of a [`CodedBlock`] with
350    /// the keyed [`Commitment`].
351    #[allow(clippy::type_complexity)]
352    block_subscriptions:
353        BTreeMap<BlockSubscriptionKey<B::Digest>, Vec<oneshot::Sender<Arc<CodedBlock<B, C, H>>>>>,
354
355    /// Metrics for the shard engine.
356    metrics: ShardMetrics,
357}
358
359impl<E, S, X, D, C, H, B, P, T> Engine<E, S, X, D, C, H, B, P, T>
360where
361    E: BufferPooler + Rng + Spawner + Metrics + Clock,
362    S: Provider<Scope = Epoch>,
363    S::Scheme: CertificateScheme<PublicKey = P>,
364    X: Blocker<PublicKey = P>,
365    D: PeerProvider<PublicKey = P>,
366    C: CodingScheme,
367    H: Hasher,
368    B: CertifiableBlock,
369    P: PublicKey,
370    T: Strategy,
371{
372    /// Create a new [`Engine`] with the given configuration.
373    pub fn new(context: E, config: Config<P, S, X, D, C, H, B, T>) -> (Self, Mailbox<B, C, H, P>) {
374        let metrics = ShardMetrics::new(&context);
375        let (sender, mailbox) = mpsc::channel(config.mailbox_size);
376        (
377            Self {
378                context: ContextCell::new(context),
379                mailbox,
380                scheme_provider: config.scheme_provider,
381                blocker: config.blocker,
382                shard_codec_cfg: config.shard_codec_cfg,
383                block_codec_cfg: config.block_codec_cfg,
384                strategy: config.strategy,
385                state: BTreeMap::new(),
386                peer_buffers: BTreeMap::new(),
387                peer_buffer_size: config.peer_buffer_size,
388                peer_provider: config.peer_provider,
389                aggregate_peers: Set::default(),
390                latest_primary_peers: Set::default(),
391                background_channel_capacity: config.background_channel_capacity,
392                reconstructed_blocks: BTreeMap::new(),
393                assigned_shard_verified_subscriptions: BTreeMap::new(),
394                block_subscriptions: BTreeMap::new(),
395                metrics,
396            },
397            Mailbox::new(sender),
398        )
399    }
400
401    /// Start the engine.
402    pub fn start(
403        mut self,
404        network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
405    ) -> Handle<()> {
406        spawn_cell!(self.context, self.run(network).await)
407    }
408
409    /// Run the shard engine's event loop.
410    async fn run(
411        mut self,
412        (sender, receiver): (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
413    ) {
414        let mut sender = WrappedSender::<_, Shard<C, H>>::new(
415            self.context.network_buffer_pool().clone(),
416            sender,
417        );
418        let (receiver_service, mut receiver): (_, mpsc::Receiver<(P, Shard<C, H>)>) =
419            WrappedBackgroundReceiver::new(
420                self.context.with_label("shard_ingress"),
421                receiver,
422                self.shard_codec_cfg.clone(),
423                self.blocker.clone(),
424                self.background_channel_capacity,
425                &self.strategy,
426            );
427        // Keep the handle alive to prevent the background receiver from being aborted.
428        let _receiver_handle = receiver_service.start();
429        let mut peer_set_subscription = self.peer_provider.subscribe().await;
430
431        select_loop! {
432            self.context,
433            on_start => {
434                let _ = self
435                    .metrics
436                    .reconstruction_states_count
437                    .try_set(self.state.len());
438                let _ = self
439                    .metrics
440                    .reconstructed_blocks_cache_count
441                    .try_set(self.reconstructed_blocks.len());
442
443                // Clean up closed subscriptions.
444                self.block_subscriptions.retain(|_, subscribers| {
445                    subscribers.retain(|tx| !tx.is_closed());
446                    !subscribers.is_empty()
447                });
448                self.assigned_shard_verified_subscriptions
449                    .retain(|_, subscribers| {
450                        subscribers.retain(|tx| !tx.is_closed());
451                        !subscribers.is_empty()
452                    });
453            },
454            on_stopped => {
455                debug!("received shutdown signal, stopping shard engine");
456            },
457            Some(update) = peer_set_subscription.recv() else {
458                debug!("peer set subscription closed");
459                return;
460            } => {
461                let all_peers = update.all.union();
462                self.update_latest_primary_peers(update.latest.primary);
463                self.aggregate_peers = all_peers;
464            },
465            Some(message) = self.mailbox.recv() else {
466                debug!("shard mailbox closed, stopping shard engine");
467                return;
468            } => match message {
469                Message::Proposed { block, round } => {
470                    self.broadcast_shards(&mut sender, round, block).await;
471                }
472                Message::Discovered {
473                    commitment,
474                    leader,
475                    round,
476                } => {
477                    self.handle_external_proposal(&mut sender, commitment, leader, round)
478                        .await;
479                }
480                Message::GetByCommitment {
481                    commitment,
482                    response,
483                } => {
484                    let block = self.reconstructed_blocks.get(&commitment).cloned();
485                    response.send_lossy(block);
486                }
487                Message::GetByDigest { digest, response } => {
488                    let block = self
489                        .reconstructed_blocks
490                        .iter()
491                        .find_map(|(_, b)| (b.digest() == digest).then_some(b))
492                        .cloned();
493                    response.send_lossy(block);
494                }
495                Message::SubscribeAssignedShardVerified {
496                    commitment,
497                    response,
498                } => {
499                    self.handle_assigned_shard_verified_subscription(commitment, response);
500                }
501                Message::SubscribeByCommitment {
502                    commitment,
503                    response,
504                } => {
505                    self.handle_block_subscription(
506                        BlockSubscriptionKey::Commitment(commitment),
507                        response,
508                    );
509                }
510                Message::SubscribeByDigest { digest, response } => {
511                    self.handle_block_subscription(BlockSubscriptionKey::Digest(digest), response);
512                }
513                Message::Prune { through } => {
514                    self.prune(through);
515                }
516            },
517            Some((peer, shard)) = receiver.recv() else {
518                debug!("receiver closed, stopping shard engine");
519                return;
520            } => {
521                // Track shard receipt per peer.
522                self.metrics
523                    .shards_received
524                    .get_or_create(&Peer::new(&peer))
525                    .inc();
526
527                let commitment = shard.commitment();
528                if !self.should_handle_network_shard(commitment) {
529                    continue;
530                }
531
532                if let Some(state) = self.state.get_mut(&commitment) {
533                    let round = state.round();
534                    let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
535                        warn!(%commitment, "no scheme for epoch, ignoring shard");
536                        continue;
537                    };
538                    let progressed = state
539                        .on_network_shard(
540                            peer,
541                            shard,
542                            InsertCtx::new(scheme.as_ref(), &self.strategy),
543                            &mut self.blocker,
544                        )
545                        .await;
546                    if progressed {
547                        self.try_advance(&mut sender, commitment).await;
548                    }
549                } else {
550                    self.buffer_peer_shard(peer, shard);
551                }
552            },
553        }
554    }
555
556    /// Returns whether an incoming network shard should still be processed.
557    ///
558    /// Shards for reconstructed commitments are normally ignored. The only
559    /// exception is the late leader-delivered shard for the assigned index,
560    /// which we still accept so we can notify readiness and gossip it to
561    /// slower peers.
562    fn should_handle_network_shard(&self, commitment: Commitment) -> bool {
563        if self.reconstructed_blocks.contains_key(&commitment) {
564            return self
565                .state
566                .get(&commitment)
567                .is_some_and(|s| !s.is_assigned_shard_verified());
568        }
569        true
570    }
571
572    /// Attempts to reconstruct a [`CodedBlock`] from the checked [`Shard`]s present in the
573    /// [`ReconstructionState`].
574    ///
575    /// # Returns
576    /// - `Ok(Some(block))` if reconstruction was successful or the block was already reconstructed.
577    /// - `Ok(None)` if reconstruction could not be attempted due to insufficient checked shards.
578    /// - `Err(_)` if reconstruction was attempted but failed.
579    #[allow(clippy::type_complexity)]
580    fn try_reconstruct(
581        &mut self,
582        commitment: Commitment,
583    ) -> Result<Option<Arc<CodedBlock<B, C, H>>>, Error<C>> {
584        if let Some(block) = self.reconstructed_blocks.get(&commitment) {
585            return Ok(Some(Arc::clone(block)));
586        }
587        let Some(state) = self.state.get_mut(&commitment) else {
588            return Ok(None);
589        };
590        if state.checked_shards().len() < usize::from(commitment.config().minimum_shards.get()) {
591            debug!(%commitment, "not enough checked shards to reconstruct block");
592            return Ok(None);
593        }
594        // Attempt to reconstruct the encoded blob
595        let start = self.context.current();
596        let blob = C::decode(
597            &commitment.config(),
598            &commitment.root(),
599            state.checked_shards().iter(),
600            &self.strategy,
601        )
602        .map_err(Error::Coding)?;
603        self.metrics
604            .erasure_decode_duration
605            .observe_between(start, self.context.current());
606
607        // Attempt to decode the block from the encoded blob
608        let (inner, config): (B, CodingConfig) =
609            Decode::decode_cfg(&mut blob.as_slice(), &(self.block_codec_cfg.clone(), ()))?;
610
611        match validate_reconstruction::<H, _>(&inner, config, commitment) {
612            Ok(()) => {}
613            Err(InvariantError::BlockDigest) => {
614                return Err(Error::DigestMismatch);
615            }
616            Err(InvariantError::CodingConfig) => {
617                warn!(
618                    %commitment,
619                    expected_config = ?commitment.config(),
620                    actual_config = ?config,
621                    "reconstructed block config does not match commitment config, but digest matches"
622                );
623                return Err(Error::ConfigMismatch);
624            }
625            Err(InvariantError::ContextDigest(expected, actual)) => {
626                warn!(
627                    %commitment,
628                    expected_context_digest = ?expected,
629                    actual_context_digest = ?actual,
630                    "reconstructed block context digest does not match commitment context digest"
631                );
632                return Err(Error::ContextMismatch);
633            }
634        }
635
636        // Construct a coding block with a _trusted_ commitment. `S::decode` verified the blob's
637        // integrity against the commitment, so shards can be lazily re-constructed if need be.
638        let block = Arc::new(CodedBlock::new_trusted(inner, commitment));
639        self.cache_block(Arc::clone(&block));
640        self.metrics.blocks_reconstructed_total.inc();
641        Ok(Some(block))
642    }
643
644    /// Handles leader announcements for a commitment and advances reconstruction.
645    async fn handle_external_proposal<Sr: Sender<PublicKey = P>>(
646        &mut self,
647        sender: &mut WrappedSender<Sr, Shard<C, H>>,
648        commitment: Commitment,
649        leader: P,
650        round: Round,
651    ) {
652        if self.reconstructed_blocks.contains_key(&commitment) {
653            return;
654        }
655        let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
656            warn!(%commitment, "no scheme for epoch, ignoring external proposal");
657            return;
658        };
659        let participants = scheme.participants();
660        if participants.index(&leader).is_none() {
661            warn!(?leader, %commitment, "leader update for non-participant, ignoring");
662            return;
663        }
664        if let Some(state) = self.state.get(&commitment) {
665            if state.leader() != &leader {
666                warn!(
667                    existing = ?state.leader(),
668                    ?leader,
669                    %commitment,
670                    "conflicting leader update, ignoring"
671                );
672            }
673            return;
674        }
675
676        let participants_len =
677            u64::try_from(participants.len()).expect("participant count impossibly out of bounds");
678        self.state.insert(
679            commitment,
680            ReconstructionState::new(leader, round, participants_len),
681        );
682        let buffered_progress = self.ingest_buffered_shards(commitment).await;
683        if buffered_progress {
684            self.try_advance(sender, commitment).await;
685        }
686    }
687
688    /// Buffer a shard from a peer until a leader is known.
689    fn buffer_peer_shard(&mut self, peer: P, shard: Shard<C, H>) {
690        if self.latest_primary_peers.position(&peer).is_none() {
691            debug!(
692                ?peer,
693                "pre-leader shard from peer outside latest.primary not buffered"
694            );
695            return;
696        }
697        let queue = self.peer_buffers.entry(peer).or_default();
698        if queue.len() >= self.peer_buffer_size.get() {
699            let _ = queue.pop_front();
700        }
701        queue.push_back(shard);
702    }
703
704    fn update_latest_primary_peers(&mut self, peers: Set<P>) {
705        self.peer_buffers
706            .retain(|peer, _| peers.position(peer).is_some());
707        self.latest_primary_peers = peers;
708    }
709
710    /// Ingest buffered pre-leader shards for a commitment into active state.
711    async fn ingest_buffered_shards(&mut self, commitment: Commitment) -> bool {
712        let mut buffered = Vec::new();
713        for (peer, queue) in self.peer_buffers.iter_mut() {
714            let mut i = 0;
715            while i < queue.len() {
716                if queue[i].commitment() != commitment {
717                    i += 1;
718                    continue;
719                }
720                let shard = queue.swap_remove_back(i).expect("index is valid");
721                buffered.push((peer.clone(), shard));
722            }
723        }
724
725        let Some(state) = self.state.get_mut(&commitment) else {
726            return false;
727        };
728        let round = state.round();
729        let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
730            warn!(%commitment, "no scheme for epoch, dropping buffered shards");
731            return false;
732        };
733
734        // Ingest buffered shards into the active reconstruction state. Batch verification
735        // will be triggered if there are enough shards to meet the quorum threshold.
736        let mut progressed = false;
737        let ctx = InsertCtx::new(scheme.as_ref(), &self.strategy);
738        for (peer, shard) in buffered {
739            progressed |= state
740                .on_network_shard(peer, shard, ctx, &mut self.blocker)
741                .await;
742        }
743        progressed
744    }
745
746    /// Cache a block and notify all subscribers waiting on it.
747    fn cache_block(&mut self, block: Arc<CodedBlock<B, C, H>>) {
748        let commitment = block.commitment();
749        self.reconstructed_blocks
750            .insert(commitment, Arc::clone(&block));
751        self.notify_block_subscribers(block);
752    }
753
754    /// Broadcasts the shards of a [`CodedBlock`] and caches the block.
755    ///
756    /// - Participants receive the shard matching their participant index.
757    /// - Non-participants in aggregate membership receive the leader's shard.
758    async fn broadcast_shards<Sr: Sender<PublicKey = P>>(
759        &mut self,
760        sender: &mut WrappedSender<Sr, Shard<C, H>>,
761        round: Round,
762        mut block: CodedBlock<B, C, H>,
763    ) {
764        let commitment = block.commitment();
765
766        let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
767            warn!(%commitment, "no scheme available, cannot broadcast shards");
768            return;
769        };
770        let participants = scheme.participants();
771        let Some(me) = scheme.me() else {
772            warn!(
773                %commitment,
774                "cannot broadcast shards: local proposer is not a participant"
775            );
776            return;
777        };
778
779        let shard_count = block.shards(&self.strategy).len();
780        if shard_count != participants.len() {
781            warn!(
782                %commitment,
783                shard_count,
784                participants = participants.len(),
785                "cannot broadcast shards: participant/shard count mismatch"
786            );
787            return;
788        }
789
790        let my_index = me.get() as usize;
791        let leader_shard = block
792            .shard(my_index as u16)
793            .expect("proposer's shard must exist");
794
795        // Broadcast each participant their corresponding shard.
796        for (index, peer) in participants.iter().enumerate() {
797            if index == my_index {
798                continue;
799            }
800
801            let Some(shard) = block.shard(index as u16) else {
802                warn!(
803                    %commitment,
804                    index,
805                    "cannot broadcast shards: missing shard for participant index"
806                );
807                return;
808            };
809            let _ = sender
810                .send(Recipients::One(peer.clone()), shard, true)
811                .await;
812        }
813
814        // Send the leader's shard to peers in aggregate membership who are not participants.
815        let non_participants: Vec<P> = self
816            .aggregate_peers
817            .iter()
818            .filter(|peer| participants.index(peer).is_none())
819            .cloned()
820            .collect();
821        if !non_participants.is_empty() {
822            let _ = sender
823                .send(Recipients::Some(non_participants), leader_shard, true)
824                .await;
825        }
826
827        // Cache the block so we don't have to reconstruct it again.
828        let block = Arc::new(block);
829        self.cache_block(block);
830
831        // Local proposals bypass reconstruction, so shard subscribers waiting
832        // for "our valid shard arrived" still need a notification.
833        self.notify_assigned_shard_verified_subscribers(commitment);
834
835        debug!(?commitment, "broadcasted shards");
836    }
837
838    /// Gossips a validated [`Shard`] using [`commonware_p2p::Recipients::All`].
839    async fn broadcast_shard<Sr: Sender<PublicKey = P>>(
840        &mut self,
841        sender: &mut WrappedSender<Sr, Shard<C, H>>,
842        shard: Shard<C, H>,
843    ) {
844        let commitment = shard.commitment();
845        if let Ok(peers) = sender.send(Recipients::All, shard, true).await {
846            debug!(
847                ?commitment,
848                peers = peers.len(),
849                "broadcasted shard to all peers"
850            );
851        }
852    }
853
854    /// Broadcasts any pending validated shard for the given commitment and attempts
855    /// reconstruction. If reconstruction succeeds or fails, the state is cleaned
856    /// up and subscribers are notified.
857    async fn try_advance<Sr: Sender<PublicKey = P>>(
858        &mut self,
859        sender: &mut WrappedSender<Sr, Shard<C, H>>,
860        commitment: Commitment,
861    ) {
862        if let Some(state) = self.state.get_mut(&commitment) {
863            match state.take_pending_action() {
864                Some(AssignedShardVerifiedAction::Broadcast(shard)) => {
865                    self.broadcast_shard(sender, shard).await;
866                    self.notify_assigned_shard_verified_subscribers(commitment);
867                }
868                Some(AssignedShardVerifiedAction::NotifyOnly) => {
869                    self.notify_assigned_shard_verified_subscribers(commitment);
870                }
871                None => {}
872            }
873        }
874
875        match self.try_reconstruct(commitment) {
876            Ok(Some(block)) => {
877                // Do not prune other reconstruction state here. A Byzantine
878                // leader can equivocate by proposing multiple commitments in
879                // the same round, so more than one block may be reconstructed
880                // for a given round. Pruning is deferred to `prune()`, which
881                // is called once a commitment is finalized.
882                debug!(
883                    %commitment,
884                    parent = %block.parent(),
885                    height = %block.height(),
886                    "successfully reconstructed block from shards"
887                );
888            }
889            Ok(None) => {
890                debug!(%commitment, "not enough checked shards to reconstruct block");
891            }
892            Err(err) => {
893                warn!(%commitment, ?err, "failed to reconstruct block from checked shards");
894                self.state.remove(&commitment);
895                self.drop_subscriptions(commitment);
896                self.metrics.reconstruction_failures_total.inc();
897            }
898        }
899    }
900
901    /// Handles the registry of an assigned shard verification subscription.
902    ///
903    /// For participants this is tied to verification of the leader-delivered
904    /// shard for the local index, not to generic block reconstruction.
905    fn handle_assigned_shard_verified_subscription(
906        &mut self,
907        commitment: Commitment,
908        response: oneshot::Sender<()>,
909    ) {
910        // Answer immediately if our own shard has been verified.
911        let has_shard = self
912            .state
913            .get(&commitment)
914            .is_some_and(|state| state.is_assigned_shard_verified());
915        if has_shard {
916            response.send_lossy(());
917            return;
918        }
919
920        // When there is no reconstruction state but the block is already in
921        // the cache, the local node was the proposer. Proposers trivially
922        // have all shards, so resolve immediately.
923        if !self.state.contains_key(&commitment)
924            && self.reconstructed_blocks.contains_key(&commitment)
925        {
926            response.send_lossy(());
927            return;
928        }
929
930        self.assigned_shard_verified_subscriptions
931            .entry(commitment)
932            .or_default()
933            .push(response);
934    }
935
936    /// Handles the registry of a block subscription.
937    fn handle_block_subscription(
938        &mut self,
939        key: BlockSubscriptionKey<B::Digest>,
940        response: oneshot::Sender<Arc<CodedBlock<B, C, H>>>,
941    ) {
942        let block = match key {
943            BlockSubscriptionKey::Commitment(commitment) => {
944                self.reconstructed_blocks.get(&commitment)
945            }
946            BlockSubscriptionKey::Digest(digest) => self
947                .reconstructed_blocks
948                .iter()
949                .find_map(|(_, block)| (block.digest() == digest).then_some(block)),
950        };
951
952        // Answer immediately if we have the block cached.
953        if let Some(block) = block {
954            response.send_lossy(Arc::clone(block));
955            return;
956        }
957
958        self.block_subscriptions
959            .entry(key)
960            .or_default()
961            .push(response);
962    }
963
964    /// Notifies and cleans up any subscriptions waiting for assigned shard
965    /// verification.
966    fn notify_assigned_shard_verified_subscribers(&mut self, commitment: Commitment) {
967        if let Some(mut subscribers) = self
968            .assigned_shard_verified_subscriptions
969            .remove(&commitment)
970        {
971            for subscriber in subscribers.drain(..) {
972                subscriber.send_lossy(());
973            }
974        }
975    }
976
977    /// Notifies and cleans up any subscriptions for a reconstructed block.
978    fn notify_block_subscribers(&mut self, block: Arc<CodedBlock<B, C, H>>) {
979        let commitment = block.commitment();
980        let digest = block.digest();
981
982        // Notify by-commitment subscribers.
983        if let Some(mut subscribers) = self
984            .block_subscriptions
985            .remove(&BlockSubscriptionKey::Commitment(commitment))
986        {
987            for subscriber in subscribers.drain(..) {
988                subscriber.send_lossy(Arc::clone(&block));
989            }
990        }
991
992        // Notify by-digest subscribers.
993        if let Some(mut subscribers) = self
994            .block_subscriptions
995            .remove(&BlockSubscriptionKey::Digest(digest))
996        {
997            for subscriber in subscribers.drain(..) {
998                subscriber.send_lossy(Arc::clone(&block));
999            }
1000        }
1001    }
1002
1003    /// Drops all subscriptions associated with a commitment.
1004    ///
1005    /// Removing these entries drops all senders, causing receivers to resolve
1006    /// with cancellation (`RecvError`) instead of hanging indefinitely.
1007    fn drop_subscriptions(&mut self, commitment: Commitment) {
1008        self.assigned_shard_verified_subscriptions
1009            .remove(&commitment);
1010        self.block_subscriptions
1011            .remove(&BlockSubscriptionKey::Commitment(commitment));
1012        self.block_subscriptions
1013            .remove(&BlockSubscriptionKey::Digest(
1014                commitment.block::<B::Digest>(),
1015            ));
1016    }
1017
1018    /// Prunes all blocks in the reconstructed block cache that are older than the block
1019    /// with the given commitment. Also cleans up stale reconstruction state
1020    /// and subscriptions.
1021    ///
1022    /// This is the only place reconstruction state is pruned by round. We
1023    /// intentionally avoid pruning on reconstruction success because a
1024    /// Byzantine leader can equivocate, producing multiple valid commitments
1025    /// in the same round. Both must remain recoverable until finalization
1026    /// determines which one is canonical.
1027    fn prune(&mut self, through: Commitment) {
1028        if let Some(height) = self.reconstructed_blocks.get(&through).map(|b| b.height()) {
1029            self.reconstructed_blocks
1030                .retain(|_, block| block.height() > height);
1031        }
1032
1033        // Always clear direct state/subscriptions for the pruned commitment.
1034        // This avoids dangling waiters when prune is called for a commitment
1035        // that was never reconstructed locally.
1036        self.drop_subscriptions(through);
1037        let Some(round) = self.state.remove(&through).map(|state| state.round()) else {
1038            return;
1039        };
1040
1041        let mut pruned_commitments = Vec::new();
1042        self.state.retain(|c, s| {
1043            let keep = s.round() > round;
1044            if !keep {
1045                pruned_commitments.push(*c);
1046            }
1047            keep
1048        });
1049        for pruned in pruned_commitments {
1050            self.drop_subscriptions(pruned);
1051        }
1052    }
1053}
1054
1055/// Erasure coded block reconstruction state machine.
1056enum ReconstructionState<P, C, H>
1057where
1058    P: PublicKey,
1059    C: CodingScheme,
1060    H: Hasher,
1061{
1062    /// Stage 1: accumulate shards. The leader's shard for our index is
1063    /// verified immediately; all other shards are buffered until enough
1064    /// are available for batch verification.
1065    AwaitingQuorum(AwaitingQuorumState<P, C, H>),
1066    /// Stage 2: batch validation passed; checked shards are available for
1067    /// reconstruction.
1068    Ready(ReadyState<P, C, H>),
1069}
1070
1071/// Action to take once assigned shard verification has been established.
1072///
1073/// Participants broadcast the shard to all peers, while non-participants
1074/// only notify local subscribers.
1075enum AssignedShardVerifiedAction<C: CodingScheme, H: Hasher> {
1076    /// Broadcast the shard to all peers and notify local subscribers.
1077    Broadcast(Shard<C, H>),
1078    /// Only notify local subscribers (non-participant validated the leader's shard).
1079    NotifyOnly,
1080}
1081
1082/// A coding shard paired with its participant index.
1083struct IndexedShard<C: CodingScheme> {
1084    index: u16,
1085    data: C::Shard,
1086}
1087
1088/// State shared across all reconstruction phases.
1089struct CommonState<P, C, H>
1090where
1091    P: PublicKey,
1092    C: CodingScheme,
1093    H: Hasher,
1094{
1095    /// The leader associated with this reconstruction state.
1096    leader: P,
1097    /// Our validated shard and the action to take with it.
1098    pending_action: Option<AssignedShardVerifiedAction<C, H>>,
1099    /// Shards that have been verified and are ready to contribute to reconstruction.
1100    checked_shards: Vec<C::CheckedShard>,
1101    /// Bitmap tracking which participant indices have contributed a shard.
1102    contributed: BitMap,
1103    /// The round for which this commitment was externally proposed.
1104    round: Round,
1105    /// Raw shard data received per index, retained for equivocation detection.
1106    /// Keyed by shard index.
1107    received_shards: BTreeMap<u16, C::Shard>,
1108    /// Whether the leader's shard for our assigned index has been verified.
1109    assigned_shard_verified: bool,
1110}
1111
1112/// Phase data for `ReconstructionState::AwaitingQuorum`.
1113///
1114/// In this phase, the leader is known. The leader's shard for our index is
1115/// verified eagerly via `C::check`. Other shards are buffered in
1116/// `pending_shards` until enough are available to attempt batch validation.
1117struct AwaitingQuorumState<P, C, H>
1118where
1119    P: PublicKey,
1120    C: CodingScheme,
1121    H: Hasher,
1122{
1123    common: CommonState<P, C, H>,
1124    /// Shards pending batch validation, keyed by sender.
1125    pending_shards: BTreeMap<P, IndexedShard<C>>,
1126}
1127
1128/// Phase data for `ReconstructionState::Ready`.
1129///
1130/// Batch validation has passed. Checked shards are available for
1131/// reconstruction.
1132struct ReadyState<P, C, H>
1133where
1134    P: PublicKey,
1135    C: CodingScheme,
1136    H: Hasher,
1137{
1138    common: CommonState<P, C, H>,
1139}
1140
1141impl<P, C, H> CommonState<P, C, H>
1142where
1143    P: PublicKey,
1144    C: CodingScheme,
1145    H: Hasher,
1146{
1147    /// Create a new empty common state for the provided leader and round.
1148    fn new(leader: P, round: Round, participants_len: u64) -> Self {
1149        Self {
1150            leader,
1151            pending_action: None,
1152            checked_shards: Vec::new(),
1153            contributed: BitMap::zeroes(participants_len),
1154            round,
1155            received_shards: BTreeMap::new(),
1156            assigned_shard_verified: false,
1157        }
1158    }
1159}
1160
1161impl<P, C, H> CommonState<P, C, H>
1162where
1163    P: PublicKey,
1164    C: CodingScheme,
1165    H: Hasher,
1166{
1167    /// Verify the leader's shard for our index and store it.
1168    ///
1169    /// When `is_participant` is true, the validated shard is stored for
1170    /// broadcasting to peers. When false (non-participant), only subscriber
1171    /// notification is scheduled.
1172    ///
1173    /// Returns `false` if verification fails (sender is blocked), `true` on
1174    /// success.
1175    async fn verify_assigned_shard(
1176        &mut self,
1177        sender: P,
1178        commitment: Commitment,
1179        shard: IndexedShard<C>,
1180        is_participant: bool,
1181        blocker: &mut impl Blocker<PublicKey = P>,
1182    ) -> bool {
1183        // Store data for equivocation detection first (move), then clone
1184        // once for check. This avoids a second clone compared to cloning
1185        // for both check and storage.
1186        self.received_shards.insert(shard.index, shard.data);
1187        let data = self.received_shards.get(&shard.index).unwrap();
1188        let Ok(checked) = C::check(&commitment.config(), &commitment.root(), shard.index, data)
1189        else {
1190            self.received_shards.remove(&shard.index);
1191            commonware_p2p::block!(blocker, sender, "invalid shard received from leader");
1192            return false;
1193        };
1194
1195        self.contributed.set(u64::from(shard.index), true);
1196        self.checked_shards.push(checked);
1197        self.assigned_shard_verified = true;
1198        self.pending_action = Some(if is_participant {
1199            AssignedShardVerifiedAction::Broadcast(Shard::new(
1200                commitment,
1201                shard.index,
1202                data.clone(),
1203            ))
1204        } else {
1205            AssignedShardVerifiedAction::NotifyOnly
1206        });
1207        true
1208    }
1209}
1210
1211impl<P, C, H> AwaitingQuorumState<P, C, H>
1212where
1213    P: PublicKey,
1214    C: CodingScheme,
1215    H: Hasher,
1216{
1217    /// Check whether quorum is met and, if so, batch-validate all pending
1218    /// shards in parallel. Returns `Some(ReadyState)` on successful transition.
1219    async fn try_transition(
1220        &mut self,
1221        commitment: Commitment,
1222        participants_len: u64,
1223        strategy: &impl Strategy,
1224        blocker: &mut impl Blocker<PublicKey = P>,
1225    ) -> Option<ReadyState<P, C, H>> {
1226        let minimum = usize::from(commitment.config().minimum_shards.get());
1227        if self.common.checked_shards.len() + self.pending_shards.len() < minimum {
1228            return None;
1229        }
1230
1231        // Batch-validate all pending weak shards in parallel.
1232        let pending = std::mem::take(&mut self.pending_shards);
1233        let (new_checked, to_block) =
1234            strategy.map_partition_collect_vec(pending, |(peer, shard)| {
1235                let checked = C::check(
1236                    &commitment.config(),
1237                    &commitment.root(),
1238                    shard.index,
1239                    &shard.data,
1240                );
1241                (peer, checked.ok())
1242            });
1243
1244        for peer in to_block {
1245            commonware_p2p::block!(blocker, peer, "invalid shard received");
1246        }
1247        for checked in new_checked {
1248            self.common.checked_shards.push(checked);
1249        }
1250
1251        // After validation, some may have failed; recheck threshold.
1252        if self.common.checked_shards.len() < minimum {
1253            return None;
1254        }
1255
1256        // Transition to Ready.
1257        let round = self.common.round;
1258        let leader = self.common.leader.clone();
1259        let common = std::mem::replace(
1260            &mut self.common,
1261            CommonState::new(leader, round, participants_len),
1262        );
1263        Some(ReadyState { common })
1264    }
1265}
1266
1267/// Context required for processing incoming network shards.
1268struct InsertCtx<'a, Sch, S>
1269where
1270    Sch: CertificateScheme,
1271    S: Strategy,
1272{
1273    scheme: &'a Sch,
1274    strategy: &'a S,
1275    participants_len: u64,
1276}
1277
1278impl<Sch: CertificateScheme, S: Strategy> Clone for InsertCtx<'_, Sch, S> {
1279    fn clone(&self) -> Self {
1280        *self
1281    }
1282}
1283
1284impl<Sch: CertificateScheme, S: Strategy> Copy for InsertCtx<'_, Sch, S> {}
1285
1286impl<'a, Sch: CertificateScheme, S: Strategy> InsertCtx<'a, Sch, S> {
1287    fn new(scheme: &'a Sch, strategy: &'a S) -> Self {
1288        let participants_len = u64::try_from(scheme.participants().len())
1289            .expect("participant count impossibly out of bounds");
1290        Self {
1291            scheme,
1292            strategy,
1293            participants_len,
1294        }
1295    }
1296}
1297
1298impl<P, C, H> ReconstructionState<P, C, H>
1299where
1300    P: PublicKey,
1301    C: CodingScheme,
1302    H: Hasher,
1303{
1304    /// Create an initial reconstruction state for a commitment.
1305    fn new(leader: P, round: Round, participants_len: u64) -> Self {
1306        Self::AwaitingQuorum(AwaitingQuorumState {
1307            common: CommonState::new(leader, round, participants_len),
1308            pending_shards: BTreeMap::new(),
1309        })
1310    }
1311
1312    /// Access common state shared across all phases.
1313    const fn common(&self) -> &CommonState<P, C, H> {
1314        match self {
1315            Self::AwaitingQuorum(state) => &state.common,
1316            Self::Ready(state) => &state.common,
1317        }
1318    }
1319
1320    /// Mutably access common state shared across all phases.
1321    const fn common_mut(&mut self) -> &mut CommonState<P, C, H> {
1322        match self {
1323            Self::AwaitingQuorum(state) => &mut state.common,
1324            Self::Ready(state) => &mut state.common,
1325        }
1326    }
1327
1328    /// Return the leader associated with this state.
1329    const fn leader(&self) -> &P {
1330        &self.common().leader
1331    }
1332
1333    /// Returns whether the leader's shard for our index has been verified.
1334    const fn is_assigned_shard_verified(&self) -> bool {
1335        self.common().assigned_shard_verified
1336    }
1337
1338    /// Return the proposal round associated with this state.
1339    const fn round(&self) -> Round {
1340        self.common().round
1341    }
1342
1343    /// Returns all verified shards accumulated for reconstruction.
1344    const fn checked_shards(&self) -> &[C::CheckedShard] {
1345        self.common().checked_shards.as_slice()
1346    }
1347
1348    /// Takes the pending action for this commitment's validated shard.
1349    ///
1350    /// Returns [`None`] if the leader's shard hasn't been validated yet.
1351    const fn take_pending_action(&mut self) -> Option<AssignedShardVerifiedAction<C, H>> {
1352        self.common_mut().pending_action.take()
1353    }
1354
1355    /// Handle an incoming network shard.
1356    ///
1357    /// Returns `true` only when the shard caused state progress (buffered,
1358    /// validated, or transitioned), and `false` when rejected/blocked.
1359    ///
1360    /// ## Peer Blocking Rules
1361    ///
1362    /// The `sender` may be blocked via the provided [`Blocker`] if any of
1363    /// the following rules are violated:
1364    ///
1365    /// - MUST be sent by a participant in the current epoch. Non-participant
1366    ///   senders are blocked.
1367    /// - If the sender is the leader: the shard index MUST match the
1368    ///   recipient's own participant index (when the recipient is a
1369    ///   participant) or the leader's participant index (when the recipient
1370    ///   is a non-participant).
1371    /// - If the sender is not the leader: the shard index MUST match the
1372    ///   sender's participant index. Each non-leader participant may only
1373    ///   gossip their own shard.
1374    /// - A mismatched shard index results in blocking the sender.
1375    /// - Each shard index may only contribute ONE shard per commitment.
1376    ///   Sending a second shard for the same index with different data
1377    ///   (equivocation) results in blocking the sender.
1378    /// - The leader's shard is verified eagerly via [`CodingScheme::check`].
1379    ///   If verification fails, the leader is blocked.
1380    /// - Non-leader shards are buffered in `pending_shards` and
1381    ///   batch-validated when quorum is reached. Invalid shards discovered
1382    ///   during batch validation result in blocking their respective
1383    ///   senders.
1384    ///
1385    /// ## Silent Discard Rules
1386    ///
1387    /// The following conditions cause a shard to be silently ignored
1388    /// without blocking the sender:
1389    ///
1390    /// - Exact duplicate of a previously received shard for the same index.
1391    /// - The index has already been marked as contributed (via the bitmap,
1392    ///   e.g. after batch validation).
1393    /// - Non-leader shards that arrive after the state has transitioned to
1394    ///   [`ReconstructionState::Ready`] (i.e., batch validation has already
1395    ///   passed). The leader's shard for our index is still accepted in
1396    ///   `Ready` state to ensure we verify and re-broadcast it.
1397    /// - When the leader is not yet known, shards are buffered at the
1398    ///   engine level in bounded per-peer queues until
1399    ///   [`Discovered`](super::Message::Discovered) creates a
1400    ///   reconstruction state for this commitment.
1401    async fn on_network_shard<Sch, S, X>(
1402        &mut self,
1403        sender: P,
1404        shard: Shard<C, H>,
1405        ctx: InsertCtx<'_, Sch, S>,
1406        blocker: &mut X,
1407    ) -> bool
1408    where
1409        Sch: CertificateScheme<PublicKey = P>,
1410        S: Strategy,
1411        X: Blocker<PublicKey = P>,
1412    {
1413        let Some(sender_index) = ctx.scheme.participants().index(&sender) else {
1414            commonware_p2p::block!(blocker, sender, "shard sent by non-participant");
1415            return false;
1416        };
1417        let commitment = shard.commitment();
1418        let indexed = IndexedShard {
1419            index: shard.index(),
1420            data: shard.into_inner(),
1421        };
1422
1423        // Determine expected index based on sender role.
1424        let is_from_leader = sender == self.common().leader;
1425        let expected_participant = if is_from_leader {
1426            ctx.scheme.me().unwrap_or(sender_index)
1427        } else {
1428            sender_index
1429        };
1430        let expected_index: u16 = expected_participant
1431            .get()
1432            .try_into()
1433            .expect("participant index impossibly out of bounds");
1434        if indexed.index != expected_index {
1435            commonware_p2p::block!(
1436                blocker,
1437                sender,
1438                shard_index = indexed.index,
1439                expected_index,
1440                "shard index does not match expected index"
1441            );
1442            return false;
1443        }
1444
1445        // Equivocation/duplicate check.
1446        if let Some(existing) = self.common().received_shards.get(&indexed.index) {
1447            if existing != &indexed.data {
1448                commonware_p2p::block!(blocker, sender, "shard equivocation");
1449            }
1450            return false;
1451        }
1452
1453        // Check if this index already contributed (via batch validation).
1454        if self.common().contributed.get(u64::from(indexed.index)) {
1455            return false;
1456        }
1457
1458        // Leader's shard for our index is always verified eagerly,
1459        // even after transitioning to Ready. This ensures we broadcast
1460        // our own shard to help slower peers reach quorum.
1461        if is_from_leader && !self.common().assigned_shard_verified {
1462            let progressed = self
1463                .common_mut()
1464                .verify_assigned_shard(
1465                    sender,
1466                    commitment,
1467                    indexed,
1468                    ctx.scheme.me().is_some(),
1469                    blocker,
1470                )
1471                .await;
1472
1473            if progressed {
1474                if let Self::AwaitingQuorum(state) = self {
1475                    if let Some(ready) = state
1476                        .try_transition(commitment, ctx.participants_len, ctx.strategy, blocker)
1477                        .await
1478                    {
1479                        *self = Self::Ready(ready);
1480                    }
1481                }
1482            }
1483            return progressed;
1484        }
1485
1486        // Non-leader shards are only accepted while awaiting quorum.
1487        let Self::AwaitingQuorum(state) = self else {
1488            return false;
1489        };
1490
1491        // Buffer for batch validation.
1492        state
1493            .common
1494            .received_shards
1495            .insert(indexed.index, indexed.data.clone());
1496        state.common.contributed.set(u64::from(indexed.index), true);
1497        state.pending_shards.insert(sender, indexed);
1498        if let Some(ready) = state
1499            .try_transition(commitment, ctx.participants_len, ctx.strategy, blocker)
1500            .await
1501        {
1502            *self = Self::Ready(ready);
1503        }
1504
1505        true
1506    }
1507}
1508
1509#[cfg(test)]
1510mod tests {
1511    use super::*;
1512    use crate::{
1513        marshal::{
1514            coding::types::coding_config_for_participants, mocks::block::Block as MockBlock,
1515        },
1516        types::{Epoch, Height, View},
1517    };
1518    use bytes::Bytes;
1519    use commonware_codec::Encode;
1520    use commonware_coding::{
1521        CodecConfig, Config as CodingConfig, PhasedAsScheme, ReedSolomon, Zoda,
1522    };
1523    use commonware_cryptography::{
1524        certificate::Subject,
1525        ed25519::{PrivateKey, PublicKey},
1526        impl_certificate_ed25519,
1527        sha256::Digest as Sha256Digest,
1528        Committable, Digest, Sha256, Signer,
1529    };
1530    use commonware_macros::{select, test_traced};
1531    use commonware_p2p::{
1532        simulated::{self, Control, Link, Oracle},
1533        Manager as _, TrackedPeers,
1534    };
1535    use commonware_parallel::Sequential;
1536    use commonware_runtime::{deterministic, Quota, Runner};
1537    use commonware_utils::{
1538        channel::oneshot::error::TryRecvError, ordered::Set, NZUsize, Participant,
1539    };
1540    use std::{
1541        future::Future,
1542        marker::PhantomData,
1543        num::NonZeroU32,
1544        sync::atomic::{AtomicIsize, Ordering},
1545        time::Duration,
1546    };
1547
1548    #[derive(Clone, Debug)]
1549    pub struct TestSubject {
1550        pub message: Bytes,
1551    }
1552
1553    impl Subject for TestSubject {
1554        type Namespace = Vec<u8>;
1555
1556        fn namespace<'a>(&self, derived: &'a Self::Namespace) -> &'a [u8] {
1557            derived
1558        }
1559
1560        fn message(&self) -> Bytes {
1561            self.message.clone()
1562        }
1563    }
1564
1565    impl_certificate_ed25519!(TestSubject, Vec<u8>);
1566
1567    const SCHEME_NAMESPACE: &[u8] = b"_COMMONWARE_SHARD_ENGINE_TEST";
1568
1569    /// The max size of a shard sent over the wire.
1570    const MAX_SHARD_SIZE: usize = 1024 * 1024; // 1 MiB
1571
1572    /// The default link configuration for tests.
1573    const DEFAULT_LINK: Link = Link {
1574        latency: Duration::from_millis(50),
1575        jitter: Duration::ZERO,
1576        success_rate: 1.0,
1577    };
1578
1579    /// Rate limit quota for tests (effectively unlimited).
1580    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
1581
1582    /// The parallelization strategy used for tests.
1583    const STRATEGY: Sequential = Sequential;
1584
1585    /// A scheme provider that maps each epoch to a potentially different scheme.
1586    ///
1587    /// For most tests only epoch 0 is registered, matching the previous
1588    /// `ConstantProvider` behaviour. Cross-epoch tests register additional
1589    /// epochs with different participant sets.
1590    #[derive(Clone)]
1591    struct MultiEpochProvider {
1592        schemes: BTreeMap<Epoch, Arc<Scheme>>,
1593    }
1594
1595    impl MultiEpochProvider {
1596        fn single(scheme: Scheme) -> Self {
1597            let mut schemes = BTreeMap::new();
1598            schemes.insert(Epoch::zero(), Arc::new(scheme));
1599            Self { schemes }
1600        }
1601
1602        fn with_epoch(mut self, epoch: Epoch, scheme: Scheme) -> Self {
1603            self.schemes.insert(epoch, Arc::new(scheme));
1604            self
1605        }
1606    }
1607
1608    impl Provider for MultiEpochProvider {
1609        type Scope = Epoch;
1610        type Scheme = Scheme;
1611
1612        fn scoped(&self, scope: Epoch) -> Option<Arc<Scheme>> {
1613            self.schemes.get(&scope).cloned()
1614        }
1615    }
1616
1617    /// A one-epoch scheme provider that churns to `None` after a fixed number
1618    /// of successful scope lookups.
1619    #[derive(Clone)]
1620    struct ChurningProvider {
1621        scheme: Arc<Scheme>,
1622        remaining_successes: Arc<AtomicIsize>,
1623    }
1624
1625    impl ChurningProvider {
1626        fn new(scheme: Scheme, successes: isize) -> Self {
1627            Self {
1628                scheme: Arc::new(scheme),
1629                remaining_successes: Arc::new(AtomicIsize::new(successes)),
1630            }
1631        }
1632    }
1633
1634    impl Provider for ChurningProvider {
1635        type Scope = Epoch;
1636        type Scheme = Scheme;
1637
1638        fn scoped(&self, scope: Epoch) -> Option<Arc<Scheme>> {
1639            if scope != Epoch::zero() {
1640                return None;
1641            }
1642            if self.remaining_successes.fetch_sub(1, Ordering::AcqRel) <= 0 {
1643                return None;
1644            }
1645            Some(Arc::clone(&self.scheme))
1646        }
1647    }
1648
1649    // Type aliases for test convenience.
1650    type B = MockBlock<Sha256Digest, ()>;
1651    type H = Sha256;
1652    type P = PublicKey;
1653    type C = ReedSolomon<H>;
1654    type X = Control<P, deterministic::Context>;
1655    type O = Oracle<P, deterministic::Context>;
1656    type Prov = MultiEpochProvider;
1657    type NetworkSender = simulated::Sender<P, deterministic::Context>;
1658    type D = simulated::Manager<P, deterministic::Context>;
1659    type ShardEngine<S> = Engine<deterministic::Context, Prov, X, D, S, H, B, P, Sequential>;
1660    type ChurningShardEngine<S> =
1661        Engine<deterministic::Context, ChurningProvider, X, D, S, H, B, P, Sequential>;
1662
1663    async fn assert_blocked(oracle: &O, blocker: &P, blocked: &P) {
1664        let blocked_peers = oracle.blocked().await.unwrap();
1665        let is_blocked = blocked_peers
1666            .iter()
1667            .any(|(a, b)| a == blocker && b == blocked);
1668        assert!(is_blocked, "expected {blocker} to have blocked {blocked}");
1669    }
1670
1671    /// A participant in the test network with its engine mailbox and blocker.
1672    struct Peer<S: CodingScheme = C> {
1673        /// The peer's public key.
1674        public_key: PublicKey,
1675        /// The peer's index in the participant set.
1676        index: Participant,
1677        /// The mailbox for sending messages to the peer's shard engine.
1678        mailbox: Mailbox<B, S, H, P>,
1679        /// Raw network sender for injecting messages (e.g., byzantine behavior).
1680        sender: NetworkSender,
1681    }
1682
1683    /// A non-participant in the test network with its engine mailbox.
1684    #[allow(dead_code)]
1685    struct NonParticipant<S: CodingScheme = C> {
1686        /// The peer's public key.
1687        public_key: PublicKey,
1688        /// The mailbox for sending messages to the peer's shard engine.
1689        mailbox: Mailbox<B, S, H, P>,
1690        /// Raw network sender for injecting messages.
1691        sender: NetworkSender,
1692    }
1693
1694    /// Test fixture for setting up multiple participants with shard engines.
1695    struct Fixture<S: CodingScheme = C> {
1696        /// Number of peers in the test network.
1697        num_peers: usize,
1698        /// Number of non-participant peers in the test network.
1699        num_non_participants: usize,
1700        /// Network link configuration.
1701        link: Link,
1702        /// Marker for the coding scheme type parameter.
1703        _marker: PhantomData<S>,
1704    }
1705
1706    impl<S: CodingScheme> Default for Fixture<S> {
1707        fn default() -> Self {
1708            Self {
1709                num_peers: 4,
1710                num_non_participants: 0,
1711                link: DEFAULT_LINK,
1712                _marker: PhantomData,
1713            }
1714        }
1715    }
1716
1717    impl<S: CodingScheme> Fixture<S> {
1718        pub fn start<F: Future<Output = ()>>(
1719            self,
1720            f: impl FnOnce(
1721                Self,
1722                deterministic::Context,
1723                O,
1724                Vec<Peer<S>>,
1725                Vec<NonParticipant<S>>,
1726                CodingConfig,
1727            ) -> F,
1728        ) {
1729            let executor = deterministic::Runner::default();
1730            executor.start(|context| async move {
1731                let mut private_keys = (0..self.num_peers)
1732                    .map(|i| PrivateKey::from_seed(i as u64))
1733                    .collect::<Vec<_>>();
1734                private_keys.sort_by_key(|s| s.public_key());
1735                let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
1736
1737                let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
1738
1739                let mut np_private_keys = (0..self.num_non_participants)
1740                    .map(|i| PrivateKey::from_seed((self.num_peers + i) as u64))
1741                    .collect::<Vec<_>>();
1742                np_private_keys.sort_by_key(|s| s.public_key());
1743                let np_keys: Vec<P> = np_private_keys.iter().map(|k| k.public_key()).collect();
1744
1745                let (network, oracle) =
1746                    simulated::Network::<deterministic::Context, P>::new_with_split_peers(
1747                        context.with_label("network"),
1748                        simulated::Config {
1749                            max_size: MAX_SHARD_SIZE as u32,
1750                            disconnect_on_block: true,
1751                            tracked_peer_sets: NZUsize!(1),
1752                        },
1753                        peer_keys.clone(),
1754                        np_keys.clone(),
1755                    )
1756                    .await;
1757                network.start();
1758
1759                let all_keys: Vec<P> = peer_keys.iter().chain(np_keys.iter()).cloned().collect();
1760
1761                let mut registrations = BTreeMap::new();
1762                for key in all_keys.iter() {
1763                    let control = oracle.control(key.clone());
1764                    let (sender, receiver) = control
1765                        .register(0, TEST_QUOTA)
1766                        .await
1767                        .expect("registration should succeed");
1768                    registrations.insert(key.clone(), (control, sender, receiver));
1769                }
1770                for p1 in all_keys.iter() {
1771                    for p2 in all_keys.iter() {
1772                        if p2 == p1 {
1773                            continue;
1774                        }
1775                        oracle
1776                            .add_link(p1.clone(), p2.clone(), self.link.clone())
1777                            .await
1778                            .expect("link should be added");
1779                    }
1780                }
1781
1782                let coding_config =
1783                    coding_config_for_participants(u16::try_from(self.num_peers).unwrap());
1784
1785                let mut peers = Vec::with_capacity(self.num_peers);
1786                for (idx, peer_key) in peer_keys.iter().enumerate() {
1787                    let (control, sender, receiver) = registrations
1788                        .remove(peer_key)
1789                        .expect("peer should be registered");
1790
1791                    let participant = Participant::new(idx as u32);
1792                    let engine_context = context.with_label(&format!("peer_{}", idx));
1793
1794                    let scheme = Scheme::signer(
1795                        SCHEME_NAMESPACE,
1796                        participants.clone(),
1797                        private_keys[idx].clone(),
1798                    )
1799                    .expect("signer scheme should be created");
1800                    let scheme_provider: Prov = MultiEpochProvider::single(scheme);
1801
1802                    let config = Config {
1803                        scheme_provider,
1804                        blocker: control.clone(),
1805                        shard_codec_cfg: CodecConfig {
1806                            maximum_shard_size: MAX_SHARD_SIZE,
1807                        },
1808                        block_codec_cfg: (),
1809                        strategy: STRATEGY,
1810                        mailbox_size: 1024,
1811                        peer_buffer_size: NZUsize!(64),
1812                        background_channel_capacity: 1024,
1813                        peer_provider: oracle.manager(),
1814                    };
1815
1816                    let (engine, mailbox) = ShardEngine::new(engine_context, config);
1817                    let sender_clone = sender.clone();
1818                    engine.start((sender, receiver));
1819
1820                    peers.push(Peer {
1821                        public_key: peer_key.clone(),
1822                        index: participant,
1823                        mailbox,
1824                        sender: sender_clone,
1825                    });
1826                }
1827
1828                let mut non_participants = Vec::with_capacity(self.num_non_participants);
1829                for (idx, np_key) in np_keys.iter().enumerate() {
1830                    let (control, sender, receiver) = registrations
1831                        .remove(np_key)
1832                        .expect("non-participant should be registered");
1833
1834                    let engine_context = context.with_label(&format!("non_participant_{}", idx));
1835
1836                    let scheme = Scheme::verifier(SCHEME_NAMESPACE, participants.clone());
1837                    let scheme_provider: Prov = MultiEpochProvider::single(scheme);
1838
1839                    let config = Config {
1840                        scheme_provider,
1841                        blocker: control.clone(),
1842                        shard_codec_cfg: CodecConfig {
1843                            maximum_shard_size: MAX_SHARD_SIZE,
1844                        },
1845                        block_codec_cfg: (),
1846                        strategy: STRATEGY,
1847                        mailbox_size: 1024,
1848                        peer_buffer_size: NZUsize!(64),
1849                        background_channel_capacity: 1024,
1850                        peer_provider: oracle.manager(),
1851                    };
1852
1853                    let (engine, mailbox) = ShardEngine::new(engine_context, config);
1854                    let sender_clone = sender.clone();
1855                    engine.start((sender, receiver));
1856
1857                    non_participants.push(NonParticipant {
1858                        public_key: np_key.clone(),
1859                        mailbox,
1860                        sender: sender_clone,
1861                    });
1862                }
1863
1864                f(
1865                    self,
1866                    context,
1867                    oracle,
1868                    peers,
1869                    non_participants,
1870                    coding_config,
1871                )
1872                .await;
1873            });
1874        }
1875    }
1876
1877    #[test_traced]
1878    fn test_e2e_broadcast_and_reconstruction() {
1879        let fixture = Fixture {
1880            num_peers: 10,
1881            ..Default::default()
1882        };
1883
1884        fixture.start(
1885            |config, context, _, mut peers, _, coding_config| async move {
1886                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
1887                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
1888                let commitment = coded_block.commitment();
1889
1890                let leader = peers[0].public_key.clone();
1891                let round = Round::new(Epoch::zero(), View::new(1));
1892                peers[0].mailbox.proposed(round, coded_block.clone()).await;
1893
1894                // Inform all peers of the leader so shards are processed.
1895                for peer in peers[1..].iter_mut() {
1896                    peer.mailbox
1897                        .discovered(commitment, leader.clone(), round)
1898                        .await;
1899                }
1900                context.sleep(config.link.latency).await;
1901
1902                for peer in peers.iter_mut() {
1903                    peer.mailbox
1904                        .subscribe_assigned_shard_verified(commitment)
1905                        .await
1906                        .await
1907                        .expect("shard subscription should complete");
1908                }
1909                context.sleep(config.link.latency).await;
1910
1911                for peer in peers.iter_mut() {
1912                    let reconstructed = peer
1913                        .mailbox
1914                        .get(commitment)
1915                        .await
1916                        .expect("block should be reconstructed");
1917                    assert_eq!(reconstructed.commitment(), commitment);
1918                    assert_eq!(reconstructed.height(), coded_block.height());
1919                }
1920            },
1921        );
1922    }
1923
1924    #[test_traced]
1925    fn test_e2e_broadcast_and_reconstruction_zoda() {
1926        let fixture = Fixture {
1927            num_peers: 10,
1928            ..Default::default()
1929        };
1930
1931        fixture.start(
1932            |config, context, _, mut peers, _, coding_config| async move {
1933                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
1934                let coded_block = CodedBlock::<B, PhasedAsScheme<Zoda<H>>, H>::new(
1935                    inner,
1936                    coding_config,
1937                    &STRATEGY,
1938                );
1939                let commitment = coded_block.commitment();
1940
1941                let leader = peers[0].public_key.clone();
1942                let round = Round::new(Epoch::zero(), View::new(1));
1943                peers[0].mailbox.proposed(round, coded_block.clone()).await;
1944
1945                // Inform all peers of the leader so shards are processed.
1946                for peer in peers[1..].iter_mut() {
1947                    peer.mailbox
1948                        .discovered(commitment, leader.clone(), round)
1949                        .await;
1950                }
1951                context.sleep(config.link.latency).await;
1952
1953                for peer in peers.iter_mut() {
1954                    peer.mailbox
1955                        .subscribe_assigned_shard_verified(commitment)
1956                        .await
1957                        .await
1958                        .expect("shard subscription should complete");
1959                }
1960                context.sleep(config.link.latency).await;
1961
1962                for peer in peers.iter_mut() {
1963                    let reconstructed = peer
1964                        .mailbox
1965                        .get(commitment)
1966                        .await
1967                        .expect("block should be reconstructed");
1968                    assert_eq!(reconstructed.commitment(), commitment);
1969                    assert_eq!(reconstructed.height(), coded_block.height());
1970                }
1971            },
1972        );
1973    }
1974
1975    #[test_traced]
1976    fn test_block_subscriptions() {
1977        let fixture = Fixture {
1978            num_peers: 10,
1979            ..Default::default()
1980        };
1981
1982        fixture.start(
1983            |config, context, _, mut peers, _, coding_config| async move {
1984                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
1985                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
1986                let commitment = coded_block.commitment();
1987                let digest = coded_block.digest();
1988
1989                let leader = peers[0].public_key.clone();
1990                let round = Round::new(Epoch::zero(), View::new(1));
1991
1992                // Subscribe before broadcasting.
1993                let commitment_sub = peers[1].mailbox.subscribe(commitment).await;
1994                let digest_sub = peers[2].mailbox.subscribe_by_digest(digest).await;
1995
1996                peers[0].mailbox.proposed(round, coded_block.clone()).await;
1997
1998                // Inform all peers of the leader so shards are processed.
1999                for peer in peers[1..].iter_mut() {
2000                    peer.mailbox
2001                        .discovered(commitment, leader.clone(), round)
2002                        .await;
2003                }
2004                context.sleep(config.link.latency * 2).await;
2005
2006                for peer in peers.iter_mut() {
2007                    peer.mailbox
2008                        .subscribe_assigned_shard_verified(commitment)
2009                        .await
2010                        .await
2011                        .expect("shard subscription should complete");
2012                }
2013                context.sleep(config.link.latency).await;
2014
2015                let block_by_commitment =
2016                    commitment_sub.await.expect("subscription should resolve");
2017                assert_eq!(block_by_commitment.commitment(), commitment);
2018                assert_eq!(block_by_commitment.height(), coded_block.height());
2019
2020                let block_by_digest = digest_sub.await.expect("subscription should resolve");
2021                assert_eq!(block_by_digest.commitment(), commitment);
2022                assert_eq!(block_by_digest.height(), coded_block.height());
2023            },
2024        );
2025    }
2026
2027    #[test_traced]
2028    fn test_proposer_preproposal_subscriptions_resolve_after_local_cache() {
2029        let fixture = Fixture {
2030            num_peers: 10,
2031            ..Default::default()
2032        };
2033
2034        fixture.start(|config, context, _, peers, _, coding_config| async move {
2035            let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2036            let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2037            let commitment = coded_block.commitment();
2038            let digest = coded_block.digest();
2039            let round = Round::new(Epoch::zero(), View::new(1));
2040
2041            // Subscribe on the proposer before it caches the locally proposed block.
2042            let shard_sub = peers[0].mailbox.subscribe_assigned_shard_verified(commitment).await;
2043            let commitment_sub = peers[0].mailbox.subscribe(commitment).await;
2044            let digest_sub = peers[0].mailbox.subscribe_by_digest(digest).await;
2045
2046            peers[0].mailbox.proposed(round, coded_block.clone()).await;
2047            context.sleep(config.link.latency).await;
2048
2049            select! {
2050                result = shard_sub => {
2051                    result.expect("shard subscription should resolve");
2052                },
2053                _ = context.sleep(Duration::from_secs(5)) => {
2054                    panic!("shard subscription did not resolve after local proposal cache");
2055                }
2056            }
2057
2058            let block_by_commitment = select! {
2059                result = commitment_sub => {
2060                    result.expect("block subscription by commitment should resolve")
2061                },
2062                _ = context.sleep(Duration::from_secs(5)) => {
2063                    panic!("block subscription by commitment did not resolve after local proposal cache");
2064                }
2065            };
2066            assert_eq!(block_by_commitment.commitment(), commitment);
2067            assert_eq!(block_by_commitment.height(), coded_block.height());
2068
2069            let block_by_digest = select! {
2070                result = digest_sub => {
2071                    result.expect("block subscription by digest should resolve")
2072                },
2073                _ = context.sleep(Duration::from_secs(5)) => {
2074                    panic!("block subscription by digest did not resolve after local proposal cache");
2075                }
2076            };
2077            assert_eq!(block_by_digest.commitment(), commitment);
2078            assert_eq!(block_by_digest.height(), coded_block.height());
2079        });
2080    }
2081
2082    #[test_traced]
2083    fn test_shard_subscription_rejects_invalid_shard() {
2084        let fixture = Fixture::<C>::default();
2085        fixture.start(
2086            |config, context, oracle, mut peers, _, coding_config| async move {
2087                // peers[0] = byzantine
2088                // peers[1] = honest proposer
2089                // peers[2] = receiver
2090
2091                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2092                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2093                let commitment = coded_block.commitment();
2094                let receiver_index = peers[2].index.get() as u16;
2095
2096                let valid_shard = coded_block.shard(receiver_index).expect("missing shard");
2097
2098                // Corrupt the shard's index to one that doesn't match
2099                // peers[0]'s participant index, triggering a block.
2100                let mut invalid_shard = valid_shard.clone();
2101                invalid_shard.index = peers[3].index.get() as u16;
2102
2103                // Receiver subscribes to their shard and learns the leader.
2104                let receiver_pk = peers[2].public_key.clone();
2105                let leader = peers[1].public_key.clone();
2106                peers[2]
2107                    .mailbox
2108                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2109                    .await;
2110                let mut shard_sub = peers[2]
2111                    .mailbox
2112                    .subscribe_assigned_shard_verified(commitment)
2113                    .await;
2114
2115                // Byzantine peer sends the invalid shard.
2116                let invalid_bytes = invalid_shard.encode();
2117                peers[0]
2118                    .sender
2119                    .send(Recipients::One(receiver_pk.clone()), invalid_bytes, true)
2120                    .await
2121                    .expect("send failed");
2122
2123                context.sleep(config.link.latency * 2).await;
2124
2125                assert!(
2126                    matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
2127                    "subscription should not resolve from invalid shard"
2128                );
2129                assert_blocked(&oracle, &peers[2].public_key, &peers[0].public_key).await;
2130
2131                // Honest proposer sends the valid shard.
2132                let valid_bytes = valid_shard.encode();
2133                peers[1]
2134                    .sender
2135                    .send(Recipients::One(receiver_pk), valid_bytes, true)
2136                    .await
2137                    .expect("send failed");
2138                context.sleep(config.link.latency * 2).await;
2139
2140                // Subscription should now resolve.
2141                select! {
2142                    _ = shard_sub => {},
2143                    _ = context.sleep(Duration::from_secs(5)) => {
2144                        panic!("subscription did not complete after valid shard arrival");
2145                    },
2146                };
2147            },
2148        );
2149    }
2150
2151    #[test_traced]
2152    fn test_durable_prunes_reconstructed_blocks() {
2153        let fixture = Fixture::<C>::default();
2154        fixture.start(|_, context, _, mut peers, _, coding_config| async move {
2155            // Create 3 blocks at heights 1, 2, 3.
2156            let block1 = CodedBlock::<B, C, H>::new(
2157                B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100),
2158                coding_config,
2159                &STRATEGY,
2160            );
2161            let block2 = CodedBlock::<B, C, H>::new(
2162                B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 100),
2163                coding_config,
2164                &STRATEGY,
2165            );
2166            let block3 = CodedBlock::<B, C, H>::new(
2167                B::new::<H>((), Sha256Digest::EMPTY, Height::new(3), 100),
2168                coding_config,
2169                &STRATEGY,
2170            );
2171            let commitment1 = block1.commitment();
2172            let commitment2 = block2.commitment();
2173            let commitment3 = block3.commitment();
2174
2175            // Cache all blocks via `proposed`.
2176            let peer = &mut peers[0];
2177            let round = Round::new(Epoch::zero(), View::new(1));
2178            peer.mailbox.proposed(round, block1).await;
2179            peer.mailbox.proposed(round, block2).await;
2180            peer.mailbox.proposed(round, block3).await;
2181            context.sleep(Duration::from_millis(10)).await;
2182
2183            // Verify all blocks are in the cache.
2184            assert!(
2185                peer.mailbox.get(commitment1).await.is_some(),
2186                "block1 should be cached"
2187            );
2188            assert!(
2189                peer.mailbox.get(commitment2).await.is_some(),
2190                "block2 should be cached"
2191            );
2192            assert!(
2193                peer.mailbox.get(commitment3).await.is_some(),
2194                "block3 should be cached"
2195            );
2196
2197            // Prune at height 2 (blocks with height <= 2 should be removed).
2198            peer.mailbox.prune(commitment2).await;
2199            context.sleep(Duration::from_millis(10)).await;
2200
2201            // Blocks at heights 1 and 2 should be pruned.
2202            assert!(
2203                peer.mailbox.get(commitment1).await.is_none(),
2204                "block1 should be pruned"
2205            );
2206            assert!(
2207                peer.mailbox.get(commitment2).await.is_none(),
2208                "block2 should be pruned"
2209            );
2210
2211            // Block at height 3 should still be cached.
2212            assert!(
2213                peer.mailbox.get(commitment3).await.is_some(),
2214                "block3 should still be cached"
2215            );
2216        });
2217    }
2218
2219    #[test_traced]
2220    fn test_duplicate_leader_shard_ignored() {
2221        let fixture = Fixture::<C>::default();
2222        fixture.start(
2223            |config, context, oracle, mut peers, _, coding_config| async move {
2224                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2225                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2226                let commitment = coded_block.commitment();
2227
2228                // Get peer 2's own-index shard (the one the leader sends them).
2229                let peer2_index = peers[2].index.get() as u16;
2230                let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2231                let shard_bytes = peer2_shard.encode();
2232
2233                let peer2_pk = peers[2].public_key.clone();
2234                let leader = peers[0].public_key.clone();
2235
2236                // Inform peer 2 that peer 0 is the leader.
2237                peers[2]
2238                    .mailbox
2239                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2240                    .await;
2241
2242                // Send peer 2 their shard from peer 0 (leader, first time - should succeed).
2243                peers[0]
2244                    .sender
2245                    .send(Recipients::One(peer2_pk.clone()), shard_bytes.clone(), true)
2246                    .await
2247                    .expect("send failed");
2248                context.sleep(config.link.latency * 2).await;
2249
2250                // Send the same shard again from peer 0 (leader duplicate - ignored).
2251                peers[0]
2252                    .sender
2253                    .send(Recipients::One(peer2_pk), shard_bytes, true)
2254                    .await
2255                    .expect("send failed");
2256                context.sleep(config.link.latency * 2).await;
2257
2258                // The leader should NOT be blocked for sending an identical duplicate.
2259                let blocked_peers = oracle.blocked().await.unwrap();
2260                let is_blocked = blocked_peers
2261                    .iter()
2262                    .any(|(a, b)| a == &peers[2].public_key && b == &peers[0].public_key);
2263                assert!(
2264                    !is_blocked,
2265                    "leader should not be blocked for duplicate shard"
2266                );
2267            },
2268        );
2269    }
2270
2271    #[test_traced]
2272    fn test_equivocating_leader_shard_blocks_peer() {
2273        let fixture = Fixture::<C>::default();
2274        fixture.start(
2275            |config, context, oracle, mut peers, _, coding_config| async move {
2276                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2277                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
2278                let commitment = coded_block1.commitment();
2279
2280                // Create a second block with different payload to get different shard data.
2281                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 200);
2282                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
2283
2284                // Get peer 2's shard from both blocks.
2285                let peer2_index = peers[2].index.get() as u16;
2286                let shard_bytes1 = coded_block1
2287                    .shard(peer2_index)
2288                    .expect("missing shard")
2289                    .encode();
2290                let mut equivocating_shard =
2291                    coded_block2.shard(peer2_index).expect("missing shard");
2292                // Override the commitment so it targets the same reconstruction state.
2293                equivocating_shard.commitment = commitment;
2294                let shard_bytes2 = equivocating_shard.encode();
2295
2296                let peer2_pk = peers[2].public_key.clone();
2297                let leader = peers[0].public_key.clone();
2298
2299                // Inform peer 2 that peer 0 is the leader.
2300                peers[2]
2301                    .mailbox
2302                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2303                    .await;
2304
2305                // Send peer 2 their shard from the leader (first time - succeeds).
2306                peers[0]
2307                    .sender
2308                    .send(Recipients::One(peer2_pk.clone()), shard_bytes1, true)
2309                    .await
2310                    .expect("send failed");
2311                context.sleep(config.link.latency * 2).await;
2312
2313                // Send a different shard from the leader (equivocation - should block).
2314                peers[0]
2315                    .sender
2316                    .send(Recipients::One(peer2_pk), shard_bytes2, true)
2317                    .await
2318                    .expect("send failed");
2319                context.sleep(config.link.latency * 2).await;
2320
2321                // Peer 2 should have blocked the leader for equivocation.
2322                assert_blocked(&oracle, &peers[2].public_key, &peers[0].public_key).await;
2323            },
2324        );
2325    }
2326
2327    #[test_traced]
2328    fn test_non_leader_wrong_index_shard_blocked() {
2329        // Test that a non-leader sending a shard with the wrong index is blocked.
2330        // Non-leaders must send shards at their own participant index.
2331        let fixture = Fixture::<C>::default();
2332        fixture.start(
2333            |config, context, oracle, mut peers, _, coding_config| async move {
2334                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2335                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2336                let commitment = coded_block.commitment();
2337
2338                // Get peer 2's own-index shard.
2339                let peer2_index = peers[2].index.get() as u16;
2340                let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2341                let shard_bytes = peer2_shard.encode();
2342
2343                let peer2_pk = peers[2].public_key.clone();
2344                let leader = peers[0].public_key.clone();
2345
2346                // Inform peer 2 that peer 0 is the leader.
2347                peers[2]
2348                    .mailbox
2349                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2350                    .await;
2351
2352                // Peer 1 (not the leader) sends peer 2 a shard with peer 2's index
2353                // (wrong: non-leaders must use their own index).
2354                peers[1]
2355                    .sender
2356                    .send(Recipients::One(peer2_pk), shard_bytes, true)
2357                    .await
2358                    .expect("send failed");
2359                context.sleep(config.link.latency * 2).await;
2360
2361                // Peer 1 should be blocked by peer 2 for wrong shard index.
2362                assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2363            },
2364        );
2365    }
2366
2367    #[test_traced]
2368    fn test_buffered_wrong_index_shard_blocked_on_leader_arrival() {
2369        // Test that when a non-leader's shard with the wrong index is buffered
2370        // (leader unknown) and then the leader arrives, the sender is blocked.
2371        let fixture = Fixture::<C>::default();
2372        fixture.start(
2373            |config, context, oracle, mut peers, _, coding_config| async move {
2374                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2375                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2376                let commitment = coded_block.commitment();
2377
2378                // Get peer 2's own-index shard.
2379                let peer2_index = peers[2].index.get() as u16;
2380                let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2381                let shard_bytes = peer2_shard.encode();
2382
2383                let peer2_pk = peers[2].public_key.clone();
2384
2385                // Peer 1 sends a shard with peer 2's index before the leader is known (buffered).
2386                // This is wrong: non-leaders must send at their own index.
2387                peers[1]
2388                    .sender
2389                    .send(Recipients::One(peer2_pk), shard_bytes, true)
2390                    .await
2391                    .expect("send failed");
2392                context.sleep(config.link.latency * 2).await;
2393
2394                // Nobody should be blocked yet (shard is buffered, leader unknown).
2395                let blocked = oracle.blocked().await.unwrap();
2396                assert!(
2397                    blocked.is_empty(),
2398                    "no peers should be blocked while leader is unknown"
2399                );
2400
2401                // Now inform peer 2 that peer 0 is the leader.
2402                // This drains the buffer: peer 1's shard has peer 2's index but
2403                // peer 1 is not the leader, so expected index is peer 1's own index.
2404                let leader = peers[0].public_key.clone();
2405                peers[2]
2406                    .mailbox
2407                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2408                    .await;
2409                context.sleep(Duration::from_millis(10)).await;
2410
2411                assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2412            },
2413        );
2414    }
2415
2416    #[test_traced]
2417    fn test_conflicting_external_proposed_ignored() {
2418        let fixture = Fixture::<C>::default();
2419        fixture.start(
2420            |config, context, oracle, mut peers, _, coding_config| async move {
2421                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2422                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2423                let commitment = coded_block.commitment();
2424
2425                // Get the shard the leader would send to peer 2 (at peer 2's index).
2426                let peer2_index = peers[2].index.get() as u16;
2427                let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2428                let shard_bytes = peer2_shard.encode();
2429
2430                let peer2_pk = peers[2].public_key.clone();
2431                let leader_a = peers[0].public_key.clone();
2432                let leader_b = peers[1].public_key.clone();
2433
2434                // Subscribe before shards arrive so we can verify acceptance.
2435                let shard_sub = peers[2]
2436                    .mailbox
2437                    .subscribe_assigned_shard_verified(commitment)
2438                    .await;
2439
2440                // First leader update should stick.
2441                peers[2]
2442                    .mailbox
2443                    .discovered(
2444                        commitment,
2445                        leader_a.clone(),
2446                        Round::new(Epoch::zero(), View::new(1)),
2447                    )
2448                    .await;
2449                // Conflicting update should be ignored.
2450                peers[2]
2451                    .mailbox
2452                    .discovered(
2453                        commitment,
2454                        leader_b,
2455                        Round::new(Epoch::zero(), View::new(1)),
2456                    )
2457                    .await;
2458
2459                // Original leader sends shard; this should still be accepted.
2460                peers[0]
2461                    .sender
2462                    .send(Recipients::One(peer2_pk.clone()), shard_bytes.clone(), true)
2463                    .await
2464                    .expect("send failed");
2465                context.sleep(config.link.latency * 2).await;
2466
2467                // Subscription should resolve from accepted leader shard.
2468                select! {
2469                    _ = shard_sub => {},
2470                    _ = context.sleep(Duration::from_secs(5)) => {
2471                        panic!("subscription did not complete after shard from original leader");
2472                    },
2473                };
2474
2475                // The conflicting leader should still be treated as non-leader and blocked.
2476                peers[1]
2477                    .sender
2478                    .send(Recipients::One(peer2_pk), shard_bytes, true)
2479                    .await
2480                    .expect("send failed");
2481                context.sleep(config.link.latency * 2).await;
2482
2483                assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2484
2485                // Original leader should not be blocked.
2486                let blocked_peers = oracle.blocked().await.unwrap();
2487                let leader_a_blocked = blocked_peers
2488                    .iter()
2489                    .any(|(a, b)| a == &peers[2].public_key && b == &leader_a);
2490                assert!(
2491                    !leader_a_blocked,
2492                    "original leader should not be blocked after conflicting leader update"
2493                );
2494            },
2495        );
2496    }
2497
2498    #[test_traced]
2499    fn test_non_participant_external_proposed_ignored() {
2500        let fixture = Fixture::<C>::default();
2501        fixture.start(
2502            |config, context, oracle, mut peers, _, coding_config| async move {
2503                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2504                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2505                let commitment = coded_block.commitment();
2506
2507                // Get the shard the leader would send to peer 2 (at peer 2's index).
2508                let peer2_index = peers[2].index.get() as u16;
2509                let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2510                let shard_bytes = peer2_shard.encode();
2511
2512                let peer2_pk = peers[2].public_key.clone();
2513                let leader = peers[0].public_key.clone();
2514                let non_participant_leader = PrivateKey::from_seed(10_000).public_key();
2515
2516                // Subscribe before shards arrive.
2517                let shard_sub = peers[2]
2518                    .mailbox
2519                    .subscribe_assigned_shard_verified(commitment)
2520                    .await;
2521
2522                // A non-participant leader update should be ignored.
2523                peers[2]
2524                    .mailbox
2525                    .discovered(
2526                        commitment,
2527                        non_participant_leader,
2528                        Round::new(Epoch::zero(), View::new(1)),
2529                    )
2530                    .await;
2531
2532                // Leader unknown path: this shard should be buffered, not blocked.
2533                peers[0]
2534                    .sender
2535                    .send(Recipients::One(peer2_pk.clone()), shard_bytes.clone(), true)
2536                    .await
2537                    .expect("send failed");
2538                context.sleep(config.link.latency * 2).await;
2539
2540                let blocked = oracle.blocked().await.unwrap();
2541                let leader_blocked = blocked
2542                    .iter()
2543                    .any(|(a, b)| a == &peers[2].public_key && b == &leader);
2544                assert!(
2545                    !leader_blocked,
2546                    "leader should not be blocked when non-participant update is ignored"
2547                );
2548
2549                // A valid leader update should then process buffered shards and resolve subscription.
2550                peers[2]
2551                    .mailbox
2552                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2553                    .await;
2554                context.sleep(config.link.latency * 2).await;
2555
2556                select! {
2557                    _ = shard_sub => {},
2558                    _ = context.sleep(Duration::from_secs(5)) => {
2559                        panic!("subscription did not complete after valid leader update");
2560                    },
2561                };
2562            },
2563        );
2564    }
2565
2566    #[test_traced]
2567    fn test_shard_from_non_participant_blocks_peer() {
2568        let fixture = Fixture::<C>::default();
2569        fixture.start(
2570            |config, context, oracle, peers, _, coding_config| async move {
2571                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2572                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2573                let commitment = coded_block.commitment();
2574
2575                let leader = peers[0].public_key.clone();
2576                let receiver_pk = peers[2].public_key.clone();
2577
2578                let non_participant_key = PrivateKey::from_seed(10_000);
2579                let non_participant_pk = non_participant_key.public_key();
2580
2581                let non_participant_control = oracle.control(non_participant_pk.clone());
2582                let (mut non_participant_sender, _non_participant_receiver) =
2583                    non_participant_control
2584                        .register(0, TEST_QUOTA)
2585                        .await
2586                        .expect("registration should succeed");
2587                oracle
2588                    .add_link(
2589                        non_participant_pk.clone(),
2590                        receiver_pk.clone(),
2591                        DEFAULT_LINK,
2592                    )
2593                    .await
2594                    .expect("link should be added");
2595                oracle
2596                    .manager()
2597                    .track(
2598                        2,
2599                        TrackedPeers::new(
2600                            Set::from_iter_dedup(peers.iter().map(|peer| peer.public_key.clone())),
2601                            Set::from_iter_dedup([non_participant_pk.clone()]),
2602                        ),
2603                    )
2604                    .await;
2605                context.sleep(Duration::from_millis(10)).await;
2606
2607                peers[2]
2608                    .mailbox
2609                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2610                    .await;
2611
2612                let peer2_index = peers[2].index.get() as u16;
2613                let shard = coded_block.shard(peer2_index).expect("missing shard");
2614                let shard_bytes = shard.encode();
2615
2616                non_participant_sender
2617                    .send(Recipients::One(receiver_pk), shard_bytes, true)
2618                    .await
2619                    .expect("send failed");
2620                context.sleep(config.link.latency * 2).await;
2621
2622                assert_blocked(&oracle, &peers[2].public_key, &non_participant_pk).await;
2623            },
2624        );
2625    }
2626
2627    #[test_traced]
2628    fn test_preleader_shard_from_non_participant_is_not_buffered() {
2629        let fixture = Fixture::<C>::default();
2630        fixture.start(
2631            |config, context, oracle, peers, _, coding_config| async move {
2632                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2633                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2634                let commitment = coded_block.commitment();
2635
2636                let leader = peers[0].public_key.clone();
2637                let receiver_pk = peers[2].public_key.clone();
2638
2639                let non_participant_key = PrivateKey::from_seed(10_000);
2640                let non_participant_pk = non_participant_key.public_key();
2641
2642                let non_participant_control = oracle.control(non_participant_pk.clone());
2643                let (mut non_participant_sender, _non_participant_receiver) =
2644                    non_participant_control
2645                        .register(0, TEST_QUOTA)
2646                        .await
2647                        .expect("registration should succeed");
2648                oracle
2649                    .add_link(
2650                        non_participant_pk.clone(),
2651                        receiver_pk.clone(),
2652                        DEFAULT_LINK,
2653                    )
2654                    .await
2655                    .expect("link should be added");
2656                oracle
2657                    .manager()
2658                    .track(
2659                        2,
2660                        TrackedPeers::new(
2661                            Set::from_iter_dedup(peers.iter().map(|peer| peer.public_key.clone())),
2662                            Set::from_iter_dedup([non_participant_pk.clone()]),
2663                        ),
2664                    )
2665                    .await;
2666                context.sleep(Duration::from_millis(10)).await;
2667
2668                let peer2_index = peers[2].index.get() as u16;
2669                let shard = coded_block.shard(peer2_index).expect("missing shard");
2670                let shard_bytes = shard.encode();
2671                let mut shard_sub = peers[2]
2672                    .mailbox
2673                    .subscribe_assigned_shard_verified(commitment)
2674                    .await;
2675
2676                non_participant_sender
2677                    .send(Recipients::One(receiver_pk), shard_bytes, true)
2678                    .await
2679                    .expect("send failed");
2680                context.sleep(config.link.latency * 2).await;
2681
2682                peers[2]
2683                    .mailbox
2684                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2685                    .await;
2686                context.sleep(config.link.latency * 2).await;
2687
2688                let blocked = oracle.blocked().await.unwrap();
2689                let non_participant_blocked = blocked
2690                    .iter()
2691                    .any(|(a, b)| a == &peers[2].public_key && b == &non_participant_pk);
2692                assert!(
2693                    !non_participant_blocked,
2694                    "non-participant should not be blocked when its pre-leader shard is ignored"
2695                );
2696                assert!(
2697                    matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
2698                    "pre-leader shard from non-participant should not be buffered"
2699                );
2700            },
2701        );
2702    }
2703
2704    #[test_traced]
2705    fn test_duplicate_shard_ignored() {
2706        // Use 10 peers so minimum_shards=4, giving us time to send duplicate before reconstruction.
2707        let fixture: Fixture<C> = Fixture {
2708            num_peers: 10,
2709            ..Default::default()
2710        };
2711
2712        fixture.start(
2713            |config, context, oracle, mut peers, _, coding_config| async move {
2714                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2715                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2716
2717                // Get peer 2's shard (from the leader).
2718                let peer2_index = peers[2].index.get() as u16;
2719                let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2720
2721                // Get peer 1's shard.
2722                let peer1_index = peers[1].index.get() as u16;
2723                let peer1_shard = coded_block.shard(peer1_index).expect("missing shard");
2724
2725                let peer2_pk = peers[2].public_key.clone();
2726                let leader = peers[0].public_key.clone();
2727
2728                // Inform peer 2 of the leader.
2729                peers[2]
2730                    .mailbox
2731                    .discovered(
2732                        coded_block.commitment(),
2733                        leader,
2734                        Round::new(Epoch::zero(), View::new(1)),
2735                    )
2736                    .await;
2737
2738                // Send peer 2 their shard from the leader (1 checked shard).
2739                let leader_shard_bytes = peer2_shard.encode();
2740                peers[0]
2741                    .sender
2742                    .send(Recipients::One(peer2_pk.clone()), leader_shard_bytes, true)
2743                    .await
2744                    .expect("send failed");
2745                context.sleep(config.link.latency * 2).await;
2746
2747                // Send peer 1's shard to peer 2 (first time - should succeed, 2 checked shards).
2748                let peer1_shard_bytes = peer1_shard.encode();
2749                peers[1]
2750                    .sender
2751                    .send(
2752                        Recipients::One(peer2_pk.clone()),
2753                        peer1_shard_bytes.clone(),
2754                        true,
2755                    )
2756                    .await
2757                    .expect("send failed");
2758                context.sleep(config.link.latency * 2).await;
2759
2760                // Send the same shard again (exact duplicate - should be ignored, not blocked).
2761                // With 10 peers, minimum_shards=4, so we haven't reconstructed yet.
2762                peers[1]
2763                    .sender
2764                    .send(Recipients::One(peer2_pk), peer1_shard_bytes, true)
2765                    .await
2766                    .expect("send failed");
2767                context.sleep(config.link.latency * 2).await;
2768
2769                // Peer 1 should NOT be blocked for sending an identical duplicate.
2770                let blocked_peers = oracle.blocked().await.unwrap();
2771                let is_blocked = blocked_peers
2772                    .iter()
2773                    .any(|(a, b)| a == &peers[2].public_key && b == &peers[1].public_key);
2774                assert!(
2775                    !is_blocked,
2776                    "peer should not be blocked for exact duplicate shard"
2777                );
2778            },
2779        );
2780    }
2781
2782    #[test_traced]
2783    fn test_equivocating_shard_blocks_peer() {
2784        // Use 10 peers so minimum_shards=4, giving us time to send equivocating shard.
2785        let fixture: Fixture<C> = Fixture {
2786            num_peers: 10,
2787            ..Default::default()
2788        };
2789
2790        fixture.start(
2791            |config, context, oracle, mut peers, _, coding_config| async move {
2792                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2793                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
2794
2795                // Create a second block with different payload to get different shard data.
2796                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 200);
2797                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
2798
2799                // Get peer 1's shard from block 1.
2800                let peer1_index = peers[1].index.get() as u16;
2801                let peer1_shard = coded_block1.shard(peer1_index).expect("missing shard");
2802
2803                // Get peer 1's shard from block 2 (different data, same index).
2804                let mut peer1_equivocating_shard =
2805                    coded_block2.shard(peer1_index).expect("missing shard");
2806                // Override the commitment to match block 1 so the shard targets
2807                // the same reconstruction state.
2808                peer1_equivocating_shard.commitment = coded_block1.commitment();
2809
2810                let peer2_pk = peers[2].public_key.clone();
2811                let leader = peers[0].public_key.clone();
2812
2813                // Inform peer 2 of the leader.
2814                peers[2]
2815                    .mailbox
2816                    .discovered(
2817                        coded_block1.commitment(),
2818                        leader,
2819                        Round::new(Epoch::zero(), View::new(1)),
2820                    )
2821                    .await;
2822
2823                // Send peer 2 the leader's shard (verified immediately).
2824                let peer2_index = peers[2].index.get() as u16;
2825                let leader_shard = coded_block1.shard(peer2_index).expect("missing shard");
2826                let leader_shard_bytes = leader_shard.encode();
2827                peers[0]
2828                    .sender
2829                    .send(Recipients::One(peer2_pk.clone()), leader_shard_bytes, true)
2830                    .await
2831                    .expect("send failed");
2832                context.sleep(config.link.latency * 2).await;
2833
2834                // Send peer 1's valid shard to peer 2 (first time - succeeds).
2835                let shard_bytes = peer1_shard.encode();
2836                peers[1]
2837                    .sender
2838                    .send(Recipients::One(peer2_pk.clone()), shard_bytes, true)
2839                    .await
2840                    .expect("send failed");
2841                context.sleep(config.link.latency * 2).await;
2842
2843                // Send a different shard from peer 1 (equivocation - should block).
2844                let equivocating_bytes = peer1_equivocating_shard.encode();
2845                peers[1]
2846                    .sender
2847                    .send(Recipients::One(peer2_pk), equivocating_bytes, true)
2848                    .await
2849                    .expect("send failed");
2850                context.sleep(config.link.latency * 2).await;
2851
2852                // Peer 2 should have blocked peer 1 for equivocation.
2853                assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2854            },
2855        );
2856    }
2857
2858    #[test_traced]
2859    fn test_reconstruction_states_pruned_at_or_below_reconstructed_view() {
2860        // Use 10 peers so minimum_shards=4.
2861        let fixture: Fixture<C> = Fixture {
2862            num_peers: 10,
2863            ..Default::default()
2864        };
2865
2866        fixture.start(
2867            |config, context, oracle, mut peers, _, coding_config| async move {
2868                // Commitment A at lower view (1).
2869                let block_a = CodedBlock::<B, C, H>::new(
2870                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100),
2871                    coding_config,
2872                    &STRATEGY,
2873                );
2874                let commitment_a = block_a.commitment();
2875
2876                // Commitment B at higher view (2), which we will reconstruct.
2877                let block_b = CodedBlock::<B, C, H>::new(
2878                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200),
2879                    coding_config,
2880                    &STRATEGY,
2881                );
2882                let commitment_b = block_b.commitment();
2883
2884                let peer2_pk = peers[2].public_key.clone();
2885                let leader = peers[0].public_key.clone();
2886
2887                // Create state for A and ingest one shard from peer1.
2888                peers[2]
2889                    .mailbox
2890                    .discovered(
2891                        commitment_a,
2892                        leader.clone(),
2893                        Round::new(Epoch::zero(), View::new(1)),
2894                    )
2895                    .await;
2896                let shard_a = block_a
2897                    .shard(peers[1].index.get() as u16)
2898                    .expect("missing shard")
2899                    .encode();
2900                peers[1]
2901                    .sender
2902                    .send(Recipients::One(peer2_pk.clone()), shard_a.clone(), true)
2903                    .await
2904                    .expect("send failed");
2905                context.sleep(config.link.latency * 2).await;
2906
2907                // Create/reconstruct B at higher view.
2908                peers[2]
2909                    .mailbox
2910                    .discovered(
2911                        commitment_b,
2912                        leader,
2913                        Round::new(Epoch::zero(), View::new(2)),
2914                    )
2915                    .await;
2916                // Leader's shard for peer2.
2917                let leader_shard_b = block_b
2918                    .shard(peers[2].index.get() as u16)
2919                    .expect("missing shard")
2920                    .encode();
2921                peers[0]
2922                    .sender
2923                    .send(Recipients::One(peer2_pk.clone()), leader_shard_b, true)
2924                    .await
2925                    .expect("send failed");
2926
2927                // Three shards for minimum threshold (4 total with leader's).
2928                for i in [1usize, 3usize, 4usize] {
2929                    let shard = block_b
2930                        .shard(peers[i].index.get() as u16)
2931                        .expect("missing shard")
2932                        .encode();
2933                    peers[i]
2934                        .sender
2935                        .send(Recipients::One(peer2_pk.clone()), shard, true)
2936                        .await
2937                        .expect("send failed");
2938                }
2939                context.sleep(config.link.latency * 4).await;
2940
2941                // B should reconstruct.
2942                let reconstructed = peers[2]
2943                    .mailbox
2944                    .get(commitment_b)
2945                    .await
2946                    .expect("block B should reconstruct");
2947                assert_eq!(reconstructed.commitment(), commitment_b);
2948
2949                // A state should be pruned (at/below reconstructed view). Sending the same
2950                // shard for A again should NOT be treated as duplicate.
2951                peers[1]
2952                    .sender
2953                    .send(Recipients::One(peer2_pk), shard_a, true)
2954                    .await
2955                    .expect("send failed");
2956                context.sleep(config.link.latency * 2).await;
2957
2958                let blocked = oracle.blocked().await.unwrap();
2959                let blocked_peer1 = blocked
2960                    .iter()
2961                    .any(|(a, b)| a == &peers[2].public_key && b == &peers[1].public_key);
2962                assert!(
2963                    !blocked_peer1,
2964                    "peer1 should not be blocked after lower-view state was pruned"
2965                );
2966            },
2967        );
2968    }
2969
2970    #[test_traced]
2971    fn test_pending_shards_batch_validated_at_quorum() {
2972        // Test that shards buffered in pending_shards are batch-validated once
2973        // the minimum shard threshold is met, enabling reconstruction.
2974        //
2975        // With 10 peers: minimum_shards = (10-1)/3 + 1 = 4
2976        // The leader (peer 0) sends peer 3 their own-index shard (verified
2977        // immediately). Peers 1, 2, 4 send their own shards (buffered in
2978        // pending_shards). Once the leader's shard + 3 pending shards >= 4,
2979        // batch validation fires and reconstruction succeeds.
2980        let fixture: Fixture<C> = Fixture {
2981            num_peers: 10,
2982            ..Default::default()
2983        };
2984
2985        fixture.start(
2986            |config, context, oracle, mut peers, _, coding_config| async move {
2987                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2988                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2989                let commitment = coded_block.commitment();
2990
2991                let peer3_pk = peers[3].public_key.clone();
2992                let leader = peers[0].public_key.clone();
2993
2994                // Inform peer 3 that peer 0 is the leader.
2995                peers[3]
2996                    .mailbox
2997                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2998                    .await;
2999
3000                // Send shards from peers 1, 2, 4 (their own indices).
3001                // These are buffered in pending_shards for batch validation.
3002                for &sender_idx in &[1, 2, 4] {
3003                    let shard = coded_block
3004                        .shard(peers[sender_idx].index.get() as u16)
3005                        .expect("missing shard");
3006                    let shard_bytes = shard.encode();
3007                    peers[sender_idx]
3008                        .sender
3009                        .send(Recipients::One(peer3_pk.clone()), shard_bytes, true)
3010                        .await
3011                        .expect("send failed");
3012                }
3013
3014                context.sleep(config.link.latency * 2).await;
3015
3016                // Block should not be reconstructed yet (no leader shard verified).
3017                let block = peers[3].mailbox.get(commitment).await;
3018                assert!(block.is_none(), "block should not be reconstructed yet");
3019
3020                // Now the leader (peer 0) sends peer 3's own-index shard.
3021                // This is verified immediately, and with the 3 pending shards
3022                // we reach minimum_shards=4 -> batch validation + reconstruction.
3023                let peer3_index = peers[3].index.get() as u16;
3024                let leader_shard = coded_block.shard(peer3_index).expect("missing shard");
3025                let leader_shard_bytes = leader_shard.encode();
3026                peers[0]
3027                    .sender
3028                    .send(Recipients::One(peer3_pk), leader_shard_bytes, true)
3029                    .await
3030                    .expect("send failed");
3031
3032                context.sleep(config.link.latency * 2).await;
3033
3034                // No peers should be blocked (all shards were valid).
3035                let blocked = oracle.blocked().await.unwrap();
3036                assert!(
3037                    blocked.is_empty(),
3038                    "no peers should be blocked for valid pending shards"
3039                );
3040
3041                // Block should now be reconstructed (4 checked shards >= minimum_shards).
3042                let block = peers[3].mailbox.get(commitment).await;
3043                assert!(
3044                    block.is_some(),
3045                    "block should be reconstructed after batch validation"
3046                );
3047
3048                // Verify the reconstructed block has the correct commitment.
3049                let reconstructed = block.unwrap();
3050                assert_eq!(
3051                    reconstructed.commitment(),
3052                    commitment,
3053                    "reconstructed block should have correct commitment"
3054                );
3055            },
3056        );
3057    }
3058
3059    #[test_traced]
3060    fn test_peer_shards_buffered_until_external_proposed() {
3061        // Test that shards received before leader announcement do not progress
3062        // reconstruction until Discovered is delivered.
3063        let fixture: Fixture<C> = Fixture {
3064            num_peers: 10,
3065            ..Default::default()
3066        };
3067
3068        fixture.start(
3069            |config, context, oracle, mut peers, _, coding_config| async move {
3070                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3071                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3072                let commitment = coded_block.commitment();
3073
3074                let receiver_idx = 3usize;
3075                let receiver_pk = peers[receiver_idx].public_key.clone();
3076                let leader = peers[0].public_key.clone();
3077
3078                // Subscribe before any shards arrive.
3079                let mut shard_sub = peers[receiver_idx]
3080                    .mailbox
3081                    .subscribe_assigned_shard_verified(commitment)
3082                    .await;
3083
3084                // Send the leader's shard (for receiver's index) and three shards,
3085                // all before leader announcement.
3086                let leader_shard = coded_block
3087                    .shard(peers[receiver_idx].index.get() as u16)
3088                    .expect("missing shard")
3089                    .encode();
3090                peers[0]
3091                    .sender
3092                    .send(Recipients::One(receiver_pk.clone()), leader_shard, true)
3093                    .await
3094                    .expect("send failed");
3095
3096                for i in [1usize, 2usize, 4usize] {
3097                    let shard = coded_block
3098                        .shard(peers[i].index.get() as u16)
3099                        .expect("missing shard")
3100                        .encode();
3101                    peers[i]
3102                        .sender
3103                        .send(Recipients::One(receiver_pk.clone()), shard, true)
3104                        .await
3105                        .expect("send failed");
3106                }
3107
3108                context.sleep(config.link.latency * 2).await;
3109
3110                // No leader yet: shard subscription should still be pending and block unavailable.
3111                assert!(
3112                    matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
3113                    "shard subscription should not resolve before leader announcement"
3114                );
3115                assert!(
3116                    peers[receiver_idx].mailbox.get(commitment).await.is_none(),
3117                    "block should not reconstruct before leader announcement"
3118                );
3119
3120                // Announce leader, which drains buffered shards and should progress immediately.
3121                peers[receiver_idx]
3122                    .mailbox
3123                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
3124                    .await;
3125
3126                select! {
3127                    _ = shard_sub => {},
3128                    _ = context.sleep(Duration::from_secs(5)) => {
3129                        panic!("shard subscription did not resolve after leader announcement");
3130                    },
3131                }
3132
3133                context.sleep(config.link.latency * 2).await;
3134                assert!(
3135                    peers[receiver_idx].mailbox.get(commitment).await.is_some(),
3136                    "block should reconstruct after buffered shards are ingested"
3137                );
3138
3139                // All shards were valid and from participants.
3140                assert!(
3141                    oracle.blocked().await.unwrap().is_empty(),
3142                    "no peers should be blocked for valid buffered shards"
3143                );
3144            },
3145        );
3146    }
3147
3148    #[test_traced]
3149    fn test_post_leader_shards_processed_immediately() {
3150        // Test that shards arriving after leader announcement are processed
3151        // without waiting for any extra trigger.
3152        let fixture: Fixture<C> = Fixture {
3153            num_peers: 10,
3154            ..Default::default()
3155        };
3156
3157        fixture.start(
3158            |config, context, oracle, mut peers, _, coding_config| async move {
3159                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3160                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3161                let commitment = coded_block.commitment();
3162
3163                let receiver_idx = 3usize;
3164                let receiver_pk = peers[receiver_idx].public_key.clone();
3165                let leader = peers[0].public_key.clone();
3166
3167                let shard_sub = peers[receiver_idx]
3168                    .mailbox
3169                    .subscribe_assigned_shard_verified(commitment)
3170                    .await;
3171                peers[receiver_idx]
3172                    .mailbox
3173                    .discovered(
3174                        commitment,
3175                        leader.clone(),
3176                        Round::new(Epoch::zero(), View::new(1)),
3177                    )
3178                    .await;
3179
3180                // Send leader's shard (for receiver's index) after leader is known.
3181                let leader_shard = coded_block
3182                    .shard(peers[receiver_idx].index.get() as u16)
3183                    .expect("missing shard")
3184                    .encode();
3185                peers[0]
3186                    .sender
3187                    .send(Recipients::One(receiver_pk.clone()), leader_shard, true)
3188                    .await
3189                    .expect("send failed");
3190
3191                // Subscription should resolve from the leader's shard.
3192                select! {
3193                    _ = shard_sub => {},
3194                    _ = context.sleep(Duration::from_secs(5)) => {
3195                        panic!("shard subscription did not resolve after post-leader shard");
3196                    },
3197                }
3198
3199                // Send enough shards after leader known to reconstruct.
3200                for i in [1usize, 2usize, 4usize] {
3201                    let shard = coded_block
3202                        .shard(peers[i].index.get() as u16)
3203                        .expect("missing shard")
3204                        .encode();
3205                    peers[i]
3206                        .sender
3207                        .send(Recipients::One(receiver_pk.clone()), shard, true)
3208                        .await
3209                        .expect("send failed");
3210                }
3211
3212                context.sleep(config.link.latency * 2).await;
3213                let reconstructed = peers[receiver_idx]
3214                    .mailbox
3215                    .get(commitment)
3216                    .await
3217                    .expect("block should reconstruct from post-leader shards");
3218                assert_eq!(reconstructed.commitment(), commitment);
3219
3220                assert!(
3221                    oracle.blocked().await.unwrap().is_empty(),
3222                    "no peers should be blocked for valid post-leader shards"
3223                );
3224            },
3225        );
3226    }
3227
3228    #[test_traced]
3229    fn test_invalid_shard_codec_blocks_peer() {
3230        // Test that receiving an invalid shard (codec failure) blocks the sender.
3231        let fixture: Fixture<C> = Fixture {
3232            num_peers: 4,
3233            ..Default::default()
3234        };
3235
3236        fixture.start(
3237            |config, context, oracle, mut peers, _, _coding_config| async move {
3238                let peer0_pk = peers[0].public_key.clone();
3239                let peer1_pk = peers[1].public_key.clone();
3240
3241                // Send garbage bytes that will fail codec decoding.
3242                let garbage = Bytes::from(vec![0xFF, 0xFE, 0xFD, 0xFC, 0xFB]);
3243                peers[1]
3244                    .sender
3245                    .send(Recipients::One(peer0_pk.clone()), garbage, true)
3246                    .await
3247                    .expect("send failed");
3248
3249                context.sleep(config.link.latency * 2).await;
3250
3251                // Peer 1 should be blocked by peer 0 for sending invalid shard.
3252                assert_blocked(&oracle, &peer0_pk, &peer1_pk).await;
3253            },
3254        );
3255    }
3256
3257    #[test_traced]
3258    fn test_duplicate_buffered_shard_does_not_block_before_leader() {
3259        // Test that duplicate shards before leader announcement are
3260        // buffered and do not immediately block the sender.
3261        let fixture: Fixture<C> = Fixture {
3262            ..Default::default()
3263        };
3264
3265        fixture.start(
3266            |config, context, oracle, mut peers, _, coding_config| async move {
3267                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3268                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3269
3270                // Get peer 2's shard.
3271                let peer2_index = peers[2].index.get() as u16;
3272                let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
3273                let shard_bytes = peer2_shard.encode();
3274
3275                let peer2_pk = peers[2].public_key.clone();
3276
3277                // Do NOT set a leader — shards should be buffered.
3278
3279                // Peer 1 sends the shard to peer 2 (buffered, leader unknown).
3280                peers[1]
3281                    .sender
3282                    .send(Recipients::One(peer2_pk.clone()), shard_bytes.clone(), true)
3283                    .await
3284                    .expect("send failed");
3285                context.sleep(config.link.latency * 2).await;
3286
3287                // No one should be blocked yet.
3288                let blocked = oracle.blocked().await.unwrap();
3289                assert!(blocked.is_empty(), "no peers should be blocked yet");
3290
3291                // Peer 1 sends the same shard AGAIN (duplicate while leader unknown).
3292                peers[1]
3293                    .sender
3294                    .send(Recipients::One(peer2_pk), shard_bytes, true)
3295                    .await
3296                    .expect("send failed");
3297                context.sleep(config.link.latency * 2).await;
3298
3299                // Still no blocking before a leader is known.
3300                let blocked = oracle.blocked().await.unwrap();
3301                assert!(
3302                    blocked.is_empty(),
3303                    "no peers should be blocked before leader"
3304                );
3305            },
3306        );
3307    }
3308
3309    #[test_traced]
3310    fn test_invalid_leader_shard_crypto_blocks_leader() {
3311        // Test that a leader shard failing cryptographic verification
3312        // results in the leader being blocked.
3313        let fixture: Fixture<C> = Fixture {
3314            ..Default::default()
3315        };
3316
3317        fixture.start(
3318            |config, context, oracle, mut peers, _, coding_config| async move {
3319                // Create two different blocks — shard from block2 won't verify
3320                // against commitment from block1.
3321                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3322                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3323                let commitment1 = coded_block1.commitment();
3324
3325                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3326                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3327
3328                // Get peer 2's shard from block2, but re-wrap it with
3329                // block1's commitment so it fails verification.
3330                let peer2_index = peers[2].index.get() as u16;
3331                let mut wrong_shard = coded_block2.shard(peer2_index).expect("missing shard");
3332                wrong_shard.commitment = commitment1;
3333                let wrong_bytes = wrong_shard.encode();
3334
3335                let peer2_pk = peers[2].public_key.clone();
3336                let leader = peers[0].public_key.clone();
3337
3338                // Inform peer 2 that peer 0 is the leader.
3339                peers[2]
3340                    .mailbox
3341                    .discovered(commitment1, leader, Round::new(Epoch::zero(), View::new(1)))
3342                    .await;
3343
3344                // Leader (peer 0) sends the invalid shard.
3345                peers[0]
3346                    .sender
3347                    .send(Recipients::One(peer2_pk), wrong_bytes, true)
3348                    .await
3349                    .expect("send failed");
3350                context.sleep(config.link.latency * 2).await;
3351
3352                // Peer 0 (leader) should be blocked for invalid crypto.
3353                assert_blocked(&oracle, &peers[2].public_key, &peers[0].public_key).await;
3354            },
3355        );
3356    }
3357
3358    #[test_traced]
3359    fn test_shard_index_mismatch_blocks_peer() {
3360        // Test that a shard whose shard index doesn't match the sender's
3361        // participant index results in blocking the sender.
3362        let fixture: Fixture<C> = Fixture {
3363            num_peers: 10,
3364            ..Default::default()
3365        };
3366
3367        fixture.start(
3368            |config, context, oracle, mut peers, _, coding_config| async move {
3369                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3370                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3371                let commitment = coded_block.commitment();
3372
3373                // Get peer 3's leader shard so peer 3 can validate shards.
3374                let peer3_index = peers[3].index.get() as u16;
3375                let leader_shard = coded_block.shard(peer3_index).expect("missing shard");
3376
3377                // Get peer 1's valid shard, then change the index to peer 4's index.
3378                let peer1_index = peers[1].index.get() as u16;
3379                let mut wrong_index_shard = coded_block.shard(peer1_index).expect("missing shard");
3380                // Mutate the index so it doesn't match sender (peer 1).
3381                wrong_index_shard.index = peers[4].index.get() as u16;
3382                let wrong_bytes = wrong_index_shard.encode();
3383
3384                let peer3_pk = peers[3].public_key.clone();
3385                let leader = peers[0].public_key.clone();
3386
3387                // Inform peer 3 of the leader and send them the leader shard.
3388                peers[3]
3389                    .mailbox
3390                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
3391                    .await;
3392                let shard_bytes = leader_shard.encode();
3393                peers[0]
3394                    .sender
3395                    .send(Recipients::One(peer3_pk.clone()), shard_bytes, true)
3396                    .await
3397                    .expect("send failed");
3398                context.sleep(config.link.latency * 2).await;
3399
3400                // Peer 1 sends a shard with a mismatched index to peer 3.
3401                peers[1]
3402                    .sender
3403                    .send(Recipients::One(peer3_pk), wrong_bytes, true)
3404                    .await
3405                    .expect("send failed");
3406                context.sleep(config.link.latency * 2).await;
3407
3408                // Peer 1 should be blocked for shard index mismatch.
3409                assert_blocked(&oracle, &peers[3].public_key, &peers[1].public_key).await;
3410            },
3411        );
3412    }
3413
3414    #[test_traced]
3415    fn test_invalid_shard_crypto_blocks_peer() {
3416        // Test that a shard failing cryptographic verification
3417        // results in blocking the sender once batch validation fires at quorum.
3418        let fixture: Fixture<C> = Fixture {
3419            num_peers: 10,
3420            ..Default::default()
3421        };
3422
3423        fixture.start(
3424            |config, context, oracle, mut peers, _, coding_config| async move {
3425                // Create two different blocks.
3426                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3427                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3428                let commitment1 = coded_block1.commitment();
3429
3430                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3431                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3432
3433                // Get peer 3's leader shard from block1 (valid).
3434                let peer3_index = peers[3].index.get() as u16;
3435                let leader_shard = coded_block1.shard(peer3_index).expect("missing shard");
3436
3437                // Get peer 1's shard from block2, but re-wrap with block1's
3438                // commitment so verification fails.
3439                let peer1_index = peers[1].index.get() as u16;
3440                let mut wrong_shard = coded_block2.shard(peer1_index).expect("missing shard");
3441                wrong_shard.commitment = commitment1;
3442                let wrong_bytes = wrong_shard.encode();
3443
3444                let peer3_pk = peers[3].public_key.clone();
3445                let leader = peers[0].public_key.clone();
3446
3447                // Inform peer 3 of the leader and send the valid leader shard.
3448                peers[3]
3449                    .mailbox
3450                    .discovered(commitment1, leader, Round::new(Epoch::zero(), View::new(1)))
3451                    .await;
3452                let shard_bytes = leader_shard.encode();
3453                peers[0]
3454                    .sender
3455                    .send(Recipients::One(peer3_pk.clone()), shard_bytes, true)
3456                    .await
3457                    .expect("send failed");
3458                context.sleep(config.link.latency * 2).await;
3459
3460                // Peer 1 sends the invalid shard.
3461                peers[1]
3462                    .sender
3463                    .send(Recipients::One(peer3_pk.clone()), wrong_bytes, true)
3464                    .await
3465                    .expect("send failed");
3466                context.sleep(config.link.latency * 2).await;
3467
3468                // No block yet: batch validation deferred until quorum.
3469                // Send valid shards from peers 2 and 4 to reach quorum
3470                // (minimum_shards = 4: 1 leader + 3 pending).
3471                for &idx in &[2, 4] {
3472                    let peer_index = peers[idx].index.get() as u16;
3473                    let shard = coded_block1.shard(peer_index).expect("missing shard");
3474                    let bytes = shard.encode();
3475                    peers[idx]
3476                        .sender
3477                        .send(Recipients::One(peer3_pk.clone()), bytes, true)
3478                        .await
3479                        .expect("send failed");
3480                }
3481                context.sleep(config.link.latency * 2).await;
3482
3483                // Peer 1 should be blocked for invalid shard crypto.
3484                assert_blocked(&oracle, &peers[3].public_key, &peers[1].public_key).await;
3485            },
3486        );
3487    }
3488
3489    #[test_traced]
3490    fn test_reconstruction_recovers_after_quorum_with_one_invalid_shard() {
3491        // With 10 peers, minimum_shards=4.
3492        // Contribute exactly 4 shards first (1 leader + 3 pending), with one invalid:
3493        // quorum is reached, but checked_shards stays at 3 after batch validation.
3494        // Then send one more valid shard to meet reconstruction threshold.
3495        let fixture: Fixture<C> = Fixture {
3496            num_peers: 10,
3497            ..Default::default()
3498        };
3499
3500        fixture.start(
3501            |config, context, oracle, mut peers, _, coding_config| async move {
3502                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3503                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3504                let commitment1 = coded_block1.commitment();
3505
3506                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3507                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3508
3509                let receiver_idx = 3usize;
3510                let receiver_pk = peers[receiver_idx].public_key.clone();
3511
3512                // Prepare one invalid shard: shard data from block2, commitment from block1.
3513                let peer1_index = peers[1].index.get() as u16;
3514                let mut invalid_shard = coded_block2.shard(peer1_index).expect("missing shard");
3515                invalid_shard.commitment = commitment1;
3516
3517                // Announce leader and deliver receiver's leader shard.
3518                let leader = peers[0].public_key.clone();
3519                peers[receiver_idx]
3520                    .mailbox
3521                    .discovered(commitment1, leader, Round::new(Epoch::zero(), View::new(1)))
3522                    .await;
3523                let leader_shard = coded_block1
3524                    .shard(peers[receiver_idx].index.get() as u16)
3525                    .expect("missing shard")
3526                    .encode();
3527                peers[0]
3528                    .sender
3529                    .send(Recipients::One(receiver_pk.clone()), leader_shard, true)
3530                    .await
3531                    .expect("send failed");
3532
3533                // Contribute exactly minimum_shards total:
3534                // - invalid shard from peer1
3535                // - valid shard from peer2
3536                // - valid shard from peer4
3537                peers[1]
3538                    .sender
3539                    .send(
3540                        Recipients::One(receiver_pk.clone()),
3541                        invalid_shard.encode(),
3542                        true,
3543                    )
3544                    .await
3545                    .expect("send failed");
3546                for idx in [2usize, 4usize] {
3547                    let shard = coded_block1
3548                        .shard(peers[idx].index.get() as u16)
3549                        .expect("missing shard")
3550                        .encode();
3551                    peers[idx]
3552                        .sender
3553                        .send(Recipients::One(receiver_pk.clone()), shard, true)
3554                        .await
3555                        .expect("send failed");
3556                }
3557
3558                context.sleep(config.link.latency * 2).await;
3559
3560                // Invalid shard should be blocked, and reconstruction should not happen yet.
3561                assert_blocked(
3562                    &oracle,
3563                    &peers[receiver_idx].public_key,
3564                    &peers[1].public_key,
3565                )
3566                .await;
3567                assert!(
3568                    peers[receiver_idx].mailbox.get(commitment1).await.is_none(),
3569                    "block should not reconstruct with only 3 checked shards"
3570                );
3571
3572                // Send one additional valid shard; this should now satisfy checked threshold.
3573                let extra_shard = coded_block1
3574                    .shard(peers[5].index.get() as u16)
3575                    .expect("missing shard")
3576                    .encode();
3577                peers[5]
3578                    .sender
3579                    .send(Recipients::One(receiver_pk), extra_shard, true)
3580                    .await
3581                    .expect("send failed");
3582
3583                context.sleep(config.link.latency * 2).await;
3584
3585                let reconstructed = peers[receiver_idx]
3586                    .mailbox
3587                    .get(commitment1)
3588                    .await
3589                    .expect("block should reconstruct after additional valid shard");
3590                assert_eq!(reconstructed.commitment(), commitment1);
3591            },
3592        );
3593    }
3594
3595    #[test_traced]
3596    fn test_invalid_pending_shard_blocked_on_drain() {
3597        // Test that a shard buffered in pending shards (before checking data) is
3598        // blocked when batch validation runs at quorum and verification fails.
3599        let fixture: Fixture<C> = Fixture {
3600            num_peers: 10,
3601            ..Default::default()
3602        };
3603
3604        fixture.start(
3605            |config, context, oracle, mut peers, _, coding_config| async move {
3606                // Create two different blocks.
3607                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3608                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3609                let commitment1 = coded_block1.commitment();
3610
3611                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3612                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3613
3614                // Get peer 1's shard from block2, but wrap with block1's commitment.
3615                let peer1_index = peers[1].index.get() as u16;
3616                let mut wrong_shard = coded_block2.shard(peer1_index).expect("missing shard");
3617                wrong_shard.commitment = commitment1;
3618                let wrong_bytes = wrong_shard.encode();
3619
3620                let peer3_pk = peers[3].public_key.clone();
3621
3622                // Send the invalid shard BEFORE the leader shard (no checking data yet,
3623                // so it gets buffered in pending shards).
3624                peers[1]
3625                    .sender
3626                    .send(Recipients::One(peer3_pk.clone()), wrong_bytes, true)
3627                    .await
3628                    .expect("send failed");
3629                context.sleep(config.link.latency * 2).await;
3630
3631                // No one should be blocked yet (shard is buffered).
3632                let blocked = oracle.blocked().await.unwrap();
3633                assert!(blocked.is_empty(), "no peers should be blocked yet");
3634
3635                // Send valid shards from peers 2 and 4 so the pending count
3636                // reaches quorum once the leader shard arrives
3637                // (minimum_shards = 4: 1 leader + 3 pending).
3638                for &idx in &[2, 4] {
3639                    let peer_index = peers[idx].index.get() as u16;
3640                    let shard = coded_block1.shard(peer_index).expect("missing shard");
3641                    let bytes = shard.encode();
3642                    peers[idx]
3643                        .sender
3644                        .send(Recipients::One(peer3_pk.clone()), bytes, true)
3645                        .await
3646                        .expect("send failed");
3647                }
3648                context.sleep(config.link.latency * 2).await;
3649
3650                // No one should be blocked yet (all shards are buffered pending leader).
3651                let blocked = oracle.blocked().await.unwrap();
3652                assert!(blocked.is_empty(), "no peers should be blocked yet");
3653
3654                // Now inform peer 3 of the leader and send the valid leader shard.
3655                let leader = peers[0].public_key.clone();
3656                peers[3]
3657                    .mailbox
3658                    .discovered(commitment1, leader, Round::new(Epoch::zero(), View::new(1)))
3659                    .await;
3660                let peer3_index = peers[3].index.get() as u16;
3661                let leader_shard = coded_block1.shard(peer3_index).expect("missing shard");
3662                let shard_bytes = leader_shard.encode();
3663                peers[0]
3664                    .sender
3665                    .send(Recipients::One(peer3_pk), shard_bytes, true)
3666                    .await
3667                    .expect("send failed");
3668                context.sleep(config.link.latency * 2).await;
3669
3670                // Peer 1 should be blocked after batch validation validates and
3671                // rejects their invalid shard.
3672                assert_blocked(&oracle, &peers[3].public_key, &peers[1].public_key).await;
3673            },
3674        );
3675    }
3676
3677    #[test_traced]
3678    fn test_cross_epoch_buffered_shard_not_blocked() {
3679        let executor = deterministic::Runner::default();
3680        executor.start(|context| async move {
3681            let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
3682                context.with_label("network"),
3683                simulated::Config {
3684                    max_size: MAX_SHARD_SIZE as u32,
3685                    disconnect_on_block: true,
3686                    tracked_peer_sets: NZUsize!(1),
3687                },
3688            );
3689            network.start();
3690
3691            // Epoch 0 participants: peers 0..4 (seeds 0..4).
3692            // Epoch 1 participants: peers 0..3 + peer 4 (seed 4 replaces seed 3).
3693            let mut epoch0_keys: Vec<PrivateKey> = (0..4).map(PrivateKey::from_seed).collect();
3694            epoch0_keys.sort_by_key(|s| s.public_key());
3695            let epoch0_pks: Vec<P> = epoch0_keys.iter().map(|c| c.public_key()).collect();
3696            let epoch0_set: Set<P> = Set::from_iter_dedup(epoch0_pks.clone());
3697
3698            let future_peer_key = PrivateKey::from_seed(4);
3699            let future_peer_pk = future_peer_key.public_key();
3700            let mut epoch1_pks: Vec<P> = epoch0_pks[..3]
3701                .iter()
3702                .cloned()
3703                .chain(std::iter::once(future_peer_pk.clone()))
3704                .collect();
3705            epoch1_pks.sort();
3706            let epoch1_set: Set<P> = Set::from_iter_dedup(epoch1_pks);
3707
3708            let receiver_idx_in_epoch0 = epoch0_set
3709                .index(&epoch0_pks[0])
3710                .expect("receiver must be in epoch 0")
3711                .get() as usize;
3712            let receiver_key = epoch0_keys[receiver_idx_in_epoch0].clone();
3713            let receiver_pk = receiver_key.public_key();
3714
3715            let receiver_control = oracle.control(receiver_pk.clone());
3716            let (sender_handle, receiver_handle) = receiver_control
3717                .register(0, TEST_QUOTA)
3718                .await
3719                .expect("registration should succeed");
3720
3721            let future_peer_control = oracle.control(future_peer_pk.clone());
3722            let (mut future_peer_sender, _future_peer_receiver) = future_peer_control
3723                .register(0, TEST_QUOTA)
3724                .await
3725                .expect("registration should succeed");
3726            oracle
3727                .add_link(future_peer_pk.clone(), receiver_pk.clone(), DEFAULT_LINK)
3728                .await
3729                .expect("link should be added");
3730            oracle
3731                .manager()
3732                .track(
3733                    0,
3734                    Set::from_iter_dedup([receiver_pk.clone(), future_peer_pk.clone()]),
3735                )
3736                .await;
3737            context.sleep(Duration::from_millis(10)).await;
3738
3739            // Set up the receiver's engine with a multi-epoch provider.
3740            let scheme_epoch0 =
3741                Scheme::signer(SCHEME_NAMESPACE, epoch0_set.clone(), receiver_key.clone())
3742                    .expect("signer scheme should be created");
3743            let scheme_epoch1 =
3744                Scheme::signer(SCHEME_NAMESPACE, epoch1_set.clone(), receiver_key.clone())
3745                    .expect("signer scheme should be created");
3746            let scheme_provider =
3747                MultiEpochProvider::single(scheme_epoch0).with_epoch(Epoch::new(1), scheme_epoch1);
3748
3749            let config: Config<_, _, _, _, C, _, _, _> = Config {
3750                scheme_provider,
3751                blocker: receiver_control.clone(),
3752                shard_codec_cfg: CodecConfig {
3753                    maximum_shard_size: MAX_SHARD_SIZE,
3754                },
3755                block_codec_cfg: (),
3756                strategy: STRATEGY,
3757                mailbox_size: 1024,
3758                peer_buffer_size: NZUsize!(64),
3759                background_channel_capacity: 1024,
3760                peer_provider: oracle.manager(),
3761            };
3762
3763            let (engine, mailbox) = ShardEngine::new(context.with_label("receiver"), config);
3764            engine.start((sender_handle, receiver_handle));
3765
3766            // Build a coded block using epoch 1's participant set.
3767            let coding_config = coding_config_for_participants(epoch1_set.len() as u16);
3768            let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3769            let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3770            let commitment = coded_block.commitment();
3771
3772            // The future peer creates a shard at their epoch 1 index.
3773            let future_peer_index = epoch1_set
3774                .index(&future_peer_pk)
3775                .expect("future peer must be in epoch 1");
3776            let future_shard = coded_block
3777                .shard(future_peer_index.get() as u16)
3778                .expect("missing shard");
3779            let shard_bytes = future_shard.encode();
3780
3781            // Send the shard BEFORE external_proposed (goes to pre-leader buffer).
3782            future_peer_sender
3783                .send(Recipients::One(receiver_pk.clone()), shard_bytes, true)
3784                .await
3785                .expect("send failed");
3786            context.sleep(DEFAULT_LINK.latency * 2).await;
3787
3788            // No one should be blocked yet (shard is buffered, leader unknown).
3789            let blocked = oracle.blocked().await.unwrap();
3790            assert!(
3791                blocked.is_empty(),
3792                "no peers should be blocked while shard is buffered"
3793            );
3794
3795            // Announce the leader with an epoch 1 round.
3796            let leader = epoch0_pks[1].clone();
3797            mailbox
3798                .discovered(commitment, leader, Round::new(Epoch::new(1), View::new(1)))
3799                .await;
3800            context.sleep(DEFAULT_LINK.latency * 2).await;
3801
3802            // The future peer is a valid participant in epoch 1, so they must NOT
3803            // be blocked after their buffered shard is ingested.
3804            let blocked = oracle.blocked().await.unwrap();
3805            assert!(
3806                blocked.is_empty(),
3807                "future-epoch participant should not be blocked: {blocked:?}"
3808            );
3809        });
3810    }
3811
3812    #[test_traced]
3813    fn test_shard_broadcast_survives_provider_churn() {
3814        let executor = deterministic::Runner::default();
3815        executor.start(|context| async move {
3816            let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
3817                context.with_label("network"),
3818                simulated::Config {
3819                    max_size: MAX_SHARD_SIZE as u32,
3820                    disconnect_on_block: true,
3821                    tracked_peer_sets: NZUsize!(1),
3822                },
3823            );
3824            network.start();
3825
3826            let mut private_keys: Vec<PrivateKey> = (0..4).map(PrivateKey::from_seed).collect();
3827            private_keys.sort_by_key(|s| s.public_key());
3828            let peer_keys: Vec<P> = private_keys.iter().map(|k| k.public_key()).collect();
3829            let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
3830
3831            let leader_idx = 0usize;
3832            let broadcaster_idx = 1usize;
3833            let receiver_idx = 2usize;
3834
3835            let leader_pk = peer_keys[leader_idx].clone();
3836            let broadcaster_pk = peer_keys[broadcaster_idx].clone();
3837            let receiver_pk = peer_keys[receiver_idx].clone();
3838
3839            let mut registrations = BTreeMap::new();
3840            for key in &peer_keys {
3841                let control = oracle.control(key.clone());
3842                let (sender, receiver) = control
3843                    .register(0, TEST_QUOTA)
3844                    .await
3845                    .expect("registration should succeed");
3846                registrations.insert(key.clone(), (control, sender, receiver));
3847            }
3848
3849            for src in &peer_keys {
3850                for dst in &peer_keys {
3851                    if src == dst {
3852                        continue;
3853                    }
3854                    oracle
3855                        .add_link(src.clone(), dst.clone(), DEFAULT_LINK)
3856                        .await
3857                        .expect("link should be added");
3858                }
3859            }
3860            oracle.manager().track(0, participants.clone()).await;
3861            context.sleep(Duration::from_millis(10)).await;
3862
3863            let (_leader_control, mut leader_sender, _leader_receiver) = registrations
3864                .remove(&leader_pk)
3865                .expect("leader should be registered");
3866            let (broadcaster_control, broadcaster_sender, broadcaster_receiver) = registrations
3867                .remove(&broadcaster_pk)
3868                .expect("broadcaster should be registered");
3869            let (receiver_control, receiver_sender, receiver_receiver) = registrations
3870                .remove(&receiver_pk)
3871                .expect("receiver should be registered");
3872
3873            let broadcaster_scheme = Scheme::signer(
3874                SCHEME_NAMESPACE,
3875                participants.clone(),
3876                private_keys[broadcaster_idx].clone(),
3877            )
3878            .expect("signer scheme should be created");
3879            // `discovered` performs two scoped lookups (`handle_external_proposal`
3880            // and `ingest_buffered_shards`). Leader-shard validation is the third.
3881            // Any additional lookup for epoch 0 churns to `None`.
3882            let broadcaster_provider = ChurningProvider::new(broadcaster_scheme, 3);
3883            let broadcaster_config: Config<_, _, _, _, C, _, _, _> = Config {
3884                scheme_provider: broadcaster_provider,
3885                blocker: broadcaster_control.clone(),
3886                shard_codec_cfg: CodecConfig {
3887                    maximum_shard_size: MAX_SHARD_SIZE,
3888                },
3889                block_codec_cfg: (),
3890                strategy: STRATEGY,
3891                mailbox_size: 1024,
3892                peer_buffer_size: NZUsize!(64),
3893                background_channel_capacity: 1024,
3894                peer_provider: oracle.manager(),
3895            };
3896            let (broadcaster_engine, broadcaster_mailbox) =
3897                ChurningShardEngine::new(context.with_label("broadcaster"), broadcaster_config);
3898            broadcaster_engine.start((broadcaster_sender, broadcaster_receiver));
3899
3900            let receiver_scheme = Scheme::signer(
3901                SCHEME_NAMESPACE,
3902                participants.clone(),
3903                private_keys[receiver_idx].clone(),
3904            )
3905            .expect("signer scheme should be created");
3906            let receiver_config: Config<_, _, _, _, C, _, _, _> = Config {
3907                scheme_provider: MultiEpochProvider::single(receiver_scheme),
3908                blocker: receiver_control.clone(),
3909                shard_codec_cfg: CodecConfig {
3910                    maximum_shard_size: MAX_SHARD_SIZE,
3911                },
3912                block_codec_cfg: (),
3913                strategy: STRATEGY,
3914                mailbox_size: 1024,
3915                peer_buffer_size: NZUsize!(64),
3916                background_channel_capacity: 1024,
3917                peer_provider: oracle.manager(),
3918            };
3919            let (receiver_engine, receiver_mailbox) =
3920                ShardEngine::new(context.with_label("receiver"), receiver_config);
3921            receiver_engine.start((receiver_sender, receiver_receiver));
3922
3923            let coding_config = coding_config_for_participants(peer_keys.len() as u16);
3924            let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3925            let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3926            let commitment = coded_block.commitment();
3927            let round = Round::new(Epoch::zero(), View::new(1));
3928
3929            broadcaster_mailbox
3930                .discovered(commitment, leader_pk.clone(), round)
3931                .await;
3932            receiver_mailbox
3933                .discovered(commitment, leader_pk.clone(), round)
3934                .await;
3935            context.sleep(DEFAULT_LINK.latency).await;
3936
3937            let broadcaster_index = participants
3938                .index(&broadcaster_pk)
3939                .expect("broadcaster must be a participant")
3940                .get() as u16;
3941            let broadcaster_shard = coded_block
3942                .shard(broadcaster_index)
3943                .expect("missing shard")
3944                .encode();
3945            leader_sender
3946                .send(Recipients::One(broadcaster_pk), broadcaster_shard, true)
3947                .await
3948                .expect("send failed");
3949
3950            let receiver_index = participants
3951                .index(&receiver_pk)
3952                .expect("receiver must be a participant")
3953                .get() as u16;
3954            let receiver_shard = coded_block
3955                .shard(receiver_index)
3956                .expect("missing shard")
3957                .encode();
3958            leader_sender
3959                .send(Recipients::One(receiver_pk.clone()), receiver_shard, true)
3960                .await
3961                .expect("send failed");
3962
3963            context.sleep(DEFAULT_LINK.latency * 3).await;
3964
3965            let reconstructed = receiver_mailbox.get(commitment).await;
3966            assert!(
3967                reconstructed.is_some(),
3968                "receiver should reconstruct after broadcaster validates and broadcasts shard"
3969            );
3970        });
3971    }
3972
3973    #[test_traced]
3974    fn test_failed_reconstruction_digest_mismatch_then_recovery() {
3975        // Byzantine scenario: all shards pass coding verification (correct root) but the
3976        // decoded blob has a different digest than what the commitment claims. This triggers
3977        // Error::DigestMismatch in try_reconstruct. Verify that:
3978        //   1. The failed commitment's state is cleaned up
3979        //   2. Subscriptions for the failed commitment never resolve
3980        //   3. A subsequent valid commitment reconstructs successfully
3981        let fixture: Fixture<C> = Fixture {
3982            num_peers: 10,
3983            ..Default::default()
3984        };
3985
3986        fixture.start(
3987            |config, context, _oracle, mut peers, _, coding_config| async move {
3988                // Block 1: the "claimed" block (its digest goes in the fake commitment).
3989                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3990                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3991
3992                // Block 2: the actual data behind the shards.
3993                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3994                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3995                let real_commitment2 = coded_block2.commitment();
3996
3997                // Build a fake commitment: block1's digest + block2's coding root/context/config.
3998                // Shards from block2 will verify against block2's root (present in the fake
3999                // commitment), but try_reconstruct will decode block2 and find its digest != D1.
4000                let fake_commitment = Commitment::from((
4001                    coded_block1.digest(),
4002                    real_commitment2.root::<Sha256Digest>(),
4003                    real_commitment2.context::<Sha256Digest>(),
4004                    coding_config,
4005                ));
4006
4007                let receiver_idx = 3usize;
4008                let receiver_pk = peers[receiver_idx].public_key.clone();
4009                let leader = peers[0].public_key.clone();
4010                let round = Round::new(Epoch::zero(), View::new(1));
4011
4012                // Discover the fake commitment.
4013                peers[receiver_idx]
4014                    .mailbox
4015                    .discovered(fake_commitment, leader.clone(), round)
4016                    .await;
4017
4018                // Open a block subscription before sending shards.
4019                let mut block_sub = peers[receiver_idx].mailbox.subscribe(fake_commitment).await;
4020                let mut digest_sub = peers[receiver_idx]
4021                    .mailbox
4022                    .subscribe_by_digest(coded_block1.digest())
4023                    .await;
4024
4025                // Send the receiver's shard (from block2, with fake commitment).
4026                let receiver_shard_idx = peers[receiver_idx].index.get() as u16;
4027                let mut leader_shard = coded_block2
4028                    .shard(receiver_shard_idx)
4029                    .expect("missing shard");
4030                leader_shard.commitment = fake_commitment;
4031                peers[0]
4032                    .sender
4033                    .send(
4034                        Recipients::One(receiver_pk.clone()),
4035                        leader_shard.encode(),
4036                        true,
4037                    )
4038                    .await
4039                    .expect("send failed");
4040
4041                // Send enough shards to reach minimum_shards (4 for 10 peers).
4042                // Need 3 more shards after the leader's shard.
4043                for &idx in &[1usize, 2, 4] {
4044                    let peer_shard_idx = peers[idx].index.get() as u16;
4045                    let mut shard = coded_block2.shard(peer_shard_idx).expect("missing shard");
4046                    shard.commitment = fake_commitment;
4047                    peers[idx]
4048                        .sender
4049                        .send(Recipients::One(receiver_pk.clone()), shard.encode(), true)
4050                        .await
4051                        .expect("send failed");
4052                }
4053
4054                context.sleep(config.link.latency * 2).await;
4055
4056                // Reconstruction should have failed with DigestMismatch.
4057                // State for fake_commitment should be removed (engine.rs:792).
4058                assert!(
4059                    peers[receiver_idx]
4060                        .mailbox
4061                        .get(fake_commitment)
4062                        .await
4063                        .is_none(),
4064                    "block should not be available after DigestMismatch"
4065                );
4066
4067                // Block subscription should be closed after failed reconstruction cleanup.
4068                assert!(
4069                    matches!(block_sub.try_recv(), Err(TryRecvError::Closed)),
4070                    "subscription should close for failed reconstruction"
4071                );
4072                assert!(
4073                    matches!(digest_sub.try_recv(), Err(TryRecvError::Closed)),
4074                    "digest subscription should close after failed reconstruction"
4075                );
4076
4077                // Now verify the engine is not stuck: send valid shards for block1's real
4078                // commitment and confirm reconstruction succeeds.
4079                let real_commitment1 = coded_block1.commitment();
4080                let round2 = Round::new(Epoch::zero(), View::new(2));
4081                peers[receiver_idx]
4082                    .mailbox
4083                    .discovered(real_commitment1, leader.clone(), round2)
4084                    .await;
4085
4086                let leader_shard1 = coded_block1
4087                    .shard(receiver_shard_idx)
4088                    .expect("missing shard");
4089                peers[0]
4090                    .sender
4091                    .send(
4092                        Recipients::One(receiver_pk.clone()),
4093                        leader_shard1.encode(),
4094                        true,
4095                    )
4096                    .await
4097                    .expect("send failed");
4098
4099                for &idx in &[1usize, 2, 4] {
4100                    let peer_shard_idx = peers[idx].index.get() as u16;
4101                    let shard = coded_block1.shard(peer_shard_idx).expect("missing shard");
4102                    peers[idx]
4103                        .sender
4104                        .send(Recipients::One(receiver_pk.clone()), shard.encode(), true)
4105                        .await
4106                        .expect("send failed");
4107                }
4108
4109                context.sleep(config.link.latency * 2).await;
4110
4111                let reconstructed = peers[receiver_idx]
4112                    .mailbox
4113                    .get(real_commitment1)
4114                    .await
4115                    .expect("valid block should reconstruct after prior failure");
4116                assert_eq!(reconstructed.commitment(), real_commitment1);
4117            },
4118        );
4119    }
4120
4121    #[test_traced]
4122    fn test_failed_reconstruction_context_mismatch_then_recovery() {
4123        // Byzantine scenario: shards decode to a block whose digest and coding root/config
4124        // match the commitment, but the commitment carries a mismatched context digest.
4125        // The engine must reject reconstruction and keep the commitment unresolved.
4126        let fixture: Fixture<C> = Fixture {
4127            num_peers: 10,
4128            ..Default::default()
4129        };
4130
4131        fixture.start(
4132            |config, context, _oracle, mut peers, _, coding_config| async move {
4133                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4134                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4135                let real_commitment = coded_block.commitment();
4136
4137                let wrong_context_digest = Sha256::hash(b"wrong_context");
4138                assert_ne!(
4139                    real_commitment.context::<Sha256Digest>(),
4140                    wrong_context_digest,
4141                    "test requires a distinct context digest"
4142                );
4143                let fake_commitment = Commitment::from((
4144                    coded_block.digest(),
4145                    real_commitment.root::<Sha256Digest>(),
4146                    wrong_context_digest,
4147                    coding_config,
4148                ));
4149
4150                let receiver_idx = 3usize;
4151                let receiver_pk = peers[receiver_idx].public_key.clone();
4152                let leader = peers[0].public_key.clone();
4153                let round = Round::new(Epoch::zero(), View::new(1));
4154
4155                peers[receiver_idx]
4156                    .mailbox
4157                    .discovered(fake_commitment, leader.clone(), round)
4158                    .await;
4159                let mut block_sub = peers[receiver_idx].mailbox.subscribe(fake_commitment).await;
4160
4161                let receiver_shard_idx = peers[receiver_idx].index.get() as u16;
4162                let mut leader_shard = coded_block
4163                    .shard(receiver_shard_idx)
4164                    .expect("missing shard");
4165                leader_shard.commitment = fake_commitment;
4166                peers[0]
4167                    .sender
4168                    .send(
4169                        Recipients::One(receiver_pk.clone()),
4170                        leader_shard.encode(),
4171                        true,
4172                    )
4173                    .await
4174                    .expect("send failed");
4175
4176                for &idx in &[1usize, 2, 4] {
4177                    let peer_shard_idx = peers[idx].index.get() as u16;
4178                    let mut shard = coded_block.shard(peer_shard_idx).expect("missing shard");
4179                    shard.commitment = fake_commitment;
4180                    peers[idx]
4181                        .sender
4182                        .send(Recipients::One(receiver_pk.clone()), shard.encode(), true)
4183                        .await
4184                        .expect("send failed");
4185                }
4186
4187                context.sleep(config.link.latency * 2).await;
4188
4189                assert!(
4190                    peers[receiver_idx]
4191                        .mailbox
4192                        .get(fake_commitment)
4193                        .await
4194                        .is_none(),
4195                    "block should not be available after ContextMismatch"
4196                );
4197                assert!(
4198                    matches!(block_sub.try_recv(), Err(TryRecvError::Closed)),
4199                    "subscription should close for context-mismatched commitment"
4200                );
4201
4202                // Verify the receiver still reconstructs valid commitments afterward.
4203                let round2 = Round::new(Epoch::zero(), View::new(2));
4204                peers[receiver_idx]
4205                    .mailbox
4206                    .discovered(real_commitment, leader.clone(), round2)
4207                    .await;
4208
4209                let real_leader_shard = coded_block
4210                    .shard(receiver_shard_idx)
4211                    .expect("missing shard");
4212                peers[0]
4213                    .sender
4214                    .send(
4215                        Recipients::One(receiver_pk.clone()),
4216                        real_leader_shard.encode(),
4217                        true,
4218                    )
4219                    .await
4220                    .expect("send failed");
4221
4222                for &idx in &[1usize, 2, 4] {
4223                    let peer_shard_idx = peers[idx].index.get() as u16;
4224                    let shard = coded_block.shard(peer_shard_idx).expect("missing shard");
4225                    peers[idx]
4226                        .sender
4227                        .send(Recipients::One(receiver_pk.clone()), shard.encode(), true)
4228                        .await
4229                        .expect("send failed");
4230                }
4231
4232                context.sleep(config.link.latency * 2).await;
4233
4234                let reconstructed = peers[receiver_idx]
4235                    .mailbox
4236                    .get(real_commitment)
4237                    .await
4238                    .expect("valid block should reconstruct after prior context mismatch");
4239                assert_eq!(reconstructed.commitment(), real_commitment);
4240            },
4241        );
4242    }
4243
4244    #[test_traced]
4245    fn test_same_round_equivocation_preserves_certifiable_recovery() {
4246        // Regression coverage for same-round leader equivocation:
4247        // - leader equivocates across two commitments in the same round
4248        // - we receive a shard for commitment B (the certifiable one)
4249        // - commitment A reconstructs first
4250        // - commitment B must still remain recoverable
4251        let fixture: Fixture<C> = Fixture {
4252            num_peers: 10,
4253            ..Default::default()
4254        };
4255
4256        fixture.start(
4257            |config, context, _oracle, mut peers, _, coding_config| async move {
4258                let receiver_idx = 3usize;
4259                let receiver_pk = peers[receiver_idx].public_key.clone();
4260                let receiver_shard_idx = peers[receiver_idx].index.get() as u16;
4261
4262                let leader = peers[0].public_key.clone();
4263                let round = Round::new(Epoch::zero(), View::new(7));
4264
4265                // Two different commitments in the same round (equivocation scenario).
4266                let block_a = CodedBlock::<B, C, H>::new(
4267                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 111),
4268                    coding_config,
4269                    &STRATEGY,
4270                );
4271                let commitment_a = block_a.commitment();
4272                let block_b = CodedBlock::<B, C, H>::new(
4273                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 222),
4274                    coding_config,
4275                    &STRATEGY,
4276                );
4277                let commitment_b = block_b.commitment();
4278
4279                // Receiver learns both commitments in the same round.
4280                peers[receiver_idx]
4281                    .mailbox
4282                    .discovered(commitment_a, leader.clone(), round)
4283                    .await;
4284                peers[receiver_idx]
4285                    .mailbox
4286                    .discovered(commitment_b, leader.clone(), round)
4287                    .await;
4288
4289                // Subscribe to the certifiable commitment before any reconstruction.
4290                let certifiable_sub = peers[receiver_idx].mailbox.subscribe(commitment_b).await;
4291
4292                // We receive our shard for commitment B from the equivocating leader.
4293                let shard_b = block_b
4294                    .shard(receiver_shard_idx)
4295                    .expect("missing shard")
4296                    .encode();
4297                peers[0]
4298                    .sender
4299                    .send(Recipients::One(receiver_pk.clone()), shard_b, true)
4300                    .await
4301                    .expect("send failed");
4302
4303                // Reconstruct conflicting commitment A first.
4304                let shard_a = block_a
4305                    .shard(receiver_shard_idx)
4306                    .expect("missing shard")
4307                    .encode();
4308                peers[0]
4309                    .sender
4310                    .send(Recipients::One(receiver_pk.clone()), shard_a, true)
4311                    .await
4312                    .expect("send failed");
4313                for i in [1usize, 2usize, 4usize] {
4314                    let shard_a = block_a
4315                        .shard(peers[i].index.get() as u16)
4316                        .expect("missing shard")
4317                        .encode();
4318                    peers[i]
4319                        .sender
4320                        .send(Recipients::One(receiver_pk.clone()), shard_a, true)
4321                        .await
4322                        .expect("send failed");
4323                }
4324                context.sleep(config.link.latency * 4).await;
4325                let reconstructed_a = peers[receiver_idx]
4326                    .mailbox
4327                    .get(commitment_a)
4328                    .await
4329                    .expect("conflicting commitment should reconstruct first");
4330                assert_eq!(reconstructed_a.commitment(), commitment_a);
4331
4332                // Commitment B should still be recoverable after A reconstructed.
4333                for i in [1usize, 2usize, 4usize] {
4334                    let shard_b = block_b
4335                        .shard(peers[i].index.get() as u16)
4336                        .expect("missing shard")
4337                        .encode();
4338                    peers[i]
4339                        .sender
4340                        .send(Recipients::One(receiver_pk.clone()), shard_b, true)
4341                        .await
4342                        .expect("send failed");
4343                }
4344
4345                select! {
4346                    result = certifiable_sub => {
4347                        let reconstructed_b =
4348                            result.expect("certifiable commitment should remain recoverable");
4349                        assert_eq!(reconstructed_b.commitment(), commitment_b);
4350                    },
4351                    _ = context.sleep(Duration::from_secs(5)) => {
4352                        panic!("certifiable commitment was not recoverable after same-round equivocation");
4353                    },
4354                }
4355            },
4356        );
4357    }
4358
4359    #[test_traced]
4360    fn test_leader_unrelated_shard_blocks_peer() {
4361        // Regression test: if the leader sends an unrelated/invalid shard
4362        // (i.e. a shard for a different participant index), the receiver must
4363        // block the leader.
4364        let fixture: Fixture<C> = Fixture {
4365            num_peers: 10,
4366            ..Default::default()
4367        };
4368
4369        fixture.start(
4370            |config, context, oracle, mut peers, _, coding_config| async move {
4371                // Commitment being tracked by the receiver.
4372                let tracked_block = CodedBlock::<B, C, H>::new(
4373                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100),
4374                    coding_config,
4375                    &STRATEGY,
4376                );
4377                let tracked_commitment = tracked_block.commitment();
4378
4379                // Separate block used to source "unrelated" shard data.
4380                let unrelated_block = CodedBlock::<B, C, H>::new(
4381                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200),
4382                    coding_config,
4383                    &STRATEGY,
4384                );
4385
4386                let receiver_idx = 3usize;
4387                let receiver_pk = peers[receiver_idx].public_key.clone();
4388                let leader_idx = 0usize;
4389                let leader_pk = peers[leader_idx].public_key.clone();
4390
4391                // Receiver tracks the commitment with peer0 as leader.
4392                peers[receiver_idx]
4393                    .mailbox
4394                    .discovered(
4395                        tracked_commitment,
4396                        leader_pk.clone(),
4397                        Round::new(Epoch::zero(), View::new(1)),
4398                    )
4399                    .await;
4400
4401                // Construct an unrelated shard from peer1's slot and retarget
4402                // its commitment to the tracked commitment so it hits active state.
4403                let mut unrelated_shard = unrelated_block
4404                    .shard(peers[1].index.get() as u16)
4405                    .expect("missing shard");
4406                unrelated_shard.commitment = tracked_commitment;
4407
4408                // Leader sends this unrelated/invalid shard to receiver.
4409                // The shard index no longer matches sender's participant index,
4410                // so leader must be blocked.
4411                peers[leader_idx]
4412                    .sender
4413                    .send(Recipients::One(receiver_pk), unrelated_shard.encode(), true)
4414                    .await
4415                    .expect("send failed");
4416                context.sleep(config.link.latency * 2).await;
4417
4418                assert_blocked(&oracle, &peers[receiver_idx].public_key, &leader_pk).await;
4419            },
4420        );
4421    }
4422
4423    #[test_traced]
4424    fn test_withholding_leader_victim_reconstructs_via_gossip() {
4425        // A Byzantine leader withholds the shard destined for one participant.
4426        // That participant should still reconstruct the block from shards
4427        // gossiped by other participants (sent via Recipients::All) without
4428        // any backfill mechanism.
4429        let fixture = Fixture {
4430            num_peers: 10,
4431            ..Default::default()
4432        };
4433
4434        fixture.start(
4435            |config, context, oracle, mut peers, _, coding_config| async move {
4436                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4437                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4438                let commitment = coded_block.commitment();
4439                let round = Round::new(Epoch::zero(), View::new(1));
4440
4441                let leader = peers[0].public_key.clone();
4442                let victim = peers[1].public_key.clone();
4443
4444                // Sever the link from leader to victim so the leader's
4445                // direct shard never arrives.
4446                oracle
4447                    .remove_link(leader.clone(), victim.clone())
4448                    .await
4449                    .expect("remove_link should succeed");
4450
4451                // Leader proposes. The victim will not receive a direct shard
4452                // because the link is severed.
4453                peers[0].mailbox.proposed(round, coded_block.clone()).await;
4454
4455                // Inform all non-leader peers of the leader so they validate
4456                // and re-broadcast their shards via Recipients::All.
4457                for peer in peers[1..].iter_mut() {
4458                    peer.mailbox
4459                        .discovered(commitment, leader.clone(), round)
4460                        .await;
4461                }
4462                context.sleep(config.link.latency * 2).await;
4463
4464                // The victim should reconstruct via gossiped shards from other
4465                // participants even though the leader withheld.
4466                let block_sub = peers[1].mailbox.subscribe(commitment).await;
4467                select! {
4468                    result = block_sub => {
4469                        let reconstructed = result.expect("block subscription should resolve");
4470                        assert_eq!(reconstructed.commitment(), commitment);
4471                        assert_eq!(reconstructed.height(), coded_block.height());
4472                    },
4473                    _ = context.sleep(Duration::from_secs(5)) => {
4474                        panic!("victim did not reconstruct block despite withholding leader");
4475                    },
4476                }
4477
4478                // All other participants should also have reconstructed.
4479                for peer in peers[2..].iter_mut() {
4480                    let reconstructed = peer
4481                        .mailbox
4482                        .get(commitment)
4483                        .await
4484                        .expect("block should be reconstructed");
4485                    assert_eq!(reconstructed.commitment(), commitment);
4486                }
4487
4488                // No peer should be blocked — withholding is not detectable.
4489                let blocked = oracle.blocked().await.unwrap();
4490                assert!(
4491                    blocked.is_empty(),
4492                    "no peer should be blocked in withholding leader test"
4493                );
4494            },
4495        );
4496    }
4497
4498    /// When the leader withholds its shard from a participant, the block
4499    /// can still be reconstructed from gossipped shards. However, the shard
4500    /// subscription must NOT resolve because the participant's own shard was
4501    /// never verified. Voting requires own-shard verification to ensure the
4502    /// participant re-broadcasts its shard and helps slower peers reach quorum.
4503    #[test_traced]
4504    fn test_shard_subscription_pending_after_reconstruction_without_leader_shard() {
4505        let fixture = Fixture {
4506            num_peers: 10,
4507            ..Default::default()
4508        };
4509
4510        fixture.start(
4511            |config, context, oracle, mut peers, _, coding_config| async move {
4512                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4513                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4514                let commitment = coded_block.commitment();
4515                let round = Round::new(Epoch::zero(), View::new(1));
4516
4517                let leader = peers[0].public_key.clone();
4518                let victim = peers[1].public_key.clone();
4519
4520                // Remove the link from leader to victim so the leader's shard
4521                // never reaches the victim directly.
4522                oracle
4523                    .remove_link(leader.clone(), victim.clone())
4524                    .await
4525                    .expect("remove_link should succeed");
4526
4527                // Subscribe to the shard and block BEFORE any broadcasting.
4528                let mut shard_sub = peers[1]
4529                    .mailbox
4530                    .subscribe_assigned_shard_verified(commitment)
4531                    .await;
4532                let block_sub = peers[1].mailbox.subscribe(commitment).await;
4533
4534                // Leader broadcasts.
4535                peers[0].mailbox.proposed(round, coded_block.clone()).await;
4536
4537                // All non-leader peers discover the leader.
4538                for peer in peers[1..].iter_mut() {
4539                    peer.mailbox
4540                        .discovered(commitment, leader.clone(), round)
4541                        .await;
4542                }
4543
4544                // Wait for gossip to propagate.
4545                context.sleep(config.link.latency * 4).await;
4546
4547                // Block subscription should resolve (victim reconstructs from
4548                // gossipped shards).
4549                let reconstructed = block_sub.await.expect("block subscription should resolve");
4550                assert_eq!(reconstructed.commitment(), commitment);
4551
4552                // Shard subscription must NOT resolve because the leader
4553                // never sent the victim its own shard.
4554                assert!(
4555                    matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
4556                    "shard subscription must not resolve without own shard verification"
4557                );
4558            },
4559        );
4560    }
4561
4562    #[test_traced]
4563    fn test_broadcast_routes_participant_and_non_participant_shards() {
4564        let fixture = Fixture {
4565            num_non_participants: 1,
4566            ..Default::default()
4567        };
4568
4569        fixture.start(
4570            |config, context, oracle, mut peers, non_participants, coding_config| async move {
4571                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4572                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4573                let commitment = coded_block.commitment();
4574
4575                let leader = peers[0].public_key.clone();
4576                let round = Round::new(Epoch::zero(), View::new(1));
4577                peers[0].mailbox.proposed(round, coded_block.clone()).await;
4578
4579                for peer in peers[1..].iter_mut() {
4580                    peer.mailbox
4581                        .discovered(commitment, leader.clone(), round)
4582                        .await;
4583                }
4584                for np in non_participants.iter() {
4585                    np.mailbox
4586                        .discovered(commitment, leader.clone(), round)
4587                        .await;
4588                }
4589                context.sleep(config.link.latency * 2).await;
4590
4591                // Participants should receive and validate their own shards.
4592                for peer in peers.iter_mut() {
4593                    peer.mailbox
4594                        .subscribe_assigned_shard_verified(commitment)
4595                        .await
4596                        .await
4597                        .expect("participant shard subscription should complete");
4598                }
4599
4600                // Non-participant should receive and validate the leader's shard.
4601                for np in non_participants.iter() {
4602                    np.mailbox
4603                        .subscribe_assigned_shard_verified(commitment)
4604                        .await
4605                        .await
4606                        .expect("non-participant shard subscription should complete");
4607                }
4608                context.sleep(config.link.latency).await;
4609
4610                // Non-participant should reconstruct the block from received shards.
4611                for np in non_participants.iter() {
4612                    let reconstructed = np
4613                        .mailbox
4614                        .get(commitment)
4615                        .await
4616                        .expect("non-participant should reconstruct block");
4617                    assert_eq!(reconstructed.commitment(), commitment);
4618                }
4619
4620                let blocked = oracle.blocked().await.unwrap();
4621                assert!(
4622                    blocked.is_empty(),
4623                    "no peer should be blocked in participant/non-participant shard routing test"
4624                );
4625            },
4626        );
4627    }
4628
4629    #[test_traced]
4630    fn test_non_participant_reconstructs_after_discovered() {
4631        let fixture = Fixture {
4632            num_non_participants: 1,
4633            ..Default::default()
4634        };
4635
4636        fixture.start(
4637            |config, context, oracle, mut peers, non_participants, coding_config| async move {
4638                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4639                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4640                let commitment = coded_block.commitment();
4641                let round = Round::new(Epoch::zero(), View::new(1));
4642
4643                let leader = peers[0].public_key.clone();
4644                peers[0].mailbox.proposed(round, coded_block.clone()).await;
4645
4646                // Inform participants of the leader so they validate and re-broadcast
4647                // shards.
4648                for peer in peers[1..].iter_mut() {
4649                    peer.mailbox
4650                        .discovered(commitment, leader.clone(), round)
4651                        .await;
4652                }
4653                context.sleep(config.link.latency).await;
4654
4655                // Non-participant discovers the leader after shards are already
4656                // propagating through the network.
4657                let np = &non_participants[0];
4658                let block_sub = np.mailbox.subscribe(commitment).await;
4659                np.mailbox
4660                    .discovered(commitment, leader.clone(), round)
4661                    .await;
4662
4663                // Wait for enough shards (leader's shard + shards from
4664                // participants) to arrive and reconstruct.
4665                select! {
4666                    result = block_sub => {
4667                        let reconstructed = result.expect("block subscription should resolve");
4668                        assert_eq!(reconstructed.commitment(), commitment);
4669                        assert_eq!(reconstructed.height(), coded_block.height());
4670                    },
4671                    _ = context.sleep(Duration::from_secs(5)) => {
4672                        panic!("non-participant block subscription did not resolve");
4673                    },
4674                }
4675
4676                let blocked = oracle.blocked().await.unwrap();
4677                assert!(
4678                    blocked.is_empty(),
4679                    "no peer should be blocked in non-participant reconstruction test"
4680                );
4681            },
4682        );
4683    }
4684
4685    #[test_traced]
4686    fn test_peer_set_update_evicts_peer_buffers() {
4687        // Shards buffered before leader announcement should be evicted when
4688        // the sender leaves latest.primary. Even if the overlap window keeps
4689        // the sender connected, fresh pre-leader shards from that peer must
4690        // not recreate the buffer.
4691        let executor = deterministic::Runner::default();
4692        executor.start(|context| async move {
4693            let num_peers = 10usize;
4694            let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
4695                context.with_label("network"),
4696                simulated::Config {
4697                    max_size: MAX_SHARD_SIZE as u32,
4698                    disconnect_on_block: true,
4699                    tracked_peer_sets: NZUsize!(2),
4700                },
4701            );
4702            network.start();
4703
4704            let mut private_keys = (0..num_peers)
4705                .map(|i| PrivateKey::from_seed(i as u64))
4706                .collect::<Vec<_>>();
4707            private_keys.sort_by_key(|s| s.public_key());
4708            let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
4709            let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
4710
4711            // Test from the perspective of a single receiver (peer 3).
4712            let receiver_idx = 3usize;
4713            let receiver_pk = peer_keys[receiver_idx].clone();
4714            let leader_pk = peer_keys[0].clone();
4715
4716            let receiver_control = oracle.control(receiver_pk.clone());
4717            let (sender_handle, receiver_handle) = receiver_control
4718                .register(0, TEST_QUOTA)
4719                .await
4720                .expect("registration should succeed");
4721
4722            // Register the leader so it can send shards.
4723            let leader_control = oracle.control(leader_pk.clone());
4724            let (mut leader_sender, _leader_receiver) = leader_control
4725                .register(0, TEST_QUOTA)
4726                .await
4727                .expect("registration should succeed");
4728            oracle
4729                .add_link(leader_pk.clone(), receiver_pk.clone(), DEFAULT_LINK)
4730                .await
4731                .expect("link should be added");
4732
4733            // Track the full participant set so the engine sees all peers.
4734            oracle.manager().track(0, participants.clone()).await;
4735            context.sleep(Duration::from_millis(10)).await;
4736
4737            let scheme = Scheme::signer(
4738                SCHEME_NAMESPACE,
4739                participants.clone(),
4740                private_keys[receiver_idx].clone(),
4741            )
4742            .expect("signer scheme should be created");
4743
4744            let config: Config<_, _, _, _, C, _, _, _> = Config {
4745                scheme_provider: MultiEpochProvider::single(scheme),
4746                blocker: receiver_control.clone(),
4747                shard_codec_cfg: CodecConfig {
4748                    maximum_shard_size: MAX_SHARD_SIZE,
4749                },
4750                block_codec_cfg: (),
4751                strategy: STRATEGY,
4752                mailbox_size: 1024,
4753                peer_buffer_size: NZUsize!(64),
4754                background_channel_capacity: 1024,
4755                peer_provider: oracle.manager(),
4756            };
4757
4758            let (engine, mailbox) = ShardEngine::new(context.with_label("receiver"), config);
4759            engine.start((sender_handle, receiver_handle));
4760
4761            // Build a coded block and extract the shard destined for the receiver.
4762            let coding_config = coding_config_for_participants(num_peers as u16);
4763            let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4764            let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4765            let commitment = coded_block.commitment();
4766
4767            let receiver_participant = participants
4768                .index(&receiver_pk)
4769                .expect("receiver must be a participant");
4770            let leader_shard = coded_block
4771                .shard(receiver_participant.get() as u16)
4772                .expect("missing shard");
4773            let shard_bytes = leader_shard.encode();
4774
4775            // Send the shard BEFORE leader announcement (it gets buffered).
4776            leader_sender
4777                .send(
4778                    Recipients::One(receiver_pk.clone()),
4779                    shard_bytes.clone(),
4780                    true,
4781                )
4782                .await
4783                .expect("send failed");
4784            context.sleep(DEFAULT_LINK.latency * 2).await;
4785
4786            // Now send a peer set update that excludes the leader.
4787            let remaining: Set<P> =
4788                Set::from_iter_dedup(peer_keys.iter().filter(|pk| **pk != leader_pk).cloned());
4789            oracle.manager().track(1, remaining).await;
4790            context.sleep(Duration::from_millis(10)).await;
4791
4792            // The retained overlap window still lets the leader reach the receiver,
4793            // but this fresh pre-leader shard must not be buffered again.
4794            leader_sender
4795                .send(Recipients::One(receiver_pk.clone()), shard_bytes, true)
4796                .await
4797                .expect("send failed");
4798            context.sleep(DEFAULT_LINK.latency * 2).await;
4799
4800            // Announce the leader. Buffered shards from the leader should have been
4801            // evicted, so the shard will NOT be ingested.
4802            let mut shard_sub = mailbox.subscribe_assigned_shard_verified(commitment).await;
4803            mailbox
4804                .discovered(
4805                    commitment,
4806                    leader_pk.clone(),
4807                    Round::new(Epoch::zero(), View::new(1)),
4808                )
4809                .await;
4810            context.sleep(DEFAULT_LINK.latency * 2).await;
4811
4812            // The shard subscription should still be pending (no shard was ingested).
4813            assert!(
4814                matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
4815                "shard subscription should not resolve after evicted leader's buffer"
4816            );
4817            assert!(
4818                mailbox.get(commitment).await.is_none(),
4819                "block should not reconstruct from evicted buffers"
4820            );
4821        });
4822    }
4823
4824    #[test_traced]
4825    fn test_empty_peer_buffer_is_retained_until_peer_leaves_latest_primary() {
4826        let executor = deterministic::Runner::default();
4827        executor.start(|context| async move {
4828            let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
4829                context.with_label("network"),
4830                simulated::Config {
4831                    max_size: MAX_SHARD_SIZE as u32,
4832                    disconnect_on_block: true,
4833                    tracked_peer_sets: NZUsize!(1),
4834                },
4835            );
4836            network.start();
4837
4838            let mut private_keys = (0..4)
4839                .map(|i| PrivateKey::from_seed(i as u64))
4840                .collect::<Vec<_>>();
4841            private_keys.sort_by_key(|s| s.public_key());
4842            let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
4843            let receiver_pk = peer_keys[0].clone();
4844            let sender_pk = peer_keys[1].clone();
4845            let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
4846
4847            let receiver_control = oracle.control(receiver_pk.clone());
4848            let scheme = Scheme::signer(
4849                SCHEME_NAMESPACE,
4850                participants.clone(),
4851                private_keys[0].clone(),
4852            )
4853            .expect("signer scheme should be created");
4854
4855            let config: Config<_, _, _, _, C, _, _, _> = Config {
4856                scheme_provider: MultiEpochProvider::single(scheme),
4857                blocker: receiver_control,
4858                shard_codec_cfg: CodecConfig {
4859                    maximum_shard_size: MAX_SHARD_SIZE,
4860                },
4861                block_codec_cfg: (),
4862                strategy: STRATEGY,
4863                mailbox_size: 16,
4864                peer_buffer_size: NZUsize!(4),
4865                background_channel_capacity: 16,
4866                peer_provider: oracle.manager(),
4867            };
4868
4869            let (mut engine, _mailbox) = ShardEngine::new(context.with_label("engine"), config);
4870
4871            // Only `sender_pk` is in `latest.primary`, so only that peer may retain a pre-leader
4872            // buffer row (`buffer_peer_shard` / `peer_buffers`).
4873            engine.update_latest_primary_peers(Set::from_iter_dedup([sender_pk.clone()]));
4874
4875            let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4876            let coded_block = CodedBlock::<B, C, H>::new(
4877                inner,
4878                coding_config_for_participants(participants.len() as u16),
4879                &STRATEGY,
4880            );
4881            let commitment = coded_block.commitment();
4882            let shard = coded_block.shard(0).expect("missing shard");
4883
4884            // Pre-leader path: buffer one shard before any `ReconstructionState` exists for this
4885            // commitment.
4886            engine.buffer_peer_shard(sender_pk.clone(), shard);
4887            assert_eq!(
4888                engine.peer_buffers.get(&sender_pk).map(VecDeque::len),
4889                Some(1),
4890                "peer buffer should contain the buffered shard"
4891            );
4892
4893            // No reconstruction state yet: `ingest_buffered_shards` drains matching shards from the
4894            // per-peer queues then returns without applying them, leaving an empty deque under the
4895            // same map key while the sender stays in `latest.primary`.
4896            let progressed = engine.ingest_buffered_shards(commitment).await;
4897            assert!(
4898                !progressed,
4899                "ingest should not progress without reconstruction state"
4900            );
4901            assert!(
4902                engine.peer_buffers.contains_key(&sender_pk),
4903                "empty peer buffer should be retained while sender remains in latest.primary"
4904            );
4905            assert!(
4906                engine
4907                    .peer_buffers
4908                    .get(&sender_pk)
4909                    .is_some_and(VecDeque::is_empty),
4910                "retained peer buffer should now be empty"
4911            );
4912
4913            // Empty primary: no peer may retain buffers; `update_latest_primary_peers` drops the
4914            // empty deque entry for `sender_pk`.
4915            engine.update_latest_primary_peers(Set::default());
4916            assert!(
4917                !engine.peer_buffers.contains_key(&sender_pk),
4918                "peer buffer should be evicted once sender leaves latest.primary"
4919            );
4920        });
4921    }
4922
4923    #[test_traced]
4924    fn test_old_epoch_buffered_shards_are_dropped_after_cutover() {
4925        let executor = deterministic::Runner::default();
4926        executor.start(|context| async move {
4927            let num_peers = 6usize;
4928            let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
4929                context.with_label("network"),
4930                simulated::Config {
4931                    max_size: MAX_SHARD_SIZE as u32,
4932                    disconnect_on_block: true,
4933                    tracked_peer_sets: NZUsize!(2),
4934                },
4935            );
4936            network.start();
4937
4938            let mut private_keys = (0..num_peers)
4939                .map(|i| PrivateKey::from_seed(i as u64))
4940                .collect::<Vec<_>>();
4941            private_keys.sort_by_key(|s| s.public_key());
4942            let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
4943
4944            // Epoch 0: first five peers. Epoch 1: swap out `peer_keys[0]` for `peer_keys[5]` so the
4945            // cutover changes who is in `latest.primary` while `tracked_peer_sets` retains overlap.
4946            let epoch0_set: Set<P> = Set::from_iter_dedup(peer_keys[..5].iter().cloned());
4947            let epoch1_set: Set<P> = Set::from_iter_dedup([
4948                peer_keys[1].clone(),
4949                peer_keys[2].clone(),
4950                peer_keys[3].clone(),
4951                peer_keys[4].clone(),
4952                peer_keys[5].clone(),
4953            ]);
4954
4955            let receiver_idx = 3usize;
4956            let receiver_pk = peer_keys[receiver_idx].clone();
4957            let receiver_key = private_keys[receiver_idx].clone();
4958            let leader_pk = peer_keys[0].clone();
4959
4960            let receiver_control = oracle.control(receiver_pk.clone());
4961            let (sender_handle, receiver_handle) = receiver_control
4962                .register(0, TEST_QUOTA)
4963                .await
4964                .expect("registration should succeed");
4965
4966            let leader_control = oracle.control(leader_pk.clone());
4967            let (mut leader_sender, _leader_receiver) = leader_control
4968                .register(0, TEST_QUOTA)
4969                .await
4970                .expect("registration should succeed");
4971            oracle
4972                .add_link(leader_pk.clone(), receiver_pk.clone(), DEFAULT_LINK)
4973                .await
4974                .expect("link should be added");
4975
4976            // Peer-set id 0: epoch 0 primaries before any cutover.
4977            oracle.manager().track(0, epoch0_set.clone()).await;
4978            context.sleep(Duration::from_millis(10)).await;
4979
4980            let scheme_epoch0 =
4981                Scheme::signer(SCHEME_NAMESPACE, epoch0_set.clone(), receiver_key.clone())
4982                    .expect("epoch 0 signer scheme should be created");
4983            let scheme_epoch1 =
4984                Scheme::signer(SCHEME_NAMESPACE, epoch1_set.clone(), receiver_key.clone())
4985                    .expect("epoch 1 signer scheme should be created");
4986
4987            let config: Config<_, _, _, _, C, _, _, _> = Config {
4988                scheme_provider: MultiEpochProvider::single(scheme_epoch0)
4989                    .with_epoch(Epoch::new(1), scheme_epoch1),
4990                blocker: receiver_control.clone(),
4991                shard_codec_cfg: CodecConfig {
4992                    maximum_shard_size: MAX_SHARD_SIZE,
4993                },
4994                block_codec_cfg: (),
4995                strategy: STRATEGY,
4996                mailbox_size: 1024,
4997                peer_buffer_size: NZUsize!(64),
4998                background_channel_capacity: 1024,
4999                peer_provider: oracle.manager(),
5000            };
5001
5002            // Receiver engine: schemes for both epochs so post-cutover validation can run if needed.
5003            let (engine, mailbox) = ShardEngine::new(context.with_label("receiver"), config);
5004            engine.start((sender_handle, receiver_handle));
5005
5006            let coding_config = coding_config_for_participants(epoch0_set.len() as u16);
5007            let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
5008            let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
5009            let commitment = coded_block.commitment();
5010
5011            let receiver_participant = epoch0_set
5012                .index(&receiver_pk)
5013                .expect("receiver must be an epoch 0 participant");
5014            let leader_shard = coded_block
5015                .shard(receiver_participant.get() as u16)
5016                .expect("missing shard");
5017
5018            // Inbound: epoch-0 leader shard arrives before `Discovered` (pre-leader buffer path).
5019            leader_sender
5020                .send(
5021                    Recipients::One(receiver_pk.clone()),
5022                    leader_shard.encode(),
5023                    true,
5024                )
5025                .await
5026                .expect("send failed");
5027            context.sleep(DEFAULT_LINK.latency * 2).await;
5028
5029            // Cutover to epoch 1 primaries before `Discovered`: `leader_pk` (epoch-0-only) is no
5030            // longer in `latest.primary`, so overlap-buffered shards for that sender must not feed
5031            // reconstruction.
5032            oracle.manager().track(1, epoch1_set).await;
5033            context.sleep(Duration::from_millis(10)).await;
5034
5035            // Leader announcement for the old commitment: should not complete reconstruction from
5036            // dropped pre-cutover buffers.
5037            let mut shard_sub = mailbox.subscribe_assigned_shard_verified(commitment).await;
5038            mailbox
5039                .discovered(
5040                    commitment,
5041                    leader_pk,
5042                    Round::new(Epoch::zero(), View::new(1)),
5043                )
5044                .await;
5045            context.sleep(DEFAULT_LINK.latency * 2).await;
5046
5047            assert!(
5048                matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
5049                "old-epoch shard subscription should stay pending after cutover"
5050            );
5051            assert!(
5052                mailbox.get(commitment).await.is_none(),
5053                "old-epoch commitment should not reconstruct from overlap-only buffered shards"
5054            );
5055        });
5056    }
5057
5058    /// If the evicted node leaves the
5059    /// [`commonware_p2p::PeerSetUpdate::latest`] primary set, it must still
5060    /// reconstruct once the leader is discovered, as long as enough buffered
5061    /// shards came from peers that remain in `latest.primary`.
5062    ///
5063    /// This does not rely on a self-buffered shard or a leader-delivered shard:
5064    /// reconstruction should succeed from the remaining buffered peer shards
5065    /// alone.
5066    #[test_traced]
5067    fn test_evicted_node_still_reconstructs_from_buffered_peer_shards() {
5068        let executor = deterministic::Runner::default();
5069        executor.start(|context| async move {
5070            let num_peers = 10usize;
5071            let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
5072                context.with_label("network"),
5073                simulated::Config {
5074                    max_size: MAX_SHARD_SIZE as u32,
5075                    disconnect_on_block: true,
5076                    tracked_peer_sets: NZUsize!(2),
5077                },
5078            );
5079            network.start();
5080
5081            let mut private_keys = (0..num_peers)
5082                .map(|i| PrivateKey::from_seed(i as u64))
5083                .collect::<Vec<_>>();
5084            private_keys.sort_by_key(|s| s.public_key());
5085            let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
5086            let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
5087
5088            // Receiver (`peer_keys[1]`) is evicted from `latest.primary` after shards are buffered.
5089            // The leader (`peer_keys[0]`) has no link to the receiver, so reconstruction cannot use a
5090            // leader-delivered shard or a self-buffered shard; it must use gossip from peers 2/4/5/6 only.
5091            let receiver_idx = 1usize;
5092            let receiver_pk = peer_keys[receiver_idx].clone();
5093            let leader_pk = peer_keys[0].clone();
5094            let peer2_pk = peer_keys[2].clone();
5095            let peer4_pk = peer_keys[4].clone();
5096            let peer5_pk = peer_keys[5].clone();
5097            let peer6_pk = peer_keys[6].clone();
5098
5099            let receiver_control = oracle.control(receiver_pk.clone());
5100            let (evicted_sender, evicted_receiver) = receiver_control
5101                .register(0, TEST_QUOTA)
5102                .await
5103                .expect("registration should succeed");
5104
5105            let peer2_control = oracle.control(peer2_pk.clone());
5106            let (mut peer2_sender, _peer2_receiver) = peer2_control
5107                .register(0, TEST_QUOTA)
5108                .await
5109                .expect("registration should succeed");
5110
5111            let peer4_control = oracle.control(peer4_pk.clone());
5112            let (mut peer4_sender, _peer4_receiver) = peer4_control
5113                .register(0, TEST_QUOTA)
5114                .await
5115                .expect("registration should succeed");
5116
5117            let peer5_control = oracle.control(peer5_pk.clone());
5118            let (mut peer5_sender, _peer5_receiver) = peer5_control
5119                .register(0, TEST_QUOTA)
5120                .await
5121                .expect("registration should succeed");
5122
5123            let peer6_control = oracle.control(peer6_pk.clone());
5124            let (mut peer6_sender, _peer6_receiver) = peer6_control
5125                .register(0, TEST_QUOTA)
5126                .await
5127                .expect("registration should succeed");
5128
5129            // Only secondary peers that will forward shards are connected to the receiver (not the leader).
5130            for sender in [&peer2_pk, &peer4_pk, &peer5_pk, &peer6_pk] {
5131                oracle
5132                    .add_link(sender.clone(), receiver_pk.clone(), DEFAULT_LINK)
5133                    .await
5134                    .expect("link should be added");
5135            }
5136
5137            // Start with the full committee so the receiver's signer scheme matches the coded block.
5138            oracle.manager().track(0, participants.clone()).await;
5139            context.sleep(Duration::from_millis(10)).await;
5140
5141            let scheme = Scheme::signer(
5142                SCHEME_NAMESPACE,
5143                participants.clone(),
5144                private_keys[receiver_idx].clone(),
5145            )
5146            .expect("signer scheme should be created");
5147
5148            let config: Config<_, _, _, _, C, _, _, _> = Config {
5149                scheme_provider: MultiEpochProvider::single(scheme),
5150                blocker: receiver_control.clone(),
5151                shard_codec_cfg: CodecConfig {
5152                    maximum_shard_size: MAX_SHARD_SIZE,
5153                },
5154                block_codec_cfg: (),
5155                strategy: STRATEGY,
5156                mailbox_size: 1024,
5157                peer_buffer_size: NZUsize!(64),
5158                background_channel_capacity: 1024,
5159                peer_provider: oracle.manager(),
5160            };
5161
5162            let (engine, mailbox) = ShardEngine::new(context.with_label("evicted"), config);
5163            engine.start((evicted_sender, evicted_receiver));
5164
5165            let coding_config = coding_config_for_participants(num_peers as u16);
5166            let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
5167            let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
5168            let commitment = coded_block.commitment();
5169
5170            let peer2_shard = coded_block.shard(2).expect("missing shard 2").encode();
5171            let peer4_shard = coded_block.shard(4).expect("missing shard 4").encode();
5172            let peer5_shard = coded_block.shard(5).expect("missing shard 5").encode();
5173            let peer6_shard = coded_block.shard(6).expect("missing shard 6").encode();
5174
5175            let block_sub = mailbox.subscribe(commitment).await;
5176
5177            // Pre-`Discovered` path: four shards from peers that will still be in `latest.primary` after
5178            // the receiver is evicted (indices 2, 4, 5, 6). Together they are enough to reconstruct.
5179            peer2_sender
5180                .send(
5181                    Recipients::One(receiver_pk.clone()),
5182                    peer2_shard,
5183                    true,
5184                )
5185                .await
5186                .expect("send failed");
5187            peer4_sender
5188                .send(
5189                    Recipients::One(receiver_pk.clone()),
5190                    peer4_shard,
5191                    true,
5192                )
5193                .await
5194                .expect("send failed");
5195            peer5_sender
5196                .send(
5197                    Recipients::One(receiver_pk.clone()),
5198                    peer5_shard,
5199                    true,
5200                )
5201                .await
5202                .expect("send failed");
5203            peer6_sender
5204                .send(
5205                    Recipients::One(receiver_pk.clone()),
5206                    peer6_shard,
5207                    true,
5208                )
5209                .await
5210                .expect("send failed");
5211            context.sleep(DEFAULT_LINK.latency * 2).await;
5212
5213            // Evict the receiver from `latest.primary`: buffered shards from remaining primaries must
5214            // still count toward reconstruction once the leader is known.
5215            let latest_primary: Set<P> = Set::from_iter_dedup(
5216                peer_keys
5217                    .iter()
5218                    .filter(|pk| **pk != receiver_pk)
5219                    .cloned(),
5220            );
5221            oracle.manager().track(1, latest_primary).await;
5222            context.sleep(Duration::from_millis(10)).await;
5223
5224            // Leader announcement drains overlap-buffered peer shards; the evicted receiver should
5225            // still reach quorum without ever receiving the leader's direct shard.
5226            mailbox
5227                .discovered(
5228                    commitment,
5229                    leader_pk.clone(),
5230                    Round::new(Epoch::zero(), View::new(1)),
5231                )
5232                .await;
5233
5234            select! {
5235                _ = block_sub => {},
5236                _ = context.sleep(Duration::from_secs(5)) => {
5237                    panic!("block subscription did not resolve after leader discovery");
5238                },
5239            }
5240
5241            context.sleep(DEFAULT_LINK.latency * 2).await;
5242            let block = mailbox.get(commitment).await;
5243            assert!(
5244                block.is_some(),
5245                "evicted node should reconstruct from buffered shards sent by remaining latest.primary peers"
5246            );
5247            assert_eq!(block.unwrap().commitment(), commitment);
5248
5249            assert!(
5250                oracle.blocked().await.unwrap().is_empty(),
5251                "no peer should be blocked when overlapping shards are valid"
5252            );
5253        });
5254    }
5255
5256    /// When peer gossip shards arrive before the leader's direct shard,
5257    /// the state may transition to Ready before the leader shard is
5258    /// processed. The late leader shard must still be accepted, verified,
5259    /// and broadcast so that slower peers can reach quorum.
5260    #[test_traced]
5261    fn test_late_leader_shard_accepted_after_quorum_transition() {
5262        let fixture = Fixture {
5263            num_peers: 10,
5264            ..Default::default()
5265        };
5266
5267        fixture.start(
5268            |config, context, oracle, mut peers, _, coding_config| async move {
5269                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
5270                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
5271                let commitment = coded_block.commitment();
5272                let round = Round::new(Epoch::zero(), View::new(1));
5273
5274                let leader_idx = 0usize;
5275                let victim_idx = 1usize;
5276                let leader = peers[leader_idx].public_key.clone();
5277                let victim = peers[victim_idx].public_key.clone();
5278
5279                // Sever the link from leader to victim so the leader's
5280                // direct shard does not arrive initially.
5281                oracle
5282                    .remove_link(leader.clone(), victim.clone())
5283                    .await
5284                    .expect("remove_link should succeed");
5285
5286                // Leader proposes. All peers except the victim get their
5287                // shard from the leader, verify it, and gossip it.
5288                peers[leader_idx]
5289                    .mailbox
5290                    .proposed(round, coded_block.clone())
5291                    .await;
5292
5293                // Inform all non-leader peers of the leader.
5294                for peer in peers[1..].iter_mut() {
5295                    peer.mailbox
5296                        .discovered(commitment, leader.clone(), round)
5297                        .await;
5298                }
5299
5300                // Wait for gossip to propagate. The victim should
5301                // reconstruct the block from gossiped peer shards,
5302                // transitioning to Ready without its own shard.
5303                context.sleep(config.link.latency * 4).await;
5304
5305                let block_sub = peers[victim_idx].mailbox.subscribe(commitment).await;
5306                select! {
5307                    result = block_sub => {
5308                        let reconstructed = result.expect("block subscription should resolve");
5309                        assert_eq!(reconstructed.commitment(), commitment);
5310                    },
5311                    _ = context.sleep(Duration::from_secs(5)) => {
5312                        panic!("victim did not reconstruct block from gossip");
5313                    },
5314                }
5315
5316                // The shard subscription should NOT have resolved yet
5317                // because the victim has not verified its own shard.
5318                let mut shard_sub = peers[victim_idx]
5319                    .mailbox
5320                    .subscribe_assigned_shard_verified(commitment)
5321                    .await;
5322                assert!(
5323                    matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
5324                    "shard subscription must not resolve before own shard is verified"
5325                );
5326
5327                // Now restore the link so the leader's shard arrives late.
5328                oracle
5329                    .add_link(leader.clone(), victim.clone(), DEFAULT_LINK)
5330                    .await
5331                    .expect("add_link should succeed");
5332
5333                // Re-send the leader's shard manually via the leader's
5334                // network sender (the engine already broadcast it earlier,
5335                // but the link was down).
5336                let leader_shard = coded_block
5337                    .shard(peers[victim_idx].index.get() as u16)
5338                    .expect("missing victim shard");
5339                peers[leader_idx]
5340                    .sender
5341                    .send(Recipients::One(victim.clone()), leader_shard.encode(), true)
5342                    .await
5343                    .expect("send failed");
5344                context.sleep(config.link.latency * 2).await;
5345
5346                // The shard subscription should now resolve because the
5347                // late leader shard was accepted and verified.
5348                select! {
5349                    _ = shard_sub => {},
5350                    _ = context.sleep(Duration::from_secs(5)) => {
5351                        panic!("shard subscription did not resolve after late leader shard");
5352                    },
5353                }
5354
5355                // No peer should be blocked.
5356                let blocked = oracle.blocked().await.unwrap();
5357                assert!(
5358                    blocked.is_empty(),
5359                    "no peer should be blocked in late leader shard test"
5360                );
5361
5362                // After both reconstruction and assigned shard readiness,
5363                // additional gossip shards should be silently ignored.
5364                let extra_sender_idx = 2usize;
5365                let extra_shard = coded_block
5366                    .shard(peers[extra_sender_idx].index.get() as u16)
5367                    .expect("missing shard");
5368                peers[extra_sender_idx]
5369                    .sender
5370                    .send(Recipients::One(victim.clone()), extra_shard.encode(), true)
5371                    .await
5372                    .expect("send failed");
5373                context.sleep(config.link.latency * 2).await;
5374
5375                // The gossip shard should be silently dropped (not blocked).
5376                let blocked = oracle.blocked().await.unwrap();
5377                assert!(
5378                    blocked.is_empty(),
5379                    "gossip shard after full reconstruction should be silently ignored"
5380                );
5381            },
5382        );
5383    }
5384}