Skip to main content

commonware_consensus/marshal/coding/shards/
engine.rs

1//! Shard engine for erasure-coded block distribution and reconstruction.
2//!
3//! This module implements the core logic for distributing blocks as erasure-coded
4//! shards and reconstructing blocks from received shards.
5//!
6//! # Overview
7//!
8//! The shard engine serves two primary functions:
9//! 1. Broadcast: When a node proposes a block, the engine broadcasts
10//!    erasure-coded shards to all participants and tracked non-participants.
11//! 2. Block Reconstruction: When a node receives shards from peers, the engine
12//!    validates them incrementally and reconstructs the original block once
13//!    enough valid shards are available. Both participants and non-participants
14//!    can reconstruct blocks: participants receive their own indexed strong
15//!    shard while non-participants receive the leader's strong shard. Both
16//!    collect weak shards gossiped by participants to reach the reconstruction
17//!    threshold.
18//!
19//! # Shard Types
20//!
21//! The engine distinguishes between two shard types:
22//!
23//! - Strong shards (`Scheme::StrongShard`): Original erasure-coded shards sent by the proposer.
24//!   These contain the data needed to derive checking data for validation.
25//!
26//! - Weak shards (`Scheme::WeakShard`): Shards that have been validated and re-broadcast
27//!   by participants. These require checking data (derived from a strong shard)
28//!   for validation.
29//!
30//! _These are separated because some coding schemes enable the proposer to send extra data along
31//! with the shard, reducing redundant transmission of checking data from multiple participants._
32//!
33//! # Message Flow
34//!
35//! ```text
36//!                           PROPOSER
37//!                              |
38//!                              | Proposed(block)
39//!                              v
40//!                    +------------------+
41//!                    |   Shard Engine   |
42//!                    +------------------+
43//!                              |
44//!            broadcast_shards (strong shards to each participant)
45//!                              |
46//!         +--------------------+--------------------+
47//!         |                    |                    |
48//!         v                    v                    v
49//!    Participant 0        Participant 1        Participant N
50//!         |                    |                    |
51//!         | (receive strong    | (receive strong    |
52//!         |  shard for self)   |  shard for self)   |
53//!         v                    v                    v
54//!    +----------+         +----------+         +----------+
55//!    | Buffer   |         | Buffer   |         | Buffer   |
56//!    | (await   |         | (await   |         | (await   |
57//!    |  leader) |         |  leader) |         |  leader) |
58//!    +----------+         +----------+         +----------+
59//!         |                    |                    |
60//!         | Discovered         | Discovered         |
61//!         | (leader identity)  | (leader identity)  |
62//!         v                    v                    v
63//!    +----------+         +----------+         +----------+
64//!    | Validate |         | Validate |         | Validate |
65//!    | (weaken) |         | (weaken) |         | (weaken) |
66//!    +----------+         +----------+         +----------+
67//!         |                    |                    |
68//!         | Store checking     | Store checking     |
69//!         | data + checked     | data + checked     |
70//!         | shard              | shard              |
71//!         |                    |                    |
72//!         +--------------------+--------------------+
73//!                              |
74//!                    (gossip weak shards)
75//!                              |
76//!         +--------------------+--------------------+
77//!         |                    |                    |
78//!         v                    v                    v
79//!    +----------+         +----------+         +----------+
80//!    | Validate |         | Validate |         | Validate |
81//!    | (check)  |         | (check)  |         | (check)  |
82//!    +----------+         +----------+         +----------+
83//!         |                    |                    |
84//!         v                    v                    v
85//!    Accumulate checked shards until minimum_shards reached
86//!         |                    |                    |
87//!         v                    v                    v
88//!    +-------------+      +-------------+      +-------------+
89//!    | Reconstruct |      | Reconstruct |      | Reconstruct |
90//!    |    Block    |      |    Block    |      |    Block    |
91//!    +-------------+      +-------------+      +-------------+
92//! ```
93//!
94//! # Reconstruction State Machine
95//!
96//! For each [`Commitment`] with a known leader, nodes (both participants
97//! and non-participants) maintain a [`ReconstructionState`]. Before leader announcement, shards are buffered in
98//! bounded per-peer queues:
99//!
100//! ```text
101//!    +----------------------+
102//!    | AwaitingQuorum       |
103//!    | - leader known       |
104//!    | - buffer weak        |  <--- pre-leader buffered shards are ingested here
105//!    | - checking_data when |
106//!    |   strong verified    |
107//!    +----------------------+
108//!               |
109//!               | quorum met + batch validation passes
110//!               v
111//!    +----------------------+
112//!    | Ready                |
113//!    | - has checking_data  |
114//!    | - checked shards     |
115//!    | (frozen; no new weak |
116//!    |  shards accepted)    |
117//!    +----------------------+
118//!               |
119//!               | checked_shards.len() >= minimum_shards
120//!               v
121//!    +----------------------+
122//!    | Reconstruction        |
123//!    | Attempt               |
124//!    +----------------------+
125//!               |
126//!          +----+----+
127//!          |         |
128//!          v         v
129//!       Success    Failure
130//!          |         |
131//!          v         v
132//!       Cache      Remove
133//!       Block      State
134//! ```
135//!
136//! # Peer Validation and Blocking Rules
137//!
138//! The engine enforces strict validation to prevent Byzantine attacks:
139//!
140//! - All shards MUST be sent by participants in the current epoch.
141//! - Strong shards MUST correspond to the recipient's index for participants.
142//! - For non-participants, strong shards MUST correspond to the leader's index.
143//! - Weak shards MUST be sent by the participant whose index matches
144//!   the shard index.
145//! - All shards MUST pass cryptographic verification against the commitment.
146//! - Each participant may only contribute ONE weak shard per commitment.
147//! - Sending a second shard (strong or weak) with different data than the
148//!   first (equivocation) results in blocking. Exact duplicates are silently
149//!   ignored.
150//!
151//! Peers violating these rules are blocked via the [`Blocker`] trait.
152//! Validation and blocking rules are applied while a commitment is actively
153//! tracked in reconstruction state. Once a block is already reconstructed and
154//! cached, additional shards for that commitment are ignored.
155//!
156//! _Strong shards are only accepted from the leader. If the leader is not
157//! yet known, shards are buffered in fixed-size per-peer queues until consensus
158//! signals the leader via [`Discovered`]. Once leader is known, buffered
159//! shards for that commitment are ingested into the active state machine._
160//!
161//! [`Discovered`]: super::Message::Discovered
162
163use super::{
164    mailbox::{Mailbox, Message},
165    metrics::{Peer, ShardMetrics},
166};
167use crate::{
168    marshal::coding::{
169        types::{CodedBlock, DistributionShard, Shard},
170        validation::{validate_reconstruction, ReconstructionError as InvariantError},
171    },
172    types::{coding::Commitment, Epoch, Round},
173    Block, CertifiableBlock, Heightable,
174};
175use commonware_codec::{Decode, Error as CodecError, Read};
176use commonware_coding::{Config as CodingConfig, Scheme as CodingScheme};
177use commonware_cryptography::{
178    certificate::{Provider, Scheme as CertificateScheme},
179    Committable, Digestible, Hasher, PublicKey,
180};
181use commonware_macros::select_loop;
182use commonware_p2p::{
183    utils::codec::{WrappedBackgroundReceiver, WrappedSender},
184    Blocker, Provider as PeerProvider, Receiver, Recipients, Sender,
185};
186use commonware_parallel::Strategy;
187use commonware_runtime::{
188    spawn_cell,
189    telemetry::metrics::{histogram::HistogramExt, status::GaugeExt},
190    BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner,
191};
192use commonware_utils::{
193    bitmap::BitMap,
194    channel::{fallible::OneshotExt, mpsc, oneshot},
195    ordered::{Quorum, Set},
196    Participant,
197};
198use rand::Rng;
199use std::{
200    collections::{BTreeMap, VecDeque},
201    num::NonZeroUsize,
202    sync::Arc,
203};
204use thiserror::Error;
205use tracing::{debug, warn};
206
207/// An error that can occur during reconstruction of a [`CodedBlock`] from [`Shard`]s
208#[derive(Debug, Error)]
209pub enum Error<C: CodingScheme> {
210    /// An error occurred while recovering the encoded blob from the [`Shard`]s
211    #[error(transparent)]
212    Coding(C::Error),
213
214    /// An error occurred while decoding the reconstructed blob into a [`CodedBlock`]
215    #[error(transparent)]
216    Codec(#[from] CodecError),
217
218    /// The reconstructed block's digest does not match the commitment's block digest
219    #[error("block digest mismatch: reconstructed block does not match commitment digest")]
220    DigestMismatch,
221
222    /// The reconstructed block's config does not match the commitment's coding config
223    #[error("block config mismatch: reconstructed config does not match commitment config")]
224    ConfigMismatch,
225
226    /// The reconstructed block's embedded context does not match the commitment context digest
227    #[error("block context mismatch: reconstructed context does not match commitment context")]
228    ContextMismatch,
229}
230
231#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
232enum BlockSubscriptionKey<D> {
233    Commitment(Commitment),
234    Digest(D),
235}
236
237/// Configuration for the [`Engine`].
238pub struct Config<P, S, X, D, C, H, B, T>
239where
240    P: PublicKey,
241    S: Provider<Scope = Epoch>,
242    X: Blocker<PublicKey = P>,
243    D: PeerProvider<PublicKey = P>,
244    C: CodingScheme,
245    H: Hasher,
246    B: CertifiableBlock,
247    T: Strategy,
248{
249    /// The scheme provider.
250    pub scheme_provider: S,
251
252    /// The peer blocker.
253    pub blocker: X,
254
255    /// [`Read`] configuration for decoding [`Shard`]s.
256    pub shard_codec_cfg: <Shard<C, H> as Read>::Cfg,
257
258    /// [`commonware_codec::Read`] configuration for decoding blocks.
259    pub block_codec_cfg: B::Cfg,
260
261    /// The strategy used for parallel computation.
262    pub strategy: T,
263
264    /// The size of the mailbox buffer.
265    pub mailbox_size: usize,
266
267    /// Number of shards to buffer per peer.
268    ///
269    /// Shards for commitments without a reconstruction state are buffered per
270    /// peer in a fixed-size ring to bound memory under Byzantine spam. These
271    /// shards are only ingested when consensus provides a leader via
272    /// [`Discovered`](super::Message::Discovered).
273    ///
274    /// The worst-case total memory usage for the set of shard buffers is
275    /// `num_participants * peer_buffer_size * max_shard_size`.
276    pub peer_buffer_size: NonZeroUsize,
277
278    /// Capacity of the channel between the background receiver and the engine.
279    ///
280    /// The background receiver decodes incoming network messages in a separate
281    /// task and forwards them to the engine over an `mpsc` channel with this
282    /// capacity.
283    pub background_channel_capacity: usize,
284
285    /// Provider for peer set information. Per-peer shard buffers
286    /// are freed when a peer leaves all tracked peer sets.
287    pub peer_provider: D,
288}
289
290/// A network layer for broadcasting and receiving [`CodedBlock`]s as [`Shard`]s.
291///
292/// When enough [`Shard`]s are present in the mailbox, the [`Engine`] may facilitate
293/// reconstruction of the original [`CodedBlock`] and notify any subscribers waiting for it.
294pub struct Engine<E, S, X, D, C, H, B, P, T>
295where
296    E: BufferPooler + Rng + Spawner + Metrics + Clock,
297    S: Provider<Scope = Epoch>,
298    S::Scheme: CertificateScheme<PublicKey = P>,
299    X: Blocker,
300    D: PeerProvider<PublicKey = P>,
301    C: CodingScheme,
302    H: Hasher,
303    B: CertifiableBlock,
304    P: PublicKey,
305    T: Strategy,
306{
307    /// Context held by the actor.
308    context: ContextCell<E>,
309
310    /// Receiver for incoming messages to the actor.
311    mailbox: mpsc::Receiver<Message<B, C, H, P>>,
312
313    /// The scheme provider.
314    scheme_provider: S,
315
316    /// The peer blocker.
317    blocker: X,
318
319    /// [`Read`] configuration for decoding [`Shard`]s.
320    shard_codec_cfg: <Shard<C, H> as Read>::Cfg,
321
322    /// [`Read`] configuration for decoding [`CodedBlock`]s.
323    block_codec_cfg: B::Cfg,
324
325    /// The strategy used for parallel shard verification.
326    strategy: T,
327
328    /// A map of [`Commitment`]s to [`ReconstructionState`]s.
329    state: BTreeMap<Commitment, ReconstructionState<P, C, H>>,
330
331    /// Per-peer ring buffers for shards received before leader announcement.
332    peer_buffers: BTreeMap<P, VecDeque<Shard<C, H>>>,
333
334    /// Maximum buffered pre-leader shards per peer.
335    peer_buffer_size: NonZeroUsize,
336
337    /// Provider for peer set information.
338    peer_provider: D,
339
340    /// Latest union of tracked peers from the peer set subscription.
341    tracked_peers: Set<P>,
342
343    /// Capacity of the background receiver channel.
344    background_channel_capacity: usize,
345
346    /// An ephemeral cache of reconstructed blocks, keyed by commitment.
347    ///
348    /// These blocks are evicted after a durability signal from the marshal.
349    /// Wrapped in [`Arc`] to enable cheap cloning when serving multiple subscribers.
350    reconstructed_blocks: BTreeMap<Commitment, Arc<CodedBlock<B, C, H>>>,
351
352    /// Open subscriptions for the receipt of our valid shard corresponding
353    /// to the keyed [`Commitment`] from the leader.
354    shard_subscriptions: BTreeMap<Commitment, Vec<oneshot::Sender<()>>>,
355
356    /// Open subscriptions for the reconstruction of a [`CodedBlock`] with
357    /// the keyed [`Commitment`].
358    #[allow(clippy::type_complexity)]
359    block_subscriptions:
360        BTreeMap<BlockSubscriptionKey<B::Digest>, Vec<oneshot::Sender<Arc<CodedBlock<B, C, H>>>>>,
361
362    /// Metrics for the shard engine.
363    metrics: ShardMetrics,
364}
365
366impl<E, S, X, D, C, H, B, P, T> Engine<E, S, X, D, C, H, B, P, T>
367where
368    E: BufferPooler + Rng + Spawner + Metrics + Clock,
369    S: Provider<Scope = Epoch>,
370    S::Scheme: CertificateScheme<PublicKey = P>,
371    X: Blocker<PublicKey = P>,
372    D: PeerProvider<PublicKey = P>,
373    C: CodingScheme,
374    H: Hasher,
375    B: CertifiableBlock,
376    P: PublicKey,
377    T: Strategy,
378{
379    /// Create a new [`Engine`] with the given configuration.
380    pub fn new(context: E, config: Config<P, S, X, D, C, H, B, T>) -> (Self, Mailbox<B, C, H, P>) {
381        let metrics = ShardMetrics::new(&context);
382        let (sender, mailbox) = mpsc::channel(config.mailbox_size);
383        (
384            Self {
385                context: ContextCell::new(context),
386                mailbox,
387                scheme_provider: config.scheme_provider,
388                blocker: config.blocker,
389                shard_codec_cfg: config.shard_codec_cfg,
390                block_codec_cfg: config.block_codec_cfg,
391                strategy: config.strategy,
392                state: BTreeMap::new(),
393                peer_buffers: BTreeMap::new(),
394                peer_buffer_size: config.peer_buffer_size,
395                peer_provider: config.peer_provider,
396                tracked_peers: Set::default(),
397                background_channel_capacity: config.background_channel_capacity,
398                reconstructed_blocks: BTreeMap::new(),
399                shard_subscriptions: BTreeMap::new(),
400                block_subscriptions: BTreeMap::new(),
401                metrics,
402            },
403            Mailbox::new(sender),
404        )
405    }
406
407    /// Start the engine.
408    pub fn start(
409        mut self,
410        network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
411    ) -> Handle<()> {
412        spawn_cell!(self.context, self.run(network).await)
413    }
414
415    /// Run the shard engine's event loop.
416    async fn run(
417        mut self,
418        (sender, receiver): (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
419    ) {
420        let mut sender = WrappedSender::<_, Shard<C, H>>::new(
421            self.context.network_buffer_pool().clone(),
422            sender,
423        );
424        let (receiver_service, mut receiver): (_, mpsc::Receiver<(P, Shard<C, H>)>) =
425            WrappedBackgroundReceiver::new(
426                self.context.with_label("shard_ingress"),
427                receiver,
428                self.shard_codec_cfg.clone(),
429                self.blocker.clone(),
430                self.background_channel_capacity,
431                &self.strategy,
432            );
433        // Keep the handle alive to prevent the background receiver from being aborted.
434        let _receiver_handle = receiver_service.start();
435        let mut peer_set_subscription = self.peer_provider.subscribe().await;
436
437        select_loop! {
438            self.context,
439            on_start => {
440                let _ = self
441                    .metrics
442                    .reconstruction_states_count
443                    .try_set(self.state.len());
444                let _ = self
445                    .metrics
446                    .reconstructed_blocks_cache_count
447                    .try_set(self.reconstructed_blocks.len());
448
449                // Clean up closed subscriptions.
450                self.block_subscriptions.retain(|_, subscribers| {
451                    subscribers.retain(|tx| !tx.is_closed());
452                    !subscribers.is_empty()
453                });
454                self.shard_subscriptions.retain(|_, subscribers| {
455                    subscribers.retain(|tx| !tx.is_closed());
456                    !subscribers.is_empty()
457                });
458            },
459            on_stopped => {
460                debug!("received shutdown signal, stopping shard engine");
461            },
462            Some((_, _, tracked_peers)) = peer_set_subscription.recv() else {
463                debug!("peer set subscription closed");
464                return;
465            } => {
466                self.peer_buffers
467                    .retain(|peer, _| tracked_peers.as_ref().contains(peer));
468                self.tracked_peers = tracked_peers;
469            },
470            Some(message) = self.mailbox.recv() else {
471                debug!("shard mailbox closed, stopping shard engine");
472                return;
473            } => match message {
474                Message::Proposed { block, round } => {
475                    self.broadcast_shards(&mut sender, round, block).await;
476                }
477                Message::Discovered {
478                    commitment,
479                    leader,
480                    round,
481                } => {
482                    self.handle_external_proposal(&mut sender, commitment, leader, round)
483                        .await;
484                }
485                Message::GetByCommitment {
486                    commitment,
487                    response,
488                } => {
489                    let block = self.reconstructed_blocks.get(&commitment).cloned();
490                    response.send_lossy(block);
491                }
492                Message::GetByDigest { digest, response } => {
493                    let block = self
494                        .reconstructed_blocks
495                        .iter()
496                        .find_map(|(_, b)| (b.digest() == digest).then_some(b))
497                        .cloned();
498                    response.send_lossy(block);
499                }
500                Message::SubscribeShard {
501                    commitment,
502                    response,
503                } => {
504                    self.handle_shard_subscription(commitment, response);
505                }
506                Message::SubscribeByCommitment {
507                    commitment,
508                    response,
509                } => {
510                    self.handle_block_subscription(
511                        BlockSubscriptionKey::Commitment(commitment),
512                        response,
513                    );
514                }
515                Message::SubscribeByDigest { digest, response } => {
516                    self.handle_block_subscription(BlockSubscriptionKey::Digest(digest), response);
517                }
518                Message::Prune { through } => {
519                    self.prune(through);
520                }
521            },
522            Some((peer, shard)) = receiver.recv() else {
523                debug!("receiver closed, stopping shard engine");
524                return;
525            } => {
526                // Track shard receipt per peer.
527                self.metrics
528                    .shards_received
529                    .get_or_create(&Peer::new(&peer))
530                    .inc();
531
532                let commitment = shard.commitment();
533                if self.reconstructed_blocks.contains_key(&commitment) {
534                    continue;
535                }
536
537                if let Some(state) = self.state.get_mut(&commitment) {
538                    let round = state.round();
539                    let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
540                        warn!(%commitment, "no scheme for epoch, ignoring shard");
541                        continue;
542                    };
543                    let progressed = state
544                        .on_network_shard(
545                            peer,
546                            shard,
547                            InsertCtx::new(scheme.as_ref(), &self.strategy),
548                            &mut self.blocker,
549                        )
550                        .await;
551                    if progressed {
552                        self.try_advance(&mut sender, commitment).await;
553                    }
554                } else {
555                    self.buffer_peer_shard(peer, shard);
556                }
557            },
558        }
559    }
560
561    /// Attempts to reconstruct a [`CodedBlock`] from the checked [`Shard`]s present in the
562    /// [`ReconstructionState`].
563    ///
564    /// # Returns
565    /// - `Ok(Some(block))` if reconstruction was successful or the block was already reconstructed.
566    /// - `Ok(None)` if reconstruction could not be attempted due to insufficient checked shards.
567    /// - `Err(_)` if reconstruction was attempted but failed.
568    #[allow(clippy::type_complexity)]
569    fn try_reconstruct(
570        &mut self,
571        commitment: Commitment,
572    ) -> Result<Option<Arc<CodedBlock<B, C, H>>>, Error<C>> {
573        if let Some(block) = self.reconstructed_blocks.get(&commitment) {
574            return Ok(Some(Arc::clone(block)));
575        }
576        let Some(state) = self.state.get(&commitment) else {
577            return Ok(None);
578        };
579        if state.checked_shards().len() < usize::from(commitment.config().minimum_shards.get()) {
580            debug!(%commitment, "not enough checked shards to reconstruct block");
581            return Ok(None);
582        }
583        let checking_data = state
584            .checking_data()
585            .expect("checking data must be present");
586
587        // Attempt to reconstruct the encoded blob
588        let start = self.context.current();
589        let blob = C::decode(
590            &commitment.config(),
591            &commitment.root(),
592            checking_data.clone(),
593            state.checked_shards(),
594            &self.strategy,
595        )
596        .map_err(Error::Coding)?;
597        self.metrics
598            .erasure_decode_duration
599            .observe_between(start, self.context.current());
600
601        // Attempt to decode the block from the encoded blob
602        let (inner, config): (B, CodingConfig) =
603            Decode::decode_cfg(&mut blob.as_slice(), &(self.block_codec_cfg.clone(), ()))?;
604
605        match validate_reconstruction::<H, _>(&inner, config, commitment) {
606            Ok(()) => {}
607            Err(InvariantError::BlockDigest) => {
608                return Err(Error::DigestMismatch);
609            }
610            Err(InvariantError::CodingConfig) => {
611                warn!(
612                    %commitment,
613                    expected_config = ?commitment.config(),
614                    actual_config = ?config,
615                    "reconstructed block config does not match commitment config, but digest matches"
616                );
617                return Err(Error::ConfigMismatch);
618            }
619            Err(InvariantError::ContextDigest(expected, actual)) => {
620                warn!(
621                    %commitment,
622                    expected_context_digest = ?expected,
623                    actual_context_digest = ?actual,
624                    "reconstructed block context digest does not match commitment context digest"
625                );
626                return Err(Error::ContextMismatch);
627            }
628        }
629
630        // Construct a coding block with a _trusted_ commitment. `S::decode` verified the blob's
631        // integrity against the commitment, so shards can be lazily re-constructed if need be.
632        let block = Arc::new(CodedBlock::new_trusted(inner, commitment));
633        self.cache_block(Arc::clone(&block));
634        self.metrics.blocks_reconstructed_total.inc();
635        Ok(Some(block))
636    }
637
638    /// Handles leader announcements for a commitment and advances reconstruction.
639    async fn handle_external_proposal<Sr: Sender<PublicKey = P>>(
640        &mut self,
641        sender: &mut WrappedSender<Sr, Shard<C, H>>,
642        commitment: Commitment,
643        leader: P,
644        round: Round,
645    ) {
646        if self.reconstructed_blocks.contains_key(&commitment) {
647            return;
648        }
649        let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
650            warn!(%commitment, "no scheme for epoch, ignoring external proposal");
651            return;
652        };
653        let participants = scheme.participants();
654        if participants.index(&leader).is_none() {
655            warn!(?leader, %commitment, "leader update for non-participant, ignoring");
656            return;
657        }
658        if let Some(state) = self.state.get(&commitment) {
659            if state.leader() != &leader {
660                warn!(
661                    existing = ?state.leader(),
662                    ?leader,
663                    %commitment,
664                    "conflicting leader update, ignoring"
665                );
666            }
667            return;
668        }
669
670        let participants_len =
671            u64::try_from(participants.len()).expect("participant count impossibly out of bounds");
672        self.state.insert(
673            commitment,
674            ReconstructionState::new(leader, round, participants_len),
675        );
676        let buffered_progress = self.ingest_buffered_shards(commitment).await;
677        if buffered_progress {
678            self.try_advance(sender, commitment).await;
679        }
680    }
681
682    /// Buffer a shard from a peer until a leader is known.
683    fn buffer_peer_shard(&mut self, peer: P, shard: Shard<C, H>) {
684        let queue = self.peer_buffers.entry(peer).or_default();
685        if queue.len() >= self.peer_buffer_size.get() {
686            let _ = queue.pop_front();
687        }
688        queue.push_back(shard);
689    }
690
691    /// Ingest buffered pre-leader shards for a commitment into active state.
692    async fn ingest_buffered_shards(&mut self, commitment: Commitment) -> bool {
693        let mut buffered_weak = Vec::new();
694        let mut buffered_strong = Vec::new();
695        for (peer, queue) in self.peer_buffers.iter_mut() {
696            let mut i = 0;
697            while i < queue.len() {
698                if queue[i].commitment() != commitment {
699                    i += 1;
700                    continue;
701                }
702                let shard = queue.swap_remove_back(i).expect("index is valid");
703                if shard.is_strong() {
704                    buffered_strong.push((peer.clone(), shard));
705                } else {
706                    buffered_weak.push((peer.clone(), shard));
707                }
708            }
709        }
710        self.peer_buffers.retain(|_, queue| !queue.is_empty());
711
712        let Some(state) = self.state.get_mut(&commitment) else {
713            return false;
714        };
715        let round = state.round();
716        let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
717            warn!(%commitment, "no scheme for epoch, dropping buffered shards");
718            return false;
719        };
720
721        // Ingest weak shards first so they populate pending_weak_shards before
722        // the strong shard sets checking_data and triggers batch validation.
723        let mut progressed = false;
724        let ctx = InsertCtx::new(scheme.as_ref(), &self.strategy);
725        for (peer, shard) in buffered_weak.into_iter().chain(buffered_strong) {
726            progressed |= state
727                .on_network_shard(peer, shard, ctx, &mut self.blocker)
728                .await;
729        }
730        progressed
731    }
732
733    /// Cache a block and notify block subscribers waiting on it.
734    fn cache_block(&mut self, block: Arc<CodedBlock<B, C, H>>) {
735        let commitment = block.commitment();
736        self.reconstructed_blocks
737            .insert(commitment, Arc::clone(&block));
738        self.notify_block_subscribers(block);
739    }
740
741    /// Broadcasts the shards of a [`CodedBlock`] and caches the block.
742    ///
743    /// - Participants receive the strong shard matching their participant index.
744    /// - Tracked non-participants receive the leader's strong shard.
745    async fn broadcast_shards<Sr: Sender<PublicKey = P>>(
746        &mut self,
747        sender: &mut WrappedSender<Sr, Shard<C, H>>,
748        round: Round,
749        mut block: CodedBlock<B, C, H>,
750    ) {
751        let commitment = block.commitment();
752
753        let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
754            warn!(%commitment, "no scheme available, cannot broadcast shards");
755            return;
756        };
757        let participants = scheme.participants();
758        let Some(me) = scheme.me() else {
759            warn!(
760                %commitment,
761                "cannot broadcast shards: local proposer is not a participant"
762            );
763            return;
764        };
765
766        let shard_count = block.shards(&self.strategy).len();
767        if shard_count != participants.len() {
768            warn!(
769                %commitment,
770                shard_count,
771                participants = participants.len(),
772                "cannot broadcast shards: participant/shard count mismatch"
773            );
774            return;
775        }
776
777        let my_index = me.get() as usize;
778        let leader_shard = block
779            .shard(my_index as u16)
780            .expect("proposer's shard must exist");
781
782        // Broadcast each participant their corresponding shard.
783        for (index, peer) in participants.iter().enumerate() {
784            if index == my_index {
785                continue;
786            }
787
788            let Some(shard) = block.shard(index as u16) else {
789                warn!(
790                    %commitment,
791                    index,
792                    "cannot broadcast shards: missing shard for participant index"
793                );
794                return;
795            };
796            let _ = sender
797                .send(Recipients::One(peer.clone()), shard, true)
798                .await;
799        }
800
801        // Send the leader's strong shard to tracked peers outside the participant set.
802        let non_participants: Vec<P> = self
803            .tracked_peers
804            .iter()
805            .filter(|peer| participants.index(peer).is_none())
806            .cloned()
807            .collect();
808        if !non_participants.is_empty() {
809            let _ = sender
810                .send(Recipients::Some(non_participants), leader_shard, true)
811                .await;
812        }
813
814        // Cache the block so we don't have to reconstruct it again.
815        let block = Arc::new(block);
816        self.cache_block(block);
817
818        // Local proposals bypass reconstruction, so shard subscribers waiting
819        // for "our valid shard arrived" still need a notification.
820        self.notify_shard_subscribers(commitment);
821
822        debug!(?commitment, "broadcasted shards");
823    }
824
825    /// Broadcasts a [`Shard`] to all participants.
826    async fn broadcast_weak_shard<Sr: Sender<PublicKey = P>>(
827        &mut self,
828        sender: &mut WrappedSender<Sr, Shard<C, H>>,
829        shard: Shard<C, H>,
830    ) {
831        let commitment = shard.commitment();
832        if let Ok(peers) = sender.send(Recipients::All, shard, true).await {
833            debug!(
834                ?commitment,
835                peers = peers.len(),
836                "broadcasted shard to all participants"
837            );
838        }
839    }
840
841    /// Broadcasts any pending weak shard for the given commitment and attempts
842    /// reconstruction. If reconstruction succeeds or fails, the state is cleaned
843    /// up and subscribers are notified.
844    async fn try_advance<Sr: Sender<PublicKey = P>>(
845        &mut self,
846        sender: &mut WrappedSender<Sr, Shard<C, H>>,
847        commitment: Commitment,
848    ) {
849        if let Some(state) = self.state.get_mut(&commitment) {
850            match state.take_pending_action() {
851                Some(ValidatedShardAction::Broadcast(shard)) => {
852                    self.broadcast_weak_shard(sender, shard).await;
853                    self.notify_shard_subscribers(commitment);
854                }
855                Some(ValidatedShardAction::NotifyOnly) => {
856                    self.notify_shard_subscribers(commitment);
857                }
858                None => {}
859            }
860        }
861
862        match self.try_reconstruct(commitment) {
863            Ok(Some(block)) => {
864                // Do not prune other reconstruction state here. A Byzantine
865                // leader can equivocate by proposing multiple commitments in
866                // the same round, so more than one block may be reconstructed
867                // for a given round. Pruning is deferred to `prune()`, which
868                // is called once a commitment is finalized.
869                debug!(
870                    %commitment,
871                    parent = %block.parent(),
872                    height = %block.height(),
873                    "successfully reconstructed block from shards"
874                );
875            }
876            Ok(None) => {
877                debug!(%commitment, "not enough checked shards to reconstruct block");
878            }
879            Err(err) => {
880                warn!(%commitment, ?err, "failed to reconstruct block from checked shards");
881                self.state.remove(&commitment);
882                self.drop_subscriptions(commitment);
883                self.metrics.reconstruction_failures_total.inc();
884            }
885        }
886    }
887
888    /// Handles the registry of a shard subscription.
889    fn handle_shard_subscription(&mut self, commitment: Commitment, response: oneshot::Sender<()>) {
890        // Answer immediately if we have our shard or the block has already
891        // been reconstructed (implies that our shard arrived and was verified).
892        let has_shard = self
893            .state
894            .get(&commitment)
895            .is_some_and(|state| state.checking_data().is_some());
896        let block_reconstructed = self.reconstructed_blocks.contains_key(&commitment);
897        if has_shard || block_reconstructed {
898            response.send_lossy(());
899            return;
900        }
901
902        self.shard_subscriptions
903            .entry(commitment)
904            .or_default()
905            .push(response);
906    }
907
908    /// Handles the registry of a block subscription.
909    fn handle_block_subscription(
910        &mut self,
911        key: BlockSubscriptionKey<B::Digest>,
912        response: oneshot::Sender<Arc<CodedBlock<B, C, H>>>,
913    ) {
914        let block = match key {
915            BlockSubscriptionKey::Commitment(commitment) => {
916                self.reconstructed_blocks.get(&commitment)
917            }
918            BlockSubscriptionKey::Digest(digest) => self
919                .reconstructed_blocks
920                .iter()
921                .find_map(|(_, block)| (block.digest() == digest).then_some(block)),
922        };
923
924        // Answer immediately if we have the block cached.
925        if let Some(block) = block {
926            response.send_lossy(Arc::clone(block));
927            return;
928        }
929
930        self.block_subscriptions
931            .entry(key)
932            .or_default()
933            .push(response);
934    }
935
936    /// Notifies and cleans up any subscriptions for a valid shard.
937    fn notify_shard_subscribers(&mut self, commitment: Commitment) {
938        if let Some(mut subscribers) = self.shard_subscriptions.remove(&commitment) {
939            for subscriber in subscribers.drain(..) {
940                subscriber.send_lossy(());
941            }
942        }
943    }
944
945    /// Notifies and cleans up any subscriptions for a reconstructed block.
946    fn notify_block_subscribers(&mut self, block: Arc<CodedBlock<B, C, H>>) {
947        let commitment = block.commitment();
948        let digest = block.digest();
949
950        // Notify by-commitment subscribers.
951        if let Some(mut subscribers) = self
952            .block_subscriptions
953            .remove(&BlockSubscriptionKey::Commitment(commitment))
954        {
955            for subscriber in subscribers.drain(..) {
956                subscriber.send_lossy(Arc::clone(&block));
957            }
958        }
959
960        // Notify by-digest subscribers.
961        if let Some(mut subscribers) = self
962            .block_subscriptions
963            .remove(&BlockSubscriptionKey::Digest(digest))
964        {
965            for subscriber in subscribers.drain(..) {
966                subscriber.send_lossy(Arc::clone(&block));
967            }
968        }
969    }
970
971    /// Drops all subscriptions associated with a commitment.
972    ///
973    /// Removing these entries drops all senders, causing receivers to resolve
974    /// with cancellation (`RecvError`) instead of hanging indefinitely.
975    fn drop_subscriptions(&mut self, commitment: Commitment) {
976        self.shard_subscriptions.remove(&commitment);
977        self.block_subscriptions
978            .remove(&BlockSubscriptionKey::Commitment(commitment));
979        self.block_subscriptions
980            .remove(&BlockSubscriptionKey::Digest(
981                commitment.block::<B::Digest>(),
982            ));
983    }
984
985    /// Prunes all blocks in the reconstructed block cache that are older than the block
986    /// with the given commitment. Also cleans up stale reconstruction state
987    /// and subscriptions.
988    ///
989    /// This is the only place reconstruction state is pruned by round. We
990    /// intentionally avoid pruning on reconstruction success because a
991    /// Byzantine leader can equivocate, producing multiple valid commitments
992    /// in the same round. Both must remain recoverable until finalization
993    /// determines which one is canonical.
994    fn prune(&mut self, through: Commitment) {
995        if let Some(height) = self.reconstructed_blocks.get(&through).map(|b| b.height()) {
996            self.reconstructed_blocks
997                .retain(|_, block| block.height() > height);
998        }
999
1000        // Always clear direct state/subscriptions for the pruned commitment.
1001        // This avoids dangling waiters when prune is called for a commitment
1002        // that was never reconstructed locally.
1003        self.drop_subscriptions(through);
1004        let Some(round) = self.state.remove(&through).map(|state| state.round()) else {
1005            return;
1006        };
1007
1008        let mut pruned_commitments = Vec::new();
1009        self.state.retain(|c, s| {
1010            let keep = s.round() > round;
1011            if !keep {
1012                pruned_commitments.push(*c);
1013            }
1014            keep
1015        });
1016        for pruned in pruned_commitments {
1017            self.drop_subscriptions(pruned);
1018        }
1019    }
1020}
1021
1022/// Erasure coded block reconstruction state machine.
1023enum ReconstructionState<P, C, H>
1024where
1025    P: PublicKey,
1026    C: CodingScheme,
1027    H: Hasher,
1028{
1029    /// Stage 1: leader known; buffer weak shards and optionally hold checking
1030    /// data from a verified strong shard. Transitions to `Ready` when quorum
1031    /// is met and batch validation succeeds.
1032    AwaitingQuorum(AwaitingQuorumState<P, C, H>),
1033    /// Stage 2: batch validation passed; checked shards are available for
1034    /// reconstruction.
1035    Ready(ReadyState<P, C, H>),
1036}
1037
1038/// Action to take with a validated weak shard after strong shard verification.
1039///
1040/// Participants broadcast their weak shard to all peers, while non-participants
1041/// only need to notify local subscribers (they don't gossip).
1042enum ValidatedShardAction<C: CodingScheme, H: Hasher> {
1043    /// Broadcast the weak shard to all peers and notify local subscribers.
1044    Broadcast(Shard<C, H>),
1045    /// Only notify local subscribers (non-participant validated a strong shard).
1046    NotifyOnly,
1047}
1048
1049/// State shared across all reconstruction phases.
1050struct CommonState<P, C, H>
1051where
1052    P: PublicKey,
1053    C: CodingScheme,
1054    H: Hasher,
1055{
1056    /// The leader associated with this reconstruction state.
1057    leader: P,
1058    /// Our validated weak shard and the action to take with it.
1059    pending_action: Option<ValidatedShardAction<C, H>>,
1060    /// Shards that have been verified and are ready to contribute to reconstruction.
1061    checked_shards: Vec<C::CheckedShard>,
1062    /// Bitmap tracking which participant indices have contributed a valid shard.
1063    contributed: BitMap,
1064    /// The round for which this commitment was externally proposed.
1065    round: Round,
1066    /// The strong shard data received from the leader, retained for equivocation detection.
1067    received_strong: Option<C::StrongShard>,
1068}
1069
1070/// Phase data for `ReconstructionState::AwaitingQuorum`.
1071///
1072/// In this phase, the leader is known. Weak shards are buffered until enough
1073/// shards (strong + pending weak) are available to attempt batch validation.
1074/// `checking_data` is populated once the leader's strong shard is verified via
1075/// `C::weaken`.
1076struct AwaitingQuorumState<P, C, H>
1077where
1078    P: PublicKey,
1079    C: CodingScheme,
1080    H: Hasher,
1081{
1082    common: CommonState<P, C, H>,
1083    pending_weak_shards: BTreeMap<P, WeakShard<C>>,
1084    checking_data: Option<C::CheckingData>,
1085}
1086
1087/// Phase data for `ReconstructionState::Ready`.
1088///
1089/// Batch validation has passed. Checked shards are available for
1090/// reconstruction.
1091struct ReadyState<P, C, H>
1092where
1093    P: PublicKey,
1094    C: CodingScheme,
1095    H: Hasher,
1096{
1097    common: CommonState<P, C, H>,
1098    checking_data: C::CheckingData,
1099}
1100
1101/// Parsed strong shard payload used by internal state-machine handlers.
1102struct StrongShard<C>
1103where
1104    C: CodingScheme,
1105{
1106    commitment: Commitment,
1107    index: u16,
1108    data: C::StrongShard,
1109}
1110
1111/// Parsed weak shard payload used by internal state-machine handlers.
1112struct WeakShard<C>
1113where
1114    C: CodingScheme,
1115{
1116    index: u16,
1117    data: C::WeakShard,
1118}
1119
1120impl<P, C, H> CommonState<P, C, H>
1121where
1122    P: PublicKey,
1123    C: CodingScheme,
1124    H: Hasher,
1125{
1126    /// Create a new empty common state for the provided leader and round.
1127    fn new(leader: P, round: Round, participants_len: u64) -> Self {
1128        Self {
1129            leader,
1130            pending_action: None,
1131            checked_shards: Vec::new(),
1132            contributed: BitMap::zeroes(participants_len),
1133            round,
1134            received_strong: None,
1135        }
1136    }
1137}
1138
1139impl<P, C, H> AwaitingQuorumState<P, C, H>
1140where
1141    P: PublicKey,
1142    C: CodingScheme,
1143    H: Hasher,
1144{
1145    /// Verify the leader's strong shard and store checking data.
1146    ///
1147    /// When `is_participant` is true, the validated weak shard is stored for
1148    /// broadcasting to peers. When false (non-participant), only subscriber
1149    /// notification is scheduled.
1150    ///
1151    /// Returns `false` if verification fails (sender is blocked), `true` on
1152    /// success. Does not transition state; the caller should invoke
1153    /// `try_transition` after this returns `true`.
1154    async fn verify_strong_shard(
1155        &mut self,
1156        sender: P,
1157        shard: StrongShard<C>,
1158        is_participant: bool,
1159        blocker: &mut impl Blocker<PublicKey = P>,
1160    ) -> bool {
1161        let StrongShard {
1162            commitment,
1163            index,
1164            data,
1165        } = shard;
1166        let received_strong = data.clone();
1167        let Ok((checking_data, checked, weak_shard_data)) =
1168            C::weaken(&commitment.config(), &commitment.root(), index, data)
1169        else {
1170            commonware_p2p::block!(blocker, sender, "invalid strong shard received");
1171            return false;
1172        };
1173
1174        // Only persist the strong shard (for later equivocation detection) after
1175        // it has passed `C::weaken` verification.
1176        self.common.received_strong = Some(received_strong);
1177        self.common.contributed.set(u64::from(index), true);
1178        self.common.checked_shards.push(checked);
1179        self.common.pending_action = Some(if is_participant {
1180            ValidatedShardAction::Broadcast(Shard::new(
1181                commitment,
1182                index,
1183                DistributionShard::Weak(weak_shard_data),
1184            ))
1185        } else {
1186            ValidatedShardAction::NotifyOnly
1187        });
1188        self.checking_data = Some(checking_data);
1189        true
1190    }
1191
1192    /// Check whether quorum is met and, if so, batch-validate all pending weak
1193    /// shards in parallel. Returns `Some(ReadyState)` on successful transition.
1194    async fn try_transition(
1195        &mut self,
1196        commitment: Commitment,
1197        participants_len: u64,
1198        strategy: &impl Strategy,
1199        blocker: &mut impl Blocker<PublicKey = P>,
1200    ) -> Option<ReadyState<P, C, H>> {
1201        self.checking_data.as_ref()?;
1202        let minimum = usize::from(commitment.config().minimum_shards.get());
1203        if self.common.checked_shards.len() + self.pending_weak_shards.len() < minimum {
1204            return None;
1205        }
1206
1207        // Batch-validate all pending weak shards in parallel.
1208        let pending = std::mem::take(&mut self.pending_weak_shards);
1209        let checking_data = self.checking_data.as_ref().unwrap();
1210        let (new_checked, to_block) =
1211            strategy.map_partition_collect_vec(pending, |(peer, shard)| {
1212                let checked = C::check(
1213                    &commitment.config(),
1214                    &commitment.root(),
1215                    checking_data,
1216                    shard.index,
1217                    shard.data,
1218                );
1219                (peer, checked.ok())
1220            });
1221
1222        for peer in to_block {
1223            commonware_p2p::block!(blocker, peer, "invalid shard received");
1224        }
1225        self.common.checked_shards.extend(new_checked);
1226
1227        // After validation, some may have failed; recheck threshold.
1228        if self.common.checked_shards.len() < minimum {
1229            return None;
1230        }
1231
1232        // Transition to Ready.
1233        let checking_data = self.checking_data.take().unwrap();
1234        let round = self.common.round;
1235        let leader = self.common.leader.clone();
1236        let common = std::mem::replace(
1237            &mut self.common,
1238            CommonState::new(leader, round, participants_len),
1239        );
1240        Some(ReadyState {
1241            common,
1242            checking_data,
1243        })
1244    }
1245}
1246
1247/// Context required for processing incoming network shards.
1248struct InsertCtx<'a, Sch, S>
1249where
1250    Sch: CertificateScheme,
1251    S: Strategy,
1252{
1253    scheme: &'a Sch,
1254    strategy: &'a S,
1255    participants_len: u64,
1256}
1257
1258impl<Sch: CertificateScheme, S: Strategy> Clone for InsertCtx<'_, Sch, S> {
1259    fn clone(&self) -> Self {
1260        *self
1261    }
1262}
1263
1264impl<Sch: CertificateScheme, S: Strategy> Copy for InsertCtx<'_, Sch, S> {}
1265
1266impl<'a, Sch: CertificateScheme, S: Strategy> InsertCtx<'a, Sch, S> {
1267    fn new(scheme: &'a Sch, strategy: &'a S) -> Self {
1268        let participants_len = u64::try_from(scheme.participants().len())
1269            .expect("participant count impossibly out of bounds");
1270        Self {
1271            scheme,
1272            strategy,
1273            participants_len,
1274        }
1275    }
1276}
1277
1278impl<P, C, H> ReconstructionState<P, C, H>
1279where
1280    P: PublicKey,
1281    C: CodingScheme,
1282    H: Hasher,
1283{
1284    /// Create an initial reconstruction state for a commitment.
1285    fn new(leader: P, round: Round, participants_len: u64) -> Self {
1286        Self::AwaitingQuorum(AwaitingQuorumState {
1287            common: CommonState::new(leader, round, participants_len),
1288            pending_weak_shards: BTreeMap::new(),
1289            checking_data: None,
1290        })
1291    }
1292
1293    /// Access common state shared across all phases.
1294    const fn common(&self) -> &CommonState<P, C, H> {
1295        match self {
1296            Self::AwaitingQuorum(state) => &state.common,
1297            Self::Ready(state) => &state.common,
1298        }
1299    }
1300
1301    /// Mutably access common state shared across all phases.
1302    const fn common_mut(&mut self) -> &mut CommonState<P, C, H> {
1303        match self {
1304            Self::AwaitingQuorum(state) => &mut state.common,
1305            Self::Ready(state) => &mut state.common,
1306        }
1307    }
1308
1309    /// Return the leader associated with this state.
1310    const fn leader(&self) -> &P {
1311        &self.common().leader
1312    }
1313
1314    /// Returns checking data when available.
1315    ///
1316    /// In `AwaitingQuorum`, this is `Some` once the leader's strong shard has
1317    /// been verified. In `Ready`, it is always `Some`.
1318    const fn checking_data(&self) -> Option<&C::CheckingData> {
1319        match self {
1320            Self::AwaitingQuorum(state) => state.checking_data.as_ref(),
1321            Self::Ready(state) => Some(&state.checking_data),
1322        }
1323    }
1324
1325    /// Return the proposal round associated with this state.
1326    const fn round(&self) -> Round {
1327        self.common().round
1328    }
1329
1330    /// Returns all verified shards accumulated for reconstruction.
1331    ///
1332    /// This slice grows as valid strong/weak shards are accepted.
1333    const fn checked_shards(&self) -> &[C::CheckedShard] {
1334        self.common().checked_shards.as_slice()
1335    }
1336
1337    /// Takes the pending action for this commitment's validated weak shard.
1338    ///
1339    /// Returns [`None`] if the strong shard hasn't been validated yet.
1340    const fn take_pending_action(&mut self) -> Option<ValidatedShardAction<C, H>> {
1341        self.common_mut().pending_action.take()
1342    }
1343
1344    /// Inserts a [`Shard`] into the state.
1345    ///
1346    /// ## Peer Blocking Rules
1347    ///
1348    /// The `sender` may be blocked via the provided [`Blocker`] if any of the following rules are violated:
1349    ///
1350    /// Strong shards (`CodingScheme::StrongShard`):
1351    /// - MUST be sent by a participant.
1352    /// - MUST correspond to self's index when self is a participant.
1353    /// - MUST correspond to the sender's participant index (which MUST be the leader)
1354    ///   when self is not a participant
1355    /// - MUST be sent by the leader (when the leader is known). Non-leader senders
1356    ///   are blocked.
1357    /// - The leader may only send ONE strong shard. Sending a second strong shard
1358    ///   with different data (equivocation) results in blocking the sender. Exact
1359    ///   duplicates are silently ignored.
1360    /// - MUST pass cryptographic verification via [`CodingScheme::weaken`].
1361    /// - When leader is unknown, buffering happens at the engine level in
1362    ///   bounded pre-leader queues until [`Discovered`](super::Message::Discovered)
1363    ///   creates a reconstruction state for this commitment.
1364    ///
1365    /// Weak shards (`CodingScheme::WeakShard`):
1366    /// - MUST be sent by a participant.
1367    /// - MUST be sent by the participant whose index matches the shard index.
1368    /// - MUST pass cryptographic verification via [`CodingScheme::check`].
1369    /// - Each participant may only contribute ONE weak shard per commitment.
1370    ///   Sending a second weak shard with different data (equivocation) results
1371    ///   in blocking the sender. Exact duplicates are silently ignored.
1372    /// - Weak shards that arrive after the state has transitioned to `Ready`
1373    ///   (i.e., batch validation has already passed) are silently discarded.
1374    ///   The sender's contribution slot is still consumed, preventing future
1375    ///   submissions from the same participant.
1376    ///
1377    /// Handle an incoming network shard.
1378    ///
1379    /// Returns `true` only when the shard caused state progress (buffered,
1380    /// validated, or transitioned), and `false` when rejected/blocked.
1381    async fn on_network_shard<Sch, S, X>(
1382        &mut self,
1383        sender: P,
1384        shard: Shard<C, H>,
1385        ctx: InsertCtx<'_, Sch, S>,
1386        blocker: &mut X,
1387    ) -> bool
1388    where
1389        Sch: CertificateScheme<PublicKey = P>,
1390        S: Strategy,
1391        X: Blocker<PublicKey = P>,
1392    {
1393        let Some(sender_index) = ctx.scheme.participants().index(&sender) else {
1394            commonware_p2p::block!(blocker, sender, "shard sent by non-participant");
1395            return false;
1396        };
1397        let commitment = shard.commitment();
1398        let index = shard.index();
1399
1400        let progressed = match shard.into_inner() {
1401            DistributionShard::Strong(data) => {
1402                let strong = StrongShard {
1403                    commitment,
1404                    index,
1405                    data,
1406                };
1407                self.insert_strong_shard(
1408                    ctx.scheme.me().as_ref(),
1409                    (sender, sender_index),
1410                    strong,
1411                    blocker,
1412                )
1413                .await
1414            }
1415            DistributionShard::Weak(data) => {
1416                let weak = WeakShard { index, data };
1417                self.insert_weak_shard((sender, sender_index), weak, blocker)
1418                    .await
1419            }
1420        };
1421
1422        if progressed {
1423            if let Self::AwaitingQuorum(state) = self {
1424                if let Some(ready) = state
1425                    .try_transition(commitment, ctx.participants_len, ctx.strategy, blocker)
1426                    .await
1427                {
1428                    *self = Self::Ready(ready);
1429                }
1430            }
1431        }
1432
1433        progressed
1434    }
1435
1436    /// Insert a strong shard according to the current phase.
1437    ///
1438    /// Returns `true` only when this progresses reconstruction state.
1439    async fn insert_strong_shard(
1440        &mut self,
1441        me: Option<&Participant>,
1442        (sender, sender_index): (P, Participant),
1443        shard: StrongShard<C>,
1444        blocker: &mut impl Blocker<PublicKey = P>,
1445    ) -> bool {
1446        let expected = me.copied().unwrap_or(sender_index);
1447        let expected_index: u16 = expected
1448            .get()
1449            .try_into()
1450            .expect("participant index impossibly out of bounds");
1451        if shard.index != expected_index {
1452            commonware_p2p::block!(
1453                blocker,
1454                sender,
1455                shard_index = shard.index,
1456                expected_index = expected.get() as usize,
1457                "strong shard index does not match expected index"
1458            );
1459            return false;
1460        }
1461
1462        let common = self.common();
1463        if sender != common.leader {
1464            commonware_p2p::block!(
1465                blocker,
1466                sender,
1467                leader = ?common.leader,
1468                "strong shard from non-leader"
1469            );
1470            return false;
1471        }
1472        if let Some(received_strong) = common.received_strong.as_ref() {
1473            if received_strong != &shard.data {
1474                commonware_p2p::block!(blocker, sender, "strong shard equivocation from leader");
1475            }
1476            return false;
1477        }
1478
1479        match self {
1480            Self::AwaitingQuorum(state) => {
1481                state
1482                    .verify_strong_shard(sender, shard, me.is_some(), blocker)
1483                    .await
1484            }
1485            Self::Ready(_) => false,
1486        }
1487    }
1488
1489    /// Insert a weak shard according to the current phase.
1490    ///
1491    /// Returns `true` only when this progresses reconstruction state.
1492    async fn insert_weak_shard(
1493        &mut self,
1494        (sender, sender_index): (P, Participant),
1495        shard: WeakShard<C>,
1496        blocker: &mut impl Blocker<PublicKey = P>,
1497    ) -> bool {
1498        let expected_index: u16 = sender_index
1499            .get()
1500            .try_into()
1501            .expect("participant index impossibly out of bounds");
1502        if shard.index != expected_index {
1503            commonware_p2p::block!(
1504                blocker,
1505                sender,
1506                shard_index = shard.index,
1507                expected_index = sender_index.get() as usize,
1508                "weak shard index does not match participant index"
1509            );
1510            return false;
1511        }
1512
1513        if self.common().contributed.get(u64::from(sender_index.get())) {
1514            let equivocated = matches!(
1515                self,
1516                Self::AwaitingQuorum(state)
1517                    if state.pending_weak_shards.get(&sender).is_some_and(|existing| existing.data != shard.data)
1518            );
1519            if equivocated {
1520                commonware_p2p::block!(blocker, sender, "duplicate weak shard with different data");
1521            }
1522            return false;
1523        }
1524        self.common_mut()
1525            .contributed
1526            .set(u64::from(sender_index.get()), true);
1527
1528        match self {
1529            Self::AwaitingQuorum(state) => {
1530                state.pending_weak_shards.insert(sender, shard);
1531                true
1532            }
1533            Self::Ready(_) => false,
1534        }
1535    }
1536}
1537
1538#[cfg(test)]
1539mod tests {
1540    use super::*;
1541    use crate::{
1542        marshal::{
1543            coding::types::coding_config_for_participants, mocks::block::Block as MockBlock,
1544        },
1545        types::{Height, View},
1546    };
1547    use bytes::Bytes;
1548    use commonware_codec::Encode;
1549    use commonware_coding::{CodecConfig, Config as CodingConfig, ReedSolomon, Zoda};
1550    use commonware_cryptography::{
1551        certificate::Subject,
1552        ed25519::{PrivateKey, PublicKey},
1553        impl_certificate_ed25519,
1554        sha256::Digest as Sha256Digest,
1555        Committable, Digest, Sha256, Signer,
1556    };
1557    use commonware_macros::{select, test_traced};
1558    use commonware_p2p::{
1559        simulated::{self, Control, Link, Oracle},
1560        Manager as _,
1561    };
1562    use commonware_parallel::Sequential;
1563    use commonware_runtime::{deterministic, Quota, Runner};
1564    use commonware_utils::{
1565        channel::oneshot::error::TryRecvError, ordered::Set, NZUsize, Participant,
1566    };
1567    use std::{
1568        future::Future,
1569        marker::PhantomData,
1570        num::NonZeroU32,
1571        sync::atomic::{AtomicIsize, Ordering},
1572        time::Duration,
1573    };
1574
1575    #[derive(Clone, Debug)]
1576    pub struct TestSubject {
1577        pub message: Bytes,
1578    }
1579
1580    impl Subject for TestSubject {
1581        type Namespace = Vec<u8>;
1582
1583        fn namespace<'a>(&self, derived: &'a Self::Namespace) -> &'a [u8] {
1584            derived
1585        }
1586
1587        fn message(&self) -> Bytes {
1588            self.message.clone()
1589        }
1590    }
1591
1592    impl_certificate_ed25519!(TestSubject, Vec<u8>);
1593
1594    const SCHEME_NAMESPACE: &[u8] = b"_COMMONWARE_SHARD_ENGINE_TEST";
1595
1596    /// The max size of a shard sent over the wire.
1597    const MAX_SHARD_SIZE: usize = 1024 * 1024; // 1 MiB
1598
1599    /// The default link configuration for tests.
1600    const DEFAULT_LINK: Link = Link {
1601        latency: Duration::from_millis(50),
1602        jitter: Duration::ZERO,
1603        success_rate: 1.0,
1604    };
1605
1606    /// Rate limit quota for tests (effectively unlimited).
1607    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
1608
1609    /// The parallelization strategy used for tests.
1610    const STRATEGY: Sequential = Sequential;
1611
1612    /// A scheme provider that maps each epoch to a potentially different scheme.
1613    ///
1614    /// For most tests only epoch 0 is registered, matching the previous
1615    /// `ConstantProvider` behaviour. Cross-epoch tests register additional
1616    /// epochs with different participant sets.
1617    #[derive(Clone)]
1618    struct MultiEpochProvider {
1619        schemes: BTreeMap<Epoch, Arc<Scheme>>,
1620    }
1621
1622    impl MultiEpochProvider {
1623        fn single(scheme: Scheme) -> Self {
1624            let mut schemes = BTreeMap::new();
1625            schemes.insert(Epoch::zero(), Arc::new(scheme));
1626            Self { schemes }
1627        }
1628
1629        fn with_epoch(mut self, epoch: Epoch, scheme: Scheme) -> Self {
1630            self.schemes.insert(epoch, Arc::new(scheme));
1631            self
1632        }
1633    }
1634
1635    impl Provider for MultiEpochProvider {
1636        type Scope = Epoch;
1637        type Scheme = Scheme;
1638
1639        fn scoped(&self, scope: Epoch) -> Option<Arc<Scheme>> {
1640            self.schemes.get(&scope).cloned()
1641        }
1642    }
1643
1644    /// A one-epoch scheme provider that churns to `None` after a fixed number
1645    /// of successful scope lookups.
1646    #[derive(Clone)]
1647    struct ChurningProvider {
1648        scheme: Arc<Scheme>,
1649        remaining_successes: Arc<AtomicIsize>,
1650    }
1651
1652    impl ChurningProvider {
1653        fn new(scheme: Scheme, successes: isize) -> Self {
1654            Self {
1655                scheme: Arc::new(scheme),
1656                remaining_successes: Arc::new(AtomicIsize::new(successes)),
1657            }
1658        }
1659    }
1660
1661    impl Provider for ChurningProvider {
1662        type Scope = Epoch;
1663        type Scheme = Scheme;
1664
1665        fn scoped(&self, scope: Epoch) -> Option<Arc<Scheme>> {
1666            if scope != Epoch::zero() {
1667                return None;
1668            }
1669            if self.remaining_successes.fetch_sub(1, Ordering::AcqRel) <= 0 {
1670                return None;
1671            }
1672            Some(Arc::clone(&self.scheme))
1673        }
1674    }
1675
1676    // Type aliases for test convenience.
1677    type B = MockBlock<Sha256Digest, ()>;
1678    type H = Sha256;
1679    type P = PublicKey;
1680    type C = ReedSolomon<H>;
1681    type X = Control<P, deterministic::Context>;
1682    type O = Oracle<P, deterministic::Context>;
1683    type Prov = MultiEpochProvider;
1684    type NetworkSender = simulated::Sender<P, deterministic::Context>;
1685    type D = simulated::Manager<P, deterministic::Context>;
1686    type ShardEngine<S> = Engine<deterministic::Context, Prov, X, D, S, H, B, P, Sequential>;
1687    type ChurningShardEngine<S> =
1688        Engine<deterministic::Context, ChurningProvider, X, D, S, H, B, P, Sequential>;
1689
1690    async fn assert_blocked(oracle: &O, blocker: &P, blocked: &P) {
1691        let blocked_peers = oracle.blocked().await.unwrap();
1692        let is_blocked = blocked_peers
1693            .iter()
1694            .any(|(a, b)| a == blocker && b == blocked);
1695        assert!(is_blocked, "expected {blocker} to have blocked {blocked}");
1696    }
1697
1698    /// A participant in the test network with its engine mailbox and blocker.
1699    struct Peer<S: CodingScheme = C> {
1700        /// The peer's public key.
1701        public_key: PublicKey,
1702        /// The peer's index in the participant set.
1703        index: Participant,
1704        /// The mailbox for sending messages to the peer's shard engine.
1705        mailbox: Mailbox<B, S, H, P>,
1706        /// Raw network sender for injecting messages (e.g., byzantine behavior).
1707        sender: NetworkSender,
1708    }
1709
1710    /// A non-participant in the test network with its engine mailbox.
1711    #[allow(dead_code)]
1712    struct NonParticipant<S: CodingScheme = C> {
1713        /// The peer's public key.
1714        public_key: PublicKey,
1715        /// The mailbox for sending messages to the peer's shard engine.
1716        mailbox: Mailbox<B, S, H, P>,
1717        /// Raw network sender for injecting messages.
1718        sender: NetworkSender,
1719    }
1720
1721    /// Test fixture for setting up multiple participants with shard engines.
1722    struct Fixture<S: CodingScheme = C> {
1723        /// Number of peers in the test network.
1724        num_peers: usize,
1725        /// Number of non-participant peers in the test network.
1726        num_non_participants: usize,
1727        /// Network link configuration.
1728        link: Link,
1729        /// Marker for the coding scheme type parameter.
1730        _marker: PhantomData<S>,
1731    }
1732
1733    impl<S: CodingScheme> Default for Fixture<S> {
1734        fn default() -> Self {
1735            Self {
1736                num_peers: 4,
1737                num_non_participants: 0,
1738                link: DEFAULT_LINK,
1739                _marker: PhantomData,
1740            }
1741        }
1742    }
1743
1744    impl<S: CodingScheme> Fixture<S> {
1745        pub fn start<F: Future<Output = ()>>(
1746            self,
1747            f: impl FnOnce(
1748                Self,
1749                deterministic::Context,
1750                O,
1751                Vec<Peer<S>>,
1752                Vec<NonParticipant<S>>,
1753                CodingConfig,
1754            ) -> F,
1755        ) {
1756            let executor = deterministic::Runner::default();
1757            executor.start(|context| async move {
1758                let tracked_peer_sets = if self.num_non_participants > 0 {
1759                    Some(1)
1760                } else {
1761                    None
1762                };
1763                let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
1764                    context.with_label("network"),
1765                    simulated::Config {
1766                        max_size: MAX_SHARD_SIZE as u32,
1767                        disconnect_on_block: true,
1768                        tracked_peer_sets,
1769                    },
1770                );
1771                network.start();
1772
1773                let mut private_keys = (0..self.num_peers)
1774                    .map(|i| PrivateKey::from_seed(i as u64))
1775                    .collect::<Vec<_>>();
1776                private_keys.sort_by_key(|s| s.public_key());
1777                let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
1778
1779                let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
1780
1781                let mut np_private_keys = (0..self.num_non_participants)
1782                    .map(|i| PrivateKey::from_seed((self.num_peers + i) as u64))
1783                    .collect::<Vec<_>>();
1784                np_private_keys.sort_by_key(|s| s.public_key());
1785                let np_keys: Vec<P> = np_private_keys.iter().map(|k| k.public_key()).collect();
1786
1787                let all_keys: Vec<P> = peer_keys.iter().chain(np_keys.iter()).cloned().collect();
1788
1789                let mut registrations = BTreeMap::new();
1790                for key in all_keys.iter() {
1791                    let control = oracle.control(key.clone());
1792                    let (sender, receiver) = control
1793                        .register(0, TEST_QUOTA)
1794                        .await
1795                        .expect("registration should succeed");
1796                    registrations.insert(key.clone(), (control, sender, receiver));
1797                }
1798                for p1 in all_keys.iter() {
1799                    for p2 in all_keys.iter() {
1800                        if p2 == p1 {
1801                            continue;
1802                        }
1803                        oracle
1804                            .add_link(p1.clone(), p2.clone(), self.link.clone())
1805                            .await
1806                            .expect("link should be added");
1807                    }
1808                }
1809
1810                let coding_config =
1811                    coding_config_for_participants(u16::try_from(self.num_peers).unwrap());
1812
1813                let mut peers = Vec::with_capacity(self.num_peers);
1814                for (idx, peer_key) in peer_keys.iter().enumerate() {
1815                    let (control, sender, receiver) = registrations
1816                        .remove(peer_key)
1817                        .expect("peer should be registered");
1818
1819                    let participant = Participant::new(idx as u32);
1820                    let engine_context = context.with_label(&format!("peer_{}", idx));
1821
1822                    let scheme = Scheme::signer(
1823                        SCHEME_NAMESPACE,
1824                        participants.clone(),
1825                        private_keys[idx].clone(),
1826                    )
1827                    .expect("signer scheme should be created");
1828                    let scheme_provider: Prov = MultiEpochProvider::single(scheme);
1829
1830                    let config = Config {
1831                        scheme_provider,
1832                        blocker: control.clone(),
1833                        shard_codec_cfg: CodecConfig {
1834                            maximum_shard_size: MAX_SHARD_SIZE,
1835                        },
1836                        block_codec_cfg: (),
1837                        strategy: STRATEGY,
1838                        mailbox_size: 1024,
1839                        peer_buffer_size: NZUsize!(64),
1840                        background_channel_capacity: 1024,
1841                        peer_provider: oracle.manager(),
1842                    };
1843
1844                    let (engine, mailbox) = ShardEngine::new(engine_context, config);
1845                    let sender_clone = sender.clone();
1846                    engine.start((sender, receiver));
1847
1848                    peers.push(Peer {
1849                        public_key: peer_key.clone(),
1850                        index: participant,
1851                        mailbox,
1852                        sender: sender_clone,
1853                    });
1854                }
1855
1856                let mut non_participants = Vec::with_capacity(self.num_non_participants);
1857                for (idx, np_key) in np_keys.iter().enumerate() {
1858                    let (control, sender, receiver) = registrations
1859                        .remove(np_key)
1860                        .expect("non-participant should be registered");
1861
1862                    let engine_context = context.with_label(&format!("non_participant_{}", idx));
1863
1864                    let scheme = Scheme::verifier(SCHEME_NAMESPACE, participants.clone());
1865                    let scheme_provider: Prov = MultiEpochProvider::single(scheme);
1866
1867                    let config = Config {
1868                        scheme_provider,
1869                        blocker: control.clone(),
1870                        shard_codec_cfg: CodecConfig {
1871                            maximum_shard_size: MAX_SHARD_SIZE,
1872                        },
1873                        block_codec_cfg: (),
1874                        strategy: STRATEGY,
1875                        mailbox_size: 1024,
1876                        peer_buffer_size: NZUsize!(64),
1877                        background_channel_capacity: 1024,
1878                        peer_provider: oracle.manager(),
1879                    };
1880
1881                    let (engine, mailbox) = ShardEngine::new(engine_context, config);
1882                    let sender_clone = sender.clone();
1883                    engine.start((sender, receiver));
1884
1885                    non_participants.push(NonParticipant {
1886                        public_key: np_key.clone(),
1887                        mailbox,
1888                        sender: sender_clone,
1889                    });
1890                }
1891
1892                if self.num_non_participants > 0 {
1893                    let all_tracked: Set<P> = Set::from_iter_dedup(all_keys);
1894                    oracle.manager().track(1, all_tracked).await;
1895                    context.sleep(Duration::from_millis(10)).await;
1896                }
1897
1898                f(
1899                    self,
1900                    context,
1901                    oracle,
1902                    peers,
1903                    non_participants,
1904                    coding_config,
1905                )
1906                .await;
1907            });
1908        }
1909    }
1910
1911    #[test_traced]
1912    fn test_e2e_broadcast_and_reconstruction() {
1913        let fixture = Fixture {
1914            num_peers: 10,
1915            ..Default::default()
1916        };
1917
1918        fixture.start(
1919            |config, context, _, mut peers, _, coding_config| async move {
1920                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
1921                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
1922                let commitment = coded_block.commitment();
1923
1924                let leader = peers[0].public_key.clone();
1925                let round = Round::new(Epoch::zero(), View::new(1));
1926                peers[0].mailbox.proposed(round, coded_block.clone()).await;
1927
1928                // Inform all peers of the leader so strong shards are processed.
1929                for peer in peers[1..].iter_mut() {
1930                    peer.mailbox
1931                        .discovered(commitment, leader.clone(), round)
1932                        .await;
1933                }
1934                context.sleep(config.link.latency).await;
1935
1936                for peer in peers.iter_mut() {
1937                    peer.mailbox
1938                        .subscribe_shard(commitment)
1939                        .await
1940                        .await
1941                        .expect("shard subscription should complete");
1942                }
1943                context.sleep(config.link.latency).await;
1944
1945                for peer in peers.iter_mut() {
1946                    let reconstructed = peer
1947                        .mailbox
1948                        .get(commitment)
1949                        .await
1950                        .expect("block should be reconstructed");
1951                    assert_eq!(reconstructed.commitment(), commitment);
1952                    assert_eq!(reconstructed.height(), coded_block.height());
1953                }
1954            },
1955        );
1956    }
1957
1958    #[test_traced]
1959    fn test_e2e_broadcast_and_reconstruction_zoda() {
1960        let fixture = Fixture {
1961            num_peers: 10,
1962            ..Default::default()
1963        };
1964
1965        fixture.start(
1966            |config, context, _, mut peers, _, coding_config| async move {
1967                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
1968                let coded_block = CodedBlock::<B, Zoda<H>, H>::new(inner, coding_config, &STRATEGY);
1969                let commitment = coded_block.commitment();
1970
1971                let leader = peers[0].public_key.clone();
1972                let round = Round::new(Epoch::zero(), View::new(1));
1973                peers[0].mailbox.proposed(round, coded_block.clone()).await;
1974
1975                // Inform all peers of the leader so strong shards are processed.
1976                for peer in peers[1..].iter_mut() {
1977                    peer.mailbox
1978                        .discovered(commitment, leader.clone(), round)
1979                        .await;
1980                }
1981                context.sleep(config.link.latency).await;
1982
1983                for peer in peers.iter_mut() {
1984                    peer.mailbox
1985                        .subscribe_shard(commitment)
1986                        .await
1987                        .await
1988                        .expect("shard subscription should complete");
1989                }
1990                context.sleep(config.link.latency).await;
1991
1992                for peer in peers.iter_mut() {
1993                    let reconstructed = peer
1994                        .mailbox
1995                        .get(commitment)
1996                        .await
1997                        .expect("block should be reconstructed");
1998                    assert_eq!(reconstructed.commitment(), commitment);
1999                    assert_eq!(reconstructed.height(), coded_block.height());
2000                }
2001            },
2002        );
2003    }
2004
2005    #[test_traced]
2006    fn test_block_subscriptions() {
2007        let fixture = Fixture {
2008            num_peers: 10,
2009            ..Default::default()
2010        };
2011
2012        fixture.start(
2013            |config, context, _, mut peers, _, coding_config| async move {
2014                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2015                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2016                let commitment = coded_block.commitment();
2017                let digest = coded_block.digest();
2018
2019                let leader = peers[0].public_key.clone();
2020                let round = Round::new(Epoch::zero(), View::new(1));
2021
2022                // Subscribe before broadcasting.
2023                let commitment_sub = peers[1].mailbox.subscribe(commitment).await;
2024                let digest_sub = peers[2].mailbox.subscribe_by_digest(digest).await;
2025
2026                peers[0].mailbox.proposed(round, coded_block.clone()).await;
2027
2028                // Inform all peers of the leader so strong shards are processed.
2029                for peer in peers[1..].iter_mut() {
2030                    peer.mailbox
2031                        .discovered(commitment, leader.clone(), round)
2032                        .await;
2033                }
2034                context.sleep(config.link.latency * 2).await;
2035
2036                for peer in peers.iter_mut() {
2037                    peer.mailbox
2038                        .subscribe_shard(commitment)
2039                        .await
2040                        .await
2041                        .expect("shard subscription should complete");
2042                }
2043                context.sleep(config.link.latency).await;
2044
2045                let block_by_commitment =
2046                    commitment_sub.await.expect("subscription should resolve");
2047                assert_eq!(block_by_commitment.commitment(), commitment);
2048                assert_eq!(block_by_commitment.height(), coded_block.height());
2049
2050                let block_by_digest = digest_sub.await.expect("subscription should resolve");
2051                assert_eq!(block_by_digest.commitment(), commitment);
2052                assert_eq!(block_by_digest.height(), coded_block.height());
2053            },
2054        );
2055    }
2056
2057    #[test_traced]
2058    fn test_proposer_preproposal_subscriptions_resolve_after_local_cache() {
2059        let fixture = Fixture {
2060            num_peers: 10,
2061            ..Default::default()
2062        };
2063
2064        fixture.start(|config, context, _, peers, _, coding_config| async move {
2065            let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2066            let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2067            let commitment = coded_block.commitment();
2068            let digest = coded_block.digest();
2069            let round = Round::new(Epoch::zero(), View::new(1));
2070
2071            // Subscribe on the proposer before it caches the locally proposed block.
2072            let shard_sub = peers[0].mailbox.subscribe_shard(commitment).await;
2073            let commitment_sub = peers[0].mailbox.subscribe(commitment).await;
2074            let digest_sub = peers[0].mailbox.subscribe_by_digest(digest).await;
2075
2076            peers[0].mailbox.proposed(round, coded_block.clone()).await;
2077            context.sleep(config.link.latency).await;
2078
2079            select! {
2080                result = shard_sub => {
2081                    result.expect("shard subscription should resolve");
2082                },
2083                _ = context.sleep(Duration::from_secs(5)) => {
2084                    panic!("shard subscription did not resolve after local proposal cache");
2085                }
2086            }
2087
2088            let block_by_commitment = select! {
2089                result = commitment_sub => {
2090                    result.expect("block subscription by commitment should resolve")
2091                },
2092                _ = context.sleep(Duration::from_secs(5)) => {
2093                    panic!("block subscription by commitment did not resolve after local proposal cache");
2094                }
2095            };
2096            assert_eq!(block_by_commitment.commitment(), commitment);
2097            assert_eq!(block_by_commitment.height(), coded_block.height());
2098
2099            let block_by_digest = select! {
2100                result = digest_sub => {
2101                    result.expect("block subscription by digest should resolve")
2102                },
2103                _ = context.sleep(Duration::from_secs(5)) => {
2104                    panic!("block subscription by digest did not resolve after local proposal cache");
2105                }
2106            };
2107            assert_eq!(block_by_digest.commitment(), commitment);
2108            assert_eq!(block_by_digest.height(), coded_block.height());
2109        });
2110    }
2111
2112    #[test_traced]
2113    fn test_shard_subscription_rejects_invalid_shard() {
2114        let fixture = Fixture::<C>::default();
2115        fixture.start(
2116            |config, context, oracle, mut peers, _, coding_config| async move {
2117                // peers[0] = byzantine
2118                // peers[1] = honest proposer
2119                // peers[2] = receiver
2120
2121                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2122                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2123                let commitment = coded_block.commitment();
2124                let receiver_index = peers[2].index.get() as u16;
2125
2126                let valid_shard = coded_block.shard(receiver_index).expect("missing shard");
2127
2128                // corrupt the shard's index
2129                let mut invalid_shard = valid_shard.clone();
2130                invalid_shard.index = 0;
2131
2132                // Receiver subscribes to their shard and learns the leader.
2133                let receiver_pk = peers[2].public_key.clone();
2134                let leader = peers[1].public_key.clone();
2135                peers[2]
2136                    .mailbox
2137                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2138                    .await;
2139                let mut shard_sub = peers[2].mailbox.subscribe_shard(commitment).await;
2140
2141                // Byzantine peer sends the invalid shard.
2142                let invalid_bytes = invalid_shard.encode();
2143                peers[0]
2144                    .sender
2145                    .send(Recipients::One(receiver_pk.clone()), invalid_bytes, true)
2146                    .await
2147                    .expect("send failed");
2148
2149                context.sleep(config.link.latency * 2).await;
2150
2151                assert!(
2152                    matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
2153                    "subscription should not resolve from invalid shard"
2154                );
2155                assert_blocked(&oracle, &peers[2].public_key, &peers[0].public_key).await;
2156
2157                // Honest proposer sends the valid shard.
2158                let valid_bytes = valid_shard.encode();
2159                peers[1]
2160                    .sender
2161                    .send(Recipients::One(receiver_pk), valid_bytes, true)
2162                    .await
2163                    .expect("send failed");
2164                context.sleep(config.link.latency * 2).await;
2165
2166                // Subscription should now resolve.
2167                select! {
2168                    _ = shard_sub => {},
2169                    _ = context.sleep(Duration::from_secs(5)) => {
2170                        panic!("subscription did not complete after valid shard arrival");
2171                    },
2172                };
2173            },
2174        );
2175    }
2176
2177    #[test_traced]
2178    fn test_durable_prunes_reconstructed_blocks() {
2179        let fixture = Fixture::<C>::default();
2180        fixture.start(|_, context, _, mut peers, _, coding_config| async move {
2181            // Create 3 blocks at heights 1, 2, 3.
2182            let block1 = CodedBlock::<B, C, H>::new(
2183                B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100),
2184                coding_config,
2185                &STRATEGY,
2186            );
2187            let block2 = CodedBlock::<B, C, H>::new(
2188                B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 100),
2189                coding_config,
2190                &STRATEGY,
2191            );
2192            let block3 = CodedBlock::<B, C, H>::new(
2193                B::new::<H>((), Sha256Digest::EMPTY, Height::new(3), 100),
2194                coding_config,
2195                &STRATEGY,
2196            );
2197            let commitment1 = block1.commitment();
2198            let commitment2 = block2.commitment();
2199            let commitment3 = block3.commitment();
2200
2201            // Cache all blocks via `proposed`.
2202            let peer = &mut peers[0];
2203            let round = Round::new(Epoch::zero(), View::new(1));
2204            peer.mailbox.proposed(round, block1).await;
2205            peer.mailbox.proposed(round, block2).await;
2206            peer.mailbox.proposed(round, block3).await;
2207            context.sleep(Duration::from_millis(10)).await;
2208
2209            // Verify all blocks are in the cache.
2210            assert!(
2211                peer.mailbox.get(commitment1).await.is_some(),
2212                "block1 should be cached"
2213            );
2214            assert!(
2215                peer.mailbox.get(commitment2).await.is_some(),
2216                "block2 should be cached"
2217            );
2218            assert!(
2219                peer.mailbox.get(commitment3).await.is_some(),
2220                "block3 should be cached"
2221            );
2222
2223            // Prune at height 2 (blocks with height <= 2 should be removed).
2224            peer.mailbox.prune(commitment2).await;
2225            context.sleep(Duration::from_millis(10)).await;
2226
2227            // Blocks at heights 1 and 2 should be pruned.
2228            assert!(
2229                peer.mailbox.get(commitment1).await.is_none(),
2230                "block1 should be pruned"
2231            );
2232            assert!(
2233                peer.mailbox.get(commitment2).await.is_none(),
2234                "block2 should be pruned"
2235            );
2236
2237            // Block at height 3 should still be cached.
2238            assert!(
2239                peer.mailbox.get(commitment3).await.is_some(),
2240                "block3 should still be cached"
2241            );
2242        });
2243    }
2244
2245    #[test_traced]
2246    fn test_duplicate_leader_strong_shard_ignored() {
2247        let fixture = Fixture::<C>::default();
2248        fixture.start(
2249            |config, context, oracle, mut peers, _, coding_config| async move {
2250                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2251                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2252                let commitment = coded_block.commitment();
2253
2254                // Get peer 2's strong shard.
2255                let peer2_index = peers[2].index.get() as u16;
2256                let peer2_strong_shard = coded_block.shard(peer2_index).expect("missing shard");
2257                let strong_bytes = peer2_strong_shard.encode();
2258
2259                let peer2_pk = peers[2].public_key.clone();
2260                let leader = peers[0].public_key.clone();
2261
2262                // Inform peer 2 that peer 0 is the leader.
2263                peers[2]
2264                    .mailbox
2265                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2266                    .await;
2267
2268                // Send peer 2 their strong shard from peer 0 (leader, first time - should succeed).
2269                peers[0]
2270                    .sender
2271                    .send(
2272                        Recipients::One(peer2_pk.clone()),
2273                        strong_bytes.clone(),
2274                        true,
2275                    )
2276                    .await
2277                    .expect("send failed");
2278                context.sleep(config.link.latency * 2).await;
2279
2280                // Send the same strong shard again from peer 0 (leader duplicate - ignored).
2281                peers[0]
2282                    .sender
2283                    .send(Recipients::One(peer2_pk), strong_bytes, true)
2284                    .await
2285                    .expect("send failed");
2286                context.sleep(config.link.latency * 2).await;
2287
2288                // The leader should NOT be blocked for sending an identical duplicate.
2289                let blocked_peers = oracle.blocked().await.unwrap();
2290                let is_blocked = blocked_peers
2291                    .iter()
2292                    .any(|(a, b)| a == &peers[2].public_key && b == &peers[0].public_key);
2293                assert!(
2294                    !is_blocked,
2295                    "leader should not be blocked for duplicate strong shard"
2296                );
2297            },
2298        );
2299    }
2300
2301    #[test_traced]
2302    fn test_equivocating_leader_strong_shard_blocks_peer() {
2303        let fixture = Fixture::<C>::default();
2304        fixture.start(
2305            |config, context, oracle, mut peers, _, coding_config| async move {
2306                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2307                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
2308                let commitment = coded_block1.commitment();
2309
2310                // Create a second block with different payload to get different shard data.
2311                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 200);
2312                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
2313
2314                // Get peer 2's strong shard from both blocks.
2315                let peer2_index = peers[2].index.get() as u16;
2316                let strong_bytes1 = coded_block1
2317                    .shard(peer2_index)
2318                    .expect("missing shard")
2319                    .encode();
2320                let mut equivocating_shard =
2321                    coded_block2.shard(peer2_index).expect("missing shard");
2322                // Override the commitment so it targets the same reconstruction state.
2323                equivocating_shard.commitment = commitment;
2324                let strong_bytes2 = equivocating_shard.encode();
2325
2326                let peer2_pk = peers[2].public_key.clone();
2327                let leader = peers[0].public_key.clone();
2328
2329                // Inform peer 2 that peer 0 is the leader.
2330                peers[2]
2331                    .mailbox
2332                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2333                    .await;
2334
2335                // Send peer 2 their strong shard from the leader (first time - succeeds).
2336                peers[0]
2337                    .sender
2338                    .send(Recipients::One(peer2_pk.clone()), strong_bytes1, true)
2339                    .await
2340                    .expect("send failed");
2341                context.sleep(config.link.latency * 2).await;
2342
2343                // Send a different strong shard from the leader (equivocation - should block).
2344                peers[0]
2345                    .sender
2346                    .send(Recipients::One(peer2_pk), strong_bytes2, true)
2347                    .await
2348                    .expect("send failed");
2349                context.sleep(config.link.latency * 2).await;
2350
2351                // Peer 2 should have blocked the leader for equivocation.
2352                assert_blocked(&oracle, &peers[2].public_key, &peers[0].public_key).await;
2353            },
2354        );
2355    }
2356
2357    #[test_traced]
2358    fn test_non_leader_strong_shard_blocked() {
2359        // Test that a non-leader sending a strong shard is blocked.
2360        let fixture = Fixture::<C>::default();
2361        fixture.start(
2362            |config, context, oracle, mut peers, _, coding_config| async move {
2363                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2364                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2365                let commitment = coded_block.commitment();
2366
2367                // Get peer 2's strong shard.
2368                let peer2_index = peers[2].index.get() as u16;
2369                let peer2_strong_shard = coded_block.shard(peer2_index).expect("missing shard");
2370                let strong_bytes = peer2_strong_shard.encode();
2371
2372                let peer2_pk = peers[2].public_key.clone();
2373                let leader = peers[0].public_key.clone();
2374
2375                // Inform peer 2 that peer 0 is the leader.
2376                peers[2]
2377                    .mailbox
2378                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2379                    .await;
2380
2381                // Peer 1 (not the leader) sends peer 2 their strong shard.
2382                peers[1]
2383                    .sender
2384                    .send(Recipients::One(peer2_pk), strong_bytes, true)
2385                    .await
2386                    .expect("send failed");
2387                context.sleep(config.link.latency * 2).await;
2388
2389                // Peer 1 should be blocked by peer 2 for being a non-leader.
2390                assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2391            },
2392        );
2393    }
2394
2395    #[test_traced]
2396    fn test_buffered_non_leader_blocked_on_leader_arrival() {
2397        // Test that when a non-leader's strong shard is buffered (leader unknown)
2398        // and then the leader arrives, the non-leader is blocked.
2399        let fixture = Fixture::<C>::default();
2400        fixture.start(
2401            |config, context, oracle, mut peers, _, coding_config| async move {
2402                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2403                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2404                let commitment = coded_block.commitment();
2405
2406                // Get peer 2's strong shard.
2407                let peer2_index = peers[2].index.get() as u16;
2408                let peer2_strong_shard = coded_block.shard(peer2_index).expect("missing shard");
2409                let strong_bytes = peer2_strong_shard.encode();
2410
2411                let peer2_pk = peers[2].public_key.clone();
2412
2413                // Peer 1 sends the strong shard before the leader is known (buffered).
2414                peers[1]
2415                    .sender
2416                    .send(Recipients::One(peer2_pk), strong_bytes, true)
2417                    .await
2418                    .expect("send failed");
2419                context.sleep(config.link.latency * 2).await;
2420
2421                // Nobody should be blocked yet (shard is buffered, leader unknown).
2422                let blocked = oracle.blocked().await.unwrap();
2423                assert!(
2424                    blocked.is_empty(),
2425                    "no peers should be blocked while leader is unknown"
2426                );
2427
2428                // Now inform peer 2 that peer 0 is the leader.
2429                // This drains the buffer: peer 1's shard is from a non-leader, so
2430                // peer 1 should be blocked.
2431                let leader = peers[0].public_key.clone();
2432                peers[2]
2433                    .mailbox
2434                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2435                    .await;
2436                context.sleep(Duration::from_millis(10)).await;
2437
2438                assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2439            },
2440        );
2441    }
2442
2443    #[test_traced]
2444    fn test_conflicting_external_proposed_ignored() {
2445        let fixture = Fixture::<C>::default();
2446        fixture.start(
2447            |config, context, oracle, mut peers, _, coding_config| async move {
2448                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2449                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2450                let commitment = coded_block.commitment();
2451
2452                // Get peer 2's strong shard.
2453                let peer2_index = peers[2].index.get() as u16;
2454                let peer2_strong_shard = coded_block.shard(peer2_index).expect("missing shard");
2455                let strong_bytes = peer2_strong_shard.encode();
2456
2457                let peer2_pk = peers[2].public_key.clone();
2458                let leader_a = peers[0].public_key.clone();
2459                let leader_b = peers[1].public_key.clone();
2460
2461                // Subscribe before shards arrive so we can verify acceptance.
2462                let shard_sub = peers[2].mailbox.subscribe_shard(commitment).await;
2463
2464                // First leader update should stick.
2465                peers[2]
2466                    .mailbox
2467                    .discovered(
2468                        commitment,
2469                        leader_a.clone(),
2470                        Round::new(Epoch::zero(), View::new(1)),
2471                    )
2472                    .await;
2473                // Conflicting update should be ignored.
2474                peers[2]
2475                    .mailbox
2476                    .discovered(
2477                        commitment,
2478                        leader_b,
2479                        Round::new(Epoch::zero(), View::new(1)),
2480                    )
2481                    .await;
2482
2483                // Original leader sends strong shard; this should still be accepted.
2484                peers[0]
2485                    .sender
2486                    .send(
2487                        Recipients::One(peer2_pk.clone()),
2488                        strong_bytes.clone(),
2489                        true,
2490                    )
2491                    .await
2492                    .expect("send failed");
2493                context.sleep(config.link.latency * 2).await;
2494
2495                // Subscription should resolve from accepted strong shard.
2496                select! {
2497                    _ = shard_sub => {},
2498                    _ = context.sleep(Duration::from_secs(5)) => {
2499                        panic!(
2500                            "subscription did not complete after strong shard from original leader"
2501                        );
2502                    },
2503                };
2504
2505                // The conflicting leader should still be treated as non-leader and blocked.
2506                peers[1]
2507                    .sender
2508                    .send(Recipients::One(peer2_pk), strong_bytes, true)
2509                    .await
2510                    .expect("send failed");
2511                context.sleep(config.link.latency * 2).await;
2512
2513                assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2514
2515                // Original leader should not be blocked.
2516                let blocked_peers = oracle.blocked().await.unwrap();
2517                let leader_a_blocked = blocked_peers
2518                    .iter()
2519                    .any(|(a, b)| a == &peers[2].public_key && b == &leader_a);
2520                assert!(
2521                    !leader_a_blocked,
2522                    "original leader should not be blocked after conflicting leader update"
2523                );
2524            },
2525        );
2526    }
2527
2528    #[test_traced]
2529    fn test_non_participant_external_proposed_ignored() {
2530        let fixture = Fixture::<C>::default();
2531        fixture.start(
2532            |config, context, oracle, mut peers, _, coding_config| async move {
2533                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2534                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2535                let commitment = coded_block.commitment();
2536
2537                // Get peer 2's strong shard.
2538                let peer2_index = peers[2].index.get() as u16;
2539                let peer2_strong_shard = coded_block.shard(peer2_index).expect("missing shard");
2540                let strong_bytes = peer2_strong_shard.encode();
2541
2542                let peer2_pk = peers[2].public_key.clone();
2543                let leader = peers[0].public_key.clone();
2544                let non_participant_leader = PrivateKey::from_seed(10_000).public_key();
2545
2546                // Subscribe before shards arrive.
2547                let shard_sub = peers[2].mailbox.subscribe_shard(commitment).await;
2548
2549                // A non-participant leader update should be ignored.
2550                peers[2]
2551                    .mailbox
2552                    .discovered(
2553                        commitment,
2554                        non_participant_leader,
2555                        Round::new(Epoch::zero(), View::new(1)),
2556                    )
2557                    .await;
2558
2559                // Leader unknown path: this strong shard should be buffered, not blocked.
2560                peers[0]
2561                    .sender
2562                    .send(
2563                        Recipients::One(peer2_pk.clone()),
2564                        strong_bytes.clone(),
2565                        true,
2566                    )
2567                    .await
2568                    .expect("send failed");
2569                context.sleep(config.link.latency * 2).await;
2570
2571                let blocked = oracle.blocked().await.unwrap();
2572                let leader_blocked = blocked
2573                    .iter()
2574                    .any(|(a, b)| a == &peers[2].public_key && b == &leader);
2575                assert!(
2576                    !leader_blocked,
2577                    "leader should not be blocked when non-participant update is ignored"
2578                );
2579
2580                // A valid leader update should then process buffered shards and resolve subscription.
2581                peers[2]
2582                    .mailbox
2583                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2584                    .await;
2585                context.sleep(config.link.latency * 2).await;
2586
2587                select! {
2588                    _ = shard_sub => {},
2589                    _ = context.sleep(Duration::from_secs(5)) => {
2590                        panic!("subscription did not complete after valid leader update");
2591                    },
2592                };
2593            },
2594        );
2595    }
2596
2597    #[test_traced]
2598    fn test_shard_from_non_participant_blocks_peer() {
2599        let fixture = Fixture::<C>::default();
2600        fixture.start(
2601            |config, context, oracle, peers, _, coding_config| async move {
2602                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2603                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2604                let commitment = coded_block.commitment();
2605
2606                let leader = peers[0].public_key.clone();
2607                let receiver_pk = peers[2].public_key.clone();
2608
2609                let non_participant_key = PrivateKey::from_seed(10_000);
2610                let non_participant_pk = non_participant_key.public_key();
2611
2612                let non_participant_control = oracle.control(non_participant_pk.clone());
2613                let (mut non_participant_sender, _non_participant_receiver) =
2614                    non_participant_control
2615                        .register(0, TEST_QUOTA)
2616                        .await
2617                        .expect("registration should succeed");
2618                oracle
2619                    .add_link(
2620                        non_participant_pk.clone(),
2621                        receiver_pk.clone(),
2622                        DEFAULT_LINK,
2623                    )
2624                    .await
2625                    .expect("link should be added");
2626
2627                peers[2]
2628                    .mailbox
2629                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2630                    .await;
2631
2632                let peer2_index = peers[2].index.get() as u16;
2633                let strong_shard = coded_block.shard(peer2_index).expect("missing shard");
2634                let weak_shard = strong_shard
2635                    .verify_into_weak()
2636                    .expect("verify_into_weak failed");
2637                let weak_bytes = weak_shard.encode();
2638
2639                non_participant_sender
2640                    .send(Recipients::One(receiver_pk), weak_bytes, true)
2641                    .await
2642                    .expect("send failed");
2643                context.sleep(config.link.latency * 2).await;
2644
2645                assert_blocked(&oracle, &peers[2].public_key, &non_participant_pk).await;
2646            },
2647        );
2648    }
2649
2650    #[test_traced]
2651    fn test_buffered_shard_from_non_participant_blocks_peer() {
2652        let fixture = Fixture::<C>::default();
2653        fixture.start(
2654            |config, context, oracle, peers, _, coding_config| async move {
2655                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2656                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2657                let commitment = coded_block.commitment();
2658
2659                let leader = peers[0].public_key.clone();
2660                let receiver_pk = peers[2].public_key.clone();
2661
2662                let non_participant_key = PrivateKey::from_seed(10_000);
2663                let non_participant_pk = non_participant_key.public_key();
2664
2665                let non_participant_control = oracle.control(non_participant_pk.clone());
2666                let (mut non_participant_sender, _non_participant_receiver) =
2667                    non_participant_control
2668                        .register(0, TEST_QUOTA)
2669                        .await
2670                        .expect("registration should succeed");
2671                oracle
2672                    .add_link(
2673                        non_participant_pk.clone(),
2674                        receiver_pk.clone(),
2675                        DEFAULT_LINK,
2676                    )
2677                    .await
2678                    .expect("link should be added");
2679
2680                let peer2_index = peers[2].index.get() as u16;
2681                let strong_shard = coded_block.shard(peer2_index).expect("missing shard");
2682                let weak_shard = strong_shard
2683                    .verify_into_weak()
2684                    .expect("verify_into_weak failed");
2685                let weak_bytes = weak_shard.encode();
2686
2687                non_participant_sender
2688                    .send(Recipients::One(receiver_pk), weak_bytes, true)
2689                    .await
2690                    .expect("send failed");
2691                context.sleep(config.link.latency * 2).await;
2692
2693                peers[2]
2694                    .mailbox
2695                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
2696                    .await;
2697                context.sleep(config.link.latency * 2).await;
2698
2699                assert_blocked(&oracle, &peers[2].public_key, &non_participant_pk).await;
2700            },
2701        );
2702    }
2703
2704    #[test_traced]
2705    fn test_duplicate_weak_shard_ignored() {
2706        // Use 10 peers so minimum_shards=4, giving us time to send duplicate before reconstruction.
2707        let fixture: Fixture<C> = Fixture {
2708            num_peers: 10,
2709            ..Default::default()
2710        };
2711
2712        fixture.start(
2713            |config, context, oracle, mut peers, _, coding_config| async move {
2714                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2715                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2716
2717                // Get peer 2's strong shard (to initialize their checking_data).
2718                let peer2_index = peers[2].index.get() as u16;
2719                let peer2_strong_shard = coded_block.shard(peer2_index).expect("missing shard");
2720
2721                // Get peer 1's weak shard.
2722                let peer1_index = peers[1].index.get() as u16;
2723                let peer1_strong_shard = coded_block.shard(peer1_index).expect("missing shard");
2724                let peer1_weak_shard = peer1_strong_shard
2725                    .verify_into_weak()
2726                    .expect("verify_into_weak failed");
2727
2728                let peer2_pk = peers[2].public_key.clone();
2729                let leader = peers[0].public_key.clone();
2730
2731                // Inform peer 2 of the leader.
2732                peers[2]
2733                    .mailbox
2734                    .discovered(
2735                        coded_block.commitment(),
2736                        leader,
2737                        Round::new(Epoch::zero(), View::new(1)),
2738                    )
2739                    .await;
2740
2741                // Send peer 2 their strong shard (initializes checking_data, 1 checked shard).
2742                let strong_bytes = peer2_strong_shard.encode();
2743                peers[0]
2744                    .sender
2745                    .send(Recipients::One(peer2_pk.clone()), strong_bytes, true)
2746                    .await
2747                    .expect("send failed");
2748                context.sleep(config.link.latency * 2).await;
2749
2750                // Send peer 1's weak shard to peer 2 (first time - should succeed, 2 checked shards).
2751                let weak_shard_bytes = peer1_weak_shard.encode();
2752                peers[1]
2753                    .sender
2754                    .send(
2755                        Recipients::One(peer2_pk.clone()),
2756                        weak_shard_bytes.clone(),
2757                        true,
2758                    )
2759                    .await
2760                    .expect("send failed");
2761                context.sleep(config.link.latency * 2).await;
2762
2763                // Send the same weak shard again (exact duplicate - should be ignored, not blocked).
2764                // With 10 peers, minimum_shards=4, so we haven't reconstructed yet.
2765                peers[1]
2766                    .sender
2767                    .send(Recipients::One(peer2_pk), weak_shard_bytes, true)
2768                    .await
2769                    .expect("send failed");
2770                context.sleep(config.link.latency * 2).await;
2771
2772                // Peer 1 should NOT be blocked for sending an identical duplicate.
2773                let blocked_peers = oracle.blocked().await.unwrap();
2774                let is_blocked = blocked_peers
2775                    .iter()
2776                    .any(|(a, b)| a == &peers[2].public_key && b == &peers[1].public_key);
2777                assert!(
2778                    !is_blocked,
2779                    "peer should not be blocked for exact duplicate weak shard"
2780                );
2781            },
2782        );
2783    }
2784
2785    #[test_traced]
2786    fn test_equivocating_weak_shard_blocks_peer() {
2787        // Use 10 peers so minimum_shards=4, giving us time to send equivocating shard.
2788        let fixture: Fixture<C> = Fixture {
2789            num_peers: 10,
2790            ..Default::default()
2791        };
2792
2793        fixture.start(
2794            |config, context, oracle, mut peers, _, coding_config| async move {
2795                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2796                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
2797
2798                // Create a second block with different payload to get different shard data.
2799                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 200);
2800                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
2801
2802                // Get peer 2's strong shard from block 1 (to initialize their checking_data).
2803                let peer2_index = peers[2].index.get() as u16;
2804                let peer2_strong_shard = coded_block1.shard(peer2_index).expect("missing shard");
2805
2806                // Get peer 1's weak shard from block 1.
2807                let peer1_index = peers[1].index.get() as u16;
2808                let peer1_strong_shard = coded_block1.shard(peer1_index).expect("missing shard");
2809                let peer1_weak_shard = peer1_strong_shard
2810                    .verify_into_weak()
2811                    .expect("verify_into_weak failed");
2812
2813                // Get peer 1's weak shard from block 2 (different data, same index).
2814                let peer1_strong_shard2 = coded_block2.shard(peer1_index).expect("missing shard");
2815                let mut peer1_equivocating_shard = peer1_strong_shard2
2816                    .verify_into_weak()
2817                    .expect("verify_into_weak failed");
2818                // Override the commitment to match block 1 so the shard targets
2819                // the same reconstruction state.
2820                peer1_equivocating_shard.commitment = coded_block1.commitment();
2821
2822                let peer2_pk = peers[2].public_key.clone();
2823                let leader = peers[0].public_key.clone();
2824
2825                // Inform peer 2 of the leader.
2826                peers[2]
2827                    .mailbox
2828                    .discovered(
2829                        coded_block1.commitment(),
2830                        leader,
2831                        Round::new(Epoch::zero(), View::new(1)),
2832                    )
2833                    .await;
2834
2835                // Send peer 2 their strong shard (initializes checking_data).
2836                let strong_bytes = peer2_strong_shard.encode();
2837                peers[0]
2838                    .sender
2839                    .send(Recipients::One(peer2_pk.clone()), strong_bytes, true)
2840                    .await
2841                    .expect("send failed");
2842                context.sleep(config.link.latency * 2).await;
2843
2844                // Send peer 1's valid weak shard to peer 2 (first time - succeeds).
2845                let weak_shard_bytes = peer1_weak_shard.encode();
2846                peers[1]
2847                    .sender
2848                    .send(Recipients::One(peer2_pk.clone()), weak_shard_bytes, true)
2849                    .await
2850                    .expect("send failed");
2851                context.sleep(config.link.latency * 2).await;
2852
2853                // Send a different weak shard from peer 1 (equivocation - should block).
2854                let equivocating_bytes = peer1_equivocating_shard.encode();
2855                peers[1]
2856                    .sender
2857                    .send(Recipients::One(peer2_pk), equivocating_bytes, true)
2858                    .await
2859                    .expect("send failed");
2860                context.sleep(config.link.latency * 2).await;
2861
2862                // Peer 2 should have blocked peer 1 for equivocation.
2863                assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2864            },
2865        );
2866    }
2867
2868    #[test_traced]
2869    fn test_reconstruction_states_pruned_at_or_below_reconstructed_view() {
2870        // Use 10 peers so minimum_shards=4.
2871        let fixture: Fixture<C> = Fixture {
2872            num_peers: 10,
2873            ..Default::default()
2874        };
2875
2876        fixture.start(
2877            |config, context, oracle, mut peers, _, coding_config| async move {
2878                // Commitment A at lower view (1).
2879                let block_a = CodedBlock::<B, C, H>::new(
2880                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100),
2881                    coding_config,
2882                    &STRATEGY,
2883                );
2884                let commitment_a = block_a.commitment();
2885
2886                // Commitment B at higher view (2), which we will reconstruct.
2887                let block_b = CodedBlock::<B, C, H>::new(
2888                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200),
2889                    coding_config,
2890                    &STRATEGY,
2891                );
2892                let commitment_b = block_b.commitment();
2893
2894                let peer2_pk = peers[2].public_key.clone();
2895                let leader = peers[0].public_key.clone();
2896
2897                // Create state for A and ingest one weak shard from peer1.
2898                peers[2]
2899                    .mailbox
2900                    .discovered(
2901                        commitment_a,
2902                        leader.clone(),
2903                        Round::new(Epoch::zero(), View::new(1)),
2904                    )
2905                    .await;
2906                let peer1_strong_a = block_a
2907                    .shard(peers[1].index.get() as u16)
2908                    .expect("missing shard");
2909                let weak_a = peer1_strong_a
2910                    .verify_into_weak()
2911                    .expect("verify_into_weak failed")
2912                    .encode();
2913                peers[1]
2914                    .sender
2915                    .send(Recipients::One(peer2_pk.clone()), weak_a.clone(), true)
2916                    .await
2917                    .expect("send failed");
2918                context.sleep(config.link.latency * 2).await;
2919
2920                // Create/reconstruct B at higher view.
2921                peers[2]
2922                    .mailbox
2923                    .discovered(
2924                        commitment_b,
2925                        leader,
2926                        Round::new(Epoch::zero(), View::new(2)),
2927                    )
2928                    .await;
2929                // Strong shard for peer2 from leader.
2930                let strong_b = block_b
2931                    .shard(peers[2].index.get() as u16)
2932                    .expect("missing shard")
2933                    .encode();
2934                peers[0]
2935                    .sender
2936                    .send(Recipients::One(peer2_pk.clone()), strong_b, true)
2937                    .await
2938                    .expect("send failed");
2939
2940                // Three weak shards for minimum threshold (4 total with strong).
2941                for i in [1usize, 3usize, 4usize] {
2942                    let weak = block_b
2943                        .shard(peers[i].index.get() as u16)
2944                        .expect("missing shard")
2945                        .verify_into_weak()
2946                        .expect("verify_into_weak failed")
2947                        .encode();
2948                    peers[i]
2949                        .sender
2950                        .send(Recipients::One(peer2_pk.clone()), weak, true)
2951                        .await
2952                        .expect("send failed");
2953                }
2954                context.sleep(config.link.latency * 4).await;
2955
2956                // B should reconstruct.
2957                let reconstructed = peers[2]
2958                    .mailbox
2959                    .get(commitment_b)
2960                    .await
2961                    .expect("block B should reconstruct");
2962                assert_eq!(reconstructed.commitment(), commitment_b);
2963
2964                // A state should be pruned (at/below reconstructed view). Sending the same
2965                // weak shard for A again should NOT be treated as duplicate.
2966                peers[1]
2967                    .sender
2968                    .send(Recipients::One(peer2_pk), weak_a, true)
2969                    .await
2970                    .expect("send failed");
2971                context.sleep(config.link.latency * 2).await;
2972
2973                let blocked = oracle.blocked().await.unwrap();
2974                let blocked_peer1 = blocked
2975                    .iter()
2976                    .any(|(a, b)| a == &peers[2].public_key && b == &peers[1].public_key);
2977                assert!(
2978                    !blocked_peer1,
2979                    "peer1 should not be blocked after lower-view state was pruned"
2980                );
2981            },
2982        );
2983    }
2984
2985    #[test_traced]
2986    fn test_drain_pending_validates_weak_shards_after_strong_shard() {
2987        // Test that weak shards arriving BEFORE the strong shard are validated
2988        // via drain_pending once the strong shard arrives, enabling reconstruction.
2989        //
2990        // With 10 peers: minimum_shards = (10-1)/3 + 1 = 4
2991        // We send 3 pending weak shards + 1 strong shard = 4 shards -> reconstruction.
2992        let fixture: Fixture<C> = Fixture {
2993            num_peers: 10,
2994            ..Default::default()
2995        };
2996
2997        fixture.start(
2998            |config, context, oracle, mut peers, _, coding_config| async move {
2999                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3000                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3001                let commitment = coded_block.commitment();
3002
3003                // Get peer 3's strong shard.
3004                let peer3_index = peers[3].index.get() as u16;
3005                let peer3_strong_shard = coded_block.shard(peer3_index).expect("missing shard");
3006
3007                // Get weak shards from peers 0, 1, and 2 (3 total to meet minimum_shards=4).
3008                let weak_shards: Vec<_> = [0, 1, 2]
3009                    .iter()
3010                    .map(|&i| {
3011                        coded_block
3012                            .shard(peers[i].index.get() as u16)
3013                            .expect("missing shard")
3014                            .verify_into_weak()
3015                            .expect("verify_into_weak failed")
3016                    })
3017                    .collect();
3018
3019                let peer3_pk = peers[3].public_key.clone();
3020
3021                // Send weak shards to peer 3 BEFORE their strong shard arrives.
3022                // These will be stored in pending_weak_shards since there's no checking data yet.
3023                for (i, weak_shard) in weak_shards.iter().enumerate() {
3024                    let sender_idx = [0, 1, 2][i];
3025                    let weak_shard_bytes = weak_shard.encode();
3026                    peers[sender_idx]
3027                        .sender
3028                        .send(Recipients::One(peer3_pk.clone()), weak_shard_bytes, true)
3029                        .await
3030                        .expect("send failed");
3031                }
3032
3033                context.sleep(config.link.latency * 2).await;
3034
3035                // Block should not be reconstructed yet (no checking data from strong shard).
3036                let block = peers[3].mailbox.get(commitment).await;
3037                assert!(block.is_none(), "block should not be reconstructed yet");
3038
3039                // Inform peer 3 that peer 2 is the leader.
3040                let leader = peers[2].public_key.clone();
3041                peers[3]
3042                    .mailbox
3043                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
3044                    .await;
3045
3046                // Now send peer 2's strong shard. This should:
3047                // 1. Provide checking data
3048                // 2. Trigger drain_pending which validates the 3 pending weak shards
3049                // 3. With 4 checked shards (1 strong + 3 from pending), trigger reconstruction
3050                let strong_bytes = peer3_strong_shard.encode();
3051                peers[2]
3052                    .sender
3053                    .send(Recipients::One(peer3_pk), strong_bytes, true)
3054                    .await
3055                    .expect("send failed");
3056
3057                context.sleep(config.link.latency * 2).await;
3058
3059                // No peers should be blocked (all weak shards were valid).
3060                let blocked = oracle.blocked().await.unwrap();
3061                assert!(
3062                    blocked.is_empty(),
3063                    "no peers should be blocked for valid pending weak shards"
3064                );
3065
3066                // Block should now be reconstructed (4 checked shards >= minimum_shards).
3067                let block = peers[3].mailbox.get(commitment).await;
3068                assert!(
3069                    block.is_some(),
3070                    "block should be reconstructed after drain_pending"
3071                );
3072
3073                // Verify the reconstructed block has the correct commitment.
3074                let reconstructed = block.unwrap();
3075                assert_eq!(
3076                    reconstructed.commitment(),
3077                    commitment,
3078                    "reconstructed block should have correct commitment"
3079                );
3080            },
3081        );
3082    }
3083
3084    #[test_traced]
3085    fn test_peer_shards_buffered_until_external_proposed() {
3086        // Test that shards received before leader announcement do not progress
3087        // reconstruction until Discovered is delivered.
3088        let fixture: Fixture<C> = Fixture {
3089            num_peers: 10,
3090            ..Default::default()
3091        };
3092
3093        fixture.start(
3094            |config, context, oracle, mut peers, _, coding_config| async move {
3095                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3096                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3097                let commitment = coded_block.commitment();
3098
3099                let receiver_idx = 3usize;
3100                let receiver_pk = peers[receiver_idx].public_key.clone();
3101                let leader = peers[0].public_key.clone();
3102
3103                // Subscribe before any shards arrive.
3104                let mut shard_sub = peers[receiver_idx]
3105                    .mailbox
3106                    .subscribe_shard(commitment)
3107                    .await;
3108
3109                // Send one strong shard from the eventual leader and three weak shards,
3110                // all before leader announcement.
3111                let strong = coded_block
3112                    .shard(peers[receiver_idx].index.get() as u16)
3113                    .expect("missing shard")
3114                    .encode();
3115                peers[0]
3116                    .sender
3117                    .send(Recipients::One(receiver_pk.clone()), strong, true)
3118                    .await
3119                    .expect("send failed");
3120
3121                for i in [1usize, 2usize, 4usize] {
3122                    let weak = coded_block
3123                        .shard(peers[i].index.get() as u16)
3124                        .expect("missing shard")
3125                        .verify_into_weak()
3126                        .expect("verify_into_weak failed")
3127                        .encode();
3128                    peers[i]
3129                        .sender
3130                        .send(Recipients::One(receiver_pk.clone()), weak, true)
3131                        .await
3132                        .expect("send failed");
3133                }
3134
3135                context.sleep(config.link.latency * 2).await;
3136
3137                // No leader yet: shard subscription should still be pending and block unavailable.
3138                assert!(
3139                    matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
3140                    "shard subscription should not resolve before leader announcement"
3141                );
3142                assert!(
3143                    peers[receiver_idx].mailbox.get(commitment).await.is_none(),
3144                    "block should not reconstruct before leader announcement"
3145                );
3146
3147                // Announce leader, which drains buffered shards and should progress immediately.
3148                peers[receiver_idx]
3149                    .mailbox
3150                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
3151                    .await;
3152
3153                select! {
3154                    _ = shard_sub => {},
3155                    _ = context.sleep(Duration::from_secs(5)) => {
3156                        panic!("shard subscription did not resolve after leader announcement");
3157                    },
3158                }
3159
3160                context.sleep(config.link.latency * 2).await;
3161                assert!(
3162                    peers[receiver_idx].mailbox.get(commitment).await.is_some(),
3163                    "block should reconstruct after buffered shards are ingested"
3164                );
3165
3166                // All shards were valid and from participants.
3167                assert!(
3168                    oracle.blocked().await.unwrap().is_empty(),
3169                    "no peers should be blocked for valid buffered shards"
3170                );
3171            },
3172        );
3173    }
3174
3175    #[test_traced]
3176    fn test_post_leader_shards_processed_immediately() {
3177        // Test that shards arriving after leader announcement are processed
3178        // without waiting for any extra trigger.
3179        let fixture: Fixture<C> = Fixture {
3180            num_peers: 10,
3181            ..Default::default()
3182        };
3183
3184        fixture.start(
3185            |config, context, oracle, mut peers, _, coding_config| async move {
3186                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3187                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3188                let commitment = coded_block.commitment();
3189
3190                let receiver_idx = 3usize;
3191                let receiver_pk = peers[receiver_idx].public_key.clone();
3192                let leader = peers[0].public_key.clone();
3193
3194                let shard_sub = peers[receiver_idx]
3195                    .mailbox
3196                    .subscribe_shard(commitment)
3197                    .await;
3198                peers[receiver_idx]
3199                    .mailbox
3200                    .discovered(
3201                        commitment,
3202                        leader.clone(),
3203                        Round::new(Epoch::zero(), View::new(1)),
3204                    )
3205                    .await;
3206
3207                // Send leader strong shard after leader is known.
3208                let strong = coded_block
3209                    .shard(peers[receiver_idx].index.get() as u16)
3210                    .expect("missing shard")
3211                    .encode();
3212                peers[0]
3213                    .sender
3214                    .send(Recipients::One(receiver_pk.clone()), strong, true)
3215                    .await
3216                    .expect("send failed");
3217
3218                // Subscription should resolve from the strong shard.
3219                select! {
3220                    _ = shard_sub => {},
3221                    _ = context.sleep(Duration::from_secs(5)) => {
3222                        panic!("shard subscription did not resolve after post-leader strong shard");
3223                    },
3224                }
3225
3226                // Send enough weak shards after leader known to reconstruct.
3227                for i in [1usize, 2usize, 4usize] {
3228                    let weak = coded_block
3229                        .shard(peers[i].index.get() as u16)
3230                        .expect("missing shard")
3231                        .verify_into_weak()
3232                        .expect("verify_into_weak failed")
3233                        .encode();
3234                    peers[i]
3235                        .sender
3236                        .send(Recipients::One(receiver_pk.clone()), weak, true)
3237                        .await
3238                        .expect("send failed");
3239                }
3240
3241                context.sleep(config.link.latency * 2).await;
3242                let reconstructed = peers[receiver_idx]
3243                    .mailbox
3244                    .get(commitment)
3245                    .await
3246                    .expect("block should reconstruct from post-leader shards");
3247                assert_eq!(reconstructed.commitment(), commitment);
3248
3249                assert!(
3250                    oracle.blocked().await.unwrap().is_empty(),
3251                    "no peers should be blocked for valid post-leader shards"
3252                );
3253            },
3254        );
3255    }
3256
3257    #[test_traced]
3258    fn test_invalid_shard_codec_blocks_peer() {
3259        // Test that receiving an invalid shard (codec failure) blocks the sender.
3260        let fixture: Fixture<C> = Fixture {
3261            num_peers: 4,
3262            ..Default::default()
3263        };
3264
3265        fixture.start(
3266            |config, context, oracle, mut peers, _, _coding_config| async move {
3267                let peer0_pk = peers[0].public_key.clone();
3268                let peer1_pk = peers[1].public_key.clone();
3269
3270                // Send garbage bytes that will fail codec decoding.
3271                let garbage = Bytes::from(vec![0xFF, 0xFE, 0xFD, 0xFC, 0xFB]);
3272                peers[1]
3273                    .sender
3274                    .send(Recipients::One(peer0_pk.clone()), garbage, true)
3275                    .await
3276                    .expect("send failed");
3277
3278                context.sleep(config.link.latency * 2).await;
3279
3280                // Peer 1 should be blocked by peer 0 for sending invalid shard.
3281                assert_blocked(&oracle, &peer0_pk, &peer1_pk).await;
3282            },
3283        );
3284    }
3285
3286    #[test_traced]
3287    fn test_duplicate_buffered_strong_shard_does_not_block_before_leader() {
3288        // Test that duplicate strong shards before leader announcement are
3289        // buffered and do not immediately block the sender.
3290        let fixture: Fixture<C> = Fixture {
3291            ..Default::default()
3292        };
3293
3294        fixture.start(
3295            |config, context, oracle, mut peers, _, coding_config| async move {
3296                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3297                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3298
3299                // Get peer 2's strong shard.
3300                let peer2_index = peers[2].index.get() as u16;
3301                let peer2_strong_shard = coded_block.shard(peer2_index).expect("missing shard");
3302                let strong_bytes = peer2_strong_shard.encode();
3303
3304                let peer2_pk = peers[2].public_key.clone();
3305
3306                // Do NOT set a leader — shards should be buffered.
3307
3308                // Peer 1 sends the strong shard to peer 2 (buffered, leader unknown).
3309                peers[1]
3310                    .sender
3311                    .send(
3312                        Recipients::One(peer2_pk.clone()),
3313                        strong_bytes.clone(),
3314                        true,
3315                    )
3316                    .await
3317                    .expect("send failed");
3318                context.sleep(config.link.latency * 2).await;
3319
3320                // No one should be blocked yet.
3321                let blocked = oracle.blocked().await.unwrap();
3322                assert!(blocked.is_empty(), "no peers should be blocked yet");
3323
3324                // Peer 1 sends the same strong shard AGAIN (duplicate while leader unknown).
3325                peers[1]
3326                    .sender
3327                    .send(Recipients::One(peer2_pk), strong_bytes, true)
3328                    .await
3329                    .expect("send failed");
3330                context.sleep(config.link.latency * 2).await;
3331
3332                // Still no blocking before a leader is known.
3333                let blocked = oracle.blocked().await.unwrap();
3334                assert!(
3335                    blocked.is_empty(),
3336                    "no peers should be blocked before leader"
3337                );
3338            },
3339        );
3340    }
3341
3342    #[test_traced]
3343    fn test_invalid_strong_shard_crypto_blocks_leader() {
3344        // Test that a strong shard failing cryptographic verification (C::weaken)
3345        // results in the leader being blocked.
3346        let fixture: Fixture<C> = Fixture {
3347            ..Default::default()
3348        };
3349
3350        fixture.start(
3351            |config, context, oracle, mut peers, _, coding_config| async move {
3352                // Create two different blocks — shard from block2 won't verify
3353                // against commitment from block1.
3354                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3355                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3356                let commitment1 = coded_block1.commitment();
3357
3358                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3359                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3360
3361                // Get peer 2's strong shard from block2, but re-wrap it with
3362                // block1's commitment so it fails C::weaken.
3363                let peer2_index = peers[2].index.get() as u16;
3364                let mut wrong_shard = coded_block2.shard(peer2_index).expect("missing shard");
3365                wrong_shard.commitment = commitment1;
3366                let wrong_bytes = wrong_shard.encode();
3367
3368                let peer2_pk = peers[2].public_key.clone();
3369                let leader = peers[0].public_key.clone();
3370
3371                // Inform peer 2 that peer 0 is the leader.
3372                peers[2]
3373                    .mailbox
3374                    .discovered(commitment1, leader, Round::new(Epoch::zero(), View::new(1)))
3375                    .await;
3376
3377                // Leader (peer 0) sends the invalid strong shard.
3378                peers[0]
3379                    .sender
3380                    .send(Recipients::One(peer2_pk), wrong_bytes, true)
3381                    .await
3382                    .expect("send failed");
3383                context.sleep(config.link.latency * 2).await;
3384
3385                // Peer 0 (leader) should be blocked for invalid crypto.
3386                assert_blocked(&oracle, &peers[2].public_key, &peers[0].public_key).await;
3387            },
3388        );
3389    }
3390
3391    #[test_traced]
3392    fn test_weak_shard_index_mismatch_blocks_peer() {
3393        // Test that a weak shard whose shard index doesn't match the sender's
3394        // participant index results in blocking the sender.
3395        let fixture: Fixture<C> = Fixture {
3396            num_peers: 10,
3397            ..Default::default()
3398        };
3399
3400        fixture.start(
3401            |config, context, oracle, mut peers, _, coding_config| async move {
3402                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3403                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3404                let commitment = coded_block.commitment();
3405
3406                // Get peer 2's strong shard so peer 3 can validate weak shards.
3407                let peer3_index = peers[3].index.get() as u16;
3408                let peer3_strong_shard = coded_block.shard(peer3_index).expect("missing shard");
3409
3410                // Get peer 1's valid weak shard, then change the index to peer 4's index.
3411                let peer1_index = peers[1].index.get() as u16;
3412                let mut wrong_index_weak_shard = coded_block
3413                    .shard(peer1_index)
3414                    .expect("missing shard")
3415                    .verify_into_weak()
3416                    .expect("verify_into_weak failed");
3417                // Mutate the index so it doesn't match sender (peer 1).
3418                wrong_index_weak_shard.index = peers[4].index.get() as u16;
3419                let wrong_bytes = wrong_index_weak_shard.encode();
3420
3421                let peer3_pk = peers[3].public_key.clone();
3422                let leader = peers[0].public_key.clone();
3423
3424                // Inform peer 3 of the leader and send them the strong shard.
3425                peers[3]
3426                    .mailbox
3427                    .discovered(commitment, leader, Round::new(Epoch::zero(), View::new(1)))
3428                    .await;
3429                let strong_bytes = peer3_strong_shard.encode();
3430                peers[0]
3431                    .sender
3432                    .send(Recipients::One(peer3_pk.clone()), strong_bytes, true)
3433                    .await
3434                    .expect("send failed");
3435                context.sleep(config.link.latency * 2).await;
3436
3437                // Peer 1 sends a weak shard with a mismatched index to peer 3.
3438                peers[1]
3439                    .sender
3440                    .send(Recipients::One(peer3_pk), wrong_bytes, true)
3441                    .await
3442                    .expect("send failed");
3443                context.sleep(config.link.latency * 2).await;
3444
3445                // Peer 1 should be blocked for weak shard index mismatch.
3446                assert_blocked(&oracle, &peers[3].public_key, &peers[1].public_key).await;
3447            },
3448        );
3449    }
3450
3451    #[test_traced]
3452    fn test_invalid_weak_shard_crypto_blocks_peer() {
3453        // Test that a weak shard failing cryptographic verification (C::check)
3454        // results in blocking the sender once batch validation fires at quorum.
3455        let fixture: Fixture<C> = Fixture {
3456            num_peers: 10,
3457            ..Default::default()
3458        };
3459
3460        fixture.start(
3461            |config, context, oracle, mut peers, _, coding_config| async move {
3462                // Create two different blocks.
3463                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3464                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3465                let commitment1 = coded_block1.commitment();
3466
3467                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3468                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3469
3470                // Get peer 3's strong shard from block1 (valid).
3471                let peer3_index = peers[3].index.get() as u16;
3472                let peer3_strong_shard = coded_block1.shard(peer3_index).expect("missing shard");
3473
3474                // Get peer 1's weak shard from block2, but re-wrap with block1's
3475                // commitment so C::check fails.
3476                let peer1_index = peers[1].index.get() as u16;
3477                let mut wrong_weak_shard = coded_block2
3478                    .shard(peer1_index)
3479                    .expect("missing shard")
3480                    .verify_into_weak()
3481                    .expect("verify_into_weak failed");
3482                wrong_weak_shard.commitment = commitment1;
3483                let wrong_bytes = wrong_weak_shard.encode();
3484
3485                let peer3_pk = peers[3].public_key.clone();
3486                let leader = peers[0].public_key.clone();
3487
3488                // Inform peer 3 of the leader and send the valid strong shard.
3489                peers[3]
3490                    .mailbox
3491                    .discovered(commitment1, leader, Round::new(Epoch::zero(), View::new(1)))
3492                    .await;
3493                let strong_bytes = peer3_strong_shard.encode();
3494                peers[0]
3495                    .sender
3496                    .send(Recipients::One(peer3_pk.clone()), strong_bytes, true)
3497                    .await
3498                    .expect("send failed");
3499                context.sleep(config.link.latency * 2).await;
3500
3501                // Peer 1 sends the invalid weak shard.
3502                peers[1]
3503                    .sender
3504                    .send(Recipients::One(peer3_pk.clone()), wrong_bytes, true)
3505                    .await
3506                    .expect("send failed");
3507                context.sleep(config.link.latency * 2).await;
3508
3509                // No block yet: batch validation deferred until quorum.
3510                // Send valid weak shards from peers 2 and 4 to reach quorum
3511                // (minimum_shards = 4: 1 strong + 3 pending weak).
3512                for &idx in &[2, 4] {
3513                    let peer_index = peers[idx].index.get() as u16;
3514                    let weak = coded_block1
3515                        .shard(peer_index)
3516                        .expect("missing shard")
3517                        .verify_into_weak()
3518                        .expect("verify_into_weak failed");
3519                    let bytes = weak.encode();
3520                    peers[idx]
3521                        .sender
3522                        .send(Recipients::One(peer3_pk.clone()), bytes, true)
3523                        .await
3524                        .expect("send failed");
3525                }
3526                context.sleep(config.link.latency * 2).await;
3527
3528                // Peer 1 should be blocked for invalid weak shard crypto.
3529                assert_blocked(&oracle, &peers[3].public_key, &peers[1].public_key).await;
3530            },
3531        );
3532    }
3533
3534    #[test_traced]
3535    fn test_reconstruction_recovers_after_quorum_with_one_invalid_weak_shard() {
3536        // With 10 peers, minimum_shards=4.
3537        // Contribute exactly 4 shards first (1 strong + 3 weak), with one weak invalid:
3538        // quorum is reached, but checked_shards stays at 3 after batch validation.
3539        // Then send one more valid weak shard to meet reconstruction threshold.
3540        let fixture: Fixture<C> = Fixture {
3541            num_peers: 10,
3542            ..Default::default()
3543        };
3544
3545        fixture.start(
3546            |config, context, oracle, mut peers, _, coding_config| async move {
3547                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3548                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3549                let commitment1 = coded_block1.commitment();
3550
3551                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3552                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3553
3554                let receiver_idx = 3usize;
3555                let receiver_pk = peers[receiver_idx].public_key.clone();
3556
3557                // Prepare one invalid weak shard: shard data from block2, commitment from block1.
3558                let peer1_index = peers[1].index.get() as u16;
3559                let mut invalid_weak = coded_block2
3560                    .shard(peer1_index)
3561                    .expect("missing shard")
3562                    .verify_into_weak()
3563                    .expect("verify_into_weak failed");
3564                invalid_weak.commitment = commitment1;
3565
3566                // Announce leader and deliver receiver's strong shard.
3567                let leader = peers[0].public_key.clone();
3568                peers[receiver_idx]
3569                    .mailbox
3570                    .discovered(commitment1, leader, Round::new(Epoch::zero(), View::new(1)))
3571                    .await;
3572                let receiver_strong = coded_block1
3573                    .shard(peers[receiver_idx].index.get() as u16)
3574                    .expect("missing shard")
3575                    .encode();
3576                peers[0]
3577                    .sender
3578                    .send(Recipients::One(receiver_pk.clone()), receiver_strong, true)
3579                    .await
3580                    .expect("send failed");
3581
3582                // Contribute exactly minimum_shards total:
3583                // - invalid weak from peer1
3584                // - valid weak from peer2
3585                // - valid weak from peer4
3586                peers[1]
3587                    .sender
3588                    .send(
3589                        Recipients::One(receiver_pk.clone()),
3590                        invalid_weak.encode(),
3591                        true,
3592                    )
3593                    .await
3594                    .expect("send failed");
3595                for idx in [2usize, 4usize] {
3596                    let weak = coded_block1
3597                        .shard(peers[idx].index.get() as u16)
3598                        .expect("missing shard")
3599                        .verify_into_weak()
3600                        .expect("verify_into_weak failed")
3601                        .encode();
3602                    peers[idx]
3603                        .sender
3604                        .send(Recipients::One(receiver_pk.clone()), weak, true)
3605                        .await
3606                        .expect("send failed");
3607                }
3608
3609                context.sleep(config.link.latency * 2).await;
3610
3611                // Invalid weak shard should be blocked, and reconstruction should not happen yet.
3612                assert_blocked(
3613                    &oracle,
3614                    &peers[receiver_idx].public_key,
3615                    &peers[1].public_key,
3616                )
3617                .await;
3618                assert!(
3619                    peers[receiver_idx].mailbox.get(commitment1).await.is_none(),
3620                    "block should not reconstruct with only 3 checked shards"
3621                );
3622
3623                // Send one additional valid weak shard; this should now satisfy checked threshold.
3624                let extra_valid = coded_block1
3625                    .shard(peers[5].index.get() as u16)
3626                    .expect("missing shard")
3627                    .verify_into_weak()
3628                    .expect("verify_into_weak failed")
3629                    .encode();
3630                peers[5]
3631                    .sender
3632                    .send(Recipients::One(receiver_pk), extra_valid, true)
3633                    .await
3634                    .expect("send failed");
3635
3636                context.sleep(config.link.latency * 2).await;
3637
3638                let reconstructed = peers[receiver_idx]
3639                    .mailbox
3640                    .get(commitment1)
3641                    .await
3642                    .expect("block should reconstruct after additional valid weak shard");
3643                assert_eq!(reconstructed.commitment(), commitment1);
3644            },
3645        );
3646    }
3647
3648    #[test_traced]
3649    fn test_invalid_pending_weak_shard_blocked_on_drain() {
3650        // Test that a weak shard buffered in pending_weak_shards (before checking data) is
3651        // blocked when batch validation runs at quorum and C::check fails.
3652        let fixture: Fixture<C> = Fixture {
3653            num_peers: 10,
3654            ..Default::default()
3655        };
3656
3657        fixture.start(
3658            |config, context, oracle, mut peers, _, coding_config| async move {
3659                // Create two different blocks.
3660                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3661                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3662                let commitment1 = coded_block1.commitment();
3663
3664                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3665                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3666
3667                // Get peer 1's weak shard from block2, but wrap with block1's commitment.
3668                let peer1_index = peers[1].index.get() as u16;
3669                let mut wrong_weak_shard = coded_block2
3670                    .shard(peer1_index)
3671                    .expect("missing shard")
3672                    .verify_into_weak()
3673                    .expect("verify_into_weak failed");
3674                wrong_weak_shard.commitment = commitment1;
3675                let wrong_bytes = wrong_weak_shard.encode();
3676
3677                let peer3_pk = peers[3].public_key.clone();
3678
3679                // Send the invalid weak shard BEFORE the strong shard (no checking data yet,
3680                // so it gets buffered in pending_weak_shards).
3681                peers[1]
3682                    .sender
3683                    .send(Recipients::One(peer3_pk.clone()), wrong_bytes, true)
3684                    .await
3685                    .expect("send failed");
3686                context.sleep(config.link.latency * 2).await;
3687
3688                // No one should be blocked yet (weak shard is buffered).
3689                let blocked = oracle.blocked().await.unwrap();
3690                assert!(blocked.is_empty(), "no peers should be blocked yet");
3691
3692                // Send valid weak shards from peers 2 and 4 so the pending count
3693                // reaches quorum once the strong shard arrives
3694                // (minimum_shards = 4: 1 strong + 3 pending weak).
3695                for &idx in &[2, 4] {
3696                    let peer_index = peers[idx].index.get() as u16;
3697                    let weak = coded_block1
3698                        .shard(peer_index)
3699                        .expect("missing shard")
3700                        .verify_into_weak()
3701                        .expect("verify_into_weak failed");
3702                    let bytes = weak.encode();
3703                    peers[idx]
3704                        .sender
3705                        .send(Recipients::One(peer3_pk.clone()), bytes, true)
3706                        .await
3707                        .expect("send failed");
3708                }
3709                context.sleep(config.link.latency * 2).await;
3710
3711                // No one should be blocked yet (all shards are buffered pending leader).
3712                let blocked = oracle.blocked().await.unwrap();
3713                assert!(blocked.is_empty(), "no peers should be blocked yet");
3714
3715                // Now inform peer 3 of the leader and send the valid strong shard.
3716                let leader = peers[0].public_key.clone();
3717                peers[3]
3718                    .mailbox
3719                    .discovered(commitment1, leader, Round::new(Epoch::zero(), View::new(1)))
3720                    .await;
3721                let peer3_index = peers[3].index.get() as u16;
3722                let peer3_strong_shard = coded_block1.shard(peer3_index).expect("missing shard");
3723                let strong_bytes = peer3_strong_shard.encode();
3724                peers[0]
3725                    .sender
3726                    .send(Recipients::One(peer3_pk), strong_bytes, true)
3727                    .await
3728                    .expect("send failed");
3729                context.sleep(config.link.latency * 2).await;
3730
3731                // Peer 1 should be blocked after batch validation validates and
3732                // rejects their invalid weak shard.
3733                assert_blocked(&oracle, &peers[3].public_key, &peers[1].public_key).await;
3734            },
3735        );
3736    }
3737
3738    #[test_traced]
3739    fn test_cross_epoch_buffered_shard_not_blocked() {
3740        let executor = deterministic::Runner::default();
3741        executor.start(|context| async move {
3742            let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
3743                context.with_label("network"),
3744                simulated::Config {
3745                    max_size: MAX_SHARD_SIZE as u32,
3746                    disconnect_on_block: true,
3747                    tracked_peer_sets: None,
3748                },
3749            );
3750            network.start();
3751
3752            // Epoch 0 participants: peers 0..4 (seeds 0..4).
3753            // Epoch 1 participants: peers 0..3 + peer 4 (seed 4 replaces seed 3).
3754            let mut epoch0_keys: Vec<PrivateKey> = (0..4).map(PrivateKey::from_seed).collect();
3755            epoch0_keys.sort_by_key(|s| s.public_key());
3756            let epoch0_pks: Vec<P> = epoch0_keys.iter().map(|c| c.public_key()).collect();
3757            let epoch0_set: Set<P> = Set::from_iter_dedup(epoch0_pks.clone());
3758
3759            let future_peer_key = PrivateKey::from_seed(4);
3760            let future_peer_pk = future_peer_key.public_key();
3761            let mut epoch1_pks: Vec<P> = epoch0_pks[..3]
3762                .iter()
3763                .cloned()
3764                .chain(std::iter::once(future_peer_pk.clone()))
3765                .collect();
3766            epoch1_pks.sort();
3767            let epoch1_set: Set<P> = Set::from_iter_dedup(epoch1_pks);
3768
3769            let receiver_idx_in_epoch0 = epoch0_set
3770                .index(&epoch0_pks[0])
3771                .expect("receiver must be in epoch 0")
3772                .get() as usize;
3773            let receiver_key = epoch0_keys[receiver_idx_in_epoch0].clone();
3774            let receiver_pk = receiver_key.public_key();
3775
3776            let receiver_control = oracle.control(receiver_pk.clone());
3777            let (sender_handle, receiver_handle) = receiver_control
3778                .register(0, TEST_QUOTA)
3779                .await
3780                .expect("registration should succeed");
3781
3782            let future_peer_control = oracle.control(future_peer_pk.clone());
3783            let (mut future_peer_sender, _future_peer_receiver) = future_peer_control
3784                .register(0, TEST_QUOTA)
3785                .await
3786                .expect("registration should succeed");
3787            oracle
3788                .add_link(future_peer_pk.clone(), receiver_pk.clone(), DEFAULT_LINK)
3789                .await
3790                .expect("link should be added");
3791
3792            // Set up the receiver's engine with a multi-epoch provider.
3793            let scheme_epoch0 =
3794                Scheme::signer(SCHEME_NAMESPACE, epoch0_set.clone(), receiver_key.clone())
3795                    .expect("signer scheme should be created");
3796            let scheme_epoch1 =
3797                Scheme::signer(SCHEME_NAMESPACE, epoch1_set.clone(), receiver_key.clone())
3798                    .expect("signer scheme should be created");
3799            let scheme_provider =
3800                MultiEpochProvider::single(scheme_epoch0).with_epoch(Epoch::new(1), scheme_epoch1);
3801
3802            let config: Config<_, _, _, _, C, _, _, _> = Config {
3803                scheme_provider,
3804                blocker: receiver_control.clone(),
3805                shard_codec_cfg: CodecConfig {
3806                    maximum_shard_size: MAX_SHARD_SIZE,
3807                },
3808                block_codec_cfg: (),
3809                strategy: STRATEGY,
3810                mailbox_size: 1024,
3811                peer_buffer_size: NZUsize!(64),
3812                background_channel_capacity: 1024,
3813                peer_provider: oracle.manager(),
3814            };
3815
3816            let (engine, mailbox) = ShardEngine::new(context.with_label("receiver"), config);
3817            engine.start((sender_handle, receiver_handle));
3818
3819            // Build a coded block using epoch 1's participant set.
3820            let coding_config = coding_config_for_participants(epoch1_set.len() as u16);
3821            let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3822            let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3823            let commitment = coded_block.commitment();
3824
3825            // The future peer creates a weak shard at their epoch 1 index.
3826            let future_peer_index = epoch1_set
3827                .index(&future_peer_pk)
3828                .expect("future peer must be in epoch 1");
3829            let strong_shard = coded_block
3830                .shard(future_peer_index.get() as u16)
3831                .expect("missing shard");
3832            let weak_shard = strong_shard
3833                .verify_into_weak()
3834                .expect("verify_into_weak failed");
3835            let weak_bytes = weak_shard.encode();
3836
3837            // Send the shard BEFORE external_proposed (goes to pre-leader buffer).
3838            future_peer_sender
3839                .send(Recipients::One(receiver_pk.clone()), weak_bytes, true)
3840                .await
3841                .expect("send failed");
3842            context.sleep(DEFAULT_LINK.latency * 2).await;
3843
3844            // No one should be blocked yet (shard is buffered, leader unknown).
3845            let blocked = oracle.blocked().await.unwrap();
3846            assert!(
3847                blocked.is_empty(),
3848                "no peers should be blocked while shard is buffered"
3849            );
3850
3851            // Announce the leader with an epoch 1 round.
3852            let leader = epoch0_pks[1].clone();
3853            mailbox
3854                .discovered(commitment, leader, Round::new(Epoch::new(1), View::new(1)))
3855                .await;
3856            context.sleep(DEFAULT_LINK.latency * 2).await;
3857
3858            // The future peer is a valid participant in epoch 1, so they must NOT
3859            // be blocked after their buffered shard is ingested.
3860            let blocked = oracle.blocked().await.unwrap();
3861            assert!(
3862                blocked.is_empty(),
3863                "future-epoch participant should not be blocked: {blocked:?}"
3864            );
3865        });
3866    }
3867
3868    #[test_traced]
3869    fn test_weak_shard_broadcast_survives_provider_churn() {
3870        let executor = deterministic::Runner::default();
3871        executor.start(|context| async move {
3872            let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
3873                context.with_label("network"),
3874                simulated::Config {
3875                    max_size: MAX_SHARD_SIZE as u32,
3876                    disconnect_on_block: true,
3877                    tracked_peer_sets: None,
3878                },
3879            );
3880            network.start();
3881
3882            let mut private_keys: Vec<PrivateKey> = (0..4).map(PrivateKey::from_seed).collect();
3883            private_keys.sort_by_key(|s| s.public_key());
3884            let peer_keys: Vec<P> = private_keys.iter().map(|k| k.public_key()).collect();
3885            let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
3886
3887            let leader_idx = 0usize;
3888            let broadcaster_idx = 1usize;
3889            let receiver_idx = 2usize;
3890
3891            let leader_pk = peer_keys[leader_idx].clone();
3892            let broadcaster_pk = peer_keys[broadcaster_idx].clone();
3893            let receiver_pk = peer_keys[receiver_idx].clone();
3894
3895            let mut registrations = BTreeMap::new();
3896            for key in &peer_keys {
3897                let control = oracle.control(key.clone());
3898                let (sender, receiver) = control
3899                    .register(0, TEST_QUOTA)
3900                    .await
3901                    .expect("registration should succeed");
3902                registrations.insert(key.clone(), (control, sender, receiver));
3903            }
3904
3905            for src in &peer_keys {
3906                for dst in &peer_keys {
3907                    if src == dst {
3908                        continue;
3909                    }
3910                    oracle
3911                        .add_link(src.clone(), dst.clone(), DEFAULT_LINK)
3912                        .await
3913                        .expect("link should be added");
3914                }
3915            }
3916
3917            let (_leader_control, mut leader_sender, _leader_receiver) = registrations
3918                .remove(&leader_pk)
3919                .expect("leader should be registered");
3920            let (broadcaster_control, broadcaster_sender, broadcaster_receiver) = registrations
3921                .remove(&broadcaster_pk)
3922                .expect("broadcaster should be registered");
3923            let (receiver_control, receiver_sender, receiver_receiver) = registrations
3924                .remove(&receiver_pk)
3925                .expect("receiver should be registered");
3926
3927            let broadcaster_scheme = Scheme::signer(
3928                SCHEME_NAMESPACE,
3929                participants.clone(),
3930                private_keys[broadcaster_idx].clone(),
3931            )
3932            .expect("signer scheme should be created");
3933            // `discovered` performs two scoped lookups (`handle_external_proposal`
3934            // and `ingest_buffered_shards`). Strong-shard validation is the third.
3935            // Any additional lookup for epoch 0 churns to `None`.
3936            let broadcaster_provider = ChurningProvider::new(broadcaster_scheme, 3);
3937            let broadcaster_config: Config<_, _, _, _, C, _, _, _> = Config {
3938                scheme_provider: broadcaster_provider,
3939                blocker: broadcaster_control.clone(),
3940                shard_codec_cfg: CodecConfig {
3941                    maximum_shard_size: MAX_SHARD_SIZE,
3942                },
3943                block_codec_cfg: (),
3944                strategy: STRATEGY,
3945                mailbox_size: 1024,
3946                peer_buffer_size: NZUsize!(64),
3947                background_channel_capacity: 1024,
3948                peer_provider: oracle.manager(),
3949            };
3950            let (broadcaster_engine, broadcaster_mailbox) =
3951                ChurningShardEngine::new(context.with_label("broadcaster"), broadcaster_config);
3952            broadcaster_engine.start((broadcaster_sender, broadcaster_receiver));
3953
3954            let receiver_scheme = Scheme::signer(
3955                SCHEME_NAMESPACE,
3956                participants.clone(),
3957                private_keys[receiver_idx].clone(),
3958            )
3959            .expect("signer scheme should be created");
3960            let receiver_config: Config<_, _, _, _, C, _, _, _> = Config {
3961                scheme_provider: MultiEpochProvider::single(receiver_scheme),
3962                blocker: receiver_control.clone(),
3963                shard_codec_cfg: CodecConfig {
3964                    maximum_shard_size: MAX_SHARD_SIZE,
3965                },
3966                block_codec_cfg: (),
3967                strategy: STRATEGY,
3968                mailbox_size: 1024,
3969                peer_buffer_size: NZUsize!(64),
3970                background_channel_capacity: 1024,
3971                peer_provider: oracle.manager(),
3972            };
3973            let (receiver_engine, receiver_mailbox) =
3974                ShardEngine::new(context.with_label("receiver"), receiver_config);
3975            receiver_engine.start((receiver_sender, receiver_receiver));
3976
3977            let coding_config = coding_config_for_participants(peer_keys.len() as u16);
3978            let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3979            let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3980            let commitment = coded_block.commitment();
3981            let round = Round::new(Epoch::zero(), View::new(1));
3982
3983            broadcaster_mailbox
3984                .discovered(commitment, leader_pk.clone(), round)
3985                .await;
3986            receiver_mailbox
3987                .discovered(commitment, leader_pk.clone(), round)
3988                .await;
3989            context.sleep(DEFAULT_LINK.latency).await;
3990
3991            let broadcaster_index = participants
3992                .index(&broadcaster_pk)
3993                .expect("broadcaster must be a participant")
3994                .get() as u16;
3995            let broadcaster_strong = coded_block
3996                .shard(broadcaster_index)
3997                .expect("missing shard")
3998                .encode();
3999            leader_sender
4000                .send(Recipients::One(broadcaster_pk), broadcaster_strong, true)
4001                .await
4002                .expect("send failed");
4003
4004            let receiver_index = participants
4005                .index(&receiver_pk)
4006                .expect("receiver must be a participant")
4007                .get() as u16;
4008            let receiver_strong = coded_block
4009                .shard(receiver_index)
4010                .expect("missing shard")
4011                .encode();
4012            leader_sender
4013                .send(Recipients::One(receiver_pk.clone()), receiver_strong, true)
4014                .await
4015                .expect("send failed");
4016
4017            context.sleep(DEFAULT_LINK.latency * 3).await;
4018
4019            let reconstructed = receiver_mailbox.get(commitment).await;
4020            assert!(
4021                reconstructed.is_some(),
4022                "receiver should reconstruct after broadcaster validates and gossips weak shard"
4023            );
4024        });
4025    }
4026
4027    #[test_traced]
4028    fn test_failed_reconstruction_digest_mismatch_then_recovery() {
4029        // Byzantine scenario: all shards pass coding verification (correct root) but the
4030        // decoded blob has a different digest than what the commitment claims. This triggers
4031        // Error::DigestMismatch in try_reconstruct. Verify that:
4032        //   1. The failed commitment's state is cleaned up
4033        //   2. Subscriptions for the failed commitment never resolve
4034        //   3. A subsequent valid commitment reconstructs successfully
4035        let fixture: Fixture<C> = Fixture {
4036            num_peers: 10,
4037            ..Default::default()
4038        };
4039
4040        fixture.start(
4041            |config, context, _oracle, mut peers, _, coding_config| async move {
4042                // Block 1: the "claimed" block (its digest goes in the fake commitment).
4043                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4044                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
4045
4046                // Block 2: the actual data behind the shards.
4047                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
4048                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
4049                let real_commitment2 = coded_block2.commitment();
4050
4051                // Build a fake commitment: block1's digest + block2's coding root/context/config.
4052                // Shards from block2 will verify against block2's root (present in the fake
4053                // commitment), but try_reconstruct will decode block2 and find its digest != D1.
4054                let fake_commitment = Commitment::from((
4055                    coded_block1.digest(),
4056                    real_commitment2.root::<Sha256Digest>(),
4057                    real_commitment2.context::<Sha256Digest>(),
4058                    coding_config,
4059                ));
4060
4061                let receiver_idx = 3usize;
4062                let receiver_pk = peers[receiver_idx].public_key.clone();
4063                let leader = peers[0].public_key.clone();
4064                let round = Round::new(Epoch::zero(), View::new(1));
4065
4066                // Discover the fake commitment.
4067                peers[receiver_idx]
4068                    .mailbox
4069                    .discovered(fake_commitment, leader.clone(), round)
4070                    .await;
4071
4072                // Open a block subscription before sending shards.
4073                let mut block_sub = peers[receiver_idx].mailbox.subscribe(fake_commitment).await;
4074                let mut digest_sub = peers[receiver_idx]
4075                    .mailbox
4076                    .subscribe_by_digest(coded_block1.digest())
4077                    .await;
4078
4079                // Send the receiver's strong shard (from block2, with fake commitment).
4080                let receiver_shard_idx = peers[receiver_idx].index.get() as u16;
4081                let mut strong_shard = coded_block2
4082                    .shard(receiver_shard_idx)
4083                    .expect("missing shard");
4084                strong_shard.commitment = fake_commitment;
4085                peers[0]
4086                    .sender
4087                    .send(
4088                        Recipients::One(receiver_pk.clone()),
4089                        strong_shard.encode(),
4090                        true,
4091                    )
4092                    .await
4093                    .expect("send failed");
4094
4095                // Send enough weak shards to reach minimum_shards (4 for 10 peers).
4096                // Need 3 more weak shards after the strong shard.
4097                for &idx in &[1usize, 2, 4] {
4098                    let peer_shard_idx = peers[idx].index.get() as u16;
4099                    let mut weak = coded_block2
4100                        .shard(peer_shard_idx)
4101                        .expect("missing shard")
4102                        .verify_into_weak()
4103                        .expect("verify_into_weak failed");
4104                    weak.commitment = fake_commitment;
4105                    peers[idx]
4106                        .sender
4107                        .send(Recipients::One(receiver_pk.clone()), weak.encode(), true)
4108                        .await
4109                        .expect("send failed");
4110                }
4111
4112                context.sleep(config.link.latency * 2).await;
4113
4114                // Reconstruction should have failed with DigestMismatch.
4115                // State for fake_commitment should be removed (engine.rs:792).
4116                assert!(
4117                    peers[receiver_idx]
4118                        .mailbox
4119                        .get(fake_commitment)
4120                        .await
4121                        .is_none(),
4122                    "block should not be available after DigestMismatch"
4123                );
4124
4125                // Block subscription should be closed after failed reconstruction cleanup.
4126                assert!(
4127                    matches!(block_sub.try_recv(), Err(TryRecvError::Closed)),
4128                    "subscription should close for failed reconstruction"
4129                );
4130                assert!(
4131                    matches!(digest_sub.try_recv(), Err(TryRecvError::Closed)),
4132                    "digest subscription should close after failed reconstruction"
4133                );
4134
4135                // Now verify the engine is not stuck: send valid shards for block1's real
4136                // commitment and confirm reconstruction succeeds.
4137                let real_commitment1 = coded_block1.commitment();
4138                let round2 = Round::new(Epoch::zero(), View::new(2));
4139                peers[receiver_idx]
4140                    .mailbox
4141                    .discovered(real_commitment1, leader.clone(), round2)
4142                    .await;
4143
4144                let strong1 = coded_block1
4145                    .shard(receiver_shard_idx)
4146                    .expect("missing shard");
4147                peers[0]
4148                    .sender
4149                    .send(Recipients::One(receiver_pk.clone()), strong1.encode(), true)
4150                    .await
4151                    .expect("send failed");
4152
4153                for &idx in &[1usize, 2, 4] {
4154                    let peer_shard_idx = peers[idx].index.get() as u16;
4155                    let weak = coded_block1
4156                        .shard(peer_shard_idx)
4157                        .expect("missing shard")
4158                        .verify_into_weak()
4159                        .expect("verify_into_weak failed");
4160                    peers[idx]
4161                        .sender
4162                        .send(Recipients::One(receiver_pk.clone()), weak.encode(), true)
4163                        .await
4164                        .expect("send failed");
4165                }
4166
4167                context.sleep(config.link.latency * 2).await;
4168
4169                let reconstructed = peers[receiver_idx]
4170                    .mailbox
4171                    .get(real_commitment1)
4172                    .await
4173                    .expect("valid block should reconstruct after prior failure");
4174                assert_eq!(reconstructed.commitment(), real_commitment1);
4175            },
4176        );
4177    }
4178
4179    #[test_traced]
4180    fn test_failed_reconstruction_context_mismatch_then_recovery() {
4181        // Byzantine scenario: shards decode to a block whose digest and coding root/config
4182        // match the commitment, but the commitment carries a mismatched context digest.
4183        // The engine must reject reconstruction and keep the commitment unresolved.
4184        let fixture: Fixture<C> = Fixture {
4185            num_peers: 10,
4186            ..Default::default()
4187        };
4188
4189        fixture.start(
4190            |config, context, _oracle, mut peers, _, coding_config| async move {
4191                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4192                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4193                let real_commitment = coded_block.commitment();
4194
4195                let wrong_context_digest = Sha256::hash(b"wrong_context");
4196                assert_ne!(
4197                    real_commitment.context::<Sha256Digest>(),
4198                    wrong_context_digest,
4199                    "test requires a distinct context digest"
4200                );
4201                let fake_commitment = Commitment::from((
4202                    coded_block.digest(),
4203                    real_commitment.root::<Sha256Digest>(),
4204                    wrong_context_digest,
4205                    coding_config,
4206                ));
4207
4208                let receiver_idx = 3usize;
4209                let receiver_pk = peers[receiver_idx].public_key.clone();
4210                let leader = peers[0].public_key.clone();
4211                let round = Round::new(Epoch::zero(), View::new(1));
4212
4213                peers[receiver_idx]
4214                    .mailbox
4215                    .discovered(fake_commitment, leader.clone(), round)
4216                    .await;
4217                let mut block_sub = peers[receiver_idx].mailbox.subscribe(fake_commitment).await;
4218
4219                let receiver_shard_idx = peers[receiver_idx].index.get() as u16;
4220                let mut strong_shard = coded_block
4221                    .shard(receiver_shard_idx)
4222                    .expect("missing shard");
4223                strong_shard.commitment = fake_commitment;
4224                peers[0]
4225                    .sender
4226                    .send(
4227                        Recipients::One(receiver_pk.clone()),
4228                        strong_shard.encode(),
4229                        true,
4230                    )
4231                    .await
4232                    .expect("send failed");
4233
4234                for &idx in &[1usize, 2, 4] {
4235                    let peer_shard_idx = peers[idx].index.get() as u16;
4236                    let mut weak = coded_block
4237                        .shard(peer_shard_idx)
4238                        .expect("missing shard")
4239                        .verify_into_weak()
4240                        .expect("verify_into_weak failed");
4241                    weak.commitment = fake_commitment;
4242                    peers[idx]
4243                        .sender
4244                        .send(Recipients::One(receiver_pk.clone()), weak.encode(), true)
4245                        .await
4246                        .expect("send failed");
4247                }
4248
4249                context.sleep(config.link.latency * 2).await;
4250
4251                assert!(
4252                    peers[receiver_idx]
4253                        .mailbox
4254                        .get(fake_commitment)
4255                        .await
4256                        .is_none(),
4257                    "block should not be available after ContextMismatch"
4258                );
4259                assert!(
4260                    matches!(block_sub.try_recv(), Err(TryRecvError::Closed)),
4261                    "subscription should close for context-mismatched commitment"
4262                );
4263
4264                // Verify the receiver still reconstructs valid commitments afterward.
4265                let round2 = Round::new(Epoch::zero(), View::new(2));
4266                peers[receiver_idx]
4267                    .mailbox
4268                    .discovered(real_commitment, leader.clone(), round2)
4269                    .await;
4270
4271                let strong_real = coded_block
4272                    .shard(receiver_shard_idx)
4273                    .expect("missing shard");
4274                peers[0]
4275                    .sender
4276                    .send(
4277                        Recipients::One(receiver_pk.clone()),
4278                        strong_real.encode(),
4279                        true,
4280                    )
4281                    .await
4282                    .expect("send failed");
4283
4284                for &idx in &[1usize, 2, 4] {
4285                    let peer_shard_idx = peers[idx].index.get() as u16;
4286                    let weak = coded_block
4287                        .shard(peer_shard_idx)
4288                        .expect("missing shard")
4289                        .verify_into_weak()
4290                        .expect("verify_into_weak failed");
4291                    peers[idx]
4292                        .sender
4293                        .send(Recipients::One(receiver_pk.clone()), weak.encode(), true)
4294                        .await
4295                        .expect("send failed");
4296                }
4297
4298                context.sleep(config.link.latency * 2).await;
4299
4300                let reconstructed = peers[receiver_idx]
4301                    .mailbox
4302                    .get(real_commitment)
4303                    .await
4304                    .expect("valid block should reconstruct after prior context mismatch");
4305                assert_eq!(reconstructed.commitment(), real_commitment);
4306            },
4307        );
4308    }
4309
4310    #[test_traced]
4311    fn test_same_round_equivocation_preserves_certifiable_recovery() {
4312        // Regression coverage for same-round leader equivocation:
4313        // - leader equivocates across two commitments in the same round
4314        // - we receive a shard for commitment B (the certifiable one)
4315        // - commitment A reconstructs first
4316        // - commitment B must still remain recoverable
4317        let fixture: Fixture<C> = Fixture {
4318            num_peers: 10,
4319            ..Default::default()
4320        };
4321
4322        fixture.start(
4323            |config, context, _oracle, mut peers, _, coding_config| async move {
4324                let receiver_idx = 3usize;
4325                let receiver_pk = peers[receiver_idx].public_key.clone();
4326                let receiver_shard_idx = peers[receiver_idx].index.get() as u16;
4327
4328                let leader = peers[0].public_key.clone();
4329                let round = Round::new(Epoch::zero(), View::new(7));
4330
4331                // Two different commitments in the same round (equivocation scenario).
4332                let block_a = CodedBlock::<B, C, H>::new(
4333                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 111),
4334                    coding_config,
4335                    &STRATEGY,
4336                );
4337                let commitment_a = block_a.commitment();
4338                let block_b = CodedBlock::<B, C, H>::new(
4339                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 222),
4340                    coding_config,
4341                    &STRATEGY,
4342                );
4343                let commitment_b = block_b.commitment();
4344
4345                // Receiver learns both commitments in the same round.
4346                peers[receiver_idx]
4347                    .mailbox
4348                    .discovered(commitment_a, leader.clone(), round)
4349                    .await;
4350                peers[receiver_idx]
4351                    .mailbox
4352                    .discovered(commitment_b, leader.clone(), round)
4353                    .await;
4354
4355                // Subscribe to the certifiable commitment before any reconstruction.
4356                let certifiable_sub = peers[receiver_idx].mailbox.subscribe(commitment_b).await;
4357
4358                // We receive our strong shard for commitment B from the equivocating leader.
4359                let strong_b = block_b
4360                    .shard(receiver_shard_idx)
4361                    .expect("missing shard")
4362                    .encode();
4363                peers[0]
4364                    .sender
4365                    .send(Recipients::One(receiver_pk.clone()), strong_b, true)
4366                    .await
4367                    .expect("send failed");
4368
4369                // Reconstruct conflicting commitment A first.
4370                let strong_a = block_a
4371                    .shard(receiver_shard_idx)
4372                    .expect("missing shard")
4373                    .encode();
4374                peers[0]
4375                    .sender
4376                    .send(Recipients::One(receiver_pk.clone()), strong_a, true)
4377                    .await
4378                    .expect("send failed");
4379                for i in [1usize, 2usize, 4usize] {
4380                    let weak_a = block_a
4381                        .shard(peers[i].index.get() as u16)
4382                        .expect("missing shard")
4383                        .verify_into_weak()
4384                        .expect("verify_into_weak failed")
4385                        .encode();
4386                    peers[i]
4387                        .sender
4388                        .send(Recipients::One(receiver_pk.clone()), weak_a, true)
4389                        .await
4390                        .expect("send failed");
4391                }
4392                context.sleep(config.link.latency * 4).await;
4393                let reconstructed_a = peers[receiver_idx]
4394                    .mailbox
4395                    .get(commitment_a)
4396                    .await
4397                    .expect("conflicting commitment should reconstruct first");
4398                assert_eq!(reconstructed_a.commitment(), commitment_a);
4399
4400                // Commitment B should still be recoverable after A reconstructed.
4401                for i in [1usize, 2usize, 4usize] {
4402                    let weak_b = block_b
4403                        .shard(peers[i].index.get() as u16)
4404                        .expect("missing shard")
4405                        .verify_into_weak()
4406                        .expect("verify_into_weak failed")
4407                        .encode();
4408                    peers[i]
4409                        .sender
4410                        .send(Recipients::One(receiver_pk.clone()), weak_b, true)
4411                        .await
4412                        .expect("send failed");
4413                }
4414
4415                select! {
4416                    result = certifiable_sub => {
4417                        let reconstructed_b =
4418                            result.expect("certifiable commitment should remain recoverable");
4419                        assert_eq!(reconstructed_b.commitment(), commitment_b);
4420                    },
4421                    _ = context.sleep(Duration::from_secs(5)) => {
4422                        panic!("certifiable commitment was not recoverable after same-round equivocation");
4423                    },
4424                }
4425            },
4426        );
4427    }
4428
4429    #[test_traced]
4430    fn test_leader_unrelated_weak_shard_blocks_peer() {
4431        // Regression test: if the leader sends an unrelated/invalid weak shard
4432        // (i.e. a shard for a different participant index), the receiver must
4433        // block the leader.
4434        let fixture: Fixture<C> = Fixture {
4435            num_peers: 10,
4436            ..Default::default()
4437        };
4438
4439        fixture.start(
4440            |config, context, oracle, mut peers, _, coding_config| async move {
4441                // Commitment being tracked by the receiver.
4442                let tracked_block = CodedBlock::<B, C, H>::new(
4443                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100),
4444                    coding_config,
4445                    &STRATEGY,
4446                );
4447                let tracked_commitment = tracked_block.commitment();
4448
4449                // Separate block used to source "unrelated" shard data.
4450                let unrelated_block = CodedBlock::<B, C, H>::new(
4451                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200),
4452                    coding_config,
4453                    &STRATEGY,
4454                );
4455
4456                let receiver_idx = 3usize;
4457                let receiver_pk = peers[receiver_idx].public_key.clone();
4458                let leader_idx = 0usize;
4459                let leader_pk = peers[leader_idx].public_key.clone();
4460
4461                // Receiver tracks the commitment with peer0 as leader.
4462                peers[receiver_idx]
4463                    .mailbox
4464                    .discovered(
4465                        tracked_commitment,
4466                        leader_pk.clone(),
4467                        Round::new(Epoch::zero(), View::new(1)),
4468                    )
4469                    .await;
4470
4471                // Construct an unrelated weak shard from peer1's slot and retarget
4472                // its commitment to the tracked commitment so it hits active state.
4473                let mut unrelated_weak = unrelated_block
4474                    .shard(peers[1].index.get() as u16)
4475                    .expect("missing shard")
4476                    .verify_into_weak()
4477                    .expect("verify_into_weak failed");
4478                unrelated_weak.commitment = tracked_commitment;
4479
4480                // Leader sends this unrelated/invalid weak shard to receiver.
4481                // The shard index no longer matches sender's participant index,
4482                // so leader must be blocked.
4483                peers[leader_idx]
4484                    .sender
4485                    .send(Recipients::One(receiver_pk), unrelated_weak.encode(), true)
4486                    .await
4487                    .expect("send failed");
4488                context.sleep(config.link.latency * 2).await;
4489
4490                assert_blocked(&oracle, &peers[receiver_idx].public_key, &leader_pk).await;
4491            },
4492        );
4493    }
4494
4495    #[test_traced]
4496    fn test_broadcast_routes_participant_and_non_participant_shards() {
4497        let fixture = Fixture {
4498            num_non_participants: 1,
4499            ..Default::default()
4500        };
4501
4502        fixture.start(
4503            |config, context, oracle, mut peers, non_participants, coding_config| async move {
4504                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4505                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4506                let commitment = coded_block.commitment();
4507
4508                let leader = peers[0].public_key.clone();
4509                let round = Round::new(Epoch::zero(), View::new(1));
4510                peers[0].mailbox.proposed(round, coded_block.clone()).await;
4511
4512                for peer in peers[1..].iter_mut() {
4513                    peer.mailbox
4514                        .discovered(commitment, leader.clone(), round)
4515                        .await;
4516                }
4517                for np in non_participants.iter() {
4518                    np.mailbox
4519                        .discovered(commitment, leader.clone(), round)
4520                        .await;
4521                }
4522                context.sleep(config.link.latency * 2).await;
4523
4524                // Participants should receive and validate their own strong shards.
4525                for peer in peers.iter_mut() {
4526                    peer.mailbox
4527                        .subscribe_shard(commitment)
4528                        .await
4529                        .await
4530                        .expect("participant shard subscription should complete");
4531                }
4532
4533                // Non-participant should receive and validate the leader's strong shard.
4534                for np in non_participants.iter() {
4535                    np.mailbox
4536                        .subscribe_shard(commitment)
4537                        .await
4538                        .await
4539                        .expect("non-participant shard subscription should complete");
4540                }
4541                context.sleep(config.link.latency).await;
4542
4543                // Non-participant should reconstruct the block from received shards.
4544                for np in non_participants.iter() {
4545                    let reconstructed = np
4546                        .mailbox
4547                        .get(commitment)
4548                        .await
4549                        .expect("non-participant should reconstruct block");
4550                    assert_eq!(reconstructed.commitment(), commitment);
4551                }
4552
4553                let blocked = oracle.blocked().await.unwrap();
4554                assert!(
4555                    blocked.is_empty(),
4556                    "no peer should be blocked in participant/non-participant shard routing test"
4557                );
4558            },
4559        );
4560    }
4561
4562    #[test_traced]
4563    fn test_non_participant_reconstructs_after_discovered() {
4564        let fixture = Fixture {
4565            num_non_participants: 1,
4566            ..Default::default()
4567        };
4568
4569        fixture.start(
4570            |config, context, oracle, mut peers, non_participants, coding_config| async move {
4571                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4572                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4573                let commitment = coded_block.commitment();
4574                let round = Round::new(Epoch::zero(), View::new(1));
4575
4576                let leader = peers[0].public_key.clone();
4577                peers[0].mailbox.proposed(round, coded_block.clone()).await;
4578
4579                // Inform participants of the leader so they validate and re-broadcast
4580                // weak shards.
4581                for peer in peers[1..].iter_mut() {
4582                    peer.mailbox
4583                        .discovered(commitment, leader.clone(), round)
4584                        .await;
4585                }
4586                context.sleep(config.link.latency).await;
4587
4588                // Non-participant discovers the leader after shards are already
4589                // propagating through the network.
4590                let np = &non_participants[0];
4591                let block_sub = np.mailbox.subscribe(commitment).await;
4592                np.mailbox
4593                    .discovered(commitment, leader.clone(), round)
4594                    .await;
4595
4596                // Wait for enough shards (strong from leader + weak from
4597                // participants) to arrive and reconstruct.
4598                select! {
4599                    result = block_sub => {
4600                        let reconstructed = result.expect("block subscription should resolve");
4601                        assert_eq!(reconstructed.commitment(), commitment);
4602                        assert_eq!(reconstructed.height(), coded_block.height());
4603                    },
4604                    _ = context.sleep(Duration::from_secs(5)) => {
4605                        panic!("non-participant block subscription did not resolve");
4606                    },
4607                }
4608
4609                let blocked = oracle.blocked().await.unwrap();
4610                assert!(
4611                    blocked.is_empty(),
4612                    "no peer should be blocked in non-participant reconstruction test"
4613                );
4614            },
4615        );
4616    }
4617
4618    #[test_traced]
4619    fn test_peer_set_update_evicts_peer_buffers() {
4620        // Shards buffered before leader announcement should be evicted when
4621        // the sender leaves the tracked peer set. After eviction, announcing
4622        // the leader should NOT reconstruct the block (the buffered shard is
4623        // gone), but sending the shard again post-leader should succeed.
4624        let executor = deterministic::Runner::default();
4625        executor.start(|context| async move {
4626            let num_peers = 10usize;
4627            let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
4628                context.with_label("network"),
4629                simulated::Config {
4630                    max_size: MAX_SHARD_SIZE as u32,
4631                    disconnect_on_block: true,
4632                    tracked_peer_sets: Some(1),
4633                },
4634            );
4635            network.start();
4636
4637            let mut private_keys = (0..num_peers)
4638                .map(|i| PrivateKey::from_seed(i as u64))
4639                .collect::<Vec<_>>();
4640            private_keys.sort_by_key(|s| s.public_key());
4641            let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
4642            let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
4643
4644            // Test from the perspective of a single receiver (peer 3).
4645            let receiver_idx = 3usize;
4646            let receiver_pk = peer_keys[receiver_idx].clone();
4647            let leader_pk = peer_keys[0].clone();
4648
4649            let receiver_control = oracle.control(receiver_pk.clone());
4650            let (sender_handle, receiver_handle) = receiver_control
4651                .register(0, TEST_QUOTA)
4652                .await
4653                .expect("registration should succeed");
4654
4655            // Register the leader so it can send shards.
4656            let leader_control = oracle.control(leader_pk.clone());
4657            let (mut leader_sender, _leader_receiver) = leader_control
4658                .register(0, TEST_QUOTA)
4659                .await
4660                .expect("registration should succeed");
4661            oracle
4662                .add_link(leader_pk.clone(), receiver_pk.clone(), DEFAULT_LINK)
4663                .await
4664                .expect("link should be added");
4665
4666            // Track the full participant set so the engine sees all peers.
4667            oracle.manager().track(0, participants.clone()).await;
4668            context.sleep(Duration::from_millis(10)).await;
4669
4670            let scheme = Scheme::signer(
4671                SCHEME_NAMESPACE,
4672                participants.clone(),
4673                private_keys[receiver_idx].clone(),
4674            )
4675            .expect("signer scheme should be created");
4676
4677            let config: Config<_, _, _, _, C, _, _, _> = Config {
4678                scheme_provider: MultiEpochProvider::single(scheme),
4679                blocker: receiver_control.clone(),
4680                shard_codec_cfg: CodecConfig {
4681                    maximum_shard_size: MAX_SHARD_SIZE,
4682                },
4683                block_codec_cfg: (),
4684                strategy: STRATEGY,
4685                mailbox_size: 1024,
4686                peer_buffer_size: NZUsize!(64),
4687                background_channel_capacity: 1024,
4688                peer_provider: oracle.manager(),
4689            };
4690
4691            let (engine, mailbox) = ShardEngine::new(context.with_label("receiver"), config);
4692            engine.start((sender_handle, receiver_handle));
4693
4694            // Build a coded block and extract the strong shard destined for the receiver.
4695            let coding_config = coding_config_for_participants(num_peers as u16);
4696            let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4697            let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4698            let commitment = coded_block.commitment();
4699
4700            let receiver_participant = participants
4701                .index(&receiver_pk)
4702                .expect("receiver must be a participant");
4703            let strong_shard = coded_block
4704                .shard(receiver_participant.get() as u16)
4705                .expect("missing shard");
4706            let strong_bytes = strong_shard.encode();
4707
4708            // Send the strong shard BEFORE leader announcement (it gets buffered).
4709            leader_sender
4710                .send(
4711                    Recipients::One(receiver_pk.clone()),
4712                    strong_bytes.clone(),
4713                    true,
4714                )
4715                .await
4716                .expect("send failed");
4717            context.sleep(DEFAULT_LINK.latency * 2).await;
4718
4719            // Now send a peer set update that excludes the leader.
4720            let remaining: Set<P> =
4721                Set::from_iter_dedup(peer_keys.iter().filter(|pk| **pk != leader_pk).cloned());
4722            oracle.manager().track(1, remaining).await;
4723            context.sleep(Duration::from_millis(10)).await;
4724
4725            // Announce the leader. Buffered shards from the leader should have been
4726            // evicted, so the strong shard will NOT be ingested.
4727            let mut shard_sub = mailbox.subscribe_shard(commitment).await;
4728            mailbox
4729                .discovered(
4730                    commitment,
4731                    leader_pk.clone(),
4732                    Round::new(Epoch::zero(), View::new(1)),
4733                )
4734                .await;
4735            context.sleep(DEFAULT_LINK.latency * 2).await;
4736
4737            // The shard subscription should still be pending (no shard was ingested).
4738            assert!(
4739                matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
4740                "shard subscription should not resolve after evicted leader's buffer"
4741            );
4742            assert!(
4743                mailbox.get(commitment).await.is_none(),
4744                "block should not reconstruct from evicted buffers"
4745            );
4746        });
4747    }
4748}