Skip to main content

commonware_consensus/marshal/coding/shards/
engine.rs

1//! Shard engine for erasure-coded block distribution and reconstruction.
2//!
3//! This module implements the core logic for distributing blocks as erasure-coded
4//! shards and reconstructing blocks from received shards.
5//!
6//! # Overview
7//!
8//! The shard engine serves two primary functions:
9//! 1. Broadcast: When a node proposes a block, the engine broadcasts
10//!    erasure-coded shards to all participants and to non-participants in
11//!    aggregate membership (peers in [`commonware_p2p::PeerSetUpdate::all`]
12//!    but not in the epoch participant list).
13//!    The leader sends each participant their indexed shard.
14//! 2. Block Reconstruction: When a node receives shards from peers, the engine
15//!    validates them and reconstructs the original block once enough valid
16//!    shards are available. Both participants and non-participants can
17//!    reconstruct blocks: participants receive their own indexed shard from
18//!    the leader, while non-participants reconstruct from shards gossiped
19//!    by participants. All participants gossip their validated shard to peers.
20//!
21//! # Message Flow
22//!
23//! ```text
24//!                           PROPOSER
25//!                              |
26//!                              | Proposed(block)
27//!                              v
28//!                    +------------------+
29//!                    |   Shard Engine   |
30//!                    +------------------+
31//!                              |
32//!            broadcast_shards (each participant's indexed shard)
33//!                              |
34//!         +--------------------+--------------------+
35//!         |                    |                    |
36//!         v                    v                    v
37//!    Participant 0        Participant 1        Participant N
38//!         |                    |                    |
39//!         | (receive shard     | (receive shard     |
40//!         |  for own index)    |  for own index)    |
41//!         v                    v                    v
42//!    +----------+         +----------+         +----------+
43//!    | Validate |         | Validate |         | Validate |
44//!    | (check)  |         | (check)  |         | (check)  |
45//!    +----------+         +----------+         +----------+
46//!         |                    |                    |
47//!         +--------------------+--------------------+
48//!                              |
49//!                    (gossip validated shards)
50//!                              |
51//!         +--------------------+--------------------+
52//!         |                    |                    |
53//!         v                    v                    v
54//!    Accumulate checked shards until minimum_shards reached
55//!         |                    |                    |
56//!         v                    v                    v
57//!            Batch verify pending shards at quorum
58//!         |                    |                    |
59//!         v                    v                    v
60//!    +-------------+      +-------------+      +-------------+
61//!    | Reconstruct |      | Reconstruct |      | Reconstruct |
62//!    |    Block    |      |    Block    |      |    Block    |
63//!    +-------------+      +-------------+      +-------------+
64//! ```
65//!
66//! # Reconstruction State Machine
67//!
68//! For each [`Commitment`] that is either leader-discovered or notarized, nodes
69//! (both participants and non-participants) maintain a [`ReconstructionState`].
70//! Before either consensus signal is observed (a leader announcement or a
71//! notarization for the commitment), shards are buffered in bounded per-peer
72//! queues:
73//!
74//! ```text
75//!    +----------------------+
76//!    | AwaitingQuorum       |
77//!    | - leader known       |
78//!    | - leader's shard     |  <--- verified immediately on receipt
79//!    |   verified eagerly   |
80//!    | - other shards       |  <--- buffered in pending_shards
81//!    |   buffered           |
82//!    +----------------------+
83//!               |
84//!               | quorum met + batch validation passes
85//!               v
86//!    +----------------------+
87//!    | Ready                |
88//!    | - checked shards     |
89//!    | (frozen; no new      |
90//!    |  shards accepted)    |
91//!    +----------------------+
92//!               |
93//!               | checked_shards.len() >= minimum_shards
94//!               v
95//!    +----------------------+
96//!    | Reconstruction       |
97//!    | Attempt              |
98//!    +----------------------+
99//!               |
100//!          +----+----+
101//!          |         |
102//!          v         v
103//!       Success    Failure
104//!          |         |
105//!          v         v
106//!       Cache      Remove
107//!       Block      State
108//! ```
109//!
110//! _Per-peer buffers are only kept for peers in `latest.primary`, matching [`commonware_broadcast::buffered`].
111//! When a peer is no longer in `latest.primary`, all its buffered shards are evicted._
112//!
113//! # Peer Validation and Blocking Rules
114//!
115//! The engine enforces strict validation to prevent Byzantine attacks:
116//!
117//! - All shards MUST be sent by participants in the current epoch.
118//! - If the sender is the leader: the shard index MUST match the recipient's
119//!   participant index (for participants) or the leader's index (for
120//!   non-participants).
121//! - If the sender is not the leader: the shard index MUST match the sender's
122//!   participant index (each participant can only gossip their own shard).
123//! - All shards MUST pass cryptographic verification against the commitment.
124//! - Each shard index may only contribute ONE shard per commitment.
125//! - Sending a second shard for the same index with different data
126//!   (equivocation) results in blocking. Exact duplicates are silently
127//!   ignored.
128//!
129//! Peers violating these rules are blocked via the [`Blocker`] trait.
130//! Validation and blocking rules are applied while a commitment is actively
131//! tracked in reconstruction state. Once a block is already reconstructed and
132//! cached, additional shards for that commitment are ignored.
133//!
134//! _If the leader is not yet known, shards are buffered in fixed-size per-peer
135//! queues until consensus signals either the leader via [`Mailbox::discovered`]
136//! or a notarization via [`Mailbox::notarized`]. A notarization activates
137//! reconstruction interest without a leader, so only sender-indexed gossip
138//! shards can be ingested. Assigned shard verification still requires leader
139//! discovery._
140
141use super::{
142    mailbox::{Mailbox, Message},
143    metrics::ShardMetrics,
144};
145use crate::{
146    marshal::coding::{
147        types::{CodedBlock, Shard},
148        validation::{validate_reconstruction, ReconstructionError as InvariantError},
149    },
150    types::{coding::Commitment, Epoch, Round},
151    Block, CertifiableBlock, Heightable,
152};
153use commonware_actor::mailbox;
154use commonware_codec::{Decode, Error as CodecError, Read};
155use commonware_coding::{Config as CodingConfig, Scheme as CodingScheme};
156use commonware_cryptography::{
157    certificate::{Provider, Scheme as CertificateScheme},
158    Committable, Digestible, Hasher, PublicKey,
159};
160use commonware_macros::select_loop;
161use commonware_p2p::{
162    utils::codec::{WrappedBackgroundReceiver, WrappedSender},
163    Blocker, Provider as PeerProvider, Receiver, Recipients, Sender,
164};
165use commonware_parallel::Strategy;
166use commonware_runtime::{
167    spawn_cell,
168    telemetry::metrics::{GaugeExt, HistogramExt},
169    BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner,
170};
171use commonware_utils::{
172    bitmap::BitMap,
173    channel::{fallible::OneshotExt, oneshot},
174    ordered::{Quorum, Set},
175};
176use rand::Rng;
177use std::{
178    collections::{BTreeMap, VecDeque},
179    num::NonZeroUsize,
180};
181use thiserror::Error;
182use tracing::{debug, warn};
183
184/// An error that can occur during reconstruction of a [`CodedBlock`] from [`Shard`]s
185#[derive(Debug, Error)]
186pub enum Error<C: CodingScheme> {
187    /// An error occurred while recovering the encoded blob from the [`Shard`]s
188    #[error(transparent)]
189    Coding(C::Error),
190
191    /// An error occurred while decoding the reconstructed blob into a [`CodedBlock`]
192    #[error(transparent)]
193    Codec(#[from] CodecError),
194
195    /// The reconstructed block's digest does not match the commitment's block digest
196    #[error("block digest mismatch: reconstructed block does not match commitment digest")]
197    DigestMismatch,
198
199    /// The reconstructed block's config does not match the commitment's coding config
200    #[error("block config mismatch: reconstructed config does not match commitment config")]
201    ConfigMismatch,
202
203    /// The reconstructed block's embedded context does not match the commitment context digest
204    #[error("block context mismatch: reconstructed context does not match commitment context")]
205    ContextMismatch,
206}
207
208#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
209enum BlockSubscriptionKey<D> {
210    Commitment(Commitment),
211    Digest(D),
212}
213
214/// Configuration for the [`Engine`].
215pub struct Config<P, S, X, D, C, H, B, T>
216where
217    P: PublicKey,
218    S: Provider<Scope = Epoch>,
219    X: Blocker<PublicKey = P>,
220    D: PeerProvider<PublicKey = P>,
221    C: CodingScheme,
222    H: Hasher,
223    B: CertifiableBlock,
224    T: Strategy,
225{
226    /// The scheme provider.
227    pub scheme_provider: S,
228
229    /// The peer blocker.
230    pub blocker: X,
231
232    /// [`Read`] configuration for decoding [`Shard`]s.
233    pub shard_codec_cfg: <Shard<C, H> as Read>::Cfg,
234
235    /// [`commonware_codec::Read`] configuration for decoding blocks.
236    pub block_codec_cfg: B::Cfg,
237
238    /// The strategy used for parallel computation.
239    pub strategy: T,
240
241    /// The size of the mailbox buffer.
242    pub mailbox_size: NonZeroUsize,
243
244    /// Number of shards to buffer per peer.
245    ///
246    /// Shards for commitments without a reconstruction state are buffered per
247    /// peer in a fixed-size ring to bound memory under Byzantine spam. These
248    /// shards are only ingested when consensus provides a leader via
249    /// [`Mailbox::discovered`] or reports a notarization via
250    /// [`Mailbox::notarized`].
251    ///
252    /// The worst-case total memory usage for the set of shard buffers is
253    /// `num_participants * peer_buffer_size * max_shard_size`.
254    pub peer_buffer_size: NonZeroUsize,
255
256    /// Capacity of the channel between the background receiver and the engine.
257    ///
258    /// The background receiver decodes incoming network messages in a separate
259    /// task and forwards them to the engine over a mailbox with this
260    /// capacity.
261    pub background_channel_capacity: NonZeroUsize,
262
263    /// Provider for peer set information. Pre-leader shards are buffered per
264    /// peer only while that peer appears in the
265    /// [`commonware_p2p::PeerSetUpdate::latest`] primary set, matching
266    /// [`commonware_broadcast::buffered::Engine`]. Broadcast delivery uses the
267    /// aggregate [`commonware_p2p::PeerSetUpdate::all`] union.
268    pub peer_provider: D,
269}
270
271/// A cached reconstructed block and the consensus round that produced it.
272struct ReconstructedBlock<B, C, H>
273where
274    B: Block,
275    C: CodingScheme,
276    H: Hasher,
277{
278    round: Round,
279    block: CodedBlock<B, C, H>,
280}
281
282/// A network layer for broadcasting and receiving [`CodedBlock`]s as [`Shard`]s.
283///
284/// When enough [`Shard`]s are present in the mailbox, the [`Engine`] may facilitate
285/// reconstruction of the original [`CodedBlock`] and notify any subscribers waiting for it.
286pub struct Engine<E, S, X, D, C, H, B, P, T>
287where
288    E: BufferPooler + Rng + Spawner + Metrics + Clock,
289    S: Provider<Scope = Epoch>,
290    S::Scheme: CertificateScheme<PublicKey = P>,
291    X: Blocker,
292    D: PeerProvider<PublicKey = P>,
293    C: CodingScheme,
294    H: Hasher,
295    B: CertifiableBlock,
296    P: PublicKey,
297    T: Strategy,
298{
299    /// Context held by the actor.
300    context: ContextCell<E>,
301
302    /// Receiver for incoming messages to the actor.
303    mailbox: mailbox::Receiver<Message<B, C, H, P>>,
304
305    /// The scheme provider.
306    scheme_provider: S,
307
308    /// The peer blocker.
309    blocker: X,
310
311    /// [`Read`] configuration for decoding [`Shard`]s.
312    shard_codec_cfg: <Shard<C, H> as Read>::Cfg,
313
314    /// [`Read`] configuration for decoding [`CodedBlock`]s.
315    block_codec_cfg: B::Cfg,
316
317    /// The strategy used for parallel shard verification.
318    strategy: T,
319
320    /// A map of [`Commitment`]s to [`ReconstructionState`]s.
321    state: BTreeMap<Commitment, ReconstructionState<P, C, H>>,
322
323    /// Per-peer ring buffers for shards received before leader announcement.
324    ///
325    /// Empty buffers are retained for active peers and only evicted when the
326    /// peer leaves `latest.primary`.
327    peer_buffers: BTreeMap<P, VecDeque<Shard<C, H>>>,
328
329    /// Maximum buffered pre-leader shards per peer.
330    peer_buffer_size: NonZeroUsize,
331
332    /// Provider for peer set information.
333    peer_provider: D,
334
335    /// Latest union of peer membership from the peer set subscription
336    /// ([`commonware_p2p::PeerSetUpdate::all`]).
337    aggregate_peers: Set<P>,
338
339    /// Latest primary peers allowed to retain pre-leader shard buffers.
340    latest_primary_peers: Set<P>,
341
342    /// Capacity of the background receiver channel.
343    background_channel_capacity: NonZeroUsize,
344
345    /// An ephemeral cache of reconstructed blocks, keyed by commitment.
346    ///
347    /// These blocks are evicted after a durability signal from the marshal.
348    reconstructed_blocks: BTreeMap<Commitment, ReconstructedBlock<B, C, H>>,
349
350    /// Open subscriptions for assigned shard verification for the keyed
351    /// [`Commitment`].
352    ///
353    /// For participants, readiness is satisfied once the leader-delivered
354    /// shard for the local participant index has been verified. Block
355    /// reconstruction from peer gossip is tracked separately and does not
356    /// satisfy this readiness condition.
357    ///
358    /// Proposers are a special case: they satisfy readiness once their local
359    /// proposal is cached because they already hold all shards.
360    assigned_shard_verified_subscriptions: BTreeMap<Commitment, Vec<oneshot::Sender<()>>>,
361
362    /// Open subscriptions for the reconstruction of a [`CodedBlock`] with
363    /// the keyed [`Commitment`].
364    #[allow(clippy::type_complexity)]
365    block_subscriptions:
366        BTreeMap<BlockSubscriptionKey<B::Digest>, Vec<oneshot::Sender<CodedBlock<B, C, H>>>>,
367
368    /// Metrics for the shard engine.
369    metrics: ShardMetrics<P>,
370}
371
372impl<E, S, X, D, C, H, B, P, T> Engine<E, S, X, D, C, H, B, P, T>
373where
374    E: BufferPooler + Rng + Spawner + Metrics + Clock,
375    S: Provider<Scope = Epoch>,
376    S::Scheme: CertificateScheme<PublicKey = P>,
377    X: Blocker<PublicKey = P>,
378    D: PeerProvider<PublicKey = P>,
379    C: CodingScheme,
380    H: Hasher,
381    B: CertifiableBlock,
382    P: PublicKey,
383    T: Strategy,
384{
385    /// Create a new [`Engine`] with the given configuration.
386    pub fn new(context: E, config: Config<P, S, X, D, C, H, B, T>) -> (Self, Mailbox<B, C, H, P>) {
387        let metrics = ShardMetrics::new(&context);
388        let (sender, mailbox) = mailbox::new(context.child("mailbox"), config.mailbox_size);
389        (
390            Self {
391                context: ContextCell::new(context),
392                mailbox,
393                scheme_provider: config.scheme_provider,
394                blocker: config.blocker,
395                shard_codec_cfg: config.shard_codec_cfg,
396                block_codec_cfg: config.block_codec_cfg,
397                strategy: config.strategy,
398                state: BTreeMap::new(),
399                peer_buffers: BTreeMap::new(),
400                peer_buffer_size: config.peer_buffer_size,
401                peer_provider: config.peer_provider,
402                aggregate_peers: Set::default(),
403                latest_primary_peers: Set::default(),
404                background_channel_capacity: config.background_channel_capacity,
405                reconstructed_blocks: BTreeMap::new(),
406                assigned_shard_verified_subscriptions: BTreeMap::new(),
407                block_subscriptions: BTreeMap::new(),
408                metrics,
409            },
410            Mailbox::new(sender),
411        )
412    }
413
414    /// Start the engine.
415    pub fn start(
416        mut self,
417        network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
418    ) -> Handle<()> {
419        spawn_cell!(self.context, self.run(network))
420    }
421
422    /// Run the shard engine's event loop.
423    async fn run(
424        mut self,
425        (sender, receiver): (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
426    ) {
427        let mut sender = WrappedSender::<_, Shard<C, H>>::new(
428            self.context.network_buffer_pool().clone(),
429            sender,
430        );
431        let (receiver_service, mut receiver) =
432            WrappedBackgroundReceiver::<_, P, X, _, Shard<C, H>>::new(
433                self.context.child("shard_ingress"),
434                receiver,
435                self.shard_codec_cfg.clone(),
436                self.blocker.clone(),
437                self.background_channel_capacity,
438                &self.strategy,
439            );
440        // Keep the handle alive to prevent the background receiver from being aborted.
441        let _receiver_handle = receiver_service.start();
442        let mut peer_set_subscription = self.peer_provider.subscribe().await;
443
444        select_loop! {
445            self.context,
446            on_start => {
447                let _ = self
448                    .metrics
449                    .reconstruction_states_count
450                    .try_set(self.state.len());
451                let _ = self
452                    .metrics
453                    .reconstructed_blocks_cache_count
454                    .try_set(self.reconstructed_blocks.len());
455
456                // Clean up closed subscriptions.
457                self.block_subscriptions.retain(|_, subscribers| {
458                    subscribers.retain(|tx| !tx.is_closed());
459                    !subscribers.is_empty()
460                });
461                self.assigned_shard_verified_subscriptions
462                    .retain(|_, subscribers| {
463                        subscribers.retain(|tx| !tx.is_closed());
464                        !subscribers.is_empty()
465                    });
466            },
467            on_stopped => {
468                debug!("received shutdown signal, stopping shard engine");
469            },
470            Some(update) = peer_set_subscription.recv() else {
471                debug!("peer set subscription closed");
472                return;
473            } => {
474                let all_peers = update.all.union();
475                self.update_latest_primary_peers(update.latest.primary);
476                self.aggregate_peers = all_peers;
477            },
478            Some(message) = self.mailbox.recv() else {
479                debug!("shard mailbox closed, stopping shard engine");
480                return;
481            } => {
482                if message.response_closed() {
483                    continue;
484                }
485
486                match message {
487                    Message::Proposed { block, round } => {
488                        self.broadcast_shards(&mut sender, round, block);
489                    }
490                    Message::Discovered {
491                        commitment,
492                        leader,
493                        round,
494                    } => {
495                        self.handle_external_proposal(&mut sender, commitment, leader, round);
496                    }
497                    Message::Notarized { commitment, round } => {
498                        self.handle_notarized_commitment(&mut sender, commitment, round);
499                    }
500                    Message::GetByCommitment {
501                        commitment,
502                        response,
503                    } => {
504                        let block = self
505                            .reconstructed_blocks
506                            .get(&commitment)
507                            .map(|entry| entry.block.clone());
508                        response.send_lossy(block);
509                    }
510                    Message::GetByDigest { digest, response } => {
511                        let block = self
512                            .reconstructed_blocks
513                            .values()
514                            .find_map(|entry| {
515                                (entry.block.digest() == digest).then_some(entry.block.clone())
516                            });
517                        response.send_lossy(block);
518                    }
519                    Message::SubscribeAssignedShardVerified {
520                        commitment,
521                        response,
522                    } => {
523                        self.handle_assigned_shard_verified_subscription(commitment, response);
524                    }
525                    Message::SubscribeByCommitment {
526                        commitment,
527                        response,
528                    } => {
529                        self.handle_block_subscription(
530                            BlockSubscriptionKey::Commitment(commitment),
531                            response,
532                        );
533                    }
534                    Message::SubscribeByDigest { digest, response } => {
535                        self.handle_block_subscription(
536                            BlockSubscriptionKey::Digest(digest),
537                            response,
538                        );
539                    }
540                    Message::Prune { through } => {
541                        self.prune(through);
542                    }
543                }
544            },
545            Some((peer, shard)) = receiver.recv() else {
546                debug!("receiver closed, stopping shard engine");
547                return;
548            } => {
549                self.handle_network_shard(&mut sender, peer, shard);
550            },
551        }
552    }
553
554    /// Handles a decoded shard received from the network.
555    fn handle_network_shard<Sr: Sender<PublicKey = P>>(
556        &mut self,
557        sender: &mut WrappedSender<Sr, Shard<C, H>>,
558        peer: P,
559        shard: Shard<C, H>,
560    ) {
561        self.metrics.shards_received.get_or_create_by(&peer).inc();
562
563        let commitment = shard.commitment();
564        if !self.should_handle_network_shard(commitment) {
565            return;
566        }
567
568        if let Some(existing) = self.state.get(&commitment) {
569            let round = existing.round();
570            let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
571                warn!(%commitment, "no scheme for epoch, ignoring shard");
572                return;
573            };
574
575            // Notarized recovery can create state before leader discovery. Until
576            // the leader is known, only sender-indexed gossip shards are safe to
577            // ingest: a participant may only gossip its own shard.
578            if existing.leader().is_none() {
579                if let Some(sender_index) = scheme.participants().index(&peer) {
580                    let expected_index: u16 = sender_index
581                        .get()
582                        .try_into()
583                        .expect("participant index impossibly out of bounds");
584                    if shard.index() != expected_index {
585                        // A mismatched shard is invalid for a non-leader, but it may be
586                        // the assigned shard if this peer later turns out to be the leader.
587                        // Keep it buffered until the sender's role is known.
588                        self.buffer_peer_shard(peer, shard);
589                        return;
590                    }
591                }
592            }
593
594            let state = self
595                .state
596                .get_mut(&commitment)
597                .expect("state checked as present");
598            let progressed = state.on_network_shard(
599                peer,
600                shard,
601                InsertCtx::new(scheme.as_ref(), &self.strategy),
602                &mut self.blocker,
603            );
604            if progressed {
605                self.try_advance(sender, commitment);
606            }
607        } else {
608            self.buffer_peer_shard(peer, shard);
609        }
610    }
611
612    /// Returns whether an incoming network shard should still be processed.
613    ///
614    /// Shards for reconstructed commitments are normally ignored. The only
615    /// exception is the late leader-delivered shard for the assigned index,
616    /// which we still accept so we can notify readiness and gossip it to
617    /// slower peers.
618    fn should_handle_network_shard(&self, commitment: Commitment) -> bool {
619        if self.reconstructed_blocks.contains_key(&commitment) {
620            // State can be populated without a leader when a notarization arrives
621            // before the leader announcement, or when our assigned shard was not
622            // verified. Keep handling shards until the leader-dependent state is
623            // complete.
624            return self
625                .state
626                .get(&commitment)
627                .is_some_and(|s| !s.is_assigned_shard_verified());
628        }
629        true
630    }
631
632    /// Attempts to reconstruct a [`CodedBlock`] from the checked [`Shard`]s present in the
633    /// [`ReconstructionState`].
634    ///
635    /// # Returns
636    /// - `Ok(Some(block))` if reconstruction was successful or the block was already reconstructed.
637    /// - `Ok(None)` if reconstruction could not be attempted due to insufficient checked shards.
638    /// - `Err(_)` if reconstruction was attempted but failed.
639    #[allow(clippy::type_complexity)]
640    fn try_reconstruct(
641        &mut self,
642        commitment: Commitment,
643    ) -> Result<Option<CodedBlock<B, C, H>>, Error<C>> {
644        if let Some(entry) = self.reconstructed_blocks.get(&commitment) {
645            return Ok(Some(entry.block.clone()));
646        }
647        let Some(state) = self.state.get_mut(&commitment) else {
648            return Ok(None);
649        };
650        let round = state.round();
651        if state.checked_shards().len() < usize::from(commitment.config().minimum_shards.get()) {
652            debug!(%commitment, "not enough checked shards to reconstruct block");
653            return Ok(None);
654        }
655        // Attempt to reconstruct the encoded blob
656        let start = self.context.current();
657        let blob = C::decode(
658            &commitment.config(),
659            &commitment.root(),
660            state.checked_shards().iter(),
661            &self.strategy,
662        )
663        .map_err(Error::Coding)?;
664        self.metrics
665            .erasure_decode_duration
666            .observe_between(start, self.context.current());
667
668        // Attempt to decode the block from the encoded blob
669        let (inner, config): (B, CodingConfig) =
670            Decode::decode_cfg(&mut blob.as_slice(), &(self.block_codec_cfg.clone(), ()))?;
671
672        match validate_reconstruction::<H, _>(&inner, config, commitment) {
673            Ok(()) => {}
674            Err(InvariantError::BlockDigest) => {
675                return Err(Error::DigestMismatch);
676            }
677            Err(InvariantError::CodingConfig) => {
678                warn!(
679                    %commitment,
680                    expected_config = ?commitment.config(),
681                    actual_config = ?config,
682                    "reconstructed block config does not match commitment config, but digest matches"
683                );
684                return Err(Error::ConfigMismatch);
685            }
686            Err(InvariantError::ContextDigest(expected, actual)) => {
687                warn!(
688                    %commitment,
689                    expected_context_digest = ?expected,
690                    actual_context_digest = ?actual,
691                    "reconstructed block context digest does not match commitment context digest"
692                );
693                return Err(Error::ContextMismatch);
694            }
695        }
696
697        // Construct a coding block with a _trusted_ commitment. `S::decode` verified the blob's
698        // integrity against the commitment, so shards can be lazily re-constructed if need be.
699        let block = CodedBlock::new_trusted(inner, commitment);
700        self.cache_block(round, block.clone());
701        self.metrics.blocks_reconstructed_total.inc();
702        Ok(Some(block))
703    }
704
705    /// Handles leader announcements for a commitment and advances reconstruction.
706    fn handle_external_proposal<Sr: Sender<PublicKey = P>>(
707        &mut self,
708        sender: &mut WrappedSender<Sr, Shard<C, H>>,
709        commitment: Commitment,
710        leader: P,
711        round: Round,
712    ) {
713        // A reconstructed block normally makes duplicate leader announcements
714        // redundant, unless notarized recovery created leaderless state first.
715        // In that case, the leader announcement must still populate the
716        // leader-dependent path.
717        if self.reconstructed_blocks.contains_key(&commitment)
718            && self
719                .state
720                .get(&commitment)
721                .is_none_or(|state| state.leader().is_some())
722        {
723            return;
724        }
725        let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
726            warn!(%commitment, "no scheme for epoch, ignoring external proposal");
727            return;
728        };
729        let participants = scheme.participants();
730        if participants.index(&leader).is_none() {
731            warn!(?leader, %commitment, "leader update for non-participant, ignoring");
732            return;
733        }
734        if let Some(state) = self.state.get_mut(&commitment) {
735            if let Some(existing) = state.leader() {
736                if existing != &leader {
737                    warn!(
738                        existing = ?existing,
739                        ?leader,
740                        %commitment,
741                        "conflicting leader update, ignoring"
742                    );
743                }
744                return;
745            }
746            state
747                .set_leader(leader)
748                .expect("leader was checked as absent");
749        } else {
750            let participants_len = u64::try_from(participants.len())
751                .expect("participant count impossibly out of bounds");
752            self.state.insert(
753                commitment,
754                ReconstructionState::new(Some(leader), round, participants_len),
755            );
756        }
757        let buffered_progress = self.ingest_buffered_shards(commitment);
758        if buffered_progress {
759            self.try_advance(sender, commitment);
760        }
761    }
762
763    /// Handles notarized reconstruction interest before the leader is known.
764    ///
765    /// This is intentionally narrower than leader discovery: it may reconstruct
766    /// the block from sender-indexed gossip shards, but it cannot mark the
767    /// local assigned shard as verified.
768    fn handle_notarized_commitment<Sr: Sender<PublicKey = P>>(
769        &mut self,
770        sender: &mut WrappedSender<Sr, Shard<C, H>>,
771        commitment: Commitment,
772        round: Round,
773    ) {
774        if self.reconstructed_blocks.contains_key(&commitment) {
775            return;
776        }
777        if self.state.contains_key(&commitment) {
778            let buffered_progress = self.ingest_buffered_shards(commitment);
779            if buffered_progress {
780                self.try_advance(sender, commitment);
781            }
782            return;
783        }
784        let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
785            warn!(%commitment, "no scheme for epoch, ignoring notarized commitment");
786            return;
787        };
788        let participants_len = u64::try_from(scheme.participants().len())
789            .expect("participant count impossibly out of bounds");
790        self.state.insert(
791            commitment,
792            ReconstructionState::new(None, round, participants_len),
793        );
794        let buffered_progress = self.ingest_buffered_shards(commitment);
795        if buffered_progress {
796            self.try_advance(sender, commitment);
797        }
798    }
799
800    /// Buffer a shard from a peer until a leader is known.
801    fn buffer_peer_shard(&mut self, peer: P, shard: Shard<C, H>) {
802        if self.latest_primary_peers.position(&peer).is_none() {
803            debug!(
804                ?peer,
805                "pre-leader shard from peer outside latest.primary not buffered"
806            );
807            return;
808        }
809        let queue = self.peer_buffers.entry(peer).or_default();
810        if queue.len() >= self.peer_buffer_size.get() {
811            let _ = queue.pop_front();
812        }
813        queue.push_back(shard);
814    }
815
816    fn update_latest_primary_peers(&mut self, peers: Set<P>) {
817        self.peer_buffers
818            .retain(|peer, _| peers.position(peer).is_some());
819        self.latest_primary_peers = peers;
820    }
821
822    /// Ingest buffered pre-leader shards for a commitment into active state.
823    ///
824    /// The buffers are per sender and may contain shards from many peers. Before
825    /// the leader is known, the only actionable shard from each sender is the
826    /// one at that sender's participant index, because that is the shard a
827    /// non-leader is allowed to gossip. This still lets a notarized commitment
828    /// reconstruct from many peer-gossiped shards. The local assigned shard is
829    /// different: it is only valid when it came from the leader, and the leader's
830    /// identity is needed before it can be accepted as assigned-shard evidence.
831    fn ingest_buffered_shards(&mut self, commitment: Commitment) -> bool {
832        let state = self
833            .state
834            .get(&commitment)
835            .expect("buffered shards can only be ingested with reconstruction state");
836        let round = state.round();
837        let leader_known = state.leader().is_some();
838        let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
839            warn!(%commitment, "no scheme for epoch, dropping buffered shards");
840            return false;
841        };
842
843        let mut buffered = Vec::new();
844        for (peer, queue) in self.peer_buffers.iter_mut() {
845            let mut i = 0;
846            while i < queue.len() {
847                if queue[i].commitment() != commitment {
848                    i += 1;
849                    continue;
850                }
851                if !leader_known {
852                    let Some(sender_index) = scheme.participants().index(peer) else {
853                        i += 1;
854                        continue;
855                    };
856                    let expected_index: u16 = sender_index
857                        .get()
858                        .try_into()
859                        .expect("participant index impossibly out of bounds");
860                    if queue[i].index() != expected_index {
861                        i += 1;
862                        continue;
863                    }
864                }
865                let shard = queue.swap_remove_back(i).expect("index is valid");
866                buffered.push((peer.clone(), shard));
867            }
868        }
869
870        let state = self
871            .state
872            .get_mut(&commitment)
873            .expect("reconstruction state checked before buffered shard drain");
874
875        // Ingest buffered shards into the active reconstruction state. Batch verification
876        // will be triggered if there are enough shards to meet the quorum threshold.
877        let mut progressed = false;
878        let ctx = InsertCtx::new(scheme.as_ref(), &self.strategy);
879        for (peer, shard) in buffered {
880            progressed |= state.on_network_shard(peer, shard, ctx, &mut self.blocker);
881        }
882        progressed
883    }
884
885    /// Cache a block and notify all subscribers waiting on it.
886    fn cache_block(&mut self, round: Round, block: CodedBlock<B, C, H>) {
887        let commitment = block.commitment();
888        self.reconstructed_blocks.insert(
889            commitment,
890            ReconstructedBlock {
891                round,
892                block: block.clone(),
893            },
894        );
895        self.notify_block_subscribers(block);
896    }
897
898    /// Broadcasts the shards of a [`CodedBlock`] and caches the block.
899    ///
900    /// - Participants receive the shard matching their participant index.
901    /// - Non-participants in aggregate membership receive the leader's shard.
902    fn broadcast_shards<Sr: Sender<PublicKey = P>>(
903        &mut self,
904        sender: &mut WrappedSender<Sr, Shard<C, H>>,
905        round: Round,
906        mut block: CodedBlock<B, C, H>,
907    ) {
908        let commitment = block.commitment();
909
910        let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else {
911            warn!(%commitment, "no scheme available, cannot broadcast shards");
912            return;
913        };
914        let participants = scheme.participants();
915        let Some(me) = scheme.me() else {
916            warn!(
917                %commitment,
918                "cannot broadcast shards: local proposer is not a participant"
919            );
920            return;
921        };
922
923        let shard_count = block.shards(&self.strategy).len();
924        if shard_count != participants.len() {
925            warn!(
926                %commitment,
927                shard_count,
928                participants = participants.len(),
929                "cannot broadcast shards: participant/shard count mismatch"
930            );
931            return;
932        }
933
934        let my_index = me.get() as usize;
935        let leader_shard = block
936            .shard(my_index as u16)
937            .expect("proposer's shard must exist");
938
939        // Broadcast each participant their corresponding shard.
940        for (index, peer) in participants.iter().enumerate() {
941            if index == my_index {
942                continue;
943            }
944
945            let Some(shard) = block.shard(index as u16) else {
946                warn!(
947                    %commitment,
948                    index,
949                    "cannot broadcast shards: missing shard for participant index"
950                );
951                return;
952            };
953            let _ = sender.send(Recipients::One(peer.clone()), shard, true);
954        }
955
956        // Send the leader's shard to peers in aggregate membership who are not participants.
957        let non_participants: Vec<P> = self
958            .aggregate_peers
959            .iter()
960            .filter(|peer| participants.index(peer).is_none())
961            .cloned()
962            .collect();
963        if !non_participants.is_empty() {
964            let _ = sender.send(Recipients::Some(non_participants), leader_shard, true);
965        }
966
967        // Cache the block so we don't have to reconstruct it again.
968        self.cache_block(round, block);
969
970        // Local proposals bypass reconstruction, so shard subscribers waiting
971        // for "our valid shard arrived" still need a notification.
972        self.notify_assigned_shard_verified_subscribers(commitment);
973
974        debug!(?commitment, "broadcasted shards");
975    }
976
977    /// Gossips a validated [`Shard`] using [`commonware_p2p::Recipients::All`].
978    fn broadcast_shard<Sr: Sender<PublicKey = P>>(
979        &mut self,
980        sender: &mut WrappedSender<Sr, Shard<C, H>>,
981        shard: Shard<C, H>,
982    ) {
983        let commitment = shard.commitment();
984        let peers = sender.send(Recipients::All, shard, true);
985        debug!(
986            ?commitment,
987            peers = peers.len(),
988            "broadcasted shard to all peers"
989        );
990    }
991
992    /// Broadcasts any pending validated shard for the given commitment and attempts
993    /// reconstruction. If reconstruction succeeds or fails, the state is cleaned
994    /// up and subscribers are notified.
995    fn try_advance<Sr: Sender<PublicKey = P>>(
996        &mut self,
997        sender: &mut WrappedSender<Sr, Shard<C, H>>,
998        commitment: Commitment,
999    ) {
1000        if let Some(state) = self.state.get_mut(&commitment) {
1001            match state.take_pending_action() {
1002                Some(AssignedShardVerifiedAction::Broadcast(shard)) => {
1003                    self.broadcast_shard(sender, shard);
1004                    self.notify_assigned_shard_verified_subscribers(commitment);
1005                }
1006                Some(AssignedShardVerifiedAction::NotifyOnly) => {
1007                    self.notify_assigned_shard_verified_subscribers(commitment);
1008                }
1009                None => {}
1010            }
1011        }
1012
1013        match self.try_reconstruct(commitment) {
1014            Ok(Some(block)) => {
1015                // Do not prune other reconstruction state here. A Byzantine
1016                // leader can equivocate by proposing multiple commitments in
1017                // the same round, so more than one block may be reconstructed
1018                // for a given round. Pruning is deferred to `prune()`, which
1019                // is called once a commitment is finalized.
1020                debug!(
1021                    %commitment,
1022                    parent = %block.parent(),
1023                    height = %block.height(),
1024                    "successfully reconstructed block from shards"
1025                );
1026            }
1027            Ok(None) => {
1028                debug!(%commitment, "not enough checked shards to reconstruct block");
1029            }
1030            Err(err) => {
1031                warn!(%commitment, ?err, "failed to reconstruct block from checked shards");
1032                self.state.remove(&commitment);
1033                self.drop_subscriptions(commitment);
1034                self.metrics.reconstruction_failures_total.inc();
1035            }
1036        }
1037    }
1038
1039    /// Handles the registry of an assigned shard verification subscription.
1040    ///
1041    /// For participants this is tied to verification of the leader-delivered
1042    /// shard for the local index, not to generic block reconstruction.
1043    fn handle_assigned_shard_verified_subscription(
1044        &mut self,
1045        commitment: Commitment,
1046        response: oneshot::Sender<()>,
1047    ) {
1048        // Answer immediately if our own shard has been verified.
1049        let has_shard = self
1050            .state
1051            .get(&commitment)
1052            .is_some_and(|state| state.is_assigned_shard_verified());
1053        if has_shard {
1054            response.send_lossy(());
1055            return;
1056        }
1057
1058        // When there is no reconstruction state but the block is already in
1059        // the cache, the local node was the proposer. Proposers trivially
1060        // have all shards, so resolve immediately.
1061        if !self.state.contains_key(&commitment)
1062            && self.reconstructed_blocks.contains_key(&commitment)
1063        {
1064            response.send_lossy(());
1065            return;
1066        }
1067
1068        self.assigned_shard_verified_subscriptions
1069            .entry(commitment)
1070            .or_default()
1071            .push(response);
1072    }
1073
1074    /// Handles the registry of a block subscription.
1075    fn handle_block_subscription(
1076        &mut self,
1077        key: BlockSubscriptionKey<B::Digest>,
1078        response: oneshot::Sender<CodedBlock<B, C, H>>,
1079    ) {
1080        let block = match key {
1081            BlockSubscriptionKey::Commitment(commitment) => self
1082                .reconstructed_blocks
1083                .get(&commitment)
1084                .map(|entry| &entry.block),
1085            BlockSubscriptionKey::Digest(digest) => self
1086                .reconstructed_blocks
1087                .values()
1088                .find_map(|entry| (entry.block.digest() == digest).then_some(&entry.block)),
1089        };
1090
1091        // Answer immediately if we have the block cached.
1092        if let Some(block) = block {
1093            response.send_lossy(block.clone());
1094            return;
1095        }
1096
1097        self.block_subscriptions
1098            .entry(key)
1099            .or_default()
1100            .push(response);
1101    }
1102
1103    /// Notifies and cleans up any subscriptions waiting for assigned shard
1104    /// verification.
1105    fn notify_assigned_shard_verified_subscribers(&mut self, commitment: Commitment) {
1106        if let Some(mut subscribers) = self
1107            .assigned_shard_verified_subscriptions
1108            .remove(&commitment)
1109        {
1110            for subscriber in subscribers.drain(..) {
1111                subscriber.send_lossy(());
1112            }
1113        }
1114    }
1115
1116    /// Notifies and cleans up any subscriptions for a reconstructed block.
1117    fn notify_block_subscribers(&mut self, block: CodedBlock<B, C, H>) {
1118        let commitment = block.commitment();
1119        let digest = block.digest();
1120
1121        // Notify by-commitment subscribers.
1122        if let Some(mut subscribers) = self
1123            .block_subscriptions
1124            .remove(&BlockSubscriptionKey::Commitment(commitment))
1125        {
1126            for subscriber in subscribers.drain(..) {
1127                subscriber.send_lossy(block.clone());
1128            }
1129        }
1130
1131        // Notify by-digest subscribers.
1132        if let Some(mut subscribers) = self
1133            .block_subscriptions
1134            .remove(&BlockSubscriptionKey::Digest(digest))
1135        {
1136            for subscriber in subscribers.drain(..) {
1137                subscriber.send_lossy(block.clone());
1138            }
1139        }
1140    }
1141
1142    /// Drops all subscriptions associated with a commitment.
1143    ///
1144    /// Removing these entries drops all senders, causing receivers to resolve
1145    /// with cancellation (`RecvError`) instead of hanging indefinitely.
1146    fn drop_subscriptions(&mut self, commitment: Commitment) {
1147        self.assigned_shard_verified_subscriptions
1148            .remove(&commitment);
1149        self.block_subscriptions
1150            .remove(&BlockSubscriptionKey::Commitment(commitment));
1151        self.block_subscriptions
1152            .remove(&BlockSubscriptionKey::Digest(
1153                commitment.block::<B::Digest>(),
1154            ));
1155    }
1156
1157    /// Prunes all blocks in the reconstructed block cache that are older than the block
1158    /// with the given commitment. Also cleans up stale reconstruction state
1159    /// and subscriptions.
1160    ///
1161    /// This is the only place reconstruction state is pruned by round. We
1162    /// intentionally avoid pruning on reconstruction success because a
1163    /// Byzantine leader can equivocate, producing multiple valid commitments
1164    /// in the same round. Both must remain recoverable until finalization
1165    /// determines which one is canonical.
1166    fn prune(&mut self, through: Commitment) {
1167        let cached = self
1168            .reconstructed_blocks
1169            .get(&through)
1170            .map(|entry| (entry.round, entry.block.height()));
1171        if let Some((_, height)) = cached {
1172            self.reconstructed_blocks
1173                .retain(|_, entry| entry.block.height() > height);
1174        }
1175
1176        // Always clear direct state/subscriptions for the pruned commitment.
1177        // This avoids dangling waiters when prune is called for a commitment
1178        // that was never reconstructed locally.
1179        self.drop_subscriptions(through);
1180        let state_round = self.state.remove(&through).map(|state| state.round());
1181        let cached_round = cached.map(|(round, _)| round);
1182        let Some(round) = state_round.or(cached_round) else {
1183            return;
1184        };
1185
1186        let mut pruned_commitments = Vec::new();
1187        self.state.retain(|c, s| {
1188            let keep = s.round() > round;
1189            if !keep {
1190                pruned_commitments.push(*c);
1191            }
1192            keep
1193        });
1194        for pruned in pruned_commitments {
1195            self.drop_subscriptions(pruned);
1196        }
1197    }
1198}
1199
1200/// Erasure coded block reconstruction state machine.
1201enum ReconstructionState<P, C, H>
1202where
1203    P: PublicKey,
1204    C: CodingScheme,
1205    H: Hasher,
1206{
1207    /// Stage 1: accumulate shards. The leader's shard for our index is
1208    /// verified immediately; all other shards are buffered until enough
1209    /// are available for batch verification.
1210    AwaitingQuorum(AwaitingQuorumState<P, C, H>),
1211    /// Stage 2: batch validation passed; checked shards are available for
1212    /// reconstruction.
1213    Ready(ReadyState<P, C, H>),
1214}
1215
1216/// Action to take once assigned shard verification has been established.
1217///
1218/// Participants broadcast the shard to all peers, while non-participants
1219/// only notify local subscribers.
1220enum AssignedShardVerifiedAction<C: CodingScheme, H: Hasher> {
1221    /// Broadcast the shard to all peers and notify local subscribers.
1222    Broadcast(Shard<C, H>),
1223    /// Only notify local subscribers (non-participant validated the leader's shard).
1224    NotifyOnly,
1225}
1226
1227/// A coding shard paired with its participant index.
1228struct IndexedShard<C: CodingScheme> {
1229    index: u16,
1230    data: C::Shard,
1231}
1232
1233/// State shared across all reconstruction phases.
1234struct CommonState<P, C, H>
1235where
1236    P: PublicKey,
1237    C: CodingScheme,
1238    H: Hasher,
1239{
1240    /// The leader associated with this reconstruction state, if consensus has
1241    /// provided it.
1242    leader: Option<P>,
1243    /// Our validated shard and the action to take with it.
1244    pending_action: Option<AssignedShardVerifiedAction<C, H>>,
1245    /// Shards that have been verified and are ready to contribute to reconstruction.
1246    checked_shards: Vec<C::CheckedShard>,
1247    /// Bitmap tracking which participant indices have contributed a shard.
1248    contributed: BitMap,
1249    /// The round for which this commitment was externally proposed.
1250    round: Round,
1251    /// Raw shard data received per index, retained for equivocation detection.
1252    /// Keyed by shard index.
1253    received_shards: BTreeMap<u16, C::Shard>,
1254    /// Whether the leader's shard for our assigned index has been verified.
1255    assigned_shard_verified: bool,
1256}
1257
1258/// Phase data for `ReconstructionState::AwaitingQuorum`.
1259///
1260/// In this phase, the leader may be unknown. Sender-indexed shards can still be
1261/// buffered until enough are available to attempt batch validation. Once the
1262/// leader is known, the leader's shard for our index is verified eagerly via
1263/// `C::check`.
1264struct AwaitingQuorumState<P, C, H>
1265where
1266    P: PublicKey,
1267    C: CodingScheme,
1268    H: Hasher,
1269{
1270    common: CommonState<P, C, H>,
1271    /// Shards pending batch validation, keyed by sender.
1272    pending_shards: BTreeMap<P, IndexedShard<C>>,
1273}
1274
1275/// Phase data for `ReconstructionState::Ready`.
1276///
1277/// Batch validation has passed. Checked shards are available for
1278/// reconstruction.
1279struct ReadyState<P, C, H>
1280where
1281    P: PublicKey,
1282    C: CodingScheme,
1283    H: Hasher,
1284{
1285    common: CommonState<P, C, H>,
1286}
1287
1288impl<P, C, H> CommonState<P, C, H>
1289where
1290    P: PublicKey,
1291    C: CodingScheme,
1292    H: Hasher,
1293{
1294    /// Create a new empty common state for the provided leader and round.
1295    fn new(leader: Option<P>, round: Round, participants_len: u64) -> Self {
1296        Self {
1297            leader,
1298            pending_action: None,
1299            checked_shards: Vec::new(),
1300            contributed: BitMap::zeroes(participants_len),
1301            round,
1302            received_shards: BTreeMap::new(),
1303            assigned_shard_verified: false,
1304        }
1305    }
1306}
1307
1308impl<P, C, H> CommonState<P, C, H>
1309where
1310    P: PublicKey,
1311    C: CodingScheme,
1312    H: Hasher,
1313{
1314    /// Verify the leader's shard for our index and store it.
1315    ///
1316    /// When `is_participant` is true, the validated shard is stored for
1317    /// broadcasting to peers. When false (non-participant), only subscriber
1318    /// notification is scheduled.
1319    ///
1320    /// Returns `false` if verification fails (sender is blocked), `true` on
1321    /// success.
1322    fn verify_assigned_shard(
1323        &mut self,
1324        sender: P,
1325        commitment: Commitment,
1326        shard: IndexedShard<C>,
1327        is_participant: bool,
1328        blocker: &mut impl Blocker<PublicKey = P>,
1329    ) -> bool {
1330        // Store data for equivocation detection first (move), then clone
1331        // once for check. This avoids a second clone compared to cloning
1332        // for both check and storage.
1333        self.received_shards.insert(shard.index, shard.data);
1334        let data = self.received_shards.get(&shard.index).unwrap();
1335        let Ok(checked) = C::check(&commitment.config(), &commitment.root(), shard.index, data)
1336        else {
1337            self.received_shards.remove(&shard.index);
1338            commonware_p2p::block!(blocker, sender, "invalid shard received from leader");
1339            return false;
1340        };
1341
1342        self.contributed.set(u64::from(shard.index), true);
1343        self.checked_shards.push(checked);
1344        self.assigned_shard_verified = true;
1345        self.pending_action = Some(if is_participant {
1346            AssignedShardVerifiedAction::Broadcast(Shard::new(
1347                commitment,
1348                shard.index,
1349                data.clone(),
1350            ))
1351        } else {
1352            AssignedShardVerifiedAction::NotifyOnly
1353        });
1354        true
1355    }
1356}
1357
1358impl<P, C, H> AwaitingQuorumState<P, C, H>
1359where
1360    P: PublicKey,
1361    C: CodingScheme,
1362    H: Hasher,
1363{
1364    /// Check whether quorum is met and, if so, batch-validate all pending
1365    /// shards in parallel. Returns `Some(ReadyState)` on successful transition.
1366    fn try_transition(
1367        &mut self,
1368        commitment: Commitment,
1369        participants_len: u64,
1370        strategy: &impl Strategy,
1371        blocker: &mut impl Blocker<PublicKey = P>,
1372    ) -> Option<ReadyState<P, C, H>> {
1373        let minimum = usize::from(commitment.config().minimum_shards.get());
1374        if self.common.checked_shards.len() + self.pending_shards.len() < minimum {
1375            return None;
1376        }
1377
1378        // Batch-validate all pending weak shards in parallel.
1379        let pending = std::mem::take(&mut self.pending_shards);
1380        let (new_checked, to_block) =
1381            strategy.map_partition_collect_vec(pending, |(peer, shard)| {
1382                let checked = C::check(
1383                    &commitment.config(),
1384                    &commitment.root(),
1385                    shard.index,
1386                    &shard.data,
1387                );
1388                (peer, checked.ok())
1389            });
1390
1391        for peer in to_block {
1392            commonware_p2p::block!(blocker, peer, "invalid shard received");
1393        }
1394        for checked in new_checked {
1395            self.common.checked_shards.push(checked);
1396        }
1397
1398        // After validation, some may have failed; recheck threshold.
1399        if self.common.checked_shards.len() < minimum {
1400            return None;
1401        }
1402
1403        // Transition to Ready.
1404        let round = self.common.round;
1405        let leader = self.common.leader.clone();
1406        let common = std::mem::replace(
1407            &mut self.common,
1408            CommonState::new(leader, round, participants_len),
1409        );
1410        Some(ReadyState { common })
1411    }
1412}
1413
1414/// Context required for processing incoming network shards.
1415struct InsertCtx<'a, Sch, S>
1416where
1417    Sch: CertificateScheme,
1418    S: Strategy,
1419{
1420    scheme: &'a Sch,
1421    strategy: &'a S,
1422    participants_len: u64,
1423}
1424
1425impl<Sch: CertificateScheme, S: Strategy> Clone for InsertCtx<'_, Sch, S> {
1426    fn clone(&self) -> Self {
1427        *self
1428    }
1429}
1430
1431impl<Sch: CertificateScheme, S: Strategy> Copy for InsertCtx<'_, Sch, S> {}
1432
1433impl<'a, Sch: CertificateScheme, S: Strategy> InsertCtx<'a, Sch, S> {
1434    fn new(scheme: &'a Sch, strategy: &'a S) -> Self {
1435        let participants_len = u64::try_from(scheme.participants().len())
1436            .expect("participant count impossibly out of bounds");
1437        Self {
1438            scheme,
1439            strategy,
1440            participants_len,
1441        }
1442    }
1443}
1444
1445impl<P, C, H> ReconstructionState<P, C, H>
1446where
1447    P: PublicKey,
1448    C: CodingScheme,
1449    H: Hasher,
1450{
1451    /// Create an initial reconstruction state for a commitment.
1452    fn new(leader: Option<P>, round: Round, participants_len: u64) -> Self {
1453        Self::AwaitingQuorum(AwaitingQuorumState {
1454            common: CommonState::new(leader, round, participants_len),
1455            pending_shards: BTreeMap::new(),
1456        })
1457    }
1458
1459    /// Access common state shared across all phases.
1460    const fn common(&self) -> &CommonState<P, C, H> {
1461        match self {
1462            Self::AwaitingQuorum(state) => &state.common,
1463            Self::Ready(state) => &state.common,
1464        }
1465    }
1466
1467    /// Mutably access common state shared across all phases.
1468    const fn common_mut(&mut self) -> &mut CommonState<P, C, H> {
1469        match self {
1470            Self::AwaitingQuorum(state) => &mut state.common,
1471            Self::Ready(state) => &mut state.common,
1472        }
1473    }
1474
1475    /// Return the leader associated with this state.
1476    const fn leader(&self) -> Option<&P> {
1477        self.common().leader.as_ref()
1478    }
1479
1480    /// Set the leader for this state if it has not already been set.
1481    fn set_leader(&mut self, leader: P) -> Result<(), P> {
1482        if self.common().leader.is_some() {
1483            return Err(leader);
1484        }
1485        self.common_mut().leader = Some(leader);
1486        Ok(())
1487    }
1488
1489    /// Returns whether the leader's shard for our index has been verified.
1490    const fn is_assigned_shard_verified(&self) -> bool {
1491        self.common().assigned_shard_verified
1492    }
1493
1494    /// Return the proposal round associated with this state.
1495    const fn round(&self) -> Round {
1496        self.common().round
1497    }
1498
1499    /// Returns all verified shards accumulated for reconstruction.
1500    const fn checked_shards(&self) -> &[C::CheckedShard] {
1501        self.common().checked_shards.as_slice()
1502    }
1503
1504    /// Takes the pending action for this commitment's validated shard.
1505    ///
1506    /// Returns [`None`] if the leader's shard hasn't been validated yet.
1507    const fn take_pending_action(&mut self) -> Option<AssignedShardVerifiedAction<C, H>> {
1508        self.common_mut().pending_action.take()
1509    }
1510
1511    /// Handle an incoming network shard.
1512    ///
1513    /// Returns `true` only when the shard caused state progress (buffered,
1514    /// validated, or transitioned), and `false` when rejected/blocked.
1515    ///
1516    /// ## Peer Blocking Rules
1517    ///
1518    /// The `sender` may be blocked via the provided [`Blocker`] if any of
1519    /// the following rules are violated:
1520    ///
1521    /// - MUST be sent by a participant in the current epoch. Non-participant
1522    ///   senders are blocked.
1523    /// - If the sender is the leader: the shard index MUST match the
1524    ///   recipient's own participant index (when the recipient is a
1525    ///   participant) or the leader's participant index (when the recipient
1526    ///   is a non-participant).
1527    /// - If the sender is not the leader: the shard index MUST match the
1528    ///   sender's participant index. Each non-leader participant may only
1529    ///   gossip their own shard.
1530    /// - Once the leader is known, a mismatched shard index results in
1531    ///   blocking the sender.
1532    /// - Each shard index may only contribute ONE shard per commitment.
1533    ///   Sending a second shard for the same index with different data
1534    ///   (equivocation) results in blocking the sender.
1535    /// - The leader's shard is verified eagerly via [`CodingScheme::check`].
1536    ///   If verification fails, the leader is blocked.
1537    /// - Non-leader shards are buffered in `pending_shards` and
1538    ///   batch-validated when quorum is reached. Invalid shards discovered
1539    ///   during batch validation result in blocking their respective
1540    ///   senders.
1541    ///
1542    /// ## Silent Discard Rules
1543    ///
1544    /// The following conditions cause a shard to be silently ignored
1545    /// without blocking the sender:
1546    ///
1547    /// - Exact duplicate of a previously received shard for the same index.
1548    /// - The index has already been marked as contributed (via the bitmap,
1549    ///   e.g. after batch validation).
1550    /// - Non-leader shards that arrive after the state has transitioned to
1551    ///   [`ReconstructionState::Ready`] (i.e., batch validation has already
1552    ///   passed). The leader's shard for our index is still accepted in
1553    ///   `Ready` state to ensure we verify and re-broadcast it.
1554    /// - Before a reconstruction state exists, shards are buffered at the
1555    ///   engine level in bounded per-peer queues until [`Mailbox::discovered`]
1556    ///   or [`Mailbox::notarized`] creates state for this commitment.
1557    fn on_network_shard<Sch, S, X>(
1558        &mut self,
1559        sender: P,
1560        shard: Shard<C, H>,
1561        ctx: InsertCtx<'_, Sch, S>,
1562        blocker: &mut X,
1563    ) -> bool
1564    where
1565        Sch: CertificateScheme<PublicKey = P>,
1566        S: Strategy,
1567        X: Blocker<PublicKey = P>,
1568    {
1569        let Some(sender_index) = ctx.scheme.participants().index(&sender) else {
1570            commonware_p2p::block!(blocker, sender, "shard sent by non-participant");
1571            return false;
1572        };
1573        let commitment = shard.commitment();
1574        let indexed = IndexedShard {
1575            index: shard.index(),
1576            data: shard.into_inner(),
1577        };
1578
1579        // Determine expected index based on sender role. Before the leader is
1580        // known, only sender-indexed gossip shards are actionable; mismatched
1581        // shards cannot be classified without the leader and do not satisfy
1582        // assigned shard verification.
1583        let leader = self.common().leader.as_ref();
1584        let is_from_leader = leader.is_some_and(|leader| leader == &sender);
1585        let expected_participant = if is_from_leader {
1586            ctx.scheme.me().unwrap_or(sender_index)
1587        } else {
1588            sender_index
1589        };
1590        let expected_index: u16 = expected_participant
1591            .get()
1592            .try_into()
1593            .expect("participant index impossibly out of bounds");
1594        if indexed.index != expected_index {
1595            if leader.is_some() {
1596                commonware_p2p::block!(
1597                    blocker,
1598                    sender,
1599                    shard_index = indexed.index,
1600                    expected_index,
1601                    "shard index does not match expected index"
1602                );
1603            }
1604            return false;
1605        }
1606
1607        // Equivocation/duplicate check.
1608        if let Some(existing) = self.common().received_shards.get(&indexed.index) {
1609            if existing != &indexed.data {
1610                commonware_p2p::block!(blocker, sender, "shard equivocation");
1611            }
1612            return false;
1613        }
1614
1615        // Check if this index already contributed (via batch validation).
1616        if self.common().contributed.get(u64::from(indexed.index)) {
1617            return false;
1618        }
1619
1620        // Leader's shard for our index is always verified eagerly,
1621        // even after transitioning to Ready. This ensures we broadcast
1622        // our own shard to help slower peers reach quorum.
1623        if is_from_leader && !self.common().assigned_shard_verified {
1624            let progressed = self.common_mut().verify_assigned_shard(
1625                sender,
1626                commitment,
1627                indexed,
1628                ctx.scheme.me().is_some(),
1629                blocker,
1630            );
1631
1632            if progressed {
1633                if let Self::AwaitingQuorum(state) = self {
1634                    if let Some(ready) = state.try_transition(
1635                        commitment,
1636                        ctx.participants_len,
1637                        ctx.strategy,
1638                        blocker,
1639                    ) {
1640                        *self = Self::Ready(ready);
1641                    }
1642                }
1643            }
1644            return progressed;
1645        }
1646
1647        // Non-leader shards are only accepted while awaiting quorum.
1648        let Self::AwaitingQuorum(state) = self else {
1649            return false;
1650        };
1651
1652        // Buffer for batch validation.
1653        state
1654            .common
1655            .received_shards
1656            .insert(indexed.index, indexed.data.clone());
1657        state.common.contributed.set(u64::from(indexed.index), true);
1658        state.pending_shards.insert(sender, indexed);
1659        if let Some(ready) =
1660            state.try_transition(commitment, ctx.participants_len, ctx.strategy, blocker)
1661        {
1662            *self = Self::Ready(ready);
1663        }
1664
1665        true
1666    }
1667}
1668
1669#[cfg(test)]
1670mod tests {
1671    use super::*;
1672    use crate::{
1673        marshal::{
1674            coding::types::coding_config_for_participants, mocks::block::Block as MockBlock,
1675        },
1676        types::{Epoch, Height, View},
1677    };
1678    use bytes::Bytes;
1679    use commonware_codec::Encode;
1680    use commonware_coding::{
1681        CodecConfig, Config as CodingConfig, PhasedAsScheme, ReedSolomon, Zoda,
1682    };
1683    use commonware_cryptography::{
1684        certificate::Subject,
1685        ed25519::{PrivateKey, PublicKey},
1686        impl_certificate_ed25519,
1687        sha256::Digest as Sha256Digest,
1688        Committable, Digest, Sha256, Signer,
1689    };
1690    use commonware_macros::{select, test_traced};
1691    use commonware_p2p::{
1692        simulated::{self, Control, Link, Oracle},
1693        Manager as _, TrackedPeers,
1694    };
1695    use commonware_parallel::Sequential;
1696    use commonware_runtime::{deterministic, Quota, Runner, Supervisor as _};
1697    use commonware_utils::{
1698        channel::oneshot::error::TryRecvError, ordered::Set, NZUsize, Participant,
1699    };
1700    use std::{
1701        future::Future,
1702        marker::PhantomData,
1703        num::NonZeroU32,
1704        sync::{
1705            atomic::{AtomicIsize, Ordering},
1706            Arc,
1707        },
1708        time::Duration,
1709    };
1710
1711    #[derive(Clone, Debug)]
1712    pub struct TestSubject {
1713        pub message: Bytes,
1714    }
1715
1716    impl Subject for TestSubject {
1717        type Namespace = Vec<u8>;
1718
1719        fn namespace<'a>(&self, derived: &'a Self::Namespace) -> &'a [u8] {
1720            derived
1721        }
1722
1723        fn message(&self) -> Bytes {
1724            self.message.clone()
1725        }
1726    }
1727
1728    impl_certificate_ed25519!(TestSubject, Vec<u8>);
1729
1730    const SCHEME_NAMESPACE: &[u8] = b"_COMMONWARE_SHARD_ENGINE_TEST";
1731
1732    /// The max size of a shard sent over the wire.
1733    const MAX_SHARD_SIZE: usize = 1024 * 1024; // 1 MiB
1734
1735    /// The default link configuration for tests.
1736    const DEFAULT_LINK: Link = Link {
1737        latency: Duration::from_millis(50),
1738        jitter: Duration::ZERO,
1739        success_rate: 1.0,
1740    };
1741
1742    /// Rate limit quota for tests (effectively unlimited).
1743    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
1744
1745    /// The parallelization strategy used for tests.
1746    const STRATEGY: Sequential = Sequential;
1747
1748    /// A scheme provider that maps each epoch to a potentially different scheme.
1749    ///
1750    /// For most tests only epoch 0 is registered, matching the previous
1751    /// `ConstantProvider` behaviour. Cross-epoch tests register additional
1752    /// epochs with different participant sets.
1753    #[derive(Clone)]
1754    struct MultiEpochProvider {
1755        schemes: BTreeMap<Epoch, Arc<Scheme>>,
1756    }
1757
1758    impl MultiEpochProvider {
1759        fn single(scheme: Scheme) -> Self {
1760            let mut schemes = BTreeMap::new();
1761            schemes.insert(Epoch::zero(), Arc::new(scheme));
1762            Self { schemes }
1763        }
1764
1765        fn with_epoch(mut self, epoch: Epoch, scheme: Scheme) -> Self {
1766            self.schemes.insert(epoch, Arc::new(scheme));
1767            self
1768        }
1769    }
1770
1771    impl Provider for MultiEpochProvider {
1772        type Scope = Epoch;
1773        type Scheme = Scheme;
1774
1775        fn scoped(&self, scope: Epoch) -> Option<Arc<Scheme>> {
1776            self.schemes.get(&scope).cloned()
1777        }
1778    }
1779
1780    /// A one-epoch scheme provider that churns to `None` after a fixed number
1781    /// of successful scope lookups.
1782    #[derive(Clone)]
1783    struct ChurningProvider {
1784        scheme: Arc<Scheme>,
1785        remaining_successes: Arc<AtomicIsize>,
1786    }
1787
1788    impl ChurningProvider {
1789        fn new(scheme: Scheme, successes: isize) -> Self {
1790            Self {
1791                scheme: Arc::new(scheme),
1792                remaining_successes: Arc::new(AtomicIsize::new(successes)),
1793            }
1794        }
1795    }
1796
1797    impl Provider for ChurningProvider {
1798        type Scope = Epoch;
1799        type Scheme = Scheme;
1800
1801        fn scoped(&self, scope: Epoch) -> Option<Arc<Scheme>> {
1802            if scope != Epoch::zero() {
1803                return None;
1804            }
1805            if self.remaining_successes.fetch_sub(1, Ordering::AcqRel) <= 0 {
1806                return None;
1807            }
1808            Some(Arc::clone(&self.scheme))
1809        }
1810    }
1811
1812    // Type aliases for test convenience.
1813    type B = MockBlock<Sha256Digest, ()>;
1814    type H = Sha256;
1815    type P = PublicKey;
1816    type C = ReedSolomon<H>;
1817    type X = Control<P, deterministic::Context>;
1818    type O = Oracle<P, deterministic::Context>;
1819    type Prov = MultiEpochProvider;
1820    type NetworkSender = simulated::Sender<P, deterministic::Context>;
1821    type D = simulated::Manager<P, deterministic::Context>;
1822    type ShardEngine<S> = Engine<deterministic::Context, Prov, X, D, S, H, B, P, Sequential>;
1823    type ChurningShardEngine<S> =
1824        Engine<deterministic::Context, ChurningProvider, X, D, S, H, B, P, Sequential>;
1825
1826    async fn assert_blocked(oracle: &O, blocker: &P, blocked: &P) {
1827        let blocked_peers = oracle.blocked().await.unwrap();
1828        let is_blocked = blocked_peers
1829            .iter()
1830            .any(|(a, b)| a == blocker && b == blocked);
1831        assert!(is_blocked, "expected {blocker} to have blocked {blocked}");
1832    }
1833
1834    /// A participant in the test network with its engine mailbox and blocker.
1835    struct Peer<S: CodingScheme = C> {
1836        /// The peer's public key.
1837        public_key: PublicKey,
1838        /// The peer's index in the participant set.
1839        index: Participant,
1840        /// The mailbox for sending messages to the peer's shard engine.
1841        mailbox: Mailbox<B, S, H, P>,
1842        /// Raw network sender for injecting messages (e.g., byzantine behavior).
1843        sender: NetworkSender,
1844    }
1845
1846    /// A non-participant in the test network with its engine mailbox.
1847    #[allow(dead_code)]
1848    struct NonParticipant<S: CodingScheme = C> {
1849        /// The peer's public key.
1850        public_key: PublicKey,
1851        /// The mailbox for sending messages to the peer's shard engine.
1852        mailbox: Mailbox<B, S, H, P>,
1853        /// Raw network sender for injecting messages.
1854        sender: NetworkSender,
1855    }
1856
1857    /// Test fixture for setting up multiple participants with shard engines.
1858    struct Fixture<S: CodingScheme = C> {
1859        /// Number of peers in the test network.
1860        num_peers: usize,
1861        /// Number of non-participant peers in the test network.
1862        num_non_participants: usize,
1863        /// Network link configuration.
1864        link: Link,
1865        /// Marker for the coding scheme type parameter.
1866        _marker: PhantomData<S>,
1867    }
1868
1869    impl<S: CodingScheme> Default for Fixture<S> {
1870        fn default() -> Self {
1871            Self {
1872                num_peers: 4,
1873                num_non_participants: 0,
1874                link: DEFAULT_LINK,
1875                _marker: PhantomData,
1876            }
1877        }
1878    }
1879
1880    impl<S: CodingScheme> Fixture<S> {
1881        pub fn start<F: Future<Output = ()>>(
1882            self,
1883            f: impl FnOnce(
1884                Self,
1885                deterministic::Context,
1886                O,
1887                Vec<Peer<S>>,
1888                Vec<NonParticipant<S>>,
1889                CodingConfig,
1890            ) -> F,
1891        ) {
1892            let executor = deterministic::Runner::default();
1893            executor.start(|context| async move {
1894                let mut private_keys = (0..self.num_peers)
1895                    .map(|i| PrivateKey::from_seed(i as u64))
1896                    .collect::<Vec<_>>();
1897                private_keys.sort_by_key(|s| s.public_key());
1898                let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
1899
1900                let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
1901
1902                let mut np_private_keys = (0..self.num_non_participants)
1903                    .map(|i| PrivateKey::from_seed((self.num_peers + i) as u64))
1904                    .collect::<Vec<_>>();
1905                np_private_keys.sort_by_key(|s| s.public_key());
1906                let np_keys: Vec<P> = np_private_keys.iter().map(|k| k.public_key()).collect();
1907
1908                let (network, oracle) =
1909                    simulated::Network::<deterministic::Context, P>::new_with_split_peers(
1910                        context.child("network"),
1911                        simulated::Config {
1912                            max_size: MAX_SHARD_SIZE as u32,
1913                            disconnect_on_block: true,
1914                            tracked_peer_sets: NZUsize!(1),
1915                        },
1916                        peer_keys.clone(),
1917                        np_keys.clone(),
1918                    )
1919                    .await;
1920                network.start();
1921
1922                let all_keys: Vec<P> = peer_keys.iter().chain(np_keys.iter()).cloned().collect();
1923
1924                let mut registrations = BTreeMap::new();
1925                for key in all_keys.iter() {
1926                    let control = oracle.control(key.clone());
1927                    let (sender, receiver) = control
1928                        .register(0, TEST_QUOTA)
1929                        .await
1930                        .expect("registration should succeed");
1931                    registrations.insert(key.clone(), (control, sender, receiver));
1932                }
1933                for p1 in all_keys.iter() {
1934                    for p2 in all_keys.iter() {
1935                        if p2 == p1 {
1936                            continue;
1937                        }
1938                        oracle
1939                            .add_link(p1.clone(), p2.clone(), self.link.clone())
1940                            .await
1941                            .expect("link should be added");
1942                    }
1943                }
1944
1945                let coding_config =
1946                    coding_config_for_participants(u16::try_from(self.num_peers).unwrap());
1947
1948                let mut peers = Vec::with_capacity(self.num_peers);
1949                for (idx, peer_key) in peer_keys.iter().enumerate() {
1950                    let (control, sender, receiver) = registrations
1951                        .remove(peer_key)
1952                        .expect("peer should be registered");
1953
1954                    let participant = Participant::new(idx as u32);
1955                    let engine_context = context.child("peer").with_attribute("index", idx);
1956
1957                    let scheme = Scheme::signer(
1958                        SCHEME_NAMESPACE,
1959                        participants.clone(),
1960                        private_keys[idx].clone(),
1961                    )
1962                    .expect("signer scheme should be created");
1963                    let scheme_provider: Prov = MultiEpochProvider::single(scheme);
1964
1965                    let config = Config {
1966                        scheme_provider,
1967                        blocker: control.clone(),
1968                        shard_codec_cfg: CodecConfig {
1969                            maximum_shard_size: MAX_SHARD_SIZE,
1970                        },
1971                        block_codec_cfg: (),
1972                        strategy: STRATEGY,
1973                        mailbox_size: NZUsize!(1024),
1974                        peer_buffer_size: NZUsize!(64),
1975                        background_channel_capacity: NZUsize!(1024),
1976                        peer_provider: oracle.manager(),
1977                    };
1978
1979                    let (engine, mailbox) = ShardEngine::new(engine_context, config);
1980                    let sender_clone = sender.clone();
1981                    engine.start((sender, receiver));
1982
1983                    peers.push(Peer {
1984                        public_key: peer_key.clone(),
1985                        index: participant,
1986                        mailbox,
1987                        sender: sender_clone,
1988                    });
1989                }
1990
1991                let mut non_participants = Vec::with_capacity(self.num_non_participants);
1992                for (idx, np_key) in np_keys.iter().enumerate() {
1993                    let (control, sender, receiver) = registrations
1994                        .remove(np_key)
1995                        .expect("non-participant should be registered");
1996
1997                    let engine_context = context
1998                        .child("non_participant")
1999                        .with_attribute("index", idx);
2000
2001                    let scheme = Scheme::verifier(SCHEME_NAMESPACE, participants.clone());
2002                    let scheme_provider: Prov = MultiEpochProvider::single(scheme);
2003
2004                    let config = Config {
2005                        scheme_provider,
2006                        blocker: control.clone(),
2007                        shard_codec_cfg: CodecConfig {
2008                            maximum_shard_size: MAX_SHARD_SIZE,
2009                        },
2010                        block_codec_cfg: (),
2011                        strategy: STRATEGY,
2012                        mailbox_size: NZUsize!(1024),
2013                        peer_buffer_size: NZUsize!(64),
2014                        background_channel_capacity: NZUsize!(1024),
2015                        peer_provider: oracle.manager(),
2016                    };
2017
2018                    let (engine, mailbox) = ShardEngine::new(engine_context, config);
2019                    let sender_clone = sender.clone();
2020                    engine.start((sender, receiver));
2021
2022                    non_participants.push(NonParticipant {
2023                        public_key: np_key.clone(),
2024                        mailbox,
2025                        sender: sender_clone,
2026                    });
2027                }
2028
2029                f(
2030                    self,
2031                    context,
2032                    oracle,
2033                    peers,
2034                    non_participants,
2035                    coding_config,
2036                )
2037                .await;
2038            });
2039        }
2040    }
2041
2042    #[test_traced]
2043    fn test_e2e_broadcast_and_reconstruction() {
2044        let fixture = Fixture {
2045            num_peers: 10,
2046            ..Default::default()
2047        };
2048
2049        fixture.start(
2050            |config, context, _, mut peers, _, coding_config| async move {
2051                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2052                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2053                let commitment = coded_block.commitment();
2054
2055                let leader = peers[0].public_key.clone();
2056                let round = Round::new(Epoch::zero(), View::new(1));
2057                peers[0].mailbox.proposed(round, coded_block.clone());
2058
2059                // Inform all peers of the leader so shards are processed.
2060                for peer in peers[1..].iter_mut() {
2061                    peer.mailbox.discovered(commitment, leader.clone(), round);
2062                }
2063                context.sleep(config.link.latency).await;
2064
2065                for peer in peers.iter_mut() {
2066                    peer.mailbox
2067                        .subscribe_assigned_shard_verified(commitment)
2068                        .await
2069                        .expect("shard subscription should complete");
2070                }
2071                context.sleep(config.link.latency).await;
2072
2073                for peer in peers.iter_mut() {
2074                    let reconstructed = peer
2075                        .mailbox
2076                        .get(commitment)
2077                        .await
2078                        .expect("block should be reconstructed");
2079                    assert_eq!(reconstructed.commitment(), commitment);
2080                    assert_eq!(reconstructed.height(), coded_block.height());
2081                }
2082            },
2083        );
2084    }
2085
2086    #[test_traced]
2087    fn test_e2e_broadcast_and_reconstruction_zoda() {
2088        let fixture = Fixture {
2089            num_peers: 10,
2090            ..Default::default()
2091        };
2092
2093        fixture.start(
2094            |config, context, _, mut peers, _, coding_config| async move {
2095                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2096                let coded_block = CodedBlock::<B, PhasedAsScheme<Zoda<H>>, H>::new(
2097                    inner,
2098                    coding_config,
2099                    &STRATEGY,
2100                );
2101                let commitment = coded_block.commitment();
2102
2103                let leader = peers[0].public_key.clone();
2104                let round = Round::new(Epoch::zero(), View::new(1));
2105                peers[0].mailbox.proposed(round, coded_block.clone());
2106
2107                // Inform all peers of the leader so shards are processed.
2108                for peer in peers[1..].iter_mut() {
2109                    peer.mailbox.discovered(commitment, leader.clone(), round);
2110                }
2111                context.sleep(config.link.latency).await;
2112
2113                for peer in peers.iter_mut() {
2114                    peer.mailbox
2115                        .subscribe_assigned_shard_verified(commitment)
2116                        .await
2117                        .expect("shard subscription should complete");
2118                }
2119                context.sleep(config.link.latency).await;
2120
2121                for peer in peers.iter_mut() {
2122                    let reconstructed = peer
2123                        .mailbox
2124                        .get(commitment)
2125                        .await
2126                        .expect("block should be reconstructed");
2127                    assert_eq!(reconstructed.commitment(), commitment);
2128                    assert_eq!(reconstructed.height(), coded_block.height());
2129                }
2130            },
2131        );
2132    }
2133
2134    #[test_traced]
2135    fn test_block_subscriptions() {
2136        let fixture = Fixture {
2137            num_peers: 10,
2138            ..Default::default()
2139        };
2140
2141        fixture.start(
2142            |config, context, _, mut peers, _, coding_config| async move {
2143                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2144                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2145                let commitment = coded_block.commitment();
2146                let digest = coded_block.digest();
2147
2148                let leader = peers[0].public_key.clone();
2149                let round = Round::new(Epoch::zero(), View::new(1));
2150
2151                // Subscribe before broadcasting.
2152                let commitment_sub = peers[1].mailbox.subscribe(commitment);
2153                let digest_sub = peers[2].mailbox.subscribe_by_digest(digest);
2154
2155                peers[0].mailbox.proposed(round, coded_block.clone());
2156
2157                // Inform all peers of the leader so shards are processed.
2158                for peer in peers[1..].iter_mut() {
2159                    peer.mailbox.discovered(commitment, leader.clone(), round);
2160                }
2161                context.sleep(config.link.latency * 2).await;
2162
2163                for peer in peers.iter_mut() {
2164                    peer.mailbox
2165                        .subscribe_assigned_shard_verified(commitment)
2166                        .await
2167                        .expect("shard subscription should complete");
2168                }
2169                context.sleep(config.link.latency).await;
2170
2171                let block_by_commitment =
2172                    commitment_sub.await.expect("subscription should resolve");
2173                assert_eq!(block_by_commitment.commitment(), commitment);
2174                assert_eq!(block_by_commitment.height(), coded_block.height());
2175
2176                let block_by_digest = digest_sub.await.expect("subscription should resolve");
2177                assert_eq!(block_by_digest.commitment(), commitment);
2178                assert_eq!(block_by_digest.height(), coded_block.height());
2179            },
2180        );
2181    }
2182
2183    #[test_traced]
2184    fn test_proposer_preproposal_subscriptions_resolve_after_local_cache() {
2185        let fixture = Fixture {
2186            num_peers: 10,
2187            ..Default::default()
2188        };
2189
2190        fixture.start(|config, context, _, peers, _, coding_config| async move {
2191            let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2192            let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2193            let commitment = coded_block.commitment();
2194            let digest = coded_block.digest();
2195            let round = Round::new(Epoch::zero(), View::new(1));
2196
2197            // Subscribe on the proposer before it caches the locally proposed block.
2198            let shard_sub = peers[0].mailbox.subscribe_assigned_shard_verified(commitment);
2199            let commitment_sub = peers[0].mailbox.subscribe(commitment);
2200            let digest_sub = peers[0].mailbox.subscribe_by_digest(digest);
2201
2202            peers[0].mailbox.proposed(round, coded_block.clone());
2203            context.sleep(config.link.latency).await;
2204
2205            select! {
2206                result = shard_sub => {
2207                    result.expect("shard subscription should resolve");
2208                },
2209                _ = context.sleep(Duration::from_secs(5)) => {
2210                    panic!("shard subscription did not resolve after local proposal cache");
2211                }
2212            }
2213
2214            let block_by_commitment = select! {
2215                result = commitment_sub => {
2216                    result.expect("block subscription by commitment should resolve")
2217                },
2218                _ = context.sleep(Duration::from_secs(5)) => {
2219                    panic!("block subscription by commitment did not resolve after local proposal cache");
2220                }
2221            };
2222            assert_eq!(block_by_commitment.commitment(), commitment);
2223            assert_eq!(block_by_commitment.height(), coded_block.height());
2224
2225            let block_by_digest = select! {
2226                result = digest_sub => {
2227                    result.expect("block subscription by digest should resolve")
2228                },
2229                _ = context.sleep(Duration::from_secs(5)) => {
2230                    panic!("block subscription by digest did not resolve after local proposal cache");
2231                }
2232            };
2233            assert_eq!(block_by_digest.commitment(), commitment);
2234            assert_eq!(block_by_digest.height(), coded_block.height());
2235        });
2236    }
2237
2238    #[test_traced]
2239    fn test_shard_subscription_rejects_invalid_shard() {
2240        let fixture = Fixture::<C>::default();
2241        fixture.start(
2242            |config, context, oracle, mut peers, _, coding_config| async move {
2243                // peers[0] = byzantine
2244                // peers[1] = honest proposer
2245                // peers[2] = receiver
2246
2247                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2248                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2249                let commitment = coded_block.commitment();
2250                let receiver_index = peers[2].index.get() as u16;
2251
2252                let valid_shard = coded_block.shard(receiver_index).expect("missing shard");
2253
2254                // Corrupt the shard's index to one that doesn't match
2255                // peers[0]'s participant index, triggering a block.
2256                let mut invalid_shard = valid_shard.clone();
2257                invalid_shard.index = peers[3].index.get() as u16;
2258
2259                // Receiver subscribes to their shard and learns the leader.
2260                let receiver_pk = peers[2].public_key.clone();
2261                let leader = peers[1].public_key.clone();
2262                peers[2].mailbox.discovered(
2263                    commitment,
2264                    leader,
2265                    Round::new(Epoch::zero(), View::new(1)),
2266                );
2267                let mut shard_sub = peers[2]
2268                    .mailbox
2269                    .subscribe_assigned_shard_verified(commitment);
2270
2271                // Byzantine peer sends the invalid shard.
2272                let invalid_bytes = invalid_shard.encode();
2273                peers[0]
2274                    .sender
2275                    .send(Recipients::One(receiver_pk.clone()), invalid_bytes, true);
2276
2277                context.sleep(config.link.latency * 2).await;
2278
2279                assert!(
2280                    matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
2281                    "subscription should not resolve from invalid shard"
2282                );
2283                assert_blocked(&oracle, &peers[2].public_key, &peers[0].public_key).await;
2284
2285                // Honest proposer sends the valid shard.
2286                let valid_bytes = valid_shard.encode();
2287                peers[1]
2288                    .sender
2289                    .send(Recipients::One(receiver_pk), valid_bytes, true);
2290                context.sleep(config.link.latency * 2).await;
2291
2292                // Subscription should now resolve.
2293                select! {
2294                    _ = shard_sub => {},
2295                    _ = context.sleep(Duration::from_secs(5)) => {
2296                        panic!("subscription did not complete after valid shard arrival");
2297                    },
2298                };
2299            },
2300        );
2301    }
2302
2303    #[test_traced]
2304    fn test_durable_prunes_reconstructed_blocks() {
2305        let fixture = Fixture::<C>::default();
2306        fixture.start(|_, context, _, mut peers, _, coding_config| async move {
2307            // Create 3 blocks at heights 1, 2, 3.
2308            let block1 = CodedBlock::<B, C, H>::new(
2309                B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100),
2310                coding_config,
2311                &STRATEGY,
2312            );
2313            let block2 = CodedBlock::<B, C, H>::new(
2314                B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 100),
2315                coding_config,
2316                &STRATEGY,
2317            );
2318            let block3 = CodedBlock::<B, C, H>::new(
2319                B::new::<H>((), Sha256Digest::EMPTY, Height::new(3), 100),
2320                coding_config,
2321                &STRATEGY,
2322            );
2323            let commitment1 = block1.commitment();
2324            let commitment2 = block2.commitment();
2325            let commitment3 = block3.commitment();
2326
2327            // Cache all blocks via `proposed`.
2328            let peer = &mut peers[0];
2329            let round = Round::new(Epoch::zero(), View::new(1));
2330            peer.mailbox.proposed(round, block1);
2331            peer.mailbox.proposed(round, block2);
2332            peer.mailbox.proposed(round, block3);
2333            context.sleep(Duration::from_millis(10)).await;
2334
2335            // Verify all blocks are in the cache.
2336            assert!(
2337                peer.mailbox.get(commitment1).await.is_some(),
2338                "block1 should be cached"
2339            );
2340            assert!(
2341                peer.mailbox.get(commitment2).await.is_some(),
2342                "block2 should be cached"
2343            );
2344            assert!(
2345                peer.mailbox.get(commitment3).await.is_some(),
2346                "block3 should be cached"
2347            );
2348
2349            // Prune at height 2 (blocks with height <= 2 should be removed).
2350            peer.mailbox.prune(commitment2);
2351            context.sleep(Duration::from_millis(10)).await;
2352
2353            // Blocks at heights 1 and 2 should be pruned.
2354            assert!(
2355                peer.mailbox.get(commitment1).await.is_none(),
2356                "block1 should be pruned"
2357            );
2358            assert!(
2359                peer.mailbox.get(commitment2).await.is_none(),
2360                "block2 should be pruned"
2361            );
2362
2363            // Block at height 3 should still be cached.
2364            assert!(
2365                peer.mailbox.get(commitment3).await.is_some(),
2366                "block3 should still be cached"
2367            );
2368        });
2369    }
2370
2371    #[test_traced]
2372    fn test_duplicate_leader_shard_ignored() {
2373        let fixture = Fixture::<C>::default();
2374        fixture.start(
2375            |config, context, oracle, mut peers, _, coding_config| async move {
2376                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2377                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2378                let commitment = coded_block.commitment();
2379
2380                // Get peer 2's own-index shard (the one the leader sends them).
2381                let peer2_index = peers[2].index.get() as u16;
2382                let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2383                let shard_bytes = peer2_shard.encode();
2384
2385                let peer2_pk = peers[2].public_key.clone();
2386                let leader = peers[0].public_key.clone();
2387
2388                // Inform peer 2 that peer 0 is the leader.
2389                peers[2].mailbox.discovered(
2390                    commitment,
2391                    leader,
2392                    Round::new(Epoch::zero(), View::new(1)),
2393                );
2394
2395                // Send peer 2 their shard from peer 0 (leader, first time - should succeed).
2396                peers[0]
2397                    .sender
2398                    .send(Recipients::One(peer2_pk.clone()), shard_bytes.clone(), true);
2399                context.sleep(config.link.latency * 2).await;
2400
2401                // Send the same shard again from peer 0 (leader duplicate - ignored).
2402                peers[0]
2403                    .sender
2404                    .send(Recipients::One(peer2_pk), shard_bytes, true);
2405                context.sleep(config.link.latency * 2).await;
2406
2407                // The leader should NOT be blocked for sending an identical duplicate.
2408                let blocked_peers = oracle.blocked().await.unwrap();
2409                let is_blocked = blocked_peers
2410                    .iter()
2411                    .any(|(a, b)| a == &peers[2].public_key && b == &peers[0].public_key);
2412                assert!(
2413                    !is_blocked,
2414                    "leader should not be blocked for duplicate shard"
2415                );
2416            },
2417        );
2418    }
2419
2420    #[test_traced]
2421    fn test_equivocating_leader_shard_blocks_peer() {
2422        let fixture = Fixture::<C>::default();
2423        fixture.start(
2424            |config, context, oracle, mut peers, _, coding_config| async move {
2425                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2426                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
2427                let commitment = coded_block1.commitment();
2428
2429                // Create a second block with different payload to get different shard data.
2430                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 200);
2431                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
2432
2433                // Get peer 2's shard from both blocks.
2434                let peer2_index = peers[2].index.get() as u16;
2435                let shard_bytes1 = coded_block1
2436                    .shard(peer2_index)
2437                    .expect("missing shard")
2438                    .encode();
2439                let mut equivocating_shard =
2440                    coded_block2.shard(peer2_index).expect("missing shard");
2441                // Override the commitment so it targets the same reconstruction state.
2442                equivocating_shard.commitment = commitment;
2443                let shard_bytes2 = equivocating_shard.encode();
2444
2445                let peer2_pk = peers[2].public_key.clone();
2446                let leader = peers[0].public_key.clone();
2447
2448                // Inform peer 2 that peer 0 is the leader.
2449                peers[2].mailbox.discovered(
2450                    commitment,
2451                    leader,
2452                    Round::new(Epoch::zero(), View::new(1)),
2453                );
2454
2455                // Send peer 2 their shard from the leader (first time - succeeds).
2456                peers[0]
2457                    .sender
2458                    .send(Recipients::One(peer2_pk.clone()), shard_bytes1, true);
2459                context.sleep(config.link.latency * 2).await;
2460
2461                // Send a different shard from the leader (equivocation - should block).
2462                peers[0]
2463                    .sender
2464                    .send(Recipients::One(peer2_pk), shard_bytes2, true);
2465                context.sleep(config.link.latency * 2).await;
2466
2467                // Peer 2 should have blocked the leader for equivocation.
2468                assert_blocked(&oracle, &peers[2].public_key, &peers[0].public_key).await;
2469            },
2470        );
2471    }
2472
2473    #[test_traced]
2474    fn test_non_leader_wrong_index_shard_blocked() {
2475        // Test that a non-leader sending a shard with the wrong index is blocked.
2476        // Non-leaders must send shards at their own participant index.
2477        let fixture = Fixture::<C>::default();
2478        fixture.start(
2479            |config, context, oracle, mut peers, _, coding_config| async move {
2480                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2481                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2482                let commitment = coded_block.commitment();
2483
2484                // Get peer 2's own-index shard.
2485                let peer2_index = peers[2].index.get() as u16;
2486                let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2487                let shard_bytes = peer2_shard.encode();
2488
2489                let peer2_pk = peers[2].public_key.clone();
2490                let leader = peers[0].public_key.clone();
2491
2492                // Inform peer 2 that peer 0 is the leader.
2493                peers[2].mailbox.discovered(
2494                    commitment,
2495                    leader,
2496                    Round::new(Epoch::zero(), View::new(1)),
2497                );
2498
2499                // Peer 1 (not the leader) sends peer 2 a shard with peer 2's index
2500                // (wrong: non-leaders must use their own index).
2501                peers[1]
2502                    .sender
2503                    .send(Recipients::One(peer2_pk), shard_bytes, true);
2504                context.sleep(config.link.latency * 2).await;
2505
2506                // Peer 1 should be blocked by peer 2 for wrong shard index.
2507                assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2508            },
2509        );
2510    }
2511
2512    #[test_traced]
2513    fn test_buffered_wrong_index_shard_blocked_on_leader_arrival() {
2514        // Test that when a non-leader's shard with the wrong index is buffered
2515        // (leader unknown) and then the leader arrives, the sender is blocked.
2516        let fixture = Fixture::<C>::default();
2517        fixture.start(
2518            |config, context, oracle, mut peers, _, coding_config| async move {
2519                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2520                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2521                let commitment = coded_block.commitment();
2522
2523                // Get peer 2's own-index shard.
2524                let peer2_index = peers[2].index.get() as u16;
2525                let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2526                let shard_bytes = peer2_shard.encode();
2527
2528                let peer2_pk = peers[2].public_key.clone();
2529
2530                // Peer 1 sends a shard with peer 2's index before the leader is known (buffered).
2531                // This is wrong: non-leaders must send at their own index.
2532                peers[1]
2533                    .sender
2534                    .send(Recipients::One(peer2_pk), shard_bytes, true);
2535                context.sleep(config.link.latency * 2).await;
2536
2537                // Nobody should be blocked yet (shard is buffered, leader unknown).
2538                let blocked = oracle.blocked().await.unwrap();
2539                assert!(
2540                    blocked.is_empty(),
2541                    "no peers should be blocked while leader is unknown"
2542                );
2543
2544                // Now inform peer 2 that peer 0 is the leader.
2545                // This drains the buffer: peer 1's shard has peer 2's index but
2546                // peer 1 is not the leader, so expected index is peer 1's own index.
2547                let leader = peers[0].public_key.clone();
2548                peers[2].mailbox.discovered(
2549                    commitment,
2550                    leader,
2551                    Round::new(Epoch::zero(), View::new(1)),
2552                );
2553                context.sleep(Duration::from_millis(10)).await;
2554
2555                assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2556            },
2557        );
2558    }
2559
2560    #[test_traced]
2561    fn test_conflicting_external_proposed_ignored() {
2562        let fixture = Fixture::<C>::default();
2563        fixture.start(
2564            |config, context, oracle, mut peers, _, coding_config| async move {
2565                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2566                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2567                let commitment = coded_block.commitment();
2568
2569                // Get the shard the leader would send to peer 2 (at peer 2's index).
2570                let peer2_index = peers[2].index.get() as u16;
2571                let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2572                let shard_bytes = peer2_shard.encode();
2573
2574                let peer2_pk = peers[2].public_key.clone();
2575                let leader_a = peers[0].public_key.clone();
2576                let leader_b = peers[1].public_key.clone();
2577
2578                // Subscribe before shards arrive so we can verify acceptance.
2579                let shard_sub = peers[2]
2580                    .mailbox
2581                    .subscribe_assigned_shard_verified(commitment);
2582
2583                // First leader update should stick.
2584                peers[2].mailbox.discovered(
2585                    commitment,
2586                    leader_a.clone(),
2587                    Round::new(Epoch::zero(), View::new(1)),
2588                );
2589
2590                // Conflicting update should be ignored.
2591                peers[2].mailbox.discovered(
2592                    commitment,
2593                    leader_b,
2594                    Round::new(Epoch::zero(), View::new(1)),
2595                );
2596
2597                // Original leader sends shard; this should still be accepted.
2598                peers[0]
2599                    .sender
2600                    .send(Recipients::One(peer2_pk.clone()), shard_bytes.clone(), true);
2601                context.sleep(config.link.latency * 2).await;
2602
2603                // Subscription should resolve from accepted leader shard.
2604                select! {
2605                    _ = shard_sub => {},
2606                    _ = context.sleep(Duration::from_secs(5)) => {
2607                        panic!("subscription did not complete after shard from original leader");
2608                    },
2609                };
2610
2611                // The conflicting leader should still be treated as non-leader and blocked.
2612                peers[1]
2613                    .sender
2614                    .send(Recipients::One(peer2_pk), shard_bytes, true);
2615                context.sleep(config.link.latency * 2).await;
2616
2617                assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2618
2619                // Original leader should not be blocked.
2620                let blocked_peers = oracle.blocked().await.unwrap();
2621                let leader_a_blocked = blocked_peers
2622                    .iter()
2623                    .any(|(a, b)| a == &peers[2].public_key && b == &leader_a);
2624                assert!(
2625                    !leader_a_blocked,
2626                    "original leader should not be blocked after conflicting leader update"
2627                );
2628            },
2629        );
2630    }
2631
2632    #[test_traced]
2633    fn test_non_participant_external_proposed_ignored() {
2634        let fixture = Fixture::<C>::default();
2635        fixture.start(
2636            |config, context, oracle, mut peers, _, coding_config| async move {
2637                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2638                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2639                let commitment = coded_block.commitment();
2640
2641                // Get the shard the leader would send to peer 2 (at peer 2's index).
2642                let peer2_index = peers[2].index.get() as u16;
2643                let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2644                let shard_bytes = peer2_shard.encode();
2645
2646                let peer2_pk = peers[2].public_key.clone();
2647                let leader = peers[0].public_key.clone();
2648                let non_participant_leader = PrivateKey::from_seed(10_000).public_key();
2649
2650                // Subscribe before shards arrive.
2651                let shard_sub = peers[2]
2652                    .mailbox
2653                    .subscribe_assigned_shard_verified(commitment);
2654
2655                // A non-participant leader update should be ignored.
2656                peers[2].mailbox.discovered(
2657                    commitment,
2658                    non_participant_leader,
2659                    Round::new(Epoch::zero(), View::new(1)),
2660                );
2661
2662                // Leader unknown path: this shard should be buffered, not blocked.
2663                peers[0]
2664                    .sender
2665                    .send(Recipients::One(peer2_pk.clone()), shard_bytes.clone(), true);
2666                context.sleep(config.link.latency * 2).await;
2667
2668                let blocked = oracle.blocked().await.unwrap();
2669                let leader_blocked = blocked
2670                    .iter()
2671                    .any(|(a, b)| a == &peers[2].public_key && b == &leader);
2672                assert!(
2673                    !leader_blocked,
2674                    "leader should not be blocked when non-participant update is ignored"
2675                );
2676
2677                // A valid leader update should then process buffered shards and resolve subscription.
2678                peers[2].mailbox.discovered(
2679                    commitment,
2680                    leader,
2681                    Round::new(Epoch::zero(), View::new(1)),
2682                );
2683                context.sleep(config.link.latency * 2).await;
2684
2685                select! {
2686                    _ = shard_sub => {},
2687                    _ = context.sleep(Duration::from_secs(5)) => {
2688                        panic!("subscription did not complete after valid leader update");
2689                    },
2690                };
2691            },
2692        );
2693    }
2694
2695    #[test_traced]
2696    fn test_shard_from_non_participant_blocks_peer() {
2697        let fixture = Fixture::<C>::default();
2698        fixture.start(
2699            |config, context, oracle, peers, _, coding_config| async move {
2700                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2701                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2702                let commitment = coded_block.commitment();
2703
2704                let leader = peers[0].public_key.clone();
2705                let receiver_pk = peers[2].public_key.clone();
2706
2707                let non_participant_key = PrivateKey::from_seed(10_000);
2708                let non_participant_pk = non_participant_key.public_key();
2709
2710                let non_participant_control = oracle.control(non_participant_pk.clone());
2711                let (mut non_participant_sender, _non_participant_receiver) =
2712                    non_participant_control
2713                        .register(0, TEST_QUOTA)
2714                        .await
2715                        .expect("registration should succeed");
2716                oracle
2717                    .add_link(
2718                        non_participant_pk.clone(),
2719                        receiver_pk.clone(),
2720                        DEFAULT_LINK,
2721                    )
2722                    .await
2723                    .expect("link should be added");
2724                oracle.manager().track(
2725                    2,
2726                    TrackedPeers::new(
2727                        Set::from_iter_dedup(peers.iter().map(|peer| peer.public_key.clone())),
2728                        Set::from_iter_dedup([non_participant_pk.clone()]),
2729                    ),
2730                );
2731                context.sleep(Duration::from_millis(10)).await;
2732
2733                peers[2].mailbox.discovered(
2734                    commitment,
2735                    leader,
2736                    Round::new(Epoch::zero(), View::new(1)),
2737                );
2738
2739                let peer2_index = peers[2].index.get() as u16;
2740                let shard = coded_block.shard(peer2_index).expect("missing shard");
2741                let shard_bytes = shard.encode();
2742
2743                non_participant_sender.send(Recipients::One(receiver_pk), shard_bytes, true);
2744                context.sleep(config.link.latency * 2).await;
2745
2746                assert_blocked(&oracle, &peers[2].public_key, &non_participant_pk).await;
2747            },
2748        );
2749    }
2750
2751    #[test_traced]
2752    fn test_preleader_shard_from_non_participant_is_not_buffered() {
2753        let fixture = Fixture::<C>::default();
2754        fixture.start(
2755            |config, context, oracle, peers, _, coding_config| async move {
2756                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2757                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2758                let commitment = coded_block.commitment();
2759
2760                let leader = peers[0].public_key.clone();
2761                let receiver_pk = peers[2].public_key.clone();
2762
2763                let non_participant_key = PrivateKey::from_seed(10_000);
2764                let non_participant_pk = non_participant_key.public_key();
2765
2766                let non_participant_control = oracle.control(non_participant_pk.clone());
2767                let (mut non_participant_sender, _non_participant_receiver) =
2768                    non_participant_control
2769                        .register(0, TEST_QUOTA)
2770                        .await
2771                        .expect("registration should succeed");
2772                oracle
2773                    .add_link(
2774                        non_participant_pk.clone(),
2775                        receiver_pk.clone(),
2776                        DEFAULT_LINK,
2777                    )
2778                    .await
2779                    .expect("link should be added");
2780                oracle.manager().track(
2781                    2,
2782                    TrackedPeers::new(
2783                        Set::from_iter_dedup(peers.iter().map(|peer| peer.public_key.clone())),
2784                        Set::from_iter_dedup([non_participant_pk.clone()]),
2785                    ),
2786                );
2787                context.sleep(Duration::from_millis(10)).await;
2788
2789                let peer2_index = peers[2].index.get() as u16;
2790                let shard = coded_block.shard(peer2_index).expect("missing shard");
2791                let shard_bytes = shard.encode();
2792                let mut shard_sub = peers[2]
2793                    .mailbox
2794                    .subscribe_assigned_shard_verified(commitment);
2795
2796                non_participant_sender.send(Recipients::One(receiver_pk), shard_bytes, true);
2797                context.sleep(config.link.latency * 2).await;
2798
2799                peers[2].mailbox.discovered(
2800                    commitment,
2801                    leader,
2802                    Round::new(Epoch::zero(), View::new(1)),
2803                );
2804                context.sleep(config.link.latency * 2).await;
2805
2806                let blocked = oracle.blocked().await.unwrap();
2807                let non_participant_blocked = blocked
2808                    .iter()
2809                    .any(|(a, b)| a == &peers[2].public_key && b == &non_participant_pk);
2810                assert!(
2811                    !non_participant_blocked,
2812                    "non-participant should not be blocked when its pre-leader shard is ignored"
2813                );
2814                assert!(
2815                    matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
2816                    "pre-leader shard from non-participant should not be buffered"
2817                );
2818            },
2819        );
2820    }
2821
2822    #[test_traced]
2823    fn test_duplicate_shard_ignored() {
2824        // Use 10 peers so minimum_shards=4, giving us time to send duplicate before reconstruction.
2825        let fixture: Fixture<C> = Fixture {
2826            num_peers: 10,
2827            ..Default::default()
2828        };
2829
2830        fixture.start(
2831            |config, context, oracle, mut peers, _, coding_config| async move {
2832                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2833                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
2834
2835                // Get peer 2's shard (from the leader).
2836                let peer2_index = peers[2].index.get() as u16;
2837                let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
2838
2839                // Get peer 1's shard.
2840                let peer1_index = peers[1].index.get() as u16;
2841                let peer1_shard = coded_block.shard(peer1_index).expect("missing shard");
2842
2843                let peer2_pk = peers[2].public_key.clone();
2844                let leader = peers[0].public_key.clone();
2845
2846                // Inform peer 2 of the leader.
2847                peers[2].mailbox.discovered(
2848                    coded_block.commitment(),
2849                    leader,
2850                    Round::new(Epoch::zero(), View::new(1)),
2851                );
2852
2853                // Send peer 2 their shard from the leader (1 checked shard).
2854                let leader_shard_bytes = peer2_shard.encode();
2855                peers[0]
2856                    .sender
2857                    .send(Recipients::One(peer2_pk.clone()), leader_shard_bytes, true);
2858                context.sleep(config.link.latency * 2).await;
2859
2860                // Send peer 1's shard to peer 2 (first time - should succeed, 2 checked shards).
2861                let peer1_shard_bytes = peer1_shard.encode();
2862                peers[1].sender.send(
2863                    Recipients::One(peer2_pk.clone()),
2864                    peer1_shard_bytes.clone(),
2865                    true,
2866                );
2867                context.sleep(config.link.latency * 2).await;
2868
2869                // Send the same shard again (exact duplicate - should be ignored, not blocked).
2870                // With 10 peers, minimum_shards=4, so we haven't reconstructed yet.
2871                peers[1]
2872                    .sender
2873                    .send(Recipients::One(peer2_pk), peer1_shard_bytes, true);
2874                context.sleep(config.link.latency * 2).await;
2875
2876                // Peer 1 should NOT be blocked for sending an identical duplicate.
2877                let blocked_peers = oracle.blocked().await.unwrap();
2878                let is_blocked = blocked_peers
2879                    .iter()
2880                    .any(|(a, b)| a == &peers[2].public_key && b == &peers[1].public_key);
2881                assert!(
2882                    !is_blocked,
2883                    "peer should not be blocked for exact duplicate shard"
2884                );
2885            },
2886        );
2887    }
2888
2889    #[test_traced]
2890    fn test_equivocating_shard_blocks_peer() {
2891        // Use 10 peers so minimum_shards=4, giving us time to send equivocating shard.
2892        let fixture: Fixture<C> = Fixture {
2893            num_peers: 10,
2894            ..Default::default()
2895        };
2896
2897        fixture.start(
2898            |config, context, oracle, mut peers, _, coding_config| async move {
2899                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
2900                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
2901
2902                // Create a second block with different payload to get different shard data.
2903                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 200);
2904                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
2905
2906                // Get peer 1's shard from block 1.
2907                let peer1_index = peers[1].index.get() as u16;
2908                let peer1_shard = coded_block1.shard(peer1_index).expect("missing shard");
2909
2910                // Get peer 1's shard from block 2 (different data, same index).
2911                let mut peer1_equivocating_shard =
2912                    coded_block2.shard(peer1_index).expect("missing shard");
2913                // Override the commitment to match block 1 so the shard targets
2914                // the same reconstruction state.
2915                peer1_equivocating_shard.commitment = coded_block1.commitment();
2916
2917                let peer2_pk = peers[2].public_key.clone();
2918                let leader = peers[0].public_key.clone();
2919
2920                // Inform peer 2 of the leader.
2921                peers[2].mailbox.discovered(
2922                    coded_block1.commitment(),
2923                    leader,
2924                    Round::new(Epoch::zero(), View::new(1)),
2925                );
2926
2927                // Send peer 2 the leader's shard (verified immediately).
2928                let peer2_index = peers[2].index.get() as u16;
2929                let leader_shard = coded_block1.shard(peer2_index).expect("missing shard");
2930                let leader_shard_bytes = leader_shard.encode();
2931                peers[0]
2932                    .sender
2933                    .send(Recipients::One(peer2_pk.clone()), leader_shard_bytes, true);
2934                context.sleep(config.link.latency * 2).await;
2935
2936                // Send peer 1's valid shard to peer 2 (first time - succeeds).
2937                let shard_bytes = peer1_shard.encode();
2938                peers[1]
2939                    .sender
2940                    .send(Recipients::One(peer2_pk.clone()), shard_bytes, true);
2941                context.sleep(config.link.latency * 2).await;
2942
2943                // Send a different shard from peer 1 (equivocation - should block).
2944                let equivocating_bytes = peer1_equivocating_shard.encode();
2945                peers[1]
2946                    .sender
2947                    .send(Recipients::One(peer2_pk), equivocating_bytes, true);
2948                context.sleep(config.link.latency * 2).await;
2949
2950                // Peer 2 should have blocked peer 1 for equivocation.
2951                assert_blocked(&oracle, &peers[2].public_key, &peers[1].public_key).await;
2952            },
2953        );
2954    }
2955
2956    #[test_traced]
2957    fn test_reconstruction_states_pruned_at_or_below_reconstructed_view() {
2958        // Use 10 peers so minimum_shards=4.
2959        let fixture: Fixture<C> = Fixture {
2960            num_peers: 10,
2961            ..Default::default()
2962        };
2963
2964        fixture.start(
2965            |config, context, oracle, mut peers, _, coding_config| async move {
2966                // Commitment A at lower view (1).
2967                let block_a = CodedBlock::<B, C, H>::new(
2968                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100),
2969                    coding_config,
2970                    &STRATEGY,
2971                );
2972                let commitment_a = block_a.commitment();
2973
2974                // Commitment B at higher view (2), which we will reconstruct.
2975                let block_b = CodedBlock::<B, C, H>::new(
2976                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200),
2977                    coding_config,
2978                    &STRATEGY,
2979                );
2980                let commitment_b = block_b.commitment();
2981
2982                let peer2_pk = peers[2].public_key.clone();
2983                let leader = peers[0].public_key.clone();
2984
2985                // Create state for A and ingest one shard from peer1.
2986                peers[2].mailbox.discovered(
2987                    commitment_a,
2988                    leader.clone(),
2989                    Round::new(Epoch::zero(), View::new(1)),
2990                );
2991                let shard_a = block_a
2992                    .shard(peers[1].index.get() as u16)
2993                    .expect("missing shard")
2994                    .encode();
2995                peers[1]
2996                    .sender
2997                    .send(Recipients::One(peer2_pk.clone()), shard_a.clone(), true);
2998                context.sleep(config.link.latency * 2).await;
2999
3000                // Create/reconstruct B at higher view.
3001                peers[2].mailbox.discovered(
3002                    commitment_b,
3003                    leader,
3004                    Round::new(Epoch::zero(), View::new(2)),
3005                );
3006                // Leader's shard for peer2.
3007                let leader_shard_b = block_b
3008                    .shard(peers[2].index.get() as u16)
3009                    .expect("missing shard")
3010                    .encode();
3011                peers[0]
3012                    .sender
3013                    .send(Recipients::One(peer2_pk.clone()), leader_shard_b, true);
3014
3015                // Three shards for minimum threshold (4 total with leader's).
3016                for i in [1usize, 3usize, 4usize] {
3017                    let shard = block_b
3018                        .shard(peers[i].index.get() as u16)
3019                        .expect("missing shard")
3020                        .encode();
3021                    peers[i]
3022                        .sender
3023                        .send(Recipients::One(peer2_pk.clone()), shard, true);
3024                }
3025                context.sleep(config.link.latency * 4).await;
3026
3027                // B should reconstruct.
3028                let reconstructed = peers[2]
3029                    .mailbox
3030                    .get(commitment_b)
3031                    .await
3032                    .expect("block B should reconstruct");
3033                assert_eq!(reconstructed.commitment(), commitment_b);
3034
3035                // A state should be pruned (at/below reconstructed view). Sending the same
3036                // shard for A again should NOT be treated as duplicate.
3037                peers[1]
3038                    .sender
3039                    .send(Recipients::One(peer2_pk), shard_a, true);
3040                context.sleep(config.link.latency * 2).await;
3041
3042                let blocked = oracle.blocked().await.unwrap();
3043                let blocked_peer1 = blocked
3044                    .iter()
3045                    .any(|(a, b)| a == &peers[2].public_key && b == &peers[1].public_key);
3046                assert!(
3047                    !blocked_peer1,
3048                    "peer1 should not be blocked after lower-view state was pruned"
3049                );
3050            },
3051        );
3052    }
3053
3054    #[test_traced]
3055    fn test_local_proposal_prune_clears_older_reconstruction_state() {
3056        let fixture: Fixture<C> = Fixture {
3057            num_peers: 10,
3058            ..Default::default()
3059        };
3060
3061        fixture.start(
3062            |config, context, oracle, mut peers, _, coding_config| async move {
3063                let block_a = CodedBlock::<B, C, H>::new(
3064                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100),
3065                    coding_config,
3066                    &STRATEGY,
3067                );
3068                let commitment_a = block_a.commitment();
3069
3070                let block_b = CodedBlock::<B, C, H>::new(
3071                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200),
3072                    coding_config,
3073                    &STRATEGY,
3074                );
3075                let commitment_b = block_b.commitment();
3076
3077                let peer2_pk = peers[2].public_key.clone();
3078                let leader = peers[0].public_key.clone();
3079                let round_a = Round::new(Epoch::zero(), View::new(1));
3080                let round_b = Round::new(Epoch::zero(), View::new(2));
3081
3082                peers[2].mailbox.discovered(commitment_a, leader, round_a);
3083                let block_sub = peers[2].mailbox.subscribe(commitment_a);
3084
3085                let peer1_index = peers[1].index.get() as u16;
3086                let shard_a = block_a.shard(peer1_index).expect("missing shard");
3087
3088                let block_a_equivocating = CodedBlock::<B, C, H>::new(
3089                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 300),
3090                    coding_config,
3091                    &STRATEGY,
3092                );
3093                let mut equivocating_shard = block_a_equivocating
3094                    .shard(peer1_index)
3095                    .expect("missing shard");
3096                equivocating_shard.commitment = commitment_a;
3097
3098                peers[1]
3099                    .sender
3100                    .send(Recipients::One(peer2_pk.clone()), shard_a.encode(), true);
3101                context.sleep(config.link.latency * 2).await;
3102
3103                peers[2].mailbox.proposed(round_b, block_b);
3104                assert!(
3105                    peers[2].mailbox.get(commitment_b).await.is_some(),
3106                    "local proposal should be cached before pruning"
3107                );
3108                peers[2].mailbox.prune(commitment_b);
3109
3110                select! {
3111                    result = block_sub => {
3112                        assert!(
3113                            result.is_err(),
3114                            "older block subscription should close after local-proposal prune"
3115                        );
3116                    },
3117                    _ = context.sleep(Duration::from_secs(5)) => {
3118                        panic!("older block subscription remained open after local-proposal prune");
3119                    },
3120                }
3121
3122                peers[1]
3123                    .sender
3124                    .send(Recipients::One(peer2_pk), equivocating_shard.encode(), true);
3125                context.sleep(config.link.latency * 2).await;
3126
3127                let blocked = oracle.blocked().await.unwrap();
3128                let blocked_peer1 = blocked
3129                    .iter()
3130                    .any(|(a, b)| a == &peers[2].public_key && b == &peers[1].public_key);
3131                assert!(
3132                    !blocked_peer1,
3133                    "peer1 should not be blocked after older state was pruned"
3134                );
3135            },
3136        );
3137    }
3138
3139    #[test_traced]
3140    fn test_pending_shards_batch_validated_at_quorum() {
3141        // Test that shards buffered in pending_shards are batch-validated once
3142        // the minimum shard threshold is met, enabling reconstruction.
3143        //
3144        // With 10 peers: minimum_shards = (10-1)/3 + 1 = 4
3145        // The leader (peer 0) sends peer 3 their own-index shard (verified
3146        // immediately). Peers 1, 2, 4 send their own shards (buffered in
3147        // pending_shards). Once the leader's shard + 3 pending shards >= 4,
3148        // batch validation fires and reconstruction succeeds.
3149        let fixture: Fixture<C> = Fixture {
3150            num_peers: 10,
3151            ..Default::default()
3152        };
3153
3154        fixture.start(
3155            |config, context, oracle, mut peers, _, coding_config| async move {
3156                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3157                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3158                let commitment = coded_block.commitment();
3159
3160                let peer3_pk = peers[3].public_key.clone();
3161                let leader = peers[0].public_key.clone();
3162
3163                // Inform peer 3 that peer 0 is the leader.
3164                peers[3].mailbox.discovered(
3165                    commitment,
3166                    leader,
3167                    Round::new(Epoch::zero(), View::new(1)),
3168                );
3169
3170                // Send shards from peers 1, 2, 4 (their own indices).
3171                // These are buffered in pending_shards for batch validation.
3172                for &sender_idx in &[1, 2, 4] {
3173                    let shard = coded_block
3174                        .shard(peers[sender_idx].index.get() as u16)
3175                        .expect("missing shard");
3176                    let shard_bytes = shard.encode();
3177                    peers[sender_idx].sender.send(
3178                        Recipients::One(peer3_pk.clone()),
3179                        shard_bytes,
3180                        true,
3181                    );
3182                }
3183
3184                context.sleep(config.link.latency * 2).await;
3185
3186                // Block should not be reconstructed yet (no leader shard verified).
3187                let block = peers[3].mailbox.get(commitment).await;
3188                assert!(block.is_none(), "block should not be reconstructed yet");
3189
3190                // Now the leader (peer 0) sends peer 3's own-index shard.
3191                // This is verified immediately, and with the 3 pending shards
3192                // we reach minimum_shards=4 -> batch validation + reconstruction.
3193                let peer3_index = peers[3].index.get() as u16;
3194                let leader_shard = coded_block.shard(peer3_index).expect("missing shard");
3195                let leader_shard_bytes = leader_shard.encode();
3196                peers[0]
3197                    .sender
3198                    .send(Recipients::One(peer3_pk), leader_shard_bytes, true);
3199
3200                context.sleep(config.link.latency * 2).await;
3201
3202                // No peers should be blocked (all shards were valid).
3203                let blocked = oracle.blocked().await.unwrap();
3204                assert!(
3205                    blocked.is_empty(),
3206                    "no peers should be blocked for valid pending shards"
3207                );
3208
3209                // Block should now be reconstructed (4 checked shards >= minimum_shards).
3210                let block = peers[3].mailbox.get(commitment).await;
3211                assert!(
3212                    block.is_some(),
3213                    "block should be reconstructed after batch validation"
3214                );
3215
3216                // Verify the reconstructed block has the correct commitment.
3217                let reconstructed = block.unwrap();
3218                assert_eq!(
3219                    reconstructed.commitment(),
3220                    commitment,
3221                    "reconstructed block should have correct commitment"
3222                );
3223            },
3224        );
3225    }
3226
3227    #[test_traced]
3228    fn test_peer_shards_buffered_until_external_proposed() {
3229        // Test that shards received before leader announcement do not progress
3230        // reconstruction until Discovered is delivered.
3231        let fixture: Fixture<C> = Fixture {
3232            num_peers: 10,
3233            ..Default::default()
3234        };
3235
3236        fixture.start(
3237            |config, context, oracle, mut peers, _, coding_config| async move {
3238                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3239                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3240                let commitment = coded_block.commitment();
3241
3242                let receiver_idx = 3usize;
3243                let receiver_pk = peers[receiver_idx].public_key.clone();
3244                let leader = peers[0].public_key.clone();
3245
3246                // Subscribe before any shards arrive.
3247                let mut shard_sub = peers[receiver_idx]
3248                    .mailbox
3249                    .subscribe_assigned_shard_verified(commitment);
3250
3251                // Send the leader's shard (for receiver's index) and three shards,
3252                // all before leader announcement.
3253                let leader_shard = coded_block
3254                    .shard(peers[receiver_idx].index.get() as u16)
3255                    .expect("missing shard")
3256                    .encode();
3257                peers[0]
3258                    .sender
3259                    .send(Recipients::One(receiver_pk.clone()), leader_shard, true);
3260
3261                for i in [1usize, 2usize, 4usize] {
3262                    let shard = coded_block
3263                        .shard(peers[i].index.get() as u16)
3264                        .expect("missing shard")
3265                        .encode();
3266                    peers[i]
3267                        .sender
3268                        .send(Recipients::One(receiver_pk.clone()), shard, true);
3269                }
3270
3271                context.sleep(config.link.latency * 2).await;
3272
3273                // No leader yet: shard subscription should still be pending and block unavailable.
3274                assert!(
3275                    matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
3276                    "shard subscription should not resolve before leader announcement"
3277                );
3278                assert!(
3279                    peers[receiver_idx].mailbox.get(commitment).await.is_none(),
3280                    "block should not reconstruct before leader announcement"
3281                );
3282
3283                // Announce leader, which drains buffered shards and should progress immediately.
3284                peers[receiver_idx].mailbox.discovered(
3285                    commitment,
3286                    leader,
3287                    Round::new(Epoch::zero(), View::new(1)),
3288                );
3289
3290                select! {
3291                    _ = shard_sub => {},
3292                    _ = context.sleep(Duration::from_secs(5)) => {
3293                        panic!("shard subscription did not resolve after leader announcement");
3294                    },
3295                }
3296
3297                context.sleep(config.link.latency * 2).await;
3298                assert!(
3299                    peers[receiver_idx].mailbox.get(commitment).await.is_some(),
3300                    "block should reconstruct after buffered shards are ingested"
3301                );
3302
3303                // All shards were valid and from participants.
3304                assert!(
3305                    oracle.blocked().await.unwrap().is_empty(),
3306                    "no peers should be blocked for valid buffered shards"
3307                );
3308            },
3309        );
3310    }
3311
3312    #[test_traced]
3313    fn test_notarized_commitment_reconstructs_from_buffered_peer_shards_without_leader() {
3314        let fixture: Fixture<C> = Fixture {
3315            num_peers: 10,
3316            ..Default::default()
3317        };
3318
3319        fixture.start(
3320            |config, context, oracle, mut peers, _, coding_config| async move {
3321                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3322                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3323                let commitment = coded_block.commitment();
3324                let round = Round::new(Epoch::zero(), View::new(1));
3325
3326                let receiver_idx = 3usize;
3327                let receiver = peers[receiver_idx].public_key.clone();
3328
3329                let block_sub = peers[receiver_idx].mailbox.subscribe(commitment);
3330
3331                // Four sender-indexed shards are enough to reconstruct without
3332                // classifying any sender as the leader.
3333                for sender_idx in [1usize, 2, 4, 5] {
3334                    let shard = coded_block
3335                        .shard(peers[sender_idx].index.get() as u16)
3336                        .expect("missing shard")
3337                        .encode();
3338                    peers[sender_idx].sender.send(
3339                        Recipients::One(receiver.clone()),
3340                        shard,
3341                        true,
3342                    );
3343                }
3344                context.sleep(config.link.latency * 2).await;
3345
3346                assert!(
3347                    peers[receiver_idx].mailbox.get(commitment).await.is_none(),
3348                    "block should not reconstruct before the commitment is notarized"
3349                );
3350
3351                peers[receiver_idx].mailbox.notarized(commitment, round);
3352
3353                select! {
3354                    _ = block_sub => {},
3355                    _ = context.sleep(Duration::from_secs(5)) => {
3356                        panic!("block subscription did not resolve after notarized reconstruction interest");
3357                    },
3358                }
3359
3360                let reconstructed = peers[receiver_idx]
3361                    .mailbox
3362                    .get(commitment)
3363                    .await
3364                    .expect("block should reconstruct from buffered peer shards");
3365                assert_eq!(reconstructed.commitment(), commitment);
3366
3367                let mut assigned = peers[receiver_idx]
3368                    .mailbox
3369                    .subscribe_assigned_shard_verified(commitment);
3370                assert!(
3371                    matches!(assigned.try_recv(), Err(TryRecvError::Empty)),
3372                    "leaderless reconstruction must not satisfy assigned shard readiness"
3373                );
3374
3375                let leader = peers[0].public_key.clone();
3376                peers[receiver_idx]
3377                    .mailbox
3378                    .discovered(commitment, leader, round);
3379                let leader_shard = coded_block
3380                    .shard(peers[receiver_idx].index.get() as u16)
3381                    .expect("missing leader shard")
3382                    .encode();
3383                peers[0].sender.send(
3384                    Recipients::One(receiver),
3385                    leader_shard,
3386                    true,
3387                );
3388
3389                select! {
3390                    _ = assigned => {},
3391                    _ = context.sleep(Duration::from_secs(5)) => {
3392                        panic!("assigned shard subscription did not resolve after leader discovery");
3393                    },
3394                }
3395
3396                assert!(
3397                    oracle.blocked().await.unwrap().is_empty(),
3398                    "valid sender-indexed shards should not block peers"
3399                );
3400            },
3401        );
3402    }
3403
3404    #[test_traced]
3405    fn test_leader_shard_after_notarized_is_buffered_until_discovered() {
3406        let fixture: Fixture<C> = Fixture {
3407            num_peers: 10,
3408            ..Default::default()
3409        };
3410
3411        fixture.start(
3412            |config, context, oracle, mut peers, _, coding_config| async move {
3413                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3414                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3415                let commitment = coded_block.commitment();
3416                let round = Round::new(Epoch::zero(), View::new(1));
3417
3418                let leader_idx = 0usize;
3419                let receiver_idx = 3usize;
3420                let leader = peers[leader_idx].public_key.clone();
3421                let receiver = peers[receiver_idx].public_key.clone();
3422
3423                peers[receiver_idx].mailbox.notarized(commitment, round);
3424                let assigned = peers[receiver_idx]
3425                    .mailbox
3426                    .subscribe_assigned_shard_verified(commitment);
3427
3428                let leader_shard = coded_block
3429                    .shard(peers[receiver_idx].index.get() as u16)
3430                    .expect("missing receiver shard")
3431                    .encode();
3432                peers[leader_idx]
3433                    .sender
3434                    .send(Recipients::One(receiver), leader_shard, true);
3435
3436                context.sleep(config.link.latency * 2).await;
3437                peers[receiver_idx]
3438                    .mailbox
3439                    .discovered(commitment, leader, round);
3440
3441                assigned
3442                    .await
3443                    .expect("assigned shard should resolve after leader discovery");
3444                assert!(
3445                    oracle.blocked().await.unwrap().is_empty(),
3446                    "valid leader shard should not block peers"
3447                );
3448            },
3449        );
3450    }
3451
3452    #[test_traced]
3453    fn test_post_leader_shards_processed_immediately() {
3454        // Test that shards arriving after leader announcement are processed
3455        // without waiting for any extra trigger.
3456        let fixture: Fixture<C> = Fixture {
3457            num_peers: 10,
3458            ..Default::default()
3459        };
3460
3461        fixture.start(
3462            |config, context, oracle, mut peers, _, coding_config| async move {
3463                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3464                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3465                let commitment = coded_block.commitment();
3466
3467                let receiver_idx = 3usize;
3468                let receiver_pk = peers[receiver_idx].public_key.clone();
3469                let leader = peers[0].public_key.clone();
3470
3471                let shard_sub = peers[receiver_idx]
3472                    .mailbox
3473                    .subscribe_assigned_shard_verified(commitment);
3474                peers[receiver_idx].mailbox.discovered(
3475                    commitment,
3476                    leader.clone(),
3477                    Round::new(Epoch::zero(), View::new(1)),
3478                );
3479
3480                // Send leader's shard (for receiver's index) after leader is known.
3481                let leader_shard = coded_block
3482                    .shard(peers[receiver_idx].index.get() as u16)
3483                    .expect("missing shard")
3484                    .encode();
3485                peers[0]
3486                    .sender
3487                    .send(Recipients::One(receiver_pk.clone()), leader_shard, true);
3488
3489                // Subscription should resolve from the leader's shard.
3490                select! {
3491                    _ = shard_sub => {},
3492                    _ = context.sleep(Duration::from_secs(5)) => {
3493                        panic!("shard subscription did not resolve after post-leader shard");
3494                    },
3495                }
3496
3497                // Send enough shards after leader known to reconstruct.
3498                for i in [1usize, 2usize, 4usize] {
3499                    let shard = coded_block
3500                        .shard(peers[i].index.get() as u16)
3501                        .expect("missing shard")
3502                        .encode();
3503                    peers[i]
3504                        .sender
3505                        .send(Recipients::One(receiver_pk.clone()), shard, true);
3506                }
3507
3508                context.sleep(config.link.latency * 2).await;
3509                let reconstructed = peers[receiver_idx]
3510                    .mailbox
3511                    .get(commitment)
3512                    .await
3513                    .expect("block should reconstruct from post-leader shards");
3514                assert_eq!(reconstructed.commitment(), commitment);
3515
3516                assert!(
3517                    oracle.blocked().await.unwrap().is_empty(),
3518                    "no peers should be blocked for valid post-leader shards"
3519                );
3520            },
3521        );
3522    }
3523
3524    #[test_traced]
3525    fn test_invalid_shard_codec_blocks_peer() {
3526        // Test that receiving an invalid shard (codec failure) blocks the sender.
3527        let fixture: Fixture<C> = Fixture {
3528            num_peers: 4,
3529            ..Default::default()
3530        };
3531
3532        fixture.start(
3533            |config, context, oracle, mut peers, _, _coding_config| async move {
3534                let peer0_pk = peers[0].public_key.clone();
3535                let peer1_pk = peers[1].public_key.clone();
3536
3537                // Send garbage bytes that will fail codec decoding.
3538                let garbage = Bytes::from(vec![0xFF, 0xFE, 0xFD, 0xFC, 0xFB]);
3539                peers[1]
3540                    .sender
3541                    .send(Recipients::One(peer0_pk.clone()), garbage, true);
3542
3543                context.sleep(config.link.latency * 2).await;
3544
3545                // Peer 1 should be blocked by peer 0 for sending invalid shard.
3546                assert_blocked(&oracle, &peer0_pk, &peer1_pk).await;
3547            },
3548        );
3549    }
3550
3551    #[test_traced]
3552    fn test_duplicate_buffered_shard_does_not_block_before_leader() {
3553        // Test that duplicate shards before leader announcement are
3554        // buffered and do not immediately block the sender.
3555        let fixture: Fixture<C> = Fixture {
3556            ..Default::default()
3557        };
3558
3559        fixture.start(
3560            |config, context, oracle, mut peers, _, coding_config| async move {
3561                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3562                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3563
3564                // Get peer 2's shard.
3565                let peer2_index = peers[2].index.get() as u16;
3566                let peer2_shard = coded_block.shard(peer2_index).expect("missing shard");
3567                let shard_bytes = peer2_shard.encode();
3568
3569                let peer2_pk = peers[2].public_key.clone();
3570
3571                // Do NOT set a leader — shards should be buffered.
3572
3573                // Peer 1 sends the shard to peer 2 (buffered, leader unknown).
3574                peers[1]
3575                    .sender
3576                    .send(Recipients::One(peer2_pk.clone()), shard_bytes.clone(), true);
3577                context.sleep(config.link.latency * 2).await;
3578
3579                // No one should be blocked yet.
3580                let blocked = oracle.blocked().await.unwrap();
3581                assert!(blocked.is_empty(), "no peers should be blocked yet");
3582
3583                // Peer 1 sends the same shard AGAIN (duplicate while leader unknown).
3584                peers[1]
3585                    .sender
3586                    .send(Recipients::One(peer2_pk), shard_bytes, true);
3587                context.sleep(config.link.latency * 2).await;
3588
3589                // Still no blocking before a leader is known.
3590                let blocked = oracle.blocked().await.unwrap();
3591                assert!(
3592                    blocked.is_empty(),
3593                    "no peers should be blocked before leader"
3594                );
3595            },
3596        );
3597    }
3598
3599    #[test_traced]
3600    fn test_invalid_leader_shard_crypto_blocks_leader() {
3601        // Test that a leader shard failing cryptographic verification
3602        // results in the leader being blocked.
3603        let fixture: Fixture<C> = Fixture {
3604            ..Default::default()
3605        };
3606
3607        fixture.start(
3608            |config, context, oracle, mut peers, _, coding_config| async move {
3609                // Create two different blocks — shard from block2 won't verify
3610                // against commitment from block1.
3611                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3612                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3613                let commitment1 = coded_block1.commitment();
3614
3615                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3616                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3617
3618                // Get peer 2's shard from block2, but re-wrap it with
3619                // block1's commitment so it fails verification.
3620                let peer2_index = peers[2].index.get() as u16;
3621                let mut wrong_shard = coded_block2.shard(peer2_index).expect("missing shard");
3622                wrong_shard.commitment = commitment1;
3623                let wrong_bytes = wrong_shard.encode();
3624
3625                let peer2_pk = peers[2].public_key.clone();
3626                let leader = peers[0].public_key.clone();
3627
3628                // Inform peer 2 that peer 0 is the leader.
3629                peers[2].mailbox.discovered(
3630                    commitment1,
3631                    leader,
3632                    Round::new(Epoch::zero(), View::new(1)),
3633                );
3634
3635                // Leader (peer 0) sends the invalid shard.
3636                peers[0]
3637                    .sender
3638                    .send(Recipients::One(peer2_pk), wrong_bytes, true);
3639                context.sleep(config.link.latency * 2).await;
3640
3641                // Peer 0 (leader) should be blocked for invalid crypto.
3642                assert_blocked(&oracle, &peers[2].public_key, &peers[0].public_key).await;
3643            },
3644        );
3645    }
3646
3647    #[test_traced]
3648    fn test_shard_index_mismatch_blocks_peer() {
3649        // Test that a shard whose shard index doesn't match the sender's
3650        // participant index results in blocking the sender.
3651        let fixture: Fixture<C> = Fixture {
3652            num_peers: 10,
3653            ..Default::default()
3654        };
3655
3656        fixture.start(
3657            |config, context, oracle, mut peers, _, coding_config| async move {
3658                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3659                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
3660                let commitment = coded_block.commitment();
3661
3662                // Get peer 3's leader shard so peer 3 can validate shards.
3663                let peer3_index = peers[3].index.get() as u16;
3664                let leader_shard = coded_block.shard(peer3_index).expect("missing shard");
3665
3666                // Get peer 1's valid shard, then change the index to peer 4's index.
3667                let peer1_index = peers[1].index.get() as u16;
3668                let mut wrong_index_shard = coded_block.shard(peer1_index).expect("missing shard");
3669                // Mutate the index so it doesn't match sender (peer 1).
3670                wrong_index_shard.index = peers[4].index.get() as u16;
3671                let wrong_bytes = wrong_index_shard.encode();
3672
3673                let peer3_pk = peers[3].public_key.clone();
3674                let leader = peers[0].public_key.clone();
3675
3676                // Inform peer 3 of the leader and send them the leader shard.
3677                peers[3].mailbox.discovered(
3678                    commitment,
3679                    leader,
3680                    Round::new(Epoch::zero(), View::new(1)),
3681                );
3682                let shard_bytes = leader_shard.encode();
3683                peers[0]
3684                    .sender
3685                    .send(Recipients::One(peer3_pk.clone()), shard_bytes, true);
3686                context.sleep(config.link.latency * 2).await;
3687
3688                // Peer 1 sends a shard with a mismatched index to peer 3.
3689                peers[1]
3690                    .sender
3691                    .send(Recipients::One(peer3_pk), wrong_bytes, true);
3692                context.sleep(config.link.latency * 2).await;
3693
3694                // Peer 1 should be blocked for shard index mismatch.
3695                assert_blocked(&oracle, &peers[3].public_key, &peers[1].public_key).await;
3696            },
3697        );
3698    }
3699
3700    #[test_traced]
3701    fn test_invalid_shard_crypto_blocks_peer() {
3702        // Test that a shard failing cryptographic verification
3703        // results in blocking the sender once batch validation fires at quorum.
3704        let fixture: Fixture<C> = Fixture {
3705            num_peers: 10,
3706            ..Default::default()
3707        };
3708
3709        fixture.start(
3710            |config, context, oracle, mut peers, _, coding_config| async move {
3711                // Create two different blocks.
3712                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3713                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3714                let commitment1 = coded_block1.commitment();
3715
3716                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3717                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3718
3719                // Get peer 3's leader shard from block1 (valid).
3720                let peer3_index = peers[3].index.get() as u16;
3721                let leader_shard = coded_block1.shard(peer3_index).expect("missing shard");
3722
3723                // Get peer 1's shard from block2, but re-wrap with block1's
3724                // commitment so verification fails.
3725                let peer1_index = peers[1].index.get() as u16;
3726                let mut wrong_shard = coded_block2.shard(peer1_index).expect("missing shard");
3727                wrong_shard.commitment = commitment1;
3728                let wrong_bytes = wrong_shard.encode();
3729
3730                let peer3_pk = peers[3].public_key.clone();
3731                let leader = peers[0].public_key.clone();
3732
3733                // Inform peer 3 of the leader and send the valid leader shard.
3734                peers[3].mailbox.discovered(
3735                    commitment1,
3736                    leader,
3737                    Round::new(Epoch::zero(), View::new(1)),
3738                );
3739                let shard_bytes = leader_shard.encode();
3740                peers[0]
3741                    .sender
3742                    .send(Recipients::One(peer3_pk.clone()), shard_bytes, true);
3743                context.sleep(config.link.latency * 2).await;
3744
3745                // Peer 1 sends the invalid shard.
3746                peers[1]
3747                    .sender
3748                    .send(Recipients::One(peer3_pk.clone()), wrong_bytes, true);
3749                context.sleep(config.link.latency * 2).await;
3750
3751                // No block yet: batch validation deferred until quorum.
3752                // Send valid shards from peers 2 and 4 to reach quorum
3753                // (minimum_shards = 4: 1 leader + 3 pending).
3754                for &idx in &[2, 4] {
3755                    let peer_index = peers[idx].index.get() as u16;
3756                    let shard = coded_block1.shard(peer_index).expect("missing shard");
3757                    let bytes = shard.encode();
3758                    peers[idx]
3759                        .sender
3760                        .send(Recipients::One(peer3_pk.clone()), bytes, true);
3761                }
3762                context.sleep(config.link.latency * 2).await;
3763
3764                // Peer 1 should be blocked for invalid shard crypto.
3765                assert_blocked(&oracle, &peers[3].public_key, &peers[1].public_key).await;
3766            },
3767        );
3768    }
3769
3770    #[test_traced]
3771    fn test_reconstruction_recovers_after_quorum_with_one_invalid_shard() {
3772        // With 10 peers, minimum_shards=4.
3773        // Contribute exactly 4 shards first (1 leader + 3 pending), with one invalid:
3774        // quorum is reached, but checked_shards stays at 3 after batch validation.
3775        // Then send one more valid shard to meet reconstruction threshold.
3776        let fixture: Fixture<C> = Fixture {
3777            num_peers: 10,
3778            ..Default::default()
3779        };
3780
3781        fixture.start(
3782            |config, context, oracle, mut peers, _, coding_config| async move {
3783                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3784                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3785                let commitment1 = coded_block1.commitment();
3786
3787                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3788                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3789
3790                let receiver_idx = 3usize;
3791                let receiver_pk = peers[receiver_idx].public_key.clone();
3792
3793                // Prepare one invalid shard: shard data from block2, commitment from block1.
3794                let peer1_index = peers[1].index.get() as u16;
3795                let mut invalid_shard = coded_block2.shard(peer1_index).expect("missing shard");
3796                invalid_shard.commitment = commitment1;
3797
3798                // Announce leader and deliver receiver's leader shard.
3799                let leader = peers[0].public_key.clone();
3800                peers[receiver_idx].mailbox.discovered(
3801                    commitment1,
3802                    leader,
3803                    Round::new(Epoch::zero(), View::new(1)),
3804                );
3805                let leader_shard = coded_block1
3806                    .shard(peers[receiver_idx].index.get() as u16)
3807                    .expect("missing shard")
3808                    .encode();
3809                peers[0]
3810                    .sender
3811                    .send(Recipients::One(receiver_pk.clone()), leader_shard, true);
3812
3813                // Contribute exactly minimum_shards total:
3814                // - invalid shard from peer1
3815                // - valid shard from peer2
3816                // - valid shard from peer4
3817                peers[1].sender.send(
3818                    Recipients::One(receiver_pk.clone()),
3819                    invalid_shard.encode(),
3820                    true,
3821                );
3822                for idx in [2usize, 4usize] {
3823                    let shard = coded_block1
3824                        .shard(peers[idx].index.get() as u16)
3825                        .expect("missing shard")
3826                        .encode();
3827                    peers[idx]
3828                        .sender
3829                        .send(Recipients::One(receiver_pk.clone()), shard, true);
3830                }
3831
3832                context.sleep(config.link.latency * 2).await;
3833
3834                // Invalid shard should be blocked, and reconstruction should not happen yet.
3835                assert_blocked(
3836                    &oracle,
3837                    &peers[receiver_idx].public_key,
3838                    &peers[1].public_key,
3839                )
3840                .await;
3841                assert!(
3842                    peers[receiver_idx].mailbox.get(commitment1).await.is_none(),
3843                    "block should not reconstruct with only 3 checked shards"
3844                );
3845
3846                // Send one additional valid shard; this should now satisfy checked threshold.
3847                let extra_shard = coded_block1
3848                    .shard(peers[5].index.get() as u16)
3849                    .expect("missing shard")
3850                    .encode();
3851                peers[5]
3852                    .sender
3853                    .send(Recipients::One(receiver_pk), extra_shard, true);
3854
3855                context.sleep(config.link.latency * 2).await;
3856
3857                let reconstructed = peers[receiver_idx]
3858                    .mailbox
3859                    .get(commitment1)
3860                    .await
3861                    .expect("block should reconstruct after additional valid shard");
3862                assert_eq!(reconstructed.commitment(), commitment1);
3863            },
3864        );
3865    }
3866
3867    #[test_traced]
3868    fn test_invalid_pending_shard_blocked_on_drain() {
3869        // Test that a shard buffered in pending shards (before checking data) is
3870        // blocked when batch validation runs at quorum and verification fails.
3871        let fixture: Fixture<C> = Fixture {
3872            num_peers: 10,
3873            ..Default::default()
3874        };
3875
3876        fixture.start(
3877            |config, context, oracle, mut peers, _, coding_config| async move {
3878                // Create two different blocks.
3879                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
3880                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
3881                let commitment1 = coded_block1.commitment();
3882
3883                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
3884                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
3885
3886                // Get peer 1's shard from block2, but wrap with block1's commitment.
3887                let peer1_index = peers[1].index.get() as u16;
3888                let mut wrong_shard = coded_block2.shard(peer1_index).expect("missing shard");
3889                wrong_shard.commitment = commitment1;
3890                let wrong_bytes = wrong_shard.encode();
3891
3892                let peer3_pk = peers[3].public_key.clone();
3893
3894                // Send the invalid shard BEFORE the leader shard (no checking data yet,
3895                // so it gets buffered in pending shards).
3896                peers[1]
3897                    .sender
3898                    .send(Recipients::One(peer3_pk.clone()), wrong_bytes, true);
3899                context.sleep(config.link.latency * 2).await;
3900
3901                // No one should be blocked yet (shard is buffered).
3902                let blocked = oracle.blocked().await.unwrap();
3903                assert!(blocked.is_empty(), "no peers should be blocked yet");
3904
3905                // Send valid shards from peers 2 and 4 so the pending count
3906                // reaches quorum once the leader shard arrives
3907                // (minimum_shards = 4: 1 leader + 3 pending).
3908                for &idx in &[2, 4] {
3909                    let peer_index = peers[idx].index.get() as u16;
3910                    let shard = coded_block1.shard(peer_index).expect("missing shard");
3911                    let bytes = shard.encode();
3912                    peers[idx]
3913                        .sender
3914                        .send(Recipients::One(peer3_pk.clone()), bytes, true);
3915                }
3916                context.sleep(config.link.latency * 2).await;
3917
3918                // No one should be blocked yet (all shards are buffered pending leader).
3919                let blocked = oracle.blocked().await.unwrap();
3920                assert!(blocked.is_empty(), "no peers should be blocked yet");
3921
3922                // Now inform peer 3 of the leader and send the valid leader shard.
3923                let leader = peers[0].public_key.clone();
3924                peers[3].mailbox.discovered(
3925                    commitment1,
3926                    leader,
3927                    Round::new(Epoch::zero(), View::new(1)),
3928                );
3929                let peer3_index = peers[3].index.get() as u16;
3930                let leader_shard = coded_block1.shard(peer3_index).expect("missing shard");
3931                let shard_bytes = leader_shard.encode();
3932                peers[0]
3933                    .sender
3934                    .send(Recipients::One(peer3_pk), shard_bytes, true);
3935                context.sleep(config.link.latency * 2).await;
3936
3937                // Peer 1 should be blocked after batch validation validates and
3938                // rejects their invalid shard.
3939                assert_blocked(&oracle, &peers[3].public_key, &peers[1].public_key).await;
3940            },
3941        );
3942    }
3943
3944    #[test_traced]
3945    fn test_cross_epoch_buffered_shard_not_blocked() {
3946        let executor = deterministic::Runner::default();
3947        executor.start(|context| async move {
3948            let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
3949                context.child("network"),
3950                simulated::Config {
3951                    max_size: MAX_SHARD_SIZE as u32,
3952                    disconnect_on_block: true,
3953                    tracked_peer_sets: NZUsize!(1),
3954                },
3955            );
3956            network.start();
3957
3958            // Epoch 0 participants: peers 0..4 (seeds 0..4).
3959            // Epoch 1 participants: peers 0..3 + peer 4 (seed 4 replaces seed 3).
3960            let mut epoch0_keys: Vec<PrivateKey> = (0..4).map(PrivateKey::from_seed).collect();
3961            epoch0_keys.sort_by_key(|s| s.public_key());
3962            let epoch0_pks: Vec<P> = epoch0_keys.iter().map(|c| c.public_key()).collect();
3963            let epoch0_set: Set<P> = Set::from_iter_dedup(epoch0_pks.clone());
3964
3965            let future_peer_key = PrivateKey::from_seed(4);
3966            let future_peer_pk = future_peer_key.public_key();
3967            let mut epoch1_pks: Vec<P> = epoch0_pks[..3]
3968                .iter()
3969                .cloned()
3970                .chain(std::iter::once(future_peer_pk.clone()))
3971                .collect();
3972            epoch1_pks.sort();
3973            let epoch1_set: Set<P> = Set::from_iter_dedup(epoch1_pks);
3974
3975            let receiver_idx_in_epoch0 = epoch0_set
3976                .index(&epoch0_pks[0])
3977                .expect("receiver must be in epoch 0")
3978                .get() as usize;
3979            let receiver_key = epoch0_keys[receiver_idx_in_epoch0].clone();
3980            let receiver_pk = receiver_key.public_key();
3981
3982            let receiver_control = oracle.control(receiver_pk.clone());
3983            let (sender_handle, receiver_handle) = receiver_control
3984                .register(0, TEST_QUOTA)
3985                .await
3986                .expect("registration should succeed");
3987
3988            let future_peer_control = oracle.control(future_peer_pk.clone());
3989            let (mut future_peer_sender, _future_peer_receiver) = future_peer_control
3990                .register(0, TEST_QUOTA)
3991                .await
3992                .expect("registration should succeed");
3993            oracle
3994                .add_link(future_peer_pk.clone(), receiver_pk.clone(), DEFAULT_LINK)
3995                .await
3996                .expect("link should be added");
3997            oracle.manager().track(
3998                0,
3999                Set::from_iter_dedup([receiver_pk.clone(), future_peer_pk.clone()]),
4000            );
4001            context.sleep(Duration::from_millis(10)).await;
4002
4003            // Set up the receiver's engine with a multi-epoch provider.
4004            let scheme_epoch0 =
4005                Scheme::signer(SCHEME_NAMESPACE, epoch0_set.clone(), receiver_key.clone())
4006                    .expect("signer scheme should be created");
4007            let scheme_epoch1 =
4008                Scheme::signer(SCHEME_NAMESPACE, epoch1_set.clone(), receiver_key.clone())
4009                    .expect("signer scheme should be created");
4010            let scheme_provider =
4011                MultiEpochProvider::single(scheme_epoch0).with_epoch(Epoch::new(1), scheme_epoch1);
4012
4013            let config: Config<_, _, _, _, C, _, _, _> = Config {
4014                scheme_provider,
4015                blocker: receiver_control.clone(),
4016                shard_codec_cfg: CodecConfig {
4017                    maximum_shard_size: MAX_SHARD_SIZE,
4018                },
4019                block_codec_cfg: (),
4020                strategy: STRATEGY,
4021                mailbox_size: NZUsize!(1024),
4022                peer_buffer_size: NZUsize!(64),
4023                background_channel_capacity: NZUsize!(1024),
4024                peer_provider: oracle.manager(),
4025            };
4026
4027            let (engine, mailbox) = ShardEngine::new(context.child("receiver"), config);
4028            engine.start((sender_handle, receiver_handle));
4029
4030            // Build a coded block using epoch 1's participant set.
4031            let coding_config = coding_config_for_participants(epoch1_set.len() as u16);
4032            let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4033            let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4034            let commitment = coded_block.commitment();
4035
4036            // The future peer creates a shard at their epoch 1 index.
4037            let future_peer_index = epoch1_set
4038                .index(&future_peer_pk)
4039                .expect("future peer must be in epoch 1");
4040            let future_shard = coded_block
4041                .shard(future_peer_index.get() as u16)
4042                .expect("missing shard");
4043            let shard_bytes = future_shard.encode();
4044
4045            // Send the shard BEFORE external_proposed (goes to pre-leader buffer).
4046            future_peer_sender.send(Recipients::One(receiver_pk.clone()), shard_bytes, true);
4047            context.sleep(DEFAULT_LINK.latency * 2).await;
4048
4049            // No one should be blocked yet (shard is buffered, leader unknown).
4050            let blocked = oracle.blocked().await.unwrap();
4051            assert!(
4052                blocked.is_empty(),
4053                "no peers should be blocked while shard is buffered"
4054            );
4055
4056            // Announce the leader with an epoch 1 round.
4057            let leader = epoch0_pks[1].clone();
4058            mailbox.discovered(commitment, leader, Round::new(Epoch::new(1), View::new(1)));
4059            context.sleep(DEFAULT_LINK.latency * 2).await;
4060
4061            // The future peer is a valid participant in epoch 1, so they must NOT
4062            // be blocked after their buffered shard is ingested.
4063            let blocked = oracle.blocked().await.unwrap();
4064            assert!(
4065                blocked.is_empty(),
4066                "future-epoch participant should not be blocked: {blocked:?}"
4067            );
4068        });
4069    }
4070
4071    #[test_traced]
4072    fn test_shard_broadcast_survives_provider_churn() {
4073        let executor = deterministic::Runner::default();
4074        executor.start(|context| async move {
4075            let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
4076                context.child("network"),
4077                simulated::Config {
4078                    max_size: MAX_SHARD_SIZE as u32,
4079                    disconnect_on_block: true,
4080                    tracked_peer_sets: NZUsize!(1),
4081                },
4082            );
4083            network.start();
4084
4085            let mut private_keys: Vec<PrivateKey> = (0..4).map(PrivateKey::from_seed).collect();
4086            private_keys.sort_by_key(|s| s.public_key());
4087            let peer_keys: Vec<P> = private_keys.iter().map(|k| k.public_key()).collect();
4088            let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
4089
4090            let leader_idx = 0usize;
4091            let broadcaster_idx = 1usize;
4092            let receiver_idx = 2usize;
4093
4094            let leader_pk = peer_keys[leader_idx].clone();
4095            let broadcaster_pk = peer_keys[broadcaster_idx].clone();
4096            let receiver_pk = peer_keys[receiver_idx].clone();
4097
4098            let mut registrations = BTreeMap::new();
4099            for key in &peer_keys {
4100                let control = oracle.control(key.clone());
4101                let (sender, receiver) = control
4102                    .register(0, TEST_QUOTA)
4103                    .await
4104                    .expect("registration should succeed");
4105                registrations.insert(key.clone(), (control, sender, receiver));
4106            }
4107
4108            for src in &peer_keys {
4109                for dst in &peer_keys {
4110                    if src == dst {
4111                        continue;
4112                    }
4113                    oracle
4114                        .add_link(src.clone(), dst.clone(), DEFAULT_LINK)
4115                        .await
4116                        .expect("link should be added");
4117                }
4118            }
4119            oracle.manager().track(0, participants.clone());
4120            context.sleep(Duration::from_millis(10)).await;
4121
4122            let (_leader_control, mut leader_sender, _leader_receiver) = registrations
4123                .remove(&leader_pk)
4124                .expect("leader should be registered");
4125            let (broadcaster_control, broadcaster_sender, broadcaster_receiver) = registrations
4126                .remove(&broadcaster_pk)
4127                .expect("broadcaster should be registered");
4128            let (receiver_control, receiver_sender, receiver_receiver) = registrations
4129                .remove(&receiver_pk)
4130                .expect("receiver should be registered");
4131
4132            let broadcaster_scheme = Scheme::signer(
4133                SCHEME_NAMESPACE,
4134                participants.clone(),
4135                private_keys[broadcaster_idx].clone(),
4136            )
4137            .expect("signer scheme should be created");
4138            // `discovered` performs two scoped lookups (`handle_external_proposal`
4139            // and `ingest_buffered_shards`). Leader-shard validation is the third.
4140            // Any additional lookup for epoch 0 churns to `None`.
4141            let broadcaster_provider = ChurningProvider::new(broadcaster_scheme, 3);
4142            let broadcaster_config: Config<_, _, _, _, C, _, _, _> = Config {
4143                scheme_provider: broadcaster_provider,
4144                blocker: broadcaster_control.clone(),
4145                shard_codec_cfg: CodecConfig {
4146                    maximum_shard_size: MAX_SHARD_SIZE,
4147                },
4148                block_codec_cfg: (),
4149                strategy: STRATEGY,
4150                mailbox_size: NZUsize!(1024),
4151                peer_buffer_size: NZUsize!(64),
4152                background_channel_capacity: NZUsize!(1024),
4153                peer_provider: oracle.manager(),
4154            };
4155            let (broadcaster_engine, broadcaster_mailbox) =
4156                ChurningShardEngine::new(context.child("broadcaster"), broadcaster_config);
4157            broadcaster_engine.start((broadcaster_sender, broadcaster_receiver));
4158
4159            let receiver_scheme = Scheme::signer(
4160                SCHEME_NAMESPACE,
4161                participants.clone(),
4162                private_keys[receiver_idx].clone(),
4163            )
4164            .expect("signer scheme should be created");
4165            let receiver_config: Config<_, _, _, _, C, _, _, _> = Config {
4166                scheme_provider: MultiEpochProvider::single(receiver_scheme),
4167                blocker: receiver_control.clone(),
4168                shard_codec_cfg: CodecConfig {
4169                    maximum_shard_size: MAX_SHARD_SIZE,
4170                },
4171                block_codec_cfg: (),
4172                strategy: STRATEGY,
4173                mailbox_size: NZUsize!(1024),
4174                peer_buffer_size: NZUsize!(64),
4175                background_channel_capacity: NZUsize!(1024),
4176                peer_provider: oracle.manager(),
4177            };
4178            let (receiver_engine, receiver_mailbox) =
4179                ShardEngine::new(context.child("receiver"), receiver_config);
4180            receiver_engine.start((receiver_sender, receiver_receiver));
4181
4182            let coding_config = coding_config_for_participants(peer_keys.len() as u16);
4183            let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4184            let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4185            let commitment = coded_block.commitment();
4186            let round = Round::new(Epoch::zero(), View::new(1));
4187
4188            broadcaster_mailbox.discovered(commitment, leader_pk.clone(), round);
4189            receiver_mailbox.discovered(commitment, leader_pk.clone(), round);
4190            context.sleep(DEFAULT_LINK.latency).await;
4191
4192            let broadcaster_index = participants
4193                .index(&broadcaster_pk)
4194                .expect("broadcaster must be a participant")
4195                .get() as u16;
4196            let broadcaster_shard = coded_block
4197                .shard(broadcaster_index)
4198                .expect("missing shard")
4199                .encode();
4200            leader_sender.send(Recipients::One(broadcaster_pk), broadcaster_shard, true);
4201
4202            let receiver_index = participants
4203                .index(&receiver_pk)
4204                .expect("receiver must be a participant")
4205                .get() as u16;
4206            let receiver_shard = coded_block
4207                .shard(receiver_index)
4208                .expect("missing shard")
4209                .encode();
4210            leader_sender.send(Recipients::One(receiver_pk.clone()), receiver_shard, true);
4211
4212            context.sleep(DEFAULT_LINK.latency * 3).await;
4213
4214            let reconstructed = receiver_mailbox.get(commitment).await;
4215            assert!(
4216                reconstructed.is_some(),
4217                "receiver should reconstruct after broadcaster validates and broadcasts shard"
4218            );
4219        });
4220    }
4221
4222    #[test_traced]
4223    fn test_failed_reconstruction_digest_mismatch_then_recovery() {
4224        // Byzantine scenario: all shards pass coding verification (correct root) but the
4225        // decoded blob has a different digest than what the commitment claims. This triggers
4226        // Error::DigestMismatch in try_reconstruct. Verify that:
4227        //   1. The failed commitment's state is cleaned up
4228        //   2. Subscriptions for the failed commitment never resolve
4229        //   3. A subsequent valid commitment reconstructs successfully
4230        let fixture: Fixture<C> = Fixture {
4231            num_peers: 10,
4232            ..Default::default()
4233        };
4234
4235        fixture.start(
4236            |config, context, _oracle, mut peers, _, coding_config| async move {
4237                // Block 1: the "claimed" block (its digest goes in the fake commitment).
4238                let inner1 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4239                let coded_block1 = CodedBlock::<B, C, H>::new(inner1, coding_config, &STRATEGY);
4240
4241                // Block 2: the actual data behind the shards.
4242                let inner2 = B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200);
4243                let coded_block2 = CodedBlock::<B, C, H>::new(inner2, coding_config, &STRATEGY);
4244                let real_commitment2 = coded_block2.commitment();
4245
4246                // Build a fake commitment: block1's digest + block2's coding root/context/config.
4247                // Shards from block2 will verify against block2's root (present in the fake
4248                // commitment), but try_reconstruct will decode block2 and find its digest != D1.
4249                let fake_commitment = Commitment::from((
4250                    coded_block1.digest(),
4251                    real_commitment2.root::<Sha256Digest>(),
4252                    real_commitment2.context::<Sha256Digest>(),
4253                    coding_config,
4254                ));
4255
4256                let receiver_idx = 3usize;
4257                let receiver_pk = peers[receiver_idx].public_key.clone();
4258                let leader = peers[0].public_key.clone();
4259                let round = Round::new(Epoch::zero(), View::new(1));
4260
4261                // Discover the fake commitment.
4262                peers[receiver_idx]
4263                    .mailbox
4264                    .discovered(fake_commitment, leader.clone(), round);
4265
4266                // Open a block subscription before sending shards.
4267                let mut block_sub = peers[receiver_idx].mailbox.subscribe(fake_commitment);
4268                let mut digest_sub = peers[receiver_idx]
4269                    .mailbox
4270                    .subscribe_by_digest(coded_block1.digest());
4271
4272                // Send the receiver's shard (from block2, with fake commitment).
4273                let receiver_shard_idx = peers[receiver_idx].index.get() as u16;
4274                let mut leader_shard = coded_block2
4275                    .shard(receiver_shard_idx)
4276                    .expect("missing shard");
4277                leader_shard.commitment = fake_commitment;
4278                peers[0].sender.send(
4279                    Recipients::One(receiver_pk.clone()),
4280                    leader_shard.encode(),
4281                    true,
4282                );
4283
4284                // Send enough shards to reach minimum_shards (4 for 10 peers).
4285                // Need 3 more shards after the leader's shard.
4286                for &idx in &[1usize, 2, 4] {
4287                    let peer_shard_idx = peers[idx].index.get() as u16;
4288                    let mut shard = coded_block2.shard(peer_shard_idx).expect("missing shard");
4289                    shard.commitment = fake_commitment;
4290                    peers[idx].sender.send(
4291                        Recipients::One(receiver_pk.clone()),
4292                        shard.encode(),
4293                        true,
4294                    );
4295                }
4296
4297                context.sleep(config.link.latency * 2).await;
4298
4299                // Reconstruction should have failed with DigestMismatch.
4300                // State for fake_commitment should be removed (engine.rs:792).
4301                assert!(
4302                    peers[receiver_idx]
4303                        .mailbox
4304                        .get(fake_commitment)
4305                        .await
4306                        .is_none(),
4307                    "block should not be available after DigestMismatch"
4308                );
4309
4310                // Block subscription should be closed after failed reconstruction cleanup.
4311                assert!(
4312                    matches!(block_sub.try_recv(), Err(TryRecvError::Closed)),
4313                    "subscription should close for failed reconstruction"
4314                );
4315                assert!(
4316                    matches!(digest_sub.try_recv(), Err(TryRecvError::Closed)),
4317                    "digest subscription should close after failed reconstruction"
4318                );
4319
4320                // Now verify the engine is not stuck: send valid shards for block1's real
4321                // commitment and confirm reconstruction succeeds.
4322                let real_commitment1 = coded_block1.commitment();
4323                let round2 = Round::new(Epoch::zero(), View::new(2));
4324                peers[receiver_idx]
4325                    .mailbox
4326                    .discovered(real_commitment1, leader.clone(), round2);
4327
4328                let leader_shard1 = coded_block1
4329                    .shard(receiver_shard_idx)
4330                    .expect("missing shard");
4331                peers[0].sender.send(
4332                    Recipients::One(receiver_pk.clone()),
4333                    leader_shard1.encode(),
4334                    true,
4335                );
4336
4337                for &idx in &[1usize, 2, 4] {
4338                    let peer_shard_idx = peers[idx].index.get() as u16;
4339                    let shard = coded_block1.shard(peer_shard_idx).expect("missing shard");
4340                    peers[idx].sender.send(
4341                        Recipients::One(receiver_pk.clone()),
4342                        shard.encode(),
4343                        true,
4344                    );
4345                }
4346
4347                context.sleep(config.link.latency * 2).await;
4348
4349                let reconstructed = peers[receiver_idx]
4350                    .mailbox
4351                    .get(real_commitment1)
4352                    .await
4353                    .expect("valid block should reconstruct after prior failure");
4354                assert_eq!(reconstructed.commitment(), real_commitment1);
4355            },
4356        );
4357    }
4358
4359    #[test_traced]
4360    fn test_failed_reconstruction_context_mismatch_then_recovery() {
4361        // Byzantine scenario: shards decode to a block whose digest and coding root/config
4362        // match the commitment, but the commitment carries a mismatched context digest.
4363        // The engine must reject reconstruction and keep the commitment unresolved.
4364        let fixture: Fixture<C> = Fixture {
4365            num_peers: 10,
4366            ..Default::default()
4367        };
4368
4369        fixture.start(
4370            |config, context, _oracle, mut peers, _, coding_config| async move {
4371                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4372                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4373                let real_commitment = coded_block.commitment();
4374
4375                let wrong_context_digest = Sha256::hash(b"wrong_context");
4376                assert_ne!(
4377                    real_commitment.context::<Sha256Digest>(),
4378                    wrong_context_digest,
4379                    "test requires a distinct context digest"
4380                );
4381                let fake_commitment = Commitment::from((
4382                    coded_block.digest(),
4383                    real_commitment.root::<Sha256Digest>(),
4384                    wrong_context_digest,
4385                    coding_config,
4386                ));
4387
4388                let receiver_idx = 3usize;
4389                let receiver_pk = peers[receiver_idx].public_key.clone();
4390                let leader = peers[0].public_key.clone();
4391                let round = Round::new(Epoch::zero(), View::new(1));
4392
4393                peers[receiver_idx]
4394                    .mailbox
4395                    .discovered(fake_commitment, leader.clone(), round);
4396                let mut block_sub = peers[receiver_idx].mailbox.subscribe(fake_commitment);
4397
4398                let receiver_shard_idx = peers[receiver_idx].index.get() as u16;
4399                let mut leader_shard = coded_block
4400                    .shard(receiver_shard_idx)
4401                    .expect("missing shard");
4402                leader_shard.commitment = fake_commitment;
4403                peers[0].sender.send(
4404                    Recipients::One(receiver_pk.clone()),
4405                    leader_shard.encode(),
4406                    true,
4407                );
4408
4409                for &idx in &[1usize, 2, 4] {
4410                    let peer_shard_idx = peers[idx].index.get() as u16;
4411                    let mut shard = coded_block.shard(peer_shard_idx).expect("missing shard");
4412                    shard.commitment = fake_commitment;
4413                    peers[idx].sender.send(
4414                        Recipients::One(receiver_pk.clone()),
4415                        shard.encode(),
4416                        true,
4417                    );
4418                }
4419
4420                context.sleep(config.link.latency * 2).await;
4421
4422                assert!(
4423                    peers[receiver_idx]
4424                        .mailbox
4425                        .get(fake_commitment)
4426                        .await
4427                        .is_none(),
4428                    "block should not be available after ContextMismatch"
4429                );
4430                assert!(
4431                    matches!(block_sub.try_recv(), Err(TryRecvError::Closed)),
4432                    "subscription should close for context-mismatched commitment"
4433                );
4434
4435                // Verify the receiver still reconstructs valid commitments afterward.
4436                let round2 = Round::new(Epoch::zero(), View::new(2));
4437                peers[receiver_idx]
4438                    .mailbox
4439                    .discovered(real_commitment, leader.clone(), round2);
4440
4441                let real_leader_shard = coded_block
4442                    .shard(receiver_shard_idx)
4443                    .expect("missing shard");
4444                peers[0].sender.send(
4445                    Recipients::One(receiver_pk.clone()),
4446                    real_leader_shard.encode(),
4447                    true,
4448                );
4449
4450                for &idx in &[1usize, 2, 4] {
4451                    let peer_shard_idx = peers[idx].index.get() as u16;
4452                    let shard = coded_block.shard(peer_shard_idx).expect("missing shard");
4453                    peers[idx].sender.send(
4454                        Recipients::One(receiver_pk.clone()),
4455                        shard.encode(),
4456                        true,
4457                    );
4458                }
4459
4460                context.sleep(config.link.latency * 2).await;
4461
4462                let reconstructed = peers[receiver_idx]
4463                    .mailbox
4464                    .get(real_commitment)
4465                    .await
4466                    .expect("valid block should reconstruct after prior context mismatch");
4467                assert_eq!(reconstructed.commitment(), real_commitment);
4468            },
4469        );
4470    }
4471
4472    #[test_traced]
4473    fn test_same_round_equivocation_preserves_certifiable_recovery() {
4474        // Regression coverage for same-round leader equivocation:
4475        // - leader equivocates across two commitments in the same round
4476        // - we receive a shard for commitment B (the certifiable one)
4477        // - commitment A reconstructs first
4478        // - commitment B must still remain recoverable
4479        let fixture: Fixture<C> = Fixture {
4480            num_peers: 10,
4481            ..Default::default()
4482        };
4483
4484        fixture.start(
4485            |config, context, _oracle, mut peers, _, coding_config| async move {
4486                let receiver_idx = 3usize;
4487                let receiver_pk = peers[receiver_idx].public_key.clone();
4488                let receiver_shard_idx = peers[receiver_idx].index.get() as u16;
4489
4490                let leader = peers[0].public_key.clone();
4491                let round = Round::new(Epoch::zero(), View::new(7));
4492
4493                // Two different commitments in the same round (equivocation scenario).
4494                let block_a = CodedBlock::<B, C, H>::new(
4495                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 111),
4496                    coding_config,
4497                    &STRATEGY,
4498                );
4499                let commitment_a = block_a.commitment();
4500                let block_b = CodedBlock::<B, C, H>::new(
4501                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 222),
4502                    coding_config,
4503                    &STRATEGY,
4504                );
4505                let commitment_b = block_b.commitment();
4506
4507                // Receiver learns both commitments in the same round.
4508                peers[receiver_idx]
4509                    .mailbox
4510                    .discovered(commitment_a, leader.clone(), round);
4511                peers[receiver_idx]
4512                    .mailbox
4513                    .discovered(commitment_b, leader.clone(), round);
4514
4515                // Subscribe to the certifiable commitment before any reconstruction.
4516                let certifiable_sub = peers[receiver_idx].mailbox.subscribe(commitment_b);
4517
4518                // We receive our shard for commitment B from the equivocating leader.
4519                let shard_b = block_b
4520                    .shard(receiver_shard_idx)
4521                    .expect("missing shard")
4522                    .encode();
4523                peers[0]
4524                    .sender
4525                    .send(Recipients::One(receiver_pk.clone()), shard_b, true);
4526
4527                // Reconstruct conflicting commitment A first.
4528                let shard_a = block_a
4529                    .shard(receiver_shard_idx)
4530                    .expect("missing shard")
4531                    .encode();
4532                peers[0]
4533                    .sender
4534                    .send(Recipients::One(receiver_pk.clone()), shard_a, true);
4535                for i in [1usize, 2usize, 4usize] {
4536                    let shard_a = block_a
4537                        .shard(peers[i].index.get() as u16)
4538                        .expect("missing shard")
4539                        .encode();
4540                    peers[i]
4541                        .sender
4542                        .send(Recipients::One(receiver_pk.clone()), shard_a, true);
4543                }
4544                context.sleep(config.link.latency * 4).await;
4545                let reconstructed_a = peers[receiver_idx]
4546                    .mailbox
4547                    .get(commitment_a)
4548                    .await
4549                    .expect("conflicting commitment should reconstruct first");
4550                assert_eq!(reconstructed_a.commitment(), commitment_a);
4551
4552                // Commitment B should still be recoverable after A reconstructed.
4553                for i in [1usize, 2usize, 4usize] {
4554                    let shard_b = block_b
4555                        .shard(peers[i].index.get() as u16)
4556                        .expect("missing shard")
4557                        .encode();
4558                    peers[i]
4559                        .sender
4560                        .send(Recipients::One(receiver_pk.clone()), shard_b, true);
4561                }
4562
4563                select! {
4564                    result = certifiable_sub => {
4565                        let reconstructed_b =
4566                            result.expect("certifiable commitment should remain recoverable");
4567                        assert_eq!(reconstructed_b.commitment(), commitment_b);
4568                    },
4569                    _ = context.sleep(Duration::from_secs(5)) => {
4570                        panic!("certifiable commitment was not recoverable after same-round equivocation");
4571                    },
4572                }
4573            },
4574        );
4575    }
4576
4577    #[test_traced]
4578    fn test_leader_unrelated_shard_blocks_peer() {
4579        // Regression test: if the leader sends an unrelated/invalid shard
4580        // (i.e. a shard for a different participant index), the receiver must
4581        // block the leader.
4582        let fixture: Fixture<C> = Fixture {
4583            num_peers: 10,
4584            ..Default::default()
4585        };
4586
4587        fixture.start(
4588            |config, context, oracle, mut peers, _, coding_config| async move {
4589                // Commitment being tracked by the receiver.
4590                let tracked_block = CodedBlock::<B, C, H>::new(
4591                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100),
4592                    coding_config,
4593                    &STRATEGY,
4594                );
4595                let tracked_commitment = tracked_block.commitment();
4596
4597                // Separate block used to source "unrelated" shard data.
4598                let unrelated_block = CodedBlock::<B, C, H>::new(
4599                    B::new::<H>((), Sha256Digest::EMPTY, Height::new(2), 200),
4600                    coding_config,
4601                    &STRATEGY,
4602                );
4603
4604                let receiver_idx = 3usize;
4605                let receiver_pk = peers[receiver_idx].public_key.clone();
4606                let leader_idx = 0usize;
4607                let leader_pk = peers[leader_idx].public_key.clone();
4608
4609                // Receiver tracks the commitment with peer0 as leader.
4610                peers[receiver_idx].mailbox.discovered(
4611                    tracked_commitment,
4612                    leader_pk.clone(),
4613                    Round::new(Epoch::zero(), View::new(1)),
4614                );
4615
4616                // Construct an unrelated shard from peer1's slot and retarget
4617                // its commitment to the tracked commitment so it hits active state.
4618                let mut unrelated_shard = unrelated_block
4619                    .shard(peers[1].index.get() as u16)
4620                    .expect("missing shard");
4621                unrelated_shard.commitment = tracked_commitment;
4622
4623                // Leader sends this unrelated/invalid shard to receiver.
4624                // The shard index no longer matches sender's participant index,
4625                // so leader must be blocked.
4626                peers[leader_idx].sender.send(
4627                    Recipients::One(receiver_pk),
4628                    unrelated_shard.encode(),
4629                    true,
4630                );
4631                context.sleep(config.link.latency * 2).await;
4632
4633                assert_blocked(&oracle, &peers[receiver_idx].public_key, &leader_pk).await;
4634            },
4635        );
4636    }
4637
4638    #[test_traced]
4639    fn test_withholding_leader_victim_reconstructs_via_gossip() {
4640        // A Byzantine leader withholds the shard destined for one participant.
4641        // That participant should still reconstruct the block from shards
4642        // gossiped by other participants (sent via Recipients::All) without
4643        // any backfill mechanism.
4644        let fixture = Fixture {
4645            num_peers: 10,
4646            ..Default::default()
4647        };
4648
4649        fixture.start(
4650            |config, context, oracle, mut peers, _, coding_config| async move {
4651                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4652                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4653                let commitment = coded_block.commitment();
4654                let round = Round::new(Epoch::zero(), View::new(1));
4655
4656                let leader = peers[0].public_key.clone();
4657                let victim = peers[1].public_key.clone();
4658
4659                // Sever the link from leader to victim so the leader's
4660                // direct shard never arrives.
4661                oracle
4662                    .remove_link(leader.clone(), victim.clone())
4663                    .await
4664                    .expect("remove_link should succeed");
4665
4666                // Leader proposes. The victim will not receive a direct shard
4667                // because the link is severed.
4668                peers[0].mailbox.proposed(round, coded_block.clone());
4669
4670                // Inform all non-leader peers of the leader so they validate
4671                // and re-broadcast their shards via Recipients::All.
4672                for peer in peers[1..].iter_mut() {
4673                    peer.mailbox.discovered(commitment, leader.clone(), round);
4674                }
4675                context.sleep(config.link.latency * 2).await;
4676
4677                // The victim should reconstruct via gossiped shards from other
4678                // participants even though the leader withheld.
4679                let block_sub = peers[1].mailbox.subscribe(commitment);
4680                select! {
4681                    result = block_sub => {
4682                        let reconstructed = result.expect("block subscription should resolve");
4683                        assert_eq!(reconstructed.commitment(), commitment);
4684                        assert_eq!(reconstructed.height(), coded_block.height());
4685                    },
4686                    _ = context.sleep(Duration::from_secs(5)) => {
4687                        panic!("victim did not reconstruct block despite withholding leader");
4688                    },
4689                }
4690
4691                // All other participants should also have reconstructed.
4692                for peer in peers[2..].iter_mut() {
4693                    let reconstructed = peer
4694                        .mailbox
4695                        .get(commitment)
4696                        .await
4697                        .expect("block should be reconstructed");
4698                    assert_eq!(reconstructed.commitment(), commitment);
4699                }
4700
4701                // No peer should be blocked — withholding is not detectable.
4702                let blocked = oracle.blocked().await.unwrap();
4703                assert!(
4704                    blocked.is_empty(),
4705                    "no peer should be blocked in withholding leader test"
4706                );
4707            },
4708        );
4709    }
4710
4711    /// When the leader withholds its shard from a participant, the block
4712    /// can still be reconstructed from gossipped shards. However, the shard
4713    /// subscription must NOT resolve because the participant's own shard was
4714    /// never verified. Voting requires own-shard verification to ensure the
4715    /// participant re-broadcasts its shard and helps slower peers reach quorum.
4716    #[test_traced]
4717    fn test_shard_subscription_pending_after_reconstruction_without_leader_shard() {
4718        let fixture = Fixture {
4719            num_peers: 10,
4720            ..Default::default()
4721        };
4722
4723        fixture.start(
4724            |config, context, oracle, mut peers, _, coding_config| async move {
4725                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4726                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4727                let commitment = coded_block.commitment();
4728                let round = Round::new(Epoch::zero(), View::new(1));
4729
4730                let leader = peers[0].public_key.clone();
4731                let victim = peers[1].public_key.clone();
4732
4733                // Remove the link from leader to victim so the leader's shard
4734                // never reaches the victim directly.
4735                oracle
4736                    .remove_link(leader.clone(), victim.clone())
4737                    .await
4738                    .expect("remove_link should succeed");
4739
4740                // Subscribe to the shard and block BEFORE any broadcasting.
4741                let mut shard_sub = peers[1]
4742                    .mailbox
4743                    .subscribe_assigned_shard_verified(commitment);
4744                let block_sub = peers[1].mailbox.subscribe(commitment);
4745
4746                // Leader broadcasts.
4747                peers[0].mailbox.proposed(round, coded_block.clone());
4748
4749                // All non-leader peers discover the leader.
4750                for peer in peers[1..].iter_mut() {
4751                    peer.mailbox.discovered(commitment, leader.clone(), round);
4752                }
4753
4754                // Wait for gossip to propagate.
4755                context.sleep(config.link.latency * 4).await;
4756
4757                // Block subscription should resolve (victim reconstructs from
4758                // gossipped shards).
4759                let reconstructed = block_sub.await.expect("block subscription should resolve");
4760                assert_eq!(reconstructed.commitment(), commitment);
4761
4762                // Shard subscription must NOT resolve because the leader
4763                // never sent the victim its own shard.
4764                assert!(
4765                    matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
4766                    "shard subscription must not resolve without own shard verification"
4767                );
4768            },
4769        );
4770    }
4771
4772    #[test_traced]
4773    fn test_broadcast_routes_participant_and_non_participant_shards() {
4774        let fixture = Fixture {
4775            num_non_participants: 1,
4776            ..Default::default()
4777        };
4778
4779        fixture.start(
4780            |config, context, oracle, mut peers, non_participants, coding_config| async move {
4781                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4782                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4783                let commitment = coded_block.commitment();
4784
4785                let leader = peers[0].public_key.clone();
4786                let round = Round::new(Epoch::zero(), View::new(1));
4787                peers[0].mailbox.proposed(round, coded_block.clone());
4788
4789                for peer in peers[1..].iter_mut() {
4790                    peer.mailbox.discovered(commitment, leader.clone(), round);
4791                }
4792                for np in non_participants.iter() {
4793                    np.mailbox.discovered(commitment, leader.clone(), round);
4794                }
4795                context.sleep(config.link.latency * 2).await;
4796
4797                // Participants should receive and validate their own shards.
4798                for peer in peers.iter_mut() {
4799                    peer.mailbox
4800                        .subscribe_assigned_shard_verified(commitment)
4801                        .await
4802                        .expect("participant shard subscription should complete");
4803                }
4804
4805                // Non-participant should receive and validate the leader's shard.
4806                for np in non_participants.iter() {
4807                    np.mailbox
4808                        .subscribe_assigned_shard_verified(commitment)
4809                        .await
4810                        .expect("non-participant shard subscription should complete");
4811                }
4812                context.sleep(config.link.latency).await;
4813
4814                // Non-participant should reconstruct the block from received shards.
4815                for np in non_participants.iter() {
4816                    let reconstructed = np
4817                        .mailbox
4818                        .get(commitment)
4819                        .await
4820                        .expect("non-participant should reconstruct block");
4821                    assert_eq!(reconstructed.commitment(), commitment);
4822                }
4823
4824                let blocked = oracle.blocked().await.unwrap();
4825                assert!(
4826                    blocked.is_empty(),
4827                    "no peer should be blocked in participant/non-participant shard routing test"
4828                );
4829            },
4830        );
4831    }
4832
4833    #[test_traced]
4834    fn test_non_participant_reconstructs_after_discovered() {
4835        let fixture = Fixture {
4836            num_non_participants: 1,
4837            ..Default::default()
4838        };
4839
4840        fixture.start(
4841            |config, context, oracle, mut peers, non_participants, coding_config| async move {
4842                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4843                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4844                let commitment = coded_block.commitment();
4845                let round = Round::new(Epoch::zero(), View::new(1));
4846
4847                let leader = peers[0].public_key.clone();
4848                peers[0].mailbox.proposed(round, coded_block.clone());
4849
4850                // Inform participants of the leader so they validate and re-broadcast
4851                // shards.
4852                for peer in peers[1..].iter_mut() {
4853                    peer.mailbox.discovered(commitment, leader.clone(), round);
4854                }
4855                context.sleep(config.link.latency).await;
4856
4857                // Non-participant discovers the leader after shards are already
4858                // propagating through the network.
4859                let np = &non_participants[0];
4860                let block_sub = np.mailbox.subscribe(commitment);
4861                np.mailbox.discovered(commitment, leader.clone(), round);
4862
4863                // Wait for enough shards (leader's shard + shards from
4864                // participants) to arrive and reconstruct.
4865                select! {
4866                    result = block_sub => {
4867                        let reconstructed = result.expect("block subscription should resolve");
4868                        assert_eq!(reconstructed.commitment(), commitment);
4869                        assert_eq!(reconstructed.height(), coded_block.height());
4870                    },
4871                    _ = context.sleep(Duration::from_secs(5)) => {
4872                        panic!("non-participant block subscription did not resolve");
4873                    },
4874                }
4875
4876                let blocked = oracle.blocked().await.unwrap();
4877                assert!(
4878                    blocked.is_empty(),
4879                    "no peer should be blocked in non-participant reconstruction test"
4880                );
4881            },
4882        );
4883    }
4884
4885    #[test_traced]
4886    fn test_peer_set_update_evicts_peer_buffers() {
4887        // Shards buffered before leader announcement should be evicted when
4888        // the sender leaves latest.primary. Even if the overlap window keeps
4889        // the sender connected, fresh pre-leader shards from that peer must
4890        // not recreate the buffer.
4891        let executor = deterministic::Runner::default();
4892        executor.start(|context| async move {
4893            let num_peers = 10usize;
4894            let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
4895                context.child("network"),
4896                simulated::Config {
4897                    max_size: MAX_SHARD_SIZE as u32,
4898                    disconnect_on_block: true,
4899                    tracked_peer_sets: NZUsize!(2),
4900                },
4901            );
4902            network.start();
4903
4904            let mut private_keys = (0..num_peers)
4905                .map(|i| PrivateKey::from_seed(i as u64))
4906                .collect::<Vec<_>>();
4907            private_keys.sort_by_key(|s| s.public_key());
4908            let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
4909            let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
4910
4911            // Test from the perspective of a single receiver (peer 3).
4912            let receiver_idx = 3usize;
4913            let receiver_pk = peer_keys[receiver_idx].clone();
4914            let leader_pk = peer_keys[0].clone();
4915
4916            let receiver_control = oracle.control(receiver_pk.clone());
4917            let (sender_handle, receiver_handle) = receiver_control
4918                .register(0, TEST_QUOTA)
4919                .await
4920                .expect("registration should succeed");
4921
4922            // Register the leader so it can send shards.
4923            let leader_control = oracle.control(leader_pk.clone());
4924            let (mut leader_sender, _leader_receiver) = leader_control
4925                .register(0, TEST_QUOTA)
4926                .await
4927                .expect("registration should succeed");
4928            oracle
4929                .add_link(leader_pk.clone(), receiver_pk.clone(), DEFAULT_LINK)
4930                .await
4931                .expect("link should be added");
4932
4933            // Track the full participant set so the engine sees all peers.
4934            oracle.manager().track(0, participants.clone());
4935            context.sleep(Duration::from_millis(10)).await;
4936
4937            let scheme = Scheme::signer(
4938                SCHEME_NAMESPACE,
4939                participants.clone(),
4940                private_keys[receiver_idx].clone(),
4941            )
4942            .expect("signer scheme should be created");
4943
4944            let config: Config<_, _, _, _, C, _, _, _> = Config {
4945                scheme_provider: MultiEpochProvider::single(scheme),
4946                blocker: receiver_control.clone(),
4947                shard_codec_cfg: CodecConfig {
4948                    maximum_shard_size: MAX_SHARD_SIZE,
4949                },
4950                block_codec_cfg: (),
4951                strategy: STRATEGY,
4952                mailbox_size: NZUsize!(1024),
4953                peer_buffer_size: NZUsize!(64),
4954                background_channel_capacity: NZUsize!(1024),
4955                peer_provider: oracle.manager(),
4956            };
4957
4958            let (engine, mailbox) = ShardEngine::new(context.child("receiver"), config);
4959            engine.start((sender_handle, receiver_handle));
4960
4961            // Build a coded block and extract the shard destined for the receiver.
4962            let coding_config = coding_config_for_participants(num_peers as u16);
4963            let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
4964            let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
4965            let commitment = coded_block.commitment();
4966
4967            let receiver_participant = participants
4968                .index(&receiver_pk)
4969                .expect("receiver must be a participant");
4970            let leader_shard = coded_block
4971                .shard(receiver_participant.get() as u16)
4972                .expect("missing shard");
4973            let shard_bytes = leader_shard.encode();
4974
4975            // Send the shard BEFORE leader announcement (it gets buffered).
4976            leader_sender.send(
4977                Recipients::One(receiver_pk.clone()),
4978                shard_bytes.clone(),
4979                true,
4980            );
4981            context.sleep(DEFAULT_LINK.latency * 2).await;
4982
4983            // Now send a peer set update that excludes the leader.
4984            let remaining: Set<P> =
4985                Set::from_iter_dedup(peer_keys.iter().filter(|pk| **pk != leader_pk).cloned());
4986            oracle.manager().track(1, remaining);
4987            context.sleep(Duration::from_millis(10)).await;
4988
4989            // The retained overlap window still lets the leader reach the receiver,
4990            // but this fresh pre-leader shard must not be buffered again.
4991            leader_sender.send(Recipients::One(receiver_pk.clone()), shard_bytes, true);
4992            context.sleep(DEFAULT_LINK.latency * 2).await;
4993
4994            // Announce the leader. Buffered shards from the leader should have been
4995            // evicted, so the shard will NOT be ingested.
4996            let mut shard_sub = mailbox.subscribe_assigned_shard_verified(commitment);
4997            mailbox.discovered(
4998                commitment,
4999                leader_pk.clone(),
5000                Round::new(Epoch::zero(), View::new(1)),
5001            );
5002            context.sleep(DEFAULT_LINK.latency * 2).await;
5003
5004            // The shard subscription should still be pending (no shard was ingested).
5005            assert!(
5006                matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
5007                "shard subscription should not resolve after evicted leader's buffer"
5008            );
5009            assert!(
5010                mailbox.get(commitment).await.is_none(),
5011                "block should not reconstruct from evicted buffers"
5012            );
5013        });
5014    }
5015
5016    #[test_traced]
5017    fn test_peer_buffer_lifetime_tracks_latest_primary() {
5018        let executor = deterministic::Runner::default();
5019        executor.start(|context| async move {
5020            let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
5021                context.child("network"),
5022                simulated::Config {
5023                    max_size: MAX_SHARD_SIZE as u32,
5024                    disconnect_on_block: true,
5025                    tracked_peer_sets: NZUsize!(1),
5026                },
5027            );
5028            network.start();
5029
5030            let mut private_keys = (0..4)
5031                .map(|i| PrivateKey::from_seed(i as u64))
5032                .collect::<Vec<_>>();
5033            private_keys.sort_by_key(|s| s.public_key());
5034            let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
5035            let receiver_pk = peer_keys[0].clone();
5036            let sender_pk = peer_keys[1].clone();
5037            let participants: Set<P> = Set::from_iter_dedup(peer_keys);
5038
5039            let receiver_control = oracle.control(receiver_pk);
5040            let scheme = Scheme::signer(
5041                SCHEME_NAMESPACE,
5042                participants.clone(),
5043                private_keys[0].clone(),
5044            )
5045            .expect("signer scheme should be created");
5046
5047            let config: Config<_, _, _, _, C, _, _, _> = Config {
5048                scheme_provider: MultiEpochProvider::single(scheme),
5049                blocker: receiver_control,
5050                shard_codec_cfg: CodecConfig {
5051                    maximum_shard_size: MAX_SHARD_SIZE,
5052                },
5053                block_codec_cfg: (),
5054                strategy: STRATEGY,
5055                mailbox_size: NZUsize!(16),
5056                peer_buffer_size: NZUsize!(4),
5057                background_channel_capacity: NZUsize!(16),
5058                peer_provider: oracle.manager(),
5059            };
5060
5061            let (mut engine, _mailbox) = ShardEngine::new(context.child("engine"), config);
5062
5063            // Only `sender_pk` is in `latest.primary`, so only that peer may retain a pre-leader
5064            // buffer row (`buffer_peer_shard` / `peer_buffers`).
5065            engine.update_latest_primary_peers(Set::from_iter_dedup([sender_pk.clone()]));
5066
5067            let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
5068            let coded_block = CodedBlock::<B, C, H>::new(
5069                inner,
5070                coding_config_for_participants(participants.len() as u16),
5071                &STRATEGY,
5072            );
5073            let shard = coded_block.shard(0).expect("missing shard");
5074
5075            // Pre-leader path: buffer one shard before any leader or notarized interest arrives.
5076            engine.buffer_peer_shard(sender_pk.clone(), shard);
5077            assert_eq!(
5078                engine.peer_buffers.get(&sender_pk).map(VecDeque::len),
5079                Some(1),
5080                "peer buffer should contain the buffered shard"
5081            );
5082
5083            // Empty primary: no peer may retain buffers; `update_latest_primary_peers` drops the
5084            // staged shard and the deque entry for `sender_pk`.
5085            engine.update_latest_primary_peers(Set::default());
5086            assert!(
5087                !engine.peer_buffers.contains_key(&sender_pk),
5088                "peer buffer should be evicted once sender leaves latest.primary"
5089            );
5090        });
5091    }
5092
5093    #[test_traced]
5094    fn test_old_epoch_buffered_shards_are_dropped_after_cutover() {
5095        let executor = deterministic::Runner::default();
5096        executor.start(|context| async move {
5097            let num_peers = 6usize;
5098            let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
5099                context.child("network"),
5100                simulated::Config {
5101                    max_size: MAX_SHARD_SIZE as u32,
5102                    disconnect_on_block: true,
5103                    tracked_peer_sets: NZUsize!(2),
5104                },
5105            );
5106            network.start();
5107
5108            let mut private_keys = (0..num_peers)
5109                .map(|i| PrivateKey::from_seed(i as u64))
5110                .collect::<Vec<_>>();
5111            private_keys.sort_by_key(|s| s.public_key());
5112            let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
5113
5114            // Epoch 0: first five peers. Epoch 1: swap out `peer_keys[0]` for `peer_keys[5]` so the
5115            // cutover changes who is in `latest.primary` while `tracked_peer_sets` retains overlap.
5116            let epoch0_set: Set<P> = Set::from_iter_dedup(peer_keys[..5].iter().cloned());
5117            let epoch1_set: Set<P> = Set::from_iter_dedup([
5118                peer_keys[1].clone(),
5119                peer_keys[2].clone(),
5120                peer_keys[3].clone(),
5121                peer_keys[4].clone(),
5122                peer_keys[5].clone(),
5123            ]);
5124
5125            let receiver_idx = 3usize;
5126            let receiver_pk = peer_keys[receiver_idx].clone();
5127            let receiver_key = private_keys[receiver_idx].clone();
5128            let leader_pk = peer_keys[0].clone();
5129
5130            let receiver_control = oracle.control(receiver_pk.clone());
5131            let (sender_handle, receiver_handle) = receiver_control
5132                .register(0, TEST_QUOTA)
5133                .await
5134                .expect("registration should succeed");
5135
5136            let leader_control = oracle.control(leader_pk.clone());
5137            let (mut leader_sender, _leader_receiver) = leader_control
5138                .register(0, TEST_QUOTA)
5139                .await
5140                .expect("registration should succeed");
5141            oracle
5142                .add_link(leader_pk.clone(), receiver_pk.clone(), DEFAULT_LINK)
5143                .await
5144                .expect("link should be added");
5145
5146            // Peer-set id 0: epoch 0 primaries before any cutover.
5147            oracle.manager().track(0, epoch0_set.clone());
5148            context.sleep(Duration::from_millis(10)).await;
5149
5150            let scheme_epoch0 =
5151                Scheme::signer(SCHEME_NAMESPACE, epoch0_set.clone(), receiver_key.clone())
5152                    .expect("epoch 0 signer scheme should be created");
5153            let scheme_epoch1 =
5154                Scheme::signer(SCHEME_NAMESPACE, epoch1_set.clone(), receiver_key.clone())
5155                    .expect("epoch 1 signer scheme should be created");
5156
5157            let config: Config<_, _, _, _, C, _, _, _> = Config {
5158                scheme_provider: MultiEpochProvider::single(scheme_epoch0)
5159                    .with_epoch(Epoch::new(1), scheme_epoch1),
5160                blocker: receiver_control.clone(),
5161                shard_codec_cfg: CodecConfig {
5162                    maximum_shard_size: MAX_SHARD_SIZE,
5163                },
5164                block_codec_cfg: (),
5165                strategy: STRATEGY,
5166                mailbox_size: NZUsize!(1024),
5167                peer_buffer_size: NZUsize!(64),
5168                background_channel_capacity: NZUsize!(1024),
5169                peer_provider: oracle.manager(),
5170            };
5171
5172            // Receiver engine: schemes for both epochs so post-cutover validation can run if needed.
5173            let (engine, mailbox) = ShardEngine::new(context.child("receiver"), config);
5174            engine.start((sender_handle, receiver_handle));
5175
5176            let coding_config = coding_config_for_participants(epoch0_set.len() as u16);
5177            let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
5178            let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
5179            let commitment = coded_block.commitment();
5180
5181            let receiver_participant = epoch0_set
5182                .index(&receiver_pk)
5183                .expect("receiver must be an epoch 0 participant");
5184            let leader_shard = coded_block
5185                .shard(receiver_participant.get() as u16)
5186                .expect("missing shard");
5187
5188            // Inbound: epoch-0 leader shard arrives before `Discovered` (pre-leader buffer path).
5189            leader_sender.send(
5190                Recipients::One(receiver_pk.clone()),
5191                leader_shard.encode(),
5192                true,
5193            );
5194            context.sleep(DEFAULT_LINK.latency * 2).await;
5195
5196            // Cutover to epoch 1 primaries before `Discovered`: `leader_pk` (epoch-0-only) is no
5197            // longer in `latest.primary`, so overlap-buffered shards for that sender must not feed
5198            // reconstruction.
5199            oracle.manager().track(1, epoch1_set);
5200            context.sleep(Duration::from_millis(10)).await;
5201
5202            // Leader announcement for the old commitment: should not complete reconstruction from
5203            // dropped pre-cutover buffers.
5204            let mut shard_sub = mailbox.subscribe_assigned_shard_verified(commitment);
5205            mailbox.discovered(
5206                commitment,
5207                leader_pk,
5208                Round::new(Epoch::zero(), View::new(1)),
5209            );
5210            context.sleep(DEFAULT_LINK.latency * 2).await;
5211
5212            assert!(
5213                matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
5214                "old-epoch shard subscription should stay pending after cutover"
5215            );
5216            assert!(
5217                mailbox.get(commitment).await.is_none(),
5218                "old-epoch commitment should not reconstruct from overlap-only buffered shards"
5219            );
5220        });
5221    }
5222
5223    /// If the evicted node leaves the
5224    /// [`commonware_p2p::PeerSetUpdate::latest`] primary set, it must still
5225    /// reconstruct once the leader is discovered, as long as enough buffered
5226    /// shards came from peers that remain in `latest.primary`.
5227    ///
5228    /// This does not rely on a self-buffered shard or a leader-delivered shard:
5229    /// reconstruction should succeed from the remaining buffered peer shards
5230    /// alone.
5231    #[test_traced]
5232    fn test_evicted_node_still_reconstructs_from_buffered_peer_shards() {
5233        let executor = deterministic::Runner::default();
5234        executor.start(|context| async move {
5235            let num_peers = 10usize;
5236            let (network, oracle) = simulated::Network::<deterministic::Context, P>::new(
5237                context.child("network"),
5238                simulated::Config {
5239                    max_size: MAX_SHARD_SIZE as u32,
5240                    disconnect_on_block: true,
5241                    tracked_peer_sets: NZUsize!(2),
5242                },
5243            );
5244            network.start();
5245
5246            let mut private_keys = (0..num_peers)
5247                .map(|i| PrivateKey::from_seed(i as u64))
5248                .collect::<Vec<_>>();
5249            private_keys.sort_by_key(|s| s.public_key());
5250            let peer_keys: Vec<P> = private_keys.iter().map(|c| c.public_key()).collect();
5251            let participants: Set<P> = Set::from_iter_dedup(peer_keys.clone());
5252
5253            // Receiver (`peer_keys[1]`) is evicted from `latest.primary` after shards are buffered.
5254            // The leader (`peer_keys[0]`) has no link to the receiver, so reconstruction cannot use a
5255            // leader-delivered shard or a self-buffered shard; it must use gossip from peers 2/4/5/6 only.
5256            let receiver_idx = 1usize;
5257            let receiver_pk = peer_keys[receiver_idx].clone();
5258            let leader_pk = peer_keys[0].clone();
5259            let peer2_pk = peer_keys[2].clone();
5260            let peer4_pk = peer_keys[4].clone();
5261            let peer5_pk = peer_keys[5].clone();
5262            let peer6_pk = peer_keys[6].clone();
5263
5264            let receiver_control = oracle.control(receiver_pk.clone());
5265            let (evicted_sender, evicted_receiver) = receiver_control
5266                .register(0, TEST_QUOTA)
5267                .await
5268                .expect("registration should succeed");
5269
5270            let peer2_control = oracle.control(peer2_pk.clone());
5271            let (mut peer2_sender, _peer2_receiver) = peer2_control
5272                .register(0, TEST_QUOTA)
5273                .await
5274                .expect("registration should succeed");
5275
5276            let peer4_control = oracle.control(peer4_pk.clone());
5277            let (mut peer4_sender, _peer4_receiver) = peer4_control
5278                .register(0, TEST_QUOTA)
5279                .await
5280                .expect("registration should succeed");
5281
5282            let peer5_control = oracle.control(peer5_pk.clone());
5283            let (mut peer5_sender, _peer5_receiver) = peer5_control
5284                .register(0, TEST_QUOTA)
5285                .await
5286                .expect("registration should succeed");
5287
5288            let peer6_control = oracle.control(peer6_pk.clone());
5289            let (mut peer6_sender, _peer6_receiver) = peer6_control
5290                .register(0, TEST_QUOTA)
5291                .await
5292                .expect("registration should succeed");
5293
5294            // Only secondary peers that will forward shards are connected to the receiver (not the leader).
5295            for sender in [&peer2_pk, &peer4_pk, &peer5_pk, &peer6_pk] {
5296                oracle
5297                    .add_link(sender.clone(), receiver_pk.clone(), DEFAULT_LINK)
5298                    .await
5299                    .expect("link should be added");
5300            }
5301
5302            // Start with the full committee so the receiver's signer scheme matches the coded block.
5303            oracle.manager().track(0, participants.clone());
5304            context.sleep(Duration::from_millis(10)).await;
5305
5306            let scheme = Scheme::signer(
5307                SCHEME_NAMESPACE,
5308                participants.clone(),
5309                private_keys[receiver_idx].clone(),
5310            )
5311            .expect("signer scheme should be created");
5312
5313            let config: Config<_, _, _, _, C, _, _, _> = Config {
5314                scheme_provider: MultiEpochProvider::single(scheme),
5315                blocker: receiver_control.clone(),
5316                shard_codec_cfg: CodecConfig {
5317                    maximum_shard_size: MAX_SHARD_SIZE,
5318                },
5319                block_codec_cfg: (),
5320                strategy: STRATEGY,
5321                mailbox_size: NZUsize!(1024),
5322                peer_buffer_size: NZUsize!(64),
5323                background_channel_capacity: NZUsize!(1024),
5324                peer_provider: oracle.manager(),
5325            };
5326
5327            let (engine, mailbox) = ShardEngine::new(context.child("evicted"), config);
5328            engine.start((evicted_sender, evicted_receiver));
5329
5330            let coding_config = coding_config_for_participants(num_peers as u16);
5331            let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
5332            let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
5333            let commitment = coded_block.commitment();
5334
5335            let peer2_shard = coded_block.shard(2).expect("missing shard 2").encode();
5336            let peer4_shard = coded_block.shard(4).expect("missing shard 4").encode();
5337            let peer5_shard = coded_block.shard(5).expect("missing shard 5").encode();
5338            let peer6_shard = coded_block.shard(6).expect("missing shard 6").encode();
5339
5340            let block_sub = mailbox.subscribe(commitment);
5341
5342            // Pre-`Discovered` path: four shards from peers that will still be in `latest.primary` after
5343            // the receiver is evicted (indices 2, 4, 5, 6). Together they are enough to reconstruct.
5344            peer2_sender
5345                .send(
5346                    Recipients::One(receiver_pk.clone()),
5347                    peer2_shard,
5348                    true,
5349                );
5350            peer4_sender
5351                .send(
5352                    Recipients::One(receiver_pk.clone()),
5353                    peer4_shard,
5354                    true,
5355                );
5356            peer5_sender
5357                .send(
5358                    Recipients::One(receiver_pk.clone()),
5359                    peer5_shard,
5360                    true,
5361                );
5362            peer6_sender
5363                .send(
5364                    Recipients::One(receiver_pk.clone()),
5365                    peer6_shard,
5366                    true,
5367                );
5368            context.sleep(DEFAULT_LINK.latency * 2).await;
5369
5370            // Evict the receiver from `latest.primary`: buffered shards from remaining primaries must
5371            // still count toward reconstruction once the leader is known.
5372            let latest_primary: Set<P> = Set::from_iter_dedup(
5373                peer_keys
5374                    .iter()
5375                    .filter(|pk| **pk != receiver_pk)
5376                    .cloned(),
5377            );
5378            oracle.manager().track(1, latest_primary);
5379            context.sleep(Duration::from_millis(10)).await;
5380
5381            // Leader announcement drains overlap-buffered peer shards; the evicted receiver should
5382            // still reach quorum without ever receiving the leader's direct shard.
5383            mailbox
5384                .discovered(
5385                    commitment,
5386                    leader_pk.clone(),
5387                    Round::new(Epoch::zero(), View::new(1)),
5388                );
5389
5390            select! {
5391                _ = block_sub => {},
5392                _ = context.sleep(Duration::from_secs(5)) => {
5393                    panic!("block subscription did not resolve after leader discovery");
5394                },
5395            }
5396
5397            context.sleep(DEFAULT_LINK.latency * 2).await;
5398            let block = mailbox.get(commitment).await;
5399            assert!(
5400                block.is_some(),
5401                "evicted node should reconstruct from buffered shards sent by remaining latest.primary peers"
5402            );
5403            assert_eq!(block.unwrap().commitment(), commitment);
5404
5405            assert!(
5406                oracle.blocked().await.unwrap().is_empty(),
5407                "no peer should be blocked when overlapping shards are valid"
5408            );
5409        });
5410    }
5411
5412    /// When peer gossip shards arrive before the leader's direct shard,
5413    /// the state may transition to Ready before the leader shard is
5414    /// processed. The late leader shard must still be accepted, verified,
5415    /// and broadcast so that slower peers can reach quorum.
5416    #[test_traced]
5417    fn test_late_leader_shard_accepted_after_quorum_transition() {
5418        let fixture = Fixture {
5419            num_peers: 10,
5420            ..Default::default()
5421        };
5422
5423        fixture.start(
5424            |config, context, oracle, mut peers, _, coding_config| async move {
5425                let inner = B::new::<H>((), Sha256Digest::EMPTY, Height::new(1), 100);
5426                let coded_block = CodedBlock::<B, C, H>::new(inner, coding_config, &STRATEGY);
5427                let commitment = coded_block.commitment();
5428                let round = Round::new(Epoch::zero(), View::new(1));
5429
5430                let leader_idx = 0usize;
5431                let victim_idx = 1usize;
5432                let leader = peers[leader_idx].public_key.clone();
5433                let victim = peers[victim_idx].public_key.clone();
5434
5435                // Sever the link from leader to victim so the leader's
5436                // direct shard does not arrive initially.
5437                oracle
5438                    .remove_link(leader.clone(), victim.clone())
5439                    .await
5440                    .expect("remove_link should succeed");
5441
5442                // Leader proposes. All peers except the victim get their
5443                // shard from the leader, verify it, and gossip it.
5444                peers[leader_idx]
5445                    .mailbox
5446                    .proposed(round, coded_block.clone());
5447
5448                // Inform all non-leader peers of the leader.
5449                for peer in peers[1..].iter_mut() {
5450                    peer.mailbox.discovered(commitment, leader.clone(), round);
5451                }
5452
5453                // Wait for gossip to propagate. The victim should
5454                // reconstruct the block from gossiped peer shards,
5455                // transitioning to Ready without its own shard.
5456                context.sleep(config.link.latency * 4).await;
5457
5458                let block_sub = peers[victim_idx].mailbox.subscribe(commitment);
5459                select! {
5460                    result = block_sub => {
5461                        let reconstructed = result.expect("block subscription should resolve");
5462                        assert_eq!(reconstructed.commitment(), commitment);
5463                    },
5464                    _ = context.sleep(Duration::from_secs(5)) => {
5465                        panic!("victim did not reconstruct block from gossip");
5466                    },
5467                }
5468
5469                // The shard subscription should NOT have resolved yet
5470                // because the victim has not verified its own shard.
5471                let mut shard_sub = peers[victim_idx]
5472                    .mailbox
5473                    .subscribe_assigned_shard_verified(commitment);
5474                assert!(
5475                    matches!(shard_sub.try_recv(), Err(TryRecvError::Empty)),
5476                    "shard subscription must not resolve before own shard is verified"
5477                );
5478
5479                // Now restore the link so the leader's shard arrives late.
5480                oracle
5481                    .add_link(leader.clone(), victim.clone(), DEFAULT_LINK)
5482                    .await
5483                    .expect("add_link should succeed");
5484
5485                // Re-send the leader's shard manually via the leader's
5486                // network sender (the engine already broadcast it earlier,
5487                // but the link was down).
5488                let leader_shard = coded_block
5489                    .shard(peers[victim_idx].index.get() as u16)
5490                    .expect("missing victim shard");
5491                peers[leader_idx].sender.send(
5492                    Recipients::One(victim.clone()),
5493                    leader_shard.encode(),
5494                    true,
5495                );
5496                context.sleep(config.link.latency * 2).await;
5497
5498                // The shard subscription should now resolve because the
5499                // late leader shard was accepted and verified.
5500                select! {
5501                    _ = shard_sub => {},
5502                    _ = context.sleep(Duration::from_secs(5)) => {
5503                        panic!("shard subscription did not resolve after late leader shard");
5504                    },
5505                }
5506
5507                // No peer should be blocked.
5508                let blocked = oracle.blocked().await.unwrap();
5509                assert!(
5510                    blocked.is_empty(),
5511                    "no peer should be blocked in late leader shard test"
5512                );
5513
5514                // After both reconstruction and assigned shard readiness,
5515                // additional gossip shards should be silently ignored.
5516                let extra_sender_idx = 2usize;
5517                let extra_shard = coded_block
5518                    .shard(peers[extra_sender_idx].index.get() as u16)
5519                    .expect("missing shard");
5520                peers[extra_sender_idx].sender.send(
5521                    Recipients::One(victim.clone()),
5522                    extra_shard.encode(),
5523                    true,
5524                );
5525                context.sleep(config.link.latency * 2).await;
5526
5527                // The gossip shard should be silently dropped (not blocked).
5528                let blocked = oracle.blocked().await.unwrap();
5529                assert!(
5530                    blocked.is_empty(),
5531                    "gossip shard after full reconstruction should be silently ignored"
5532                );
5533            },
5534        );
5535    }
5536}