Skip to main content

commonware_consensus/marshal/coding/
marshaled.rs

1//! Wrapper for consensus applications that handles epochs, erasure coding, and block dissemination.
2//!
3//! # Overview
4//!
5//! [`Marshaled`] is an adapter that wraps any [`VerifyingApplication`] implementation to handle
6//! epoch transitions and erasure coded broadcast automatically. It intercepts consensus
7//! operations (propose, verify, certify) and ensures blocks are only produced within valid epoch boundaries.
8//!
9//! # Epoch Boundaries
10//!
11//! An epoch is a fixed number of blocks (the `epoch_length`). When the last block in an epoch
12//! is reached, this wrapper prevents new blocks from being built & proposed until the next epoch begins.
13//! Instead, it re-proposes the boundary block to avoid producing blocks that would be pruned
14//! by the epoch transition.
15//!
16//! # Erasure Coding
17//!
18//! This wrapper integrates with a variant of marshal that supports erasure coded broadcast. When a leader
19//! proposes a new block, it is automatically erasure encoded and its shards are broadcasted to active
20//! participants. When verifying a proposed block (the precondition for notarization), the wrapper
21//! ensures the commitment's context digest matches the consensus context and waits for validation of
22//! the shard assigned to this participant by the proposer. If that shard is valid, the assigned shard is
23//! relayed to all other participants to aid in block reconstruction.
24//!
25//! A participant may still reconstruct the full block from gossiped shards before its designated
26//! leader-delivered shard arrives. That is sufficient for later certification and repair flows, but it
27//! is not treated as notarization readiness: a participant only helps form a notarization once it has
28//! validated the shard it is supposed to echo.
29//!
30//! During certification (the phase between notarization and finalization), the wrapper subscribes to
31//! block reconstruction and validates epoch boundaries, parent commitment, height contiguity, and
32//! that the block's embedded context matches the consensus context before allowing the block to be
33//! certified. If certification fails, the voter can still emit a nullify vote to advance the view.
34//!
35//! # Usage
36//!
37//! Wrap your [`VerifyingApplication`] implementation with [`Marshaled::new`] and provide it to your
38//! consensus engine for the [`Automaton`] and [`Relay`]. The wrapper handles all epoch logic transparently.
39//!
40//! ```rust,ignore
41//! let cfg = MarshaledConfig {
42//!     application: my_application,
43//!     marshal: marshal_mailbox,
44//!     shards: shard_mailbox,
45//!     scheme_provider,
46//!     epocher,
47//!     strategy,
48//! };
49//! let application = Marshaled::new(context, cfg);
50//! ```
51//!
52//! # Implementation Notes
53//!
54//! - Genesis blocks are handled specially: epoch 0 returns the application's genesis block,
55//!   while subsequent epochs use the last block of the previous epoch as genesis
56//! - Blocks are automatically verified to be within the current epoch
57//!
58//! # Notarization and Data Availability
59//!
60//! In rare crash cases, it is possible for a notarization certificate to exist without a block being
61//! available to the honest parties (e.g., if the whole network crashed before receiving `f+1` shards
62//! and the proposer went permanently offline). In this case, `certify` will be unable to fetch the
63//! block before timeout and result in a nullification.
64//!
65//! For this reason, it should not be expected that every notarized payload will be certifiable due
66//! to the lack of an available block. However, if even one honest and online party has the block,
67//! they will attempt to forward it to others via marshal's resolver. This case is already present
68//! in the event of a block that was proposed with invalid codec; Marshal will not be able to reconstruct
69//! the block, and therefore won't serve it.
70//!
71//! ```text
72//!                                      ┌───────────────────────────────────────────────────┐
73//!                                      ▼                                                   │
74//! ┌─────────────────────┐   ┌─────────────────────┐   ┌─────────────────────┐   ┌─────────────────────┐
75//! │          B1         │◀──│          B2         │◀──│          B3         │XXX│          B4         │
76//! └─────────────────────┘   └─────────────────────┘   └──────────┬──────────┘   └─────────────────────┘
77//!                                                                │
78//!                                                          Failed Certify
79//! ```
80
81use crate::{
82    marshal::{
83        ancestry::AncestorStream,
84        application::{
85            validation::{
86                is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, LastBuilt,
87            },
88            verification_tasks::VerificationTasks,
89        },
90        coding::{
91            shards,
92            types::{coding_config_for_participants, hash_context, CodedBlock},
93            validation::{validate_block, validate_proposal, ProposalError},
94            Coding,
95        },
96        core, Update,
97    },
98    simplex::{scheme::Scheme, types::Context, Plan},
99    types::{coding::Commitment, Epoch, Epocher, Round},
100    Application, Automaton, Block, CertifiableAutomaton, CertifiableBlock, Epochable, Heightable,
101    Relay, Reporter, VerifyingApplication,
102};
103use commonware_coding::{Config as CodingConfig, Scheme as CodingScheme};
104use commonware_cryptography::{
105    certificate::{Provider, Scheme as CertificateScheme},
106    Committable, Digestible, Hasher,
107};
108use commonware_macros::select;
109use commonware_parallel::Strategy;
110use commonware_runtime::{
111    telemetry::metrics::histogram::{Buckets, Timed},
112    Clock, Metrics, Spawner, Storage,
113};
114use commonware_utils::{
115    channel::{
116        fallible::OneshotExt,
117        oneshot::{self, error::RecvError},
118    },
119    sync::Mutex,
120    NZU16,
121};
122use futures::future::{ready, try_join, Either, Ready};
123use prometheus_client::metrics::histogram::Histogram;
124use rand::Rng;
125use std::sync::{Arc, OnceLock};
126use tracing::{debug, warn};
127
128/// The [`CodingConfig`] used for genesis blocks. These blocks are never broadcasted in
129/// the proposal phase, and thus the configuration is irrelevant.
130const GENESIS_CODING_CONFIG: CodingConfig = CodingConfig {
131    minimum_shards: NZU16!(1),
132    extra_shards: NZU16!(1),
133};
134
135/// Configuration for initializing [`Marshaled`].
136#[allow(clippy::type_complexity)]
137pub struct MarshaledConfig<A, B, C, H, Z, S, ES>
138where
139    B: CertifiableBlock<Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>>,
140    C: CodingScheme,
141    H: Hasher,
142    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
143    S: Strategy,
144    ES: Epocher,
145{
146    /// The underlying application to wrap.
147    pub application: A,
148    /// Mailbox for communicating with the marshal engine.
149    pub marshal:
150        core::Mailbox<Z::Scheme, Coding<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>>,
151    /// Mailbox for communicating with the shards engine.
152    pub shards: shards::Mailbox<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>,
153    /// Provider for signing schemes scoped by epoch.
154    pub scheme_provider: Z,
155    /// Strategy for parallel operations.
156    pub strategy: S,
157    /// Strategy for determining epoch boundaries.
158    pub epocher: ES,
159}
160
161/// An [`Application`] adapter that handles epoch transitions and erasure coded broadcast.
162///
163/// This wrapper intercepts consensus operations to enforce epoch boundaries. It prevents
164/// blocks from being produced outside their valid epoch and handles the special case of
165/// re-proposing boundary blocks during epoch transitions.
166#[derive(Clone)]
167#[allow(clippy::type_complexity)]
168pub struct Marshaled<E, A, B, C, H, Z, S, ES>
169where
170    E: Rng + Storage + Spawner + Metrics + Clock,
171    A: Application<E>,
172    B: CertifiableBlock<Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>>,
173    C: CodingScheme,
174    H: Hasher,
175    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
176    S: Strategy,
177    ES: Epocher,
178{
179    context: E,
180    application: A,
181    marshal: core::Mailbox<Z::Scheme, Coding<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>>,
182    shards: shards::Mailbox<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>,
183    scheme_provider: Z,
184    epocher: ES,
185    strategy: S,
186    last_built: LastBuilt<CodedBlock<B, C, H>>,
187    verification_tasks: VerificationTasks<Commitment>,
188    cached_genesis: Arc<OnceLock<(Commitment, CodedBlock<B, C, H>)>>,
189
190    build_duration: Timed<E>,
191    verify_duration: Timed<E>,
192    proposal_parent_fetch_duration: Timed<E>,
193    erasure_encode_duration: Timed<E>,
194}
195
196impl<E, A, B, C, H, Z, S, ES> Marshaled<E, A, B, C, H, Z, S, ES>
197where
198    E: Rng + Storage + Spawner + Metrics + Clock,
199    A: VerifyingApplication<
200        E,
201        Block = B,
202        SigningScheme = Z::Scheme,
203        Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
204    >,
205    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
206    C: CodingScheme,
207    H: Hasher,
208    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
209    S: Strategy,
210    ES: Epocher,
211{
212    /// Creates a new [`Marshaled`] wrapper.
213    ///
214    /// # Panics
215    ///
216    /// Panics if the marshal metadata store cannot be initialized.
217    pub fn new(context: E, cfg: MarshaledConfig<A, B, C, H, Z, S, ES>) -> Self {
218        let MarshaledConfig {
219            application,
220            marshal,
221            shards,
222            scheme_provider,
223            strategy,
224            epocher,
225        } = cfg;
226
227        let clock = Arc::new(context.clone());
228
229        let build_histogram = Histogram::new(Buckets::LOCAL);
230        context.register(
231            "build_duration",
232            "Histogram of time taken for the application to build a new block, in seconds",
233            build_histogram.clone(),
234        );
235        let build_duration = Timed::new(build_histogram, clock.clone());
236
237        let verify_histogram = Histogram::new(Buckets::LOCAL);
238        context.register(
239            "verify_duration",
240            "Histogram of time taken for the application to verify a block, in seconds",
241            verify_histogram.clone(),
242        );
243        let verify_duration = Timed::new(verify_histogram, clock.clone());
244
245        let parent_fetch_histogram = Histogram::new(Buckets::LOCAL);
246        context.register(
247            "parent_fetch_duration",
248            "Histogram of time taken to fetch a parent block in proposal, in seconds",
249            parent_fetch_histogram.clone(),
250        );
251        let proposal_parent_fetch_duration = Timed::new(parent_fetch_histogram, clock.clone());
252
253        let erasure_histogram = Histogram::new(Buckets::LOCAL);
254        context.register(
255            "erasure_encode_duration",
256            "Histogram of time taken to erasure encode a block, in seconds",
257            erasure_histogram.clone(),
258        );
259        let erasure_encode_duration = Timed::new(erasure_histogram, clock);
260
261        Self {
262            context,
263            application,
264            marshal,
265            shards,
266            scheme_provider,
267            strategy,
268            epocher,
269            last_built: Arc::new(Mutex::new(None)),
270            verification_tasks: VerificationTasks::new(),
271            cached_genesis: Arc::new(OnceLock::new()),
272
273            build_duration,
274            verify_duration,
275            proposal_parent_fetch_duration,
276            erasure_encode_duration,
277        }
278    }
279
280    /// Verifies a proposed block within epoch boundaries.
281    ///
282    /// This method validates that:
283    /// 1. The block is within the current epoch (unless it's a boundary block re-proposal)
284    /// 2. Re-proposals are only allowed for the last block in an epoch
285    /// 3. The block's parent digest matches the consensus context's expected parent
286    /// 4. The block's height is exactly one greater than the parent's height
287    /// 5. The block's embedded context digest matches the commitment
288    /// 6. The block's embedded context matches the consensus context
289    /// 7. The underlying application's verification logic passes
290    ///
291    /// Verification is spawned in a background task and returns a receiver that will contain
292    /// the verification result.
293    ///
294    /// If `prefetched_block` is provided, it will be used directly instead of fetching from
295    /// the marshal. This is useful in `certify` when we've already fetched the block to
296    /// extract its embedded context.
297    fn deferred_verify(
298        &mut self,
299        consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
300        commitment: Commitment,
301        prefetched_block: Option<CodedBlock<B, C, H>>,
302    ) -> oneshot::Receiver<bool> {
303        let mut marshal = self.marshal.clone();
304        let mut application = self.application.clone();
305        let epocher = self.epocher.clone();
306        let verify_duration = self.verify_duration.clone();
307        let cached_genesis = self.cached_genesis.clone();
308
309        let (mut tx, rx) = oneshot::channel();
310        self.context
311            .with_label("deferred_verify")
312            .with_attribute("round", consensus_context.round)
313            .spawn(move |runtime_context| async move {
314                let round = consensus_context.round;
315
316                // Fetch parent block
317                let (parent_view, parent_commitment) = consensus_context.parent;
318                let parent_request = fetch_parent(
319                    parent_commitment,
320                    // We are guaranteed that the parent round for any `consensus_context` is
321                    // in the same epoch (recall, the boundary block of the previous epoch
322                    // is the genesis block of the current epoch).
323                    Some(Round::new(consensus_context.epoch(), parent_view)),
324                    &mut application,
325                    &mut marshal,
326                    cached_genesis,
327                )
328                .await;
329
330                // Get block either from prefetched or by subscribing
331                let (parent, block) = if let Some(block) = prefetched_block {
332                    // We have a prefetched block, just fetch the parent
333                    let parent = select! {
334                        _ = tx.closed() => {
335                            debug!(
336                                reason = "consensus dropped receiver",
337                                "skipping verification"
338                            );
339                            return;
340                        },
341                        result = parent_request => match result {
342                            Ok(parent) => parent,
343                            Err(_) => {
344                                debug!(reason = "failed to fetch parent", "skipping verification");
345                                return;
346                            }
347                        },
348                    };
349                    (parent, block)
350                } else {
351                    // No prefetched block, fetch both parent and block
352                    let block_request = marshal
353                        .subscribe_by_commitment(Some(round), commitment)
354                        .await;
355                    let block_requests = try_join(parent_request, block_request);
356
357                    select! {
358                        _ = tx.closed() => {
359                            debug!(
360                                reason = "consensus dropped receiver",
361                                "skipping verification"
362                            );
363                            return;
364                        },
365                        result = block_requests => match result {
366                            Ok(results) => results,
367                            Err(_) => {
368                                debug!(
369                                    reason = "failed to fetch parent or block",
370                                    "skipping verification"
371                                );
372                                return;
373                            }
374                        },
375                    }
376                };
377
378                if let Err(err) = validate_block::<H, _, _>(
379                    &epocher,
380                    &block,
381                    &parent,
382                    &consensus_context,
383                    commitment,
384                    parent_commitment,
385                ) {
386                    debug!(
387                        ?err,
388                        expected_commitment = %commitment,
389                        block_commitment = %block.commitment(),
390                        expected_parent_commitment = %parent_commitment,
391                        parent_commitment = %parent.commitment(),
392                        expected_parent = %parent.digest(),
393                        block_parent = %block.parent(),
394                        parent_height = %parent.height(),
395                        block_height = %block.height(),
396                        "block failed coded invariant validation"
397                    );
398                    tx.send_lossy(false);
399                    return;
400                }
401
402                let ancestry_stream = AncestorStream::new(
403                    marshal.clone(),
404                    [block.clone().into_inner(), parent.into_inner()],
405                );
406                let validity_request = application.verify(
407                    (
408                        runtime_context.with_label("app_verify"),
409                        consensus_context.clone(),
410                    ),
411                    ancestry_stream,
412                );
413
414                // If consensus drops the receiver, we can stop work early.
415                let mut timer = verify_duration.timer();
416                let application_valid = select! {
417                    _ = tx.closed() => {
418                        debug!(
419                            reason = "consensus dropped receiver",
420                            "skipping verification"
421                        );
422                        return;
423                    },
424                    is_valid = validity_request => is_valid,
425                };
426                timer.observe();
427                if application_valid {
428                    // The block is only persisted at this point.
429                    marshal.verified(round, block).await;
430                }
431                tx.send_lossy(application_valid);
432            });
433
434        rx
435    }
436}
437
438impl<E, A, B, C, H, Z, S, ES> Automaton for Marshaled<E, A, B, C, H, Z, S, ES>
439where
440    E: Rng + Storage + Spawner + Metrics + Clock,
441    A: VerifyingApplication<
442        E,
443        Block = B,
444        SigningScheme = Z::Scheme,
445        Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
446    >,
447    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
448    C: CodingScheme,
449    H: Hasher,
450    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
451    S: Strategy,
452    ES: Epocher,
453{
454    type Digest = Commitment;
455    type Context = Context<Self::Digest, <Z::Scheme as CertificateScheme>::PublicKey>;
456
457    /// Returns the genesis digest for a given epoch.
458    ///
459    /// For epoch 0, this returns the application's genesis block digest. For subsequent
460    /// epochs, it returns the digest of the last block from the previous epoch, which
461    /// serves as the genesis block for the new epoch.
462    ///
463    /// # Panics
464    ///
465    /// Panics if a non-zero epoch is requested but the previous epoch's final block is not
466    /// available in storage. This indicates a critical error in the consensus engine startup
467    /// sequence, as engines must always have the genesis block before starting.
468    async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
469        let Some(previous_epoch) = epoch.previous() else {
470            let genesis_block = self.application.genesis().await;
471            return genesis_coding_commitment::<H, _>(&genesis_block);
472        };
473
474        let last_height = self
475            .epocher
476            .last(previous_epoch)
477            .expect("previous epoch should exist");
478        let Some(block) = self.marshal.get_block(last_height).await else {
479            // A new consensus engine will never be started without having the genesis block
480            // of the new epoch (the last block of the previous epoch) already stored.
481            unreachable!("missing starting epoch block at height {last_height}");
482        };
483        block.commitment()
484    }
485
486    /// Proposes a new block or re-proposes the epoch boundary block.
487    ///
488    /// This method builds a new block from the underlying application unless the parent block
489    /// is the last block in the current epoch. When at an epoch boundary, it re-proposes the
490    /// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
491    ///
492    /// The proposal operation is spawned in a background task and returns a receiver that will
493    /// contain the proposed block's digest when ready. The built block is cached for later
494    /// broadcasting.
495    async fn propose(
496        &mut self,
497        consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
498    ) -> oneshot::Receiver<Self::Digest> {
499        let mut marshal = self.marshal.clone();
500        let mut application = self.application.clone();
501        let last_built = self.last_built.clone();
502        let epocher = self.epocher.clone();
503        let strategy = self.strategy.clone();
504        let cached_genesis = self.cached_genesis.clone();
505
506        // If there's no scheme for the current epoch, we cannot verify the proposal.
507        // Send back a receiver with a dropped sender.
508        let Some(scheme) = self.scheme_provider.scoped(consensus_context.epoch()) else {
509            debug!(
510                round = %consensus_context.round,
511                "no scheme for epoch, skipping propose"
512            );
513            let (_, rx) = oneshot::channel();
514            return rx;
515        };
516
517        let n_participants =
518            u16::try_from(scheme.participants().len()).expect("too many participants");
519        let coding_config = coding_config_for_participants(n_participants);
520
521        // Metrics
522        let build_duration = self.build_duration.clone();
523        let proposal_parent_fetch_duration = self.proposal_parent_fetch_duration.clone();
524        let erasure_encode_duration = self.erasure_encode_duration.clone();
525
526        let (mut tx, rx) = oneshot::channel();
527        self.context
528            .with_label("propose")
529            .with_attribute("round", consensus_context.round)
530            .spawn(move |runtime_context| async move {
531                let (parent_view, parent_commitment) = consensus_context.parent;
532                let parent_request = fetch_parent(
533                    parent_commitment,
534                    // We are guaranteed that the parent round for any `consensus_context` is
535                    // in the same epoch (recall, the boundary block of the previous epoch
536                    // is the genesis block of the current epoch).
537                    Some(Round::new(consensus_context.epoch(), parent_view)),
538                    &mut application,
539                    &mut marshal,
540                    cached_genesis,
541                )
542                .await;
543
544                let mut parent_timer = proposal_parent_fetch_duration.timer();
545                let parent = select! {
546                    _ = tx.closed() => {
547                        debug!(reason = "consensus dropped receiver", "skipping proposal");
548                        return;
549                    },
550                    result = parent_request => match result {
551                        Ok(parent) => parent,
552                        Err(_) => {
553                            debug!(
554                                ?parent_commitment,
555                                reason = "failed to fetch parent block",
556                                "skipping proposal"
557                            );
558                            return;
559                        }
560                    },
561                };
562                parent_timer.observe();
563
564                // Special case: If the parent block is the last block in the epoch,
565                // re-propose it as to not produce any blocks that will be cut out
566                // by the epoch transition.
567                let last_in_epoch = epocher
568                    .last(consensus_context.epoch())
569                    .expect("current epoch should exist");
570                if parent.height() == last_in_epoch {
571                    let commitment = parent.commitment();
572                    {
573                        let mut lock = last_built.lock();
574                        *lock = Some((consensus_context.round, parent));
575                    }
576
577                    let success = tx.send_lossy(commitment);
578                    debug!(
579                        round = ?consensus_context.round,
580                        ?commitment,
581                        success,
582                        "re-proposed parent block at epoch boundary"
583                    );
584                    return;
585                }
586
587                let ancestor_stream = AncestorStream::new(marshal.clone(), [parent.into_inner()]);
588                let build_request = application.propose(
589                    (
590                        runtime_context.with_label("app_propose"),
591                        consensus_context.clone(),
592                    ),
593                    ancestor_stream,
594                );
595
596                let mut build_timer = build_duration.timer();
597                let built_block = select! {
598                    _ = tx.closed() => {
599                        debug!(reason = "consensus dropped receiver", "skipping proposal");
600                        return;
601                    },
602                    result = build_request => match result {
603                        Some(block) => block,
604                        None => {
605                            debug!(
606                                ?parent_commitment,
607                                reason = "block building failed",
608                                "skipping proposal"
609                            );
610                            return;
611                        }
612                    },
613                };
614                build_timer.observe();
615
616                let mut erasure_timer = erasure_encode_duration.timer();
617                let coded_block = CodedBlock::<B, C, H>::new(built_block, coding_config, &strategy);
618                erasure_timer.observe();
619
620                let commitment = coded_block.commitment();
621                {
622                    let mut lock = last_built.lock();
623                    *lock = Some((consensus_context.round, coded_block));
624                }
625
626                let success = tx.send_lossy(commitment);
627                debug!(
628                    round = ?consensus_context.round,
629                    ?commitment,
630                    success,
631                    "proposed new block"
632                );
633            });
634        rx
635    }
636
637    /// Verifies a received shard for a given round.
638    ///
639    /// This method validates that:
640    /// 1. The coding configuration matches the expected configuration for the current scheme.
641    /// 2. The commitment's context digest matches the consensus context (unless this is a re-proposal).
642    /// 3. The shard is contained within the consensus commitment.
643    ///
644    /// Verification is spawned in a background task and returns a receiver that will contain
645    /// the verification result. Additionally, this method kicks off deferred verification to
646    /// start block verification early (hidden behind shard validity and network latency).
647    async fn verify(
648        &mut self,
649        consensus_context: Context<Self::Digest, <Z::Scheme as CertificateScheme>::PublicKey>,
650        payload: Self::Digest,
651    ) -> oneshot::Receiver<bool> {
652        // If there's no scheme for the current epoch, we cannot vote on the proposal.
653        // Send back a receiver with a dropped sender.
654        let Some(scheme) = self.scheme_provider.scoped(consensus_context.epoch()) else {
655            debug!(
656                round = %consensus_context.round,
657                "no scheme for epoch, skipping verify"
658            );
659            let (_, rx) = oneshot::channel();
660            return rx;
661        };
662
663        let n_participants =
664            u16::try_from(scheme.participants().len()).expect("too many participants");
665        let coding_config = coding_config_for_participants(n_participants);
666        let is_reproposal = payload == consensus_context.parent.1;
667
668        // Validate proposal-level invariants:
669        // - coding config must match active participant set
670        // - context digest must match unless this is a re-proposal
671        let proposal_context = (!is_reproposal).then_some(&consensus_context);
672        if let Err(err) = validate_proposal::<H, _>(payload, coding_config, proposal_context) {
673            match err {
674                ProposalError::CodingConfig => {
675                    warn!(
676                        round = %consensus_context.round,
677                        got = ?payload.config(),
678                        expected = ?coding_config,
679                        "rejected proposal with unexpected coding configuration"
680                    );
681                }
682                ProposalError::ContextDigest => {
683                    let expected = hash_context::<H, _>(&consensus_context);
684                    let got = payload.context::<H::Digest>();
685                    warn!(
686                        round = %consensus_context.round,
687                        expected = ?expected,
688                        got = ?got,
689                        "rejected proposal with mismatched context digest"
690                    );
691                }
692            }
693
694            let (tx, rx) = oneshot::channel();
695            tx.send_lossy(false);
696            return rx;
697        }
698
699        // Re-proposals skip context-digest validation because the consensus context will point
700        // at the prior epoch-boundary block while the embedded block context is from the
701        // original proposal view.
702        //
703        // Re-proposals also skip shard-validity and deferred verification because:
704        // 1. The block was already verified when originally proposed
705        // 2. The parent-child height check would fail (parent IS the block)
706        // 3. Waiting for shards could stall if the leader doesn't rebroadcast
707        if is_reproposal {
708            // Fetch the block to verify it's at the epoch boundary.
709            // This should be fast since the parent block is typically already cached.
710            let block_rx = self
711                .marshal
712                .subscribe_by_commitment(Some(consensus_context.round), payload)
713                .await;
714            let marshal = self.marshal.clone();
715            let epocher = self.epocher.clone();
716            let round = consensus_context.round;
717            let verification_tasks = self.verification_tasks.clone();
718
719            // Register a verification task synchronously before spawning work so
720            // `certify` can always find it (no race with task startup).
721            let (task_tx, task_rx) = oneshot::channel();
722            verification_tasks.insert(round, payload, task_rx);
723
724            let (mut tx, rx) = oneshot::channel();
725            self.context
726                .with_label("verify_reproposal")
727                .spawn(move |_| async move {
728                    let block = select! {
729                        _ = tx.closed() => {
730                            debug!(
731                                reason = "consensus dropped receiver",
732                                "skipping re-proposal verification"
733                            );
734                            return;
735                        },
736                        block = block_rx => match block {
737                            Ok(block) => block,
738                            Err(_) => {
739                                debug!(
740                                    ?payload,
741                                    reason = "failed to fetch block for re-proposal verification",
742                                    "skipping re-proposal verification"
743                                );
744                                // Fetch failure is an availability issue, not an explicit
745                                // invalidity proof. Do not synthesize `false` here.
746                                return;
747                            }
748                        },
749                    };
750
751                    if !is_valid_reproposal_at_verify(&epocher, block.height(), round.epoch()) {
752                        debug!(
753                            height = %block.height(),
754                            "re-proposal is not at epoch boundary"
755                        );
756                        task_tx.send_lossy(false);
757                        tx.send_lossy(false);
758                        return;
759                    }
760
761                    // Valid re-proposal. Notify the marshal and complete the
762                    // verification task for `certify`.
763                    marshal.verified(round, block).await;
764                    task_tx.send_lossy(true);
765                    tx.send_lossy(true);
766                });
767            return rx;
768        }
769
770        // Inform the shard engine of an externally proposed commitment.
771        self.shards
772            .discovered(
773                payload,
774                consensus_context.leader.clone(),
775                consensus_context.round,
776            )
777            .await;
778
779        // Kick off deferred verification early to hide verification latency behind
780        // shard validity checks and network latency for collecting votes.
781        let round = consensus_context.round;
782        let task = self.deferred_verify(consensus_context, payload, None);
783        self.verification_tasks.insert(round, payload, task);
784
785        match scheme.me() {
786            Some(_) => {
787                // Subscribe to assigned shard verification. For participants, this
788                // only completes once the leader-delivered shard for our
789                // assigned index has been verified. Reconstructing the block
790                // from peer gossip is useful for certification later, but is
791                // not enough to emit a notarize vote.
792                let validity_rx = self.shards.subscribe_assigned_shard_verified(payload).await;
793                let (tx, rx) = oneshot::channel();
794                self.context
795                    .with_label("shard_validity_wait")
796                    .spawn(|_| async move {
797                        if validity_rx.await.is_ok() {
798                            tx.send_lossy(true);
799                        }
800                    });
801                rx
802            }
803            None => {
804                // If we are not participating, there's no shard to verify; just accept the proposal.
805                //
806                // Later, when certifying, we will wait to receive the block from the network.
807                let (tx, rx) = oneshot::channel();
808                tx.send_lossy(true);
809                rx
810            }
811        }
812    }
813}
814
815impl<E, A, B, C, H, Z, S, ES> CertifiableAutomaton for Marshaled<E, A, B, C, H, Z, S, ES>
816where
817    E: Rng + Storage + Spawner + Metrics + Clock,
818    A: VerifyingApplication<
819        E,
820        Block = B,
821        SigningScheme = Z::Scheme,
822        Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
823    >,
824    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
825    C: CodingScheme,
826    H: Hasher,
827    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
828    S: Strategy,
829    ES: Epocher,
830{
831    async fn certify(&mut self, round: Round, payload: Self::Digest) -> oneshot::Receiver<bool> {
832        // First, check for an in-progress verification task from `verify()`.
833        let task = self.verification_tasks.take(round, payload);
834        if let Some(task) = task {
835            return task;
836        }
837
838        // No in-progress task means we never verified this proposal locally.
839        // We can use the block's embedded context to move to the next view. If a Byzantine
840        // proposer embedded a malicious context, the f+1 honest validators from the notarizing quorum
841        // will verify against the proper context and reject the mismatch, preventing a 2f+1
842        // finalization quorum.
843        //
844        // Subscribe to the block and verify using its embedded context once available.
845        debug!(
846            ?round,
847            ?payload,
848            "subscribing to block for certification using embedded context"
849        );
850        let block_rx = self
851            .marshal
852            .subscribe_by_commitment(Some(round), payload)
853            .await;
854        let mut marshaled = self.clone();
855        let shards = self.shards.clone();
856        let (mut tx, rx) = oneshot::channel();
857        self.context
858            .with_label("certify")
859            .with_attribute("round", round)
860            .spawn(move |_| async move {
861                let block = select! {
862                    _ = tx.closed() => {
863                        debug!(
864                            reason = "consensus dropped receiver",
865                            "skipping certification"
866                        );
867                        return;
868                    },
869                    result = block_rx => match result {
870                        Ok(block) => block,
871                        Err(_) => {
872                            debug!(
873                                ?payload,
874                                reason = "failed to fetch block for certification",
875                                "skipping certification"
876                            );
877                            return;
878                        }
879                    },
880                };
881
882                // Re-proposal detection for certify path: we don't have the consensus
883                // context, only the block's embedded context from original proposal.
884                // Infer re-proposal from:
885                // 1. Block is at epoch boundary (only boundary blocks can be re-proposed)
886                // 2. Certification round's view > embedded context's view (re-proposals
887                //    retain their original embedded context, so a later view indicates
888                //    the block was re-proposed)
889                // 3. Same epoch (re-proposals don't cross epoch boundaries)
890                let embedded_context = block.context();
891                let is_reproposal = is_inferred_reproposal_at_certify(
892                    &marshaled.epocher,
893                    block.height(),
894                    embedded_context.round,
895                    round,
896                );
897                if is_reproposal {
898                    // NOTE: It is possible that, during crash recovery, we call
899                    // `marshal.verified` twice for the same block. That function is
900                    // idempotent, so this is safe.
901                    marshaled.marshal.verified(round, block).await;
902                    tx.send_lossy(true);
903                    return;
904                }
905
906                // Inform the shard engine of an externally proposed commitment.
907                shards
908                    .discovered(
909                        payload,
910                        embedded_context.leader.clone(),
911                        embedded_context.round,
912                    )
913                    .await;
914
915                // Use the block's embedded context for verification, passing the
916                // prefetched block to avoid fetching it again inside deferred_verify.
917                let verify_rx = marshaled.deferred_verify(embedded_context, payload, Some(block));
918                if let Ok(result) = verify_rx.await {
919                    tx.send_lossy(result);
920                }
921            });
922        rx
923    }
924}
925
926impl<E, A, B, C, H, Z, S, ES> Relay for Marshaled<E, A, B, C, H, Z, S, ES>
927where
928    E: Rng + Storage + Spawner + Metrics + Clock,
929    A: Application<
930        E,
931        Block = B,
932        Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
933    >,
934    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
935    C: CodingScheme,
936    H: Hasher,
937    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
938    S: Strategy,
939    ES: Epocher,
940{
941    type Digest = Commitment;
942    type PublicKey = <Z::Scheme as CertificateScheme>::PublicKey;
943    type Plan = Plan<Self::PublicKey>;
944
945    async fn broadcast(&mut self, commitment: Self::Digest, plan: Self::Plan) {
946        match plan {
947            Plan::Propose => {
948                let Some((round, block)) = self.last_built.lock().take() else {
949                    warn!("missing block to broadcast");
950                    return;
951                };
952                if block.commitment() != commitment {
953                    warn!(
954                        round = %round,
955                        commitment = %block.commitment(),
956                        height = %block.height(),
957                        "skipping requested broadcast of block with mismatched commitment"
958                    );
959                    return;
960                }
961                debug!(
962                    round = %round,
963                    commitment = %block.commitment(),
964                    height = %block.height(),
965                    "requested broadcast of built block"
966                );
967                self.shards.proposed(round, block).await;
968            }
969            Plan::Forward { .. } => {
970                // Coding variant does not support targeted forwarding;
971                // peers reconstruct blocks from erasure-coded shards.
972                //
973                // TODO(#3389): Support checked data forwarding for PhasedScheme.
974            }
975        }
976    }
977}
978
979impl<E, A, B, C, H, Z, S, ES> Reporter for Marshaled<E, A, B, C, H, Z, S, ES>
980where
981    E: Rng + Storage + Spawner + Metrics + Clock,
982    A: Application<
983            E,
984            Block = B,
985            Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
986        > + Reporter<Activity = Update<B>>,
987    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
988    C: CodingScheme,
989    H: Hasher,
990    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
991    S: Strategy,
992    ES: Epocher,
993{
994    type Activity = A::Activity;
995
996    /// Relays a report to the underlying [`Application`] and cleans up old verification data.
997    async fn report(&mut self, update: Self::Activity) {
998        // Clean up verification tasks and contexts for rounds <= the finalized round.
999        if let Update::Tip(round, _, _) = &update {
1000            self.verification_tasks.retain_after(round);
1001        }
1002        self.application.report(update).await
1003    }
1004}
1005
1006/// Fetches the parent block given its digest and optional round.
1007///
1008/// This is a helper function used during proposal and verification to retrieve the parent
1009/// block. If the parent digest matches the genesis block, it returns the genesis block
1010/// directly without querying the marshal. Otherwise, it subscribes to the marshal to await
1011/// the parent block's availability.
1012///
1013/// `parent_round` is an optional resolver hint. Callers should only provide a hint when
1014/// the source context is trusted/validated. Untrusted paths should pass `None`.
1015///
1016/// Returns an error if the marshal subscription is cancelled.
1017#[allow(clippy::type_complexity)]
1018async fn fetch_parent<E, S, A, B, C, H>(
1019    parent_commitment: Commitment,
1020    parent_round: Option<Round>,
1021    application: &mut A,
1022    marshal: &mut core::Mailbox<S, Coding<B, C, H, S::PublicKey>>,
1023    cached_genesis: Arc<OnceLock<(Commitment, CodedBlock<B, C, H>)>>,
1024) -> Either<Ready<Result<CodedBlock<B, C, H>, RecvError>>, oneshot::Receiver<CodedBlock<B, C, H>>>
1025where
1026    E: Rng + Spawner + Metrics + Clock,
1027    S: CertificateScheme,
1028    A: Application<E, Block = B, Context = Context<Commitment, S::PublicKey>>,
1029    B: CertifiableBlock<Context = Context<Commitment, S::PublicKey>>,
1030    C: CodingScheme,
1031    H: Hasher,
1032{
1033    if cached_genesis.get().is_none() {
1034        let genesis = application.genesis().await;
1035        let genesis_coding_commitment = genesis_coding_commitment::<H, _>(&genesis);
1036        let coded_genesis = CodedBlock::<B, C, H>::new_trusted(genesis, genesis_coding_commitment);
1037        let _ = cached_genesis.set((genesis_coding_commitment, coded_genesis));
1038    }
1039
1040    let (genesis_commitment, coded_genesis) = cached_genesis
1041        .get()
1042        .expect("genesis cache should be initialized");
1043    if parent_commitment == *genesis_commitment {
1044        Either::Left(ready(Ok(coded_genesis.clone())))
1045    } else {
1046        Either::Right(
1047            marshal
1048                .subscribe_by_commitment(parent_round, parent_commitment)
1049                .await,
1050        )
1051    }
1052}
1053
1054/// Constructs the [`Commitment`] for the genesis block.
1055fn genesis_coding_commitment<H: Hasher, B: CertifiableBlock>(block: &B) -> Commitment {
1056    Commitment::from((
1057        block.digest(),
1058        block.digest(),
1059        hash_context::<H, _>(&block.context()),
1060        GENESIS_CODING_CONFIG,
1061    ))
1062}