Skip to main content

commonware_consensus/marshal/coding/
marshaled.rs

1//! Wrapper for consensus applications that handles epochs, erasure coding, and block dissemination.
2//!
3//! # Overview
4//!
5//! [`Marshaled`] is an adapter that wraps any [`VerifyingApplication`] implementation to handle
6//! epoch transitions and erasure coded broadcast automatically. It intercepts consensus
7//! operations (propose, verify, certify) and ensures blocks are only produced within valid epoch boundaries.
8//!
9//! # Epoch Boundaries
10//!
11//! An epoch is a fixed number of blocks (the `epoch_length`). When the last block in an epoch
12//! is reached, this wrapper prevents new blocks from being built & proposed until the next epoch begins.
13//! Instead, it re-proposes the boundary block to avoid producing blocks that would be pruned
14//! by the epoch transition.
15//!
16//! # Erasure Coding
17//!
18//! This wrapper integrates with a variant of marshal that supports erasure coded broadcast. When a leader
19//! proposes a new block, it is automatically erasure encoded and its shards are broadcasted to active
20//! participants. When verifying a proposed block (the precondition for notarization), the wrapper
21//! ensures the commitment's context digest matches the consensus context and subscribes to shard validity
22//! for the shard received by the proposer. If the shard is valid, the local shard is relayed to all
23//! other participants to aid in block reconstruction.
24//!
25//! During certification (the phase between notarization and finalization), the wrapper subscribes to
26//! block reconstruction and validates epoch boundaries, parent commitment, height contiguity, and
27//! that the block's embedded context matches the consensus context before allowing the block to be
28//! certified. If certification fails, the voter can still emit a nullify vote to advance the view.
29//!
30//! # Usage
31//!
32//! Wrap your [`VerifyingApplication`] implementation with [`Marshaled::new`] and provide it to your
33//! consensus engine for the [`Automaton`] and [`Relay`]. The wrapper handles all epoch logic transparently.
34//!
35//! ```rust,ignore
36//! let cfg = MarshaledConfig {
37//!     application: my_application,
38//!     marshal: marshal_mailbox,
39//!     shards: shard_mailbox,
40//!     scheme_provider,
41//!     epocher,
42//!     strategy,
43//! };
44//! let application = Marshaled::new(context, cfg);
45//! ```
46//!
47//! # Implementation Notes
48//!
49//! - Genesis blocks are handled specially: epoch 0 returns the application's genesis block,
50//!   while subsequent epochs use the last block of the previous epoch as genesis
51//! - Blocks are automatically verified to be within the current epoch
52//!
53//! # Notarization and Data Availability
54//!
55//! In rare crash cases, it is possible for a notarization certificate to exist without a block being
56//! available to the honest parties (e.g., if the whole network crashed before receiving `f+1` shards
57//! and the proposer went permanently offline). In this case, `certify` will be unable to fetch the
58//! block before timeout and result in a nullification.
59//!
60//! For this reason, it should not be expected that every notarized payload will be certifiable due
61//! to the lack of an available block. However, if even one honest and online party has the block,
62//! they will attempt to forward it to others via marshal's resolver. This case is already present
63//! in the event of a block that was proposed with invalid codec; Marshal will not be able to reconstruct
64//! the block, and therefore won't serve it.
65//!
66//! ```text
67//!                                      ┌───────────────────────────────────────────────────┐
68//!                                      ▼                                                   │
69//! ┌─────────────────────┐   ┌─────────────────────┐   ┌─────────────────────┐   ┌─────────────────────┐
70//! │          B1         │◀──│          B2         │◀──│          B3         │XXX│          B4         │
71//! └─────────────────────┘   └─────────────────────┘   └──────────┬──────────┘   └─────────────────────┘
72//!                                                                │
73//!                                                          Failed Certify
74//! ```
75
76use crate::{
77    marshal::{
78        ancestry::AncestorStream,
79        application::{
80            validation::{
81                is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, LastBuilt,
82            },
83            verification_tasks::VerificationTasks,
84        },
85        coding::{
86            shards,
87            types::{coding_config_for_participants, hash_context, CodedBlock},
88            validation::{validate_block, validate_proposal, ProposalError},
89            Coding,
90        },
91        core, Update,
92    },
93    simplex::{scheme::Scheme, types::Context},
94    types::{coding::Commitment, Epoch, Epocher, Round},
95    Application, Automaton, Block, CertifiableAutomaton, CertifiableBlock, Epochable, Heightable,
96    Relay, Reporter, VerifyingApplication,
97};
98use commonware_coding::{Config as CodingConfig, Scheme as CodingScheme};
99use commonware_cryptography::{
100    certificate::{Provider, Scheme as CertificateScheme},
101    Committable, Digestible, Hasher,
102};
103use commonware_macros::select;
104use commonware_parallel::Strategy;
105use commonware_runtime::{
106    telemetry::metrics::histogram::{Buckets, Timed},
107    Clock, Metrics, Spawner, Storage,
108};
109use commonware_utils::{
110    channel::{
111        fallible::OneshotExt,
112        oneshot::{self, error::RecvError},
113    },
114    sync::Mutex,
115    NZU16,
116};
117use futures::future::{ready, try_join, Either, Ready};
118use prometheus_client::metrics::histogram::Histogram;
119use rand::Rng;
120use std::sync::{Arc, OnceLock};
121use tracing::{debug, warn};
122
123/// The [`CodingConfig`] used for genesis blocks. These blocks are never broadcasted in
124/// the proposal phase, and thus the configuration is irrelevant.
125const GENESIS_CODING_CONFIG: CodingConfig = CodingConfig {
126    minimum_shards: NZU16!(1),
127    extra_shards: NZU16!(1),
128};
129
130/// Configuration for initializing [`Marshaled`].
131#[allow(clippy::type_complexity)]
132pub struct MarshaledConfig<A, B, C, H, Z, S, ES>
133where
134    B: CertifiableBlock<Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>>,
135    C: CodingScheme,
136    H: Hasher,
137    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
138    S: Strategy,
139    ES: Epocher,
140{
141    /// The underlying application to wrap.
142    pub application: A,
143    /// Mailbox for communicating with the marshal engine.
144    pub marshal:
145        core::Mailbox<Z::Scheme, Coding<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>>,
146    /// Mailbox for communicating with the shards engine.
147    pub shards: shards::Mailbox<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>,
148    /// Provider for signing schemes scoped by epoch.
149    pub scheme_provider: Z,
150    /// Strategy for parallel operations.
151    pub strategy: S,
152    /// Strategy for determining epoch boundaries.
153    pub epocher: ES,
154}
155
156/// An [`Application`] adapter that handles epoch transitions and erasure coded broadcast.
157///
158/// This wrapper intercepts consensus operations to enforce epoch boundaries. It prevents
159/// blocks from being produced outside their valid epoch and handles the special case of
160/// re-proposing boundary blocks during epoch transitions.
161#[derive(Clone)]
162#[allow(clippy::type_complexity)]
163pub struct Marshaled<E, A, B, C, H, Z, S, ES>
164where
165    E: Rng + Storage + Spawner + Metrics + Clock,
166    A: Application<E>,
167    B: CertifiableBlock<Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>>,
168    C: CodingScheme,
169    H: Hasher,
170    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
171    S: Strategy,
172    ES: Epocher,
173{
174    context: E,
175    application: A,
176    marshal: core::Mailbox<Z::Scheme, Coding<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>>,
177    shards: shards::Mailbox<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>,
178    scheme_provider: Z,
179    epocher: ES,
180    strategy: S,
181    last_built: LastBuilt<CodedBlock<B, C, H>>,
182    verification_tasks: VerificationTasks<Commitment>,
183    cached_genesis: Arc<OnceLock<(Commitment, CodedBlock<B, C, H>)>>,
184
185    build_duration: Timed<E>,
186    verify_duration: Timed<E>,
187    proposal_parent_fetch_duration: Timed<E>,
188    erasure_encode_duration: Timed<E>,
189}
190
191impl<E, A, B, C, H, Z, S, ES> Marshaled<E, A, B, C, H, Z, S, ES>
192where
193    E: Rng + Storage + Spawner + Metrics + Clock,
194    A: VerifyingApplication<
195        E,
196        Block = B,
197        SigningScheme = Z::Scheme,
198        Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
199    >,
200    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
201    C: CodingScheme,
202    H: Hasher,
203    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
204    S: Strategy,
205    ES: Epocher,
206{
207    /// Creates a new [`Marshaled`] wrapper.
208    ///
209    /// # Panics
210    ///
211    /// Panics if the marshal metadata store cannot be initialized.
212    pub fn new(context: E, cfg: MarshaledConfig<A, B, C, H, Z, S, ES>) -> Self {
213        let MarshaledConfig {
214            application,
215            marshal,
216            shards,
217            scheme_provider,
218            strategy,
219            epocher,
220        } = cfg;
221
222        let clock = Arc::new(context.clone());
223
224        let build_histogram = Histogram::new(Buckets::LOCAL);
225        context.register(
226            "build_duration",
227            "Histogram of time taken for the application to build a new block, in seconds",
228            build_histogram.clone(),
229        );
230        let build_duration = Timed::new(build_histogram, clock.clone());
231
232        let verify_histogram = Histogram::new(Buckets::LOCAL);
233        context.register(
234            "verify_duration",
235            "Histogram of time taken for the application to verify a block, in seconds",
236            verify_histogram.clone(),
237        );
238        let verify_duration = Timed::new(verify_histogram, clock.clone());
239
240        let parent_fetch_histogram = Histogram::new(Buckets::LOCAL);
241        context.register(
242            "parent_fetch_duration",
243            "Histogram of time taken to fetch a parent block in proposal, in seconds",
244            parent_fetch_histogram.clone(),
245        );
246        let proposal_parent_fetch_duration = Timed::new(parent_fetch_histogram, clock.clone());
247
248        let erasure_histogram = Histogram::new(Buckets::LOCAL);
249        context.register(
250            "erasure_encode_duration",
251            "Histogram of time taken to erasure encode a block, in seconds",
252            erasure_histogram.clone(),
253        );
254        let erasure_encode_duration = Timed::new(erasure_histogram, clock);
255
256        Self {
257            context,
258            application,
259            marshal,
260            shards,
261            scheme_provider,
262            strategy,
263            epocher,
264            last_built: Arc::new(Mutex::new(None)),
265            verification_tasks: VerificationTasks::new(),
266            cached_genesis: Arc::new(OnceLock::new()),
267
268            build_duration,
269            verify_duration,
270            proposal_parent_fetch_duration,
271            erasure_encode_duration,
272        }
273    }
274
275    /// Verifies a proposed block within epoch boundaries.
276    ///
277    /// This method validates that:
278    /// 1. The block is within the current epoch (unless it's a boundary block re-proposal)
279    /// 2. Re-proposals are only allowed for the last block in an epoch
280    /// 3. The block's parent digest matches the consensus context's expected parent
281    /// 4. The block's height is exactly one greater than the parent's height
282    /// 5. The block's embedded context digest matches the commitment
283    /// 6. The block's embedded context matches the consensus context
284    /// 7. The underlying application's verification logic passes
285    ///
286    /// Verification is spawned in a background task and returns a receiver that will contain
287    /// the verification result.
288    ///
289    /// If `prefetched_block` is provided, it will be used directly instead of fetching from
290    /// the marshal. This is useful in `certify` when we've already fetched the block to
291    /// extract its embedded context.
292    fn deferred_verify(
293        &mut self,
294        consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
295        commitment: Commitment,
296        prefetched_block: Option<CodedBlock<B, C, H>>,
297    ) -> oneshot::Receiver<bool> {
298        let mut marshal = self.marshal.clone();
299        let mut application = self.application.clone();
300        let epocher = self.epocher.clone();
301        let verify_duration = self.verify_duration.clone();
302        let cached_genesis = self.cached_genesis.clone();
303
304        let (mut tx, rx) = oneshot::channel();
305        self.context
306            .with_label("deferred_verify")
307            .with_attribute("round", consensus_context.round)
308            .spawn(move |runtime_context| async move {
309                let round = consensus_context.round;
310
311                // Fetch parent block
312                let (parent_view, parent_commitment) = consensus_context.parent;
313                let parent_request = fetch_parent(
314                    parent_commitment,
315                    // We are guaranteed that the parent round for any `consensus_context` is
316                    // in the same epoch (recall, the boundary block of the previous epoch
317                    // is the genesis block of the current epoch).
318                    Some(Round::new(consensus_context.epoch(), parent_view)),
319                    &mut application,
320                    &mut marshal,
321                    cached_genesis,
322                )
323                .await;
324
325                // Get block either from prefetched or by subscribing
326                let (parent, block) = if let Some(block) = prefetched_block {
327                    // We have a prefetched block, just fetch the parent
328                    let parent = select! {
329                        _ = tx.closed() => {
330                            debug!(
331                                reason = "consensus dropped receiver",
332                                "skipping verification"
333                            );
334                            return;
335                        },
336                        result = parent_request => match result {
337                            Ok(parent) => parent,
338                            Err(_) => {
339                                debug!(reason = "failed to fetch parent", "skipping verification");
340                                return;
341                            }
342                        },
343                    };
344                    (parent, block)
345                } else {
346                    // No prefetched block, fetch both parent and block
347                    let block_request = marshal
348                        .subscribe_by_commitment(Some(round), commitment)
349                        .await;
350                    let block_requests = try_join(parent_request, block_request);
351
352                    select! {
353                        _ = tx.closed() => {
354                            debug!(
355                                reason = "consensus dropped receiver",
356                                "skipping verification"
357                            );
358                            return;
359                        },
360                        result = block_requests => match result {
361                            Ok(results) => results,
362                            Err(_) => {
363                                debug!(
364                                    reason = "failed to fetch parent or block",
365                                    "skipping verification"
366                                );
367                                return;
368                            }
369                        },
370                    }
371                };
372
373                if let Err(err) = validate_block::<H, _, _>(
374                    &epocher,
375                    &block,
376                    &parent,
377                    &consensus_context,
378                    commitment,
379                    parent_commitment,
380                ) {
381                    debug!(
382                        ?err,
383                        expected_commitment = %commitment,
384                        block_commitment = %block.commitment(),
385                        expected_parent_commitment = %parent_commitment,
386                        parent_commitment = %parent.commitment(),
387                        expected_parent = %parent.digest(),
388                        block_parent = %block.parent(),
389                        parent_height = %parent.height(),
390                        block_height = %block.height(),
391                        "block failed coded invariant validation"
392                    );
393                    tx.send_lossy(false);
394                    return;
395                }
396
397                let ancestry_stream = AncestorStream::new(
398                    marshal.clone(),
399                    [block.clone().into_inner(), parent.into_inner()],
400                );
401                let validity_request = application.verify(
402                    (
403                        runtime_context.with_label("app_verify"),
404                        consensus_context.clone(),
405                    ),
406                    ancestry_stream,
407                );
408
409                // If consensus drops the receiver, we can stop work early.
410                let mut timer = verify_duration.timer();
411                let application_valid = select! {
412                    _ = tx.closed() => {
413                        debug!(
414                            reason = "consensus dropped receiver",
415                            "skipping verification"
416                        );
417                        return;
418                    },
419                    is_valid = validity_request => is_valid,
420                };
421                timer.observe();
422                if application_valid {
423                    // The block is only persisted at this point.
424                    marshal.verified(round, block).await;
425                }
426                tx.send_lossy(application_valid);
427            });
428
429        rx
430    }
431}
432
433impl<E, A, B, C, H, Z, S, ES> Automaton for Marshaled<E, A, B, C, H, Z, S, ES>
434where
435    E: Rng + Storage + Spawner + Metrics + Clock,
436    A: VerifyingApplication<
437        E,
438        Block = B,
439        SigningScheme = Z::Scheme,
440        Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
441    >,
442    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
443    C: CodingScheme,
444    H: Hasher,
445    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
446    S: Strategy,
447    ES: Epocher,
448{
449    type Digest = Commitment;
450    type Context = Context<Self::Digest, <Z::Scheme as CertificateScheme>::PublicKey>;
451
452    /// Returns the genesis digest for a given epoch.
453    ///
454    /// For epoch 0, this returns the application's genesis block digest. For subsequent
455    /// epochs, it returns the digest of the last block from the previous epoch, which
456    /// serves as the genesis block for the new epoch.
457    ///
458    /// # Panics
459    ///
460    /// Panics if a non-zero epoch is requested but the previous epoch's final block is not
461    /// available in storage. This indicates a critical error in the consensus engine startup
462    /// sequence, as engines must always have the genesis block before starting.
463    async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
464        let Some(previous_epoch) = epoch.previous() else {
465            let genesis_block = self.application.genesis().await;
466            return genesis_coding_commitment::<H, _>(&genesis_block);
467        };
468
469        let last_height = self
470            .epocher
471            .last(previous_epoch)
472            .expect("previous epoch should exist");
473        let Some(block) = self.marshal.get_block(last_height).await else {
474            // A new consensus engine will never be started without having the genesis block
475            // of the new epoch (the last block of the previous epoch) already stored.
476            unreachable!("missing starting epoch block at height {last_height}");
477        };
478        block.commitment()
479    }
480
481    /// Proposes a new block or re-proposes the epoch boundary block.
482    ///
483    /// This method builds a new block from the underlying application unless the parent block
484    /// is the last block in the current epoch. When at an epoch boundary, it re-proposes the
485    /// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
486    ///
487    /// The proposal operation is spawned in a background task and returns a receiver that will
488    /// contain the proposed block's digest when ready. The built block is cached for later
489    /// broadcasting.
490    async fn propose(
491        &mut self,
492        consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
493    ) -> oneshot::Receiver<Self::Digest> {
494        let mut marshal = self.marshal.clone();
495        let mut application = self.application.clone();
496        let last_built = self.last_built.clone();
497        let epocher = self.epocher.clone();
498        let strategy = self.strategy.clone();
499        let cached_genesis = self.cached_genesis.clone();
500
501        // If there's no scheme for the current epoch, we cannot verify the proposal.
502        // Send back a receiver with a dropped sender.
503        let Some(scheme) = self.scheme_provider.scoped(consensus_context.epoch()) else {
504            let (_, rx) = oneshot::channel();
505            return rx;
506        };
507
508        let n_participants =
509            u16::try_from(scheme.participants().len()).expect("too many participants");
510        let coding_config = coding_config_for_participants(n_participants);
511
512        // Metrics
513        let build_duration = self.build_duration.clone();
514        let proposal_parent_fetch_duration = self.proposal_parent_fetch_duration.clone();
515        let erasure_encode_duration = self.erasure_encode_duration.clone();
516
517        let (mut tx, rx) = oneshot::channel();
518        self.context
519            .with_label("propose")
520            .with_attribute("round", consensus_context.round)
521            .spawn(move |runtime_context| async move {
522                let (parent_view, parent_commitment) = consensus_context.parent;
523                let parent_request = fetch_parent(
524                    parent_commitment,
525                    // We are guaranteed that the parent round for any `consensus_context` is
526                    // in the same epoch (recall, the boundary block of the previous epoch
527                    // is the genesis block of the current epoch).
528                    Some(Round::new(consensus_context.epoch(), parent_view)),
529                    &mut application,
530                    &mut marshal,
531                    cached_genesis,
532                )
533                .await;
534
535                let mut parent_timer = proposal_parent_fetch_duration.timer();
536                let parent = select! {
537                    _ = tx.closed() => {
538                        debug!(reason = "consensus dropped receiver", "skipping proposal");
539                        return;
540                    },
541                    result = parent_request => match result {
542                        Ok(parent) => parent,
543                        Err(_) => {
544                            debug!(
545                                ?parent_commitment,
546                                reason = "failed to fetch parent block",
547                                "skipping proposal"
548                            );
549                            return;
550                        }
551                    },
552                };
553                parent_timer.observe();
554
555                // Special case: If the parent block is the last block in the epoch,
556                // re-propose it as to not produce any blocks that will be cut out
557                // by the epoch transition.
558                let last_in_epoch = epocher
559                    .last(consensus_context.epoch())
560                    .expect("current epoch should exist");
561                if parent.height() == last_in_epoch {
562                    let commitment = parent.commitment();
563                    {
564                        let mut lock = last_built.lock();
565                        *lock = Some((consensus_context.round, parent));
566                    }
567
568                    let success = tx.send_lossy(commitment);
569                    debug!(
570                        round = ?consensus_context.round,
571                        ?commitment,
572                        success,
573                        "re-proposed parent block at epoch boundary"
574                    );
575                    return;
576                }
577
578                let ancestor_stream = AncestorStream::new(marshal.clone(), [parent.into_inner()]);
579                let build_request = application.propose(
580                    (
581                        runtime_context.with_label("app_propose"),
582                        consensus_context.clone(),
583                    ),
584                    ancestor_stream,
585                );
586
587                let mut build_timer = build_duration.timer();
588                let built_block = select! {
589                    _ = tx.closed() => {
590                        debug!(reason = "consensus dropped receiver", "skipping proposal");
591                        return;
592                    },
593                    result = build_request => match result {
594                        Some(block) => block,
595                        None => {
596                            debug!(
597                                ?parent_commitment,
598                                reason = "block building failed",
599                                "skipping proposal"
600                            );
601                            return;
602                        }
603                    },
604                };
605                build_timer.observe();
606
607                let mut erasure_timer = erasure_encode_duration.timer();
608                let coded_block = CodedBlock::<B, C, H>::new(built_block, coding_config, &strategy);
609                erasure_timer.observe();
610
611                let commitment = coded_block.commitment();
612                {
613                    let mut lock = last_built.lock();
614                    *lock = Some((consensus_context.round, coded_block));
615                }
616
617                let success = tx.send_lossy(commitment);
618                debug!(
619                    round = ?consensus_context.round,
620                    ?commitment,
621                    success,
622                    "proposed new block"
623                );
624            });
625        rx
626    }
627
628    /// Verifies a received shard for a given round.
629    ///
630    /// This method validates that:
631    /// 1. The coding configuration matches the expected configuration for the current scheme.
632    /// 2. The commitment's context digest matches the consensus context (unless this is a re-proposal).
633    /// 3. The shard is contained within the consensus commitment.
634    ///
635    /// Verification is spawned in a background task and returns a receiver that will contain
636    /// the verification result. Additionally, this method kicks off deferred verification to
637    /// start block verification early (hidden behind shard validity and network latency).
638    async fn verify(
639        &mut self,
640        consensus_context: Context<Self::Digest, <Z::Scheme as CertificateScheme>::PublicKey>,
641        payload: Self::Digest,
642    ) -> oneshot::Receiver<bool> {
643        // If there's no scheme for the current epoch, we cannot vote on the proposal.
644        // Send back a receiver with a dropped sender.
645        let Some(scheme) = self.scheme_provider.scoped(consensus_context.epoch()) else {
646            let (_, rx) = oneshot::channel();
647            return rx;
648        };
649
650        let n_participants =
651            u16::try_from(scheme.participants().len()).expect("too many participants");
652        let coding_config = coding_config_for_participants(n_participants);
653        let is_reproposal = payload == consensus_context.parent.1;
654
655        // Validate proposal-level invariants:
656        // - coding config must match active participant set
657        // - context digest must match unless this is a re-proposal
658        let proposal_context = (!is_reproposal).then_some(&consensus_context);
659        if let Err(err) = validate_proposal::<H, _>(payload, coding_config, proposal_context) {
660            match err {
661                ProposalError::CodingConfig => {
662                    warn!(
663                        round = %consensus_context.round,
664                        got = ?payload.config(),
665                        expected = ?coding_config,
666                        "rejected proposal with unexpected coding configuration"
667                    );
668                }
669                ProposalError::ContextDigest => {
670                    let expected = hash_context::<H, _>(&consensus_context);
671                    let got = payload.context::<H::Digest>();
672                    warn!(
673                        round = %consensus_context.round,
674                        expected = ?expected,
675                        got = ?got,
676                        "rejected proposal with mismatched context digest"
677                    );
678                }
679            }
680
681            let (tx, rx) = oneshot::channel();
682            tx.send_lossy(false);
683            return rx;
684        }
685
686        // Re-proposals skip context-digest validation because the consensus context will point
687        // at the prior epoch-boundary block while the embedded block context is from the
688        // original proposal view.
689        //
690        // Re-proposals also skip shard-validity and deferred verification because:
691        // 1. The block was already verified when originally proposed
692        // 2. The parent-child height check would fail (parent IS the block)
693        // 3. Waiting for shards could stall if the leader doesn't rebroadcast
694        if is_reproposal {
695            // Fetch the block to verify it's at the epoch boundary.
696            // This should be fast since the parent block is typically already cached.
697            let block_rx = self
698                .marshal
699                .subscribe_by_commitment(Some(consensus_context.round), payload)
700                .await;
701            let marshal = self.marshal.clone();
702            let epocher = self.epocher.clone();
703            let round = consensus_context.round;
704            let verification_tasks = self.verification_tasks.clone();
705
706            // Register a verification task synchronously before spawning work so
707            // `certify` can always find it (no race with task startup).
708            let (task_tx, task_rx) = oneshot::channel();
709            verification_tasks.insert(round, payload, task_rx);
710
711            let (mut tx, rx) = oneshot::channel();
712            self.context
713                .with_label("verify_reproposal")
714                .spawn(move |_| async move {
715                    let block = select! {
716                        _ = tx.closed() => {
717                            debug!(
718                                reason = "consensus dropped receiver",
719                                "skipping re-proposal verification"
720                            );
721                            return;
722                        },
723                        block = block_rx => match block {
724                            Ok(block) => block,
725                            Err(_) => {
726                                debug!(
727                                    ?payload,
728                                    reason = "failed to fetch block for re-proposal verification",
729                                    "skipping re-proposal verification"
730                                );
731                                // Fetch failure is an availability issue, not an explicit
732                                // invalidity proof. Do not synthesize `false` here.
733                                return;
734                            }
735                        },
736                    };
737
738                    if !is_valid_reproposal_at_verify(&epocher, block.height(), round.epoch()) {
739                        debug!(
740                            height = %block.height(),
741                            "re-proposal is not at epoch boundary"
742                        );
743                        task_tx.send_lossy(false);
744                        tx.send_lossy(false);
745                        return;
746                    }
747
748                    // Valid re-proposal. Notify the marshal and complete the
749                    // verification task for `certify`.
750                    marshal.verified(round, block).await;
751                    task_tx.send_lossy(true);
752                    tx.send_lossy(true);
753                });
754            return rx;
755        }
756
757        // Inform the shard engine of an externally proposed commitment.
758        self.shards
759            .discovered(
760                payload,
761                consensus_context.leader.clone(),
762                consensus_context.round,
763            )
764            .await;
765
766        // Kick off deferred verification early to hide verification latency behind
767        // shard validity checks and network latency for collecting votes.
768        let round = consensus_context.round;
769        let task = self.deferred_verify(consensus_context, payload, None);
770        self.verification_tasks.insert(round, payload, task);
771
772        match scheme.me() {
773            Some(_) => {
774                // Subscribe to shard validity. The subscription completes when a valid shard arrives.
775                let validity_rx = self.shards.subscribe_shard(payload).await;
776                let (tx, rx) = oneshot::channel();
777                self.context
778                    .with_label("shard_validity_wait")
779                    .spawn(|_| async move {
780                        if validity_rx.await.is_ok() {
781                            tx.send_lossy(true);
782                        }
783                    });
784                rx
785            }
786            None => {
787                // If we are not participating, there's no shard to verify; just accept the proposal.
788                //
789                // Later, when certifying, we will wait to receive the block from the network.
790                let (tx, rx) = oneshot::channel();
791                tx.send_lossy(true);
792                rx
793            }
794        }
795    }
796}
797
798impl<E, A, B, C, H, Z, S, ES> CertifiableAutomaton for Marshaled<E, A, B, C, H, Z, S, ES>
799where
800    E: Rng + Storage + Spawner + Metrics + Clock,
801    A: VerifyingApplication<
802        E,
803        Block = B,
804        SigningScheme = Z::Scheme,
805        Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
806    >,
807    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
808    C: CodingScheme,
809    H: Hasher,
810    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
811    S: Strategy,
812    ES: Epocher,
813{
814    async fn certify(&mut self, round: Round, payload: Self::Digest) -> oneshot::Receiver<bool> {
815        // First, check for an in-progress verification task from `verify()`.
816        let task = self.verification_tasks.take(round, payload);
817        if let Some(task) = task {
818            return task;
819        }
820
821        // No in-progress task means we never verified this proposal locally.
822        // We can use the block's embedded context to move to the next view. If a Byzantine
823        // proposer embedded a malicious context, the f+1 honest validators from the notarizing quorum
824        // will verify against the proper context and reject the mismatch, preventing a 2f+1
825        // finalization quorum.
826        //
827        // Subscribe to the block and verify using its embedded context once available.
828        debug!(
829            ?round,
830            ?payload,
831            "subscribing to block for certification using embedded context"
832        );
833        let block_rx = self
834            .marshal
835            .subscribe_by_commitment(Some(round), payload)
836            .await;
837        let mut marshaled = self.clone();
838        let shards = self.shards.clone();
839        let (mut tx, rx) = oneshot::channel();
840        self.context
841            .with_label("certify")
842            .with_attribute("round", round)
843            .spawn(move |_| async move {
844                let block = select! {
845                    _ = tx.closed() => {
846                        debug!(
847                            reason = "consensus dropped receiver",
848                            "skipping certification"
849                        );
850                        return;
851                    },
852                    result = block_rx => match result {
853                        Ok(block) => block,
854                        Err(_) => {
855                            debug!(
856                                ?payload,
857                                reason = "failed to fetch block for certification",
858                                "skipping certification"
859                            );
860                            return;
861                        }
862                    },
863                };
864
865                // Re-proposal detection for certify path: we don't have the consensus
866                // context, only the block's embedded context from original proposal.
867                // Infer re-proposal from:
868                // 1. Block is at epoch boundary (only boundary blocks can be re-proposed)
869                // 2. Certification round's view > embedded context's view (re-proposals
870                //    retain their original embedded context, so a later view indicates
871                //    the block was re-proposed)
872                // 3. Same epoch (re-proposals don't cross epoch boundaries)
873                let embedded_context = block.context();
874                let is_reproposal = is_inferred_reproposal_at_certify(
875                    &marshaled.epocher,
876                    block.height(),
877                    embedded_context.round,
878                    round,
879                );
880                if is_reproposal {
881                    // NOTE: It is possible that, during crash recovery, we call
882                    // `marshal.verified` twice for the same block. That function is
883                    // idempotent, so this is safe.
884                    marshaled.marshal.verified(round, block).await;
885                    tx.send_lossy(true);
886                    return;
887                }
888
889                // Inform the shard engine of an externally proposed commitment.
890                shards
891                    .discovered(
892                        payload,
893                        embedded_context.leader.clone(),
894                        embedded_context.round,
895                    )
896                    .await;
897
898                // Use the block's embedded context for verification, passing the
899                // prefetched block to avoid fetching it again inside deferred_verify.
900                let verify_rx = marshaled.deferred_verify(embedded_context, payload, Some(block));
901                if let Ok(result) = verify_rx.await {
902                    tx.send_lossy(result);
903                }
904            });
905        rx
906    }
907}
908
909impl<E, A, B, C, H, Z, S, ES> Relay for Marshaled<E, A, B, C, H, Z, S, ES>
910where
911    E: Rng + Storage + Spawner + Metrics + Clock,
912    A: Application<
913        E,
914        Block = B,
915        Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
916    >,
917    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
918    C: CodingScheme,
919    H: Hasher,
920    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
921    S: Strategy,
922    ES: Epocher,
923{
924    type Digest = Commitment;
925
926    /// Broadcasts a previously built block to the network.
927    ///
928    /// This uses the cached block from the last proposal operation. If no block was built or
929    /// the digest does not match the cached block, the broadcast is skipped with a warning.
930    async fn broadcast(&mut self, commitment: Self::Digest) {
931        let Some((round, block)) = self.last_built.lock().take() else {
932            warn!("missing block to broadcast");
933            return;
934        };
935
936        if block.commitment() != commitment {
937            warn!(
938                round = %round,
939                commitment = %block.commitment(),
940                height = %block.height(),
941                "skipping requested broadcast of block with mismatched commitment"
942            );
943            return;
944        }
945
946        debug!(
947            round = %round,
948            commitment = %block.commitment(),
949            height = %block.height(),
950            "requested broadcast of built block"
951        );
952
953        self.shards.proposed(round, block).await;
954    }
955}
956
957impl<E, A, B, C, H, Z, S, ES> Reporter for Marshaled<E, A, B, C, H, Z, S, ES>
958where
959    E: Rng + Storage + Spawner + Metrics + Clock,
960    A: Application<
961            E,
962            Block = B,
963            Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
964        > + Reporter<Activity = Update<B>>,
965    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
966    C: CodingScheme,
967    H: Hasher,
968    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
969    S: Strategy,
970    ES: Epocher,
971{
972    type Activity = A::Activity;
973
974    /// Relays a report to the underlying [`Application`] and cleans up old verification data.
975    async fn report(&mut self, update: Self::Activity) {
976        // Clean up verification tasks and contexts for rounds <= the finalized round.
977        if let Update::Tip(round, _, _) = &update {
978            self.verification_tasks.retain_after(round);
979        }
980        self.application.report(update).await
981    }
982}
983
984/// Fetches the parent block given its digest and optional round.
985///
986/// This is a helper function used during proposal and verification to retrieve the parent
987/// block. If the parent digest matches the genesis block, it returns the genesis block
988/// directly without querying the marshal. Otherwise, it subscribes to the marshal to await
989/// the parent block's availability.
990///
991/// `parent_round` is an optional resolver hint. Callers should only provide a hint when
992/// the source context is trusted/validated. Untrusted paths should pass `None`.
993///
994/// Returns an error if the marshal subscription is cancelled.
995#[allow(clippy::type_complexity)]
996async fn fetch_parent<E, S, A, B, C, H>(
997    parent_commitment: Commitment,
998    parent_round: Option<Round>,
999    application: &mut A,
1000    marshal: &mut core::Mailbox<S, Coding<B, C, H, S::PublicKey>>,
1001    cached_genesis: Arc<OnceLock<(Commitment, CodedBlock<B, C, H>)>>,
1002) -> Either<Ready<Result<CodedBlock<B, C, H>, RecvError>>, oneshot::Receiver<CodedBlock<B, C, H>>>
1003where
1004    E: Rng + Spawner + Metrics + Clock,
1005    S: CertificateScheme,
1006    A: Application<E, Block = B, Context = Context<Commitment, S::PublicKey>>,
1007    B: CertifiableBlock<Context = Context<Commitment, S::PublicKey>>,
1008    C: CodingScheme,
1009    H: Hasher,
1010{
1011    if cached_genesis.get().is_none() {
1012        let genesis = application.genesis().await;
1013        let genesis_coding_commitment = genesis_coding_commitment::<H, _>(&genesis);
1014        let coded_genesis = CodedBlock::<B, C, H>::new_trusted(genesis, genesis_coding_commitment);
1015        let _ = cached_genesis.set((genesis_coding_commitment, coded_genesis));
1016    }
1017
1018    let (genesis_commitment, coded_genesis) = cached_genesis
1019        .get()
1020        .expect("genesis cache should be initialized");
1021    if parent_commitment == *genesis_commitment {
1022        Either::Left(ready(Ok(coded_genesis.clone())))
1023    } else {
1024        Either::Right(
1025            marshal
1026                .subscribe_by_commitment(parent_round, parent_commitment)
1027                .await,
1028        )
1029    }
1030}
1031
1032/// Constructs the [`Commitment`] for the genesis block.
1033fn genesis_coding_commitment<H: Hasher, B: CertifiableBlock>(block: &B) -> Commitment {
1034    Commitment::from((
1035        block.digest(),
1036        block.digest(),
1037        hash_context::<H, _>(&block.context()),
1038        GENESIS_CODING_CONFIG,
1039    ))
1040}