Skip to main content

commonware_consensus/marshal/coding/
marshaled.rs

1//! Wrapper for consensus applications that handles epochs, erasure coding, and block dissemination.
2//!
3//! # Overview
4//!
5//! [`Marshaled`] is an adapter that wraps any [`Application`] implementation to handle
6//! epoch transitions and erasure coded broadcast automatically. It intercepts consensus
7//! operations (propose, verify, certify) and ensures blocks are only produced within valid epoch boundaries.
8//!
9//! # Epoch Boundaries
10//!
11//! An epoch is a fixed number of blocks (the `epoch_length`). When the last block in an epoch
12//! is reached, this wrapper prevents new blocks from being built & proposed until the next epoch begins.
13//! Instead, it re-proposes the boundary block to avoid producing blocks that would be pruned
14//! by the epoch transition.
15//!
16//! # Erasure Coding
17//!
18//! This wrapper integrates with a variant of marshal that supports erasure coded broadcast. When a leader
19//! proposes a new block, it is automatically erasure encoded and its shards are broadcasted to active
20//! participants. When verifying a proposed block (the precondition for notarization), the wrapper
21//! ensures the commitment's context digest matches the consensus context and waits for validation of
22//! the shard assigned to this participant by the proposer. If that shard is valid, the assigned shard is
23//! relayed to all other participants to aid in block reconstruction.
24//!
25//! A participant may still reconstruct the full block from gossiped shards before its designated
26//! leader-delivered shard arrives. That is sufficient for later certification and repair flows, but it
27//! is not treated as notarization readiness: a participant only helps form a notarization once it has
28//! validated the shard it is supposed to echo.
29//!
30//! During certification (the phase between notarization and finalization), the wrapper subscribes to
31//! block reconstruction and validates epoch boundaries, parent commitment, height contiguity, and
32//! that the block's embedded context matches the consensus context before allowing the block to be
33//! certified. If certification fails, the voter can still emit a nullify vote to advance the view.
34//!
35//! # Usage
36//!
37//! Wrap your [`Application`] implementation with [`Marshaled::new`] and provide it to your
38//! consensus engine for the [`Automaton`] and [`Relay`]. The wrapper handles all epoch logic transparently.
39//!
40//! ```rust,ignore
41//! let cfg = MarshaledConfig {
42//!     application: my_application,
43//!     marshal: marshal_mailbox,
44//!     shards: shard_mailbox,
45//!     scheme_provider,
46//!     epocher,
47//!     strategy,
48//! };
49//! let application = Marshaled::new(context, cfg);
50//! ```
51//!
52//! # Implementation Notes
53//!
54//! - Genesis blocks are handled specially: epoch 0 returns the application's genesis block,
55//!   while subsequent epochs use the last block of the previous epoch as genesis
56//! - Blocks are automatically verified to be within the current epoch
57//!
58//! # Notarization and Data Availability
59//!
60//! In rare crash cases, it is possible for a notarization certificate to exist without a block being
61//! available to the honest parties (e.g., if the whole network crashed before receiving `f+1` shards
62//! and the proposer went permanently offline). In this case, `certify` will be unable to fetch the
63//! block before timeout and result in a nullification.
64//!
65//! For this reason, it should not be expected that every notarized payload will be certifiable due
66//! to the lack of an available block. However, if even one honest and online party has the block,
67//! they will attempt to forward it to others via marshal's resolver. This case is already present
68//! in the event of a block that was proposed with invalid codec; Marshal will not be able to reconstruct
69//! the block, and therefore won't serve it.
70//!
71//! ```text
72//!                                      ┌───────────────────────────────────────────────────┐
73//!                                      ▼                                                   │
74//! ┌─────────────────────┐   ┌─────────────────────┐   ┌─────────────────────┐   ┌─────────────────────┐
75//! │          B1         │◀──│          B2         │◀──│          B3         │XXX│          B4         │
76//! └─────────────────────┘   └─────────────────────┘   └──────────┬──────────┘   └─────────────────────┘
77//!                                                                │
78//!                                                          Failed Certify
79//! ```
80
81use crate::{
82    marshal::{
83        application::{
84            validation::{is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, Stage},
85            verification_tasks::VerificationTasks,
86        },
87        coding::{
88            shards,
89            types::{coding_config_for_participants, hash_context, CodedBlock},
90            validation::{validate_block, validate_proposal, ProposalError},
91            Coding,
92        },
93        core, Update,
94    },
95    simplex::{scheme::Scheme, types::Context, Plan},
96    types::{coding::Commitment, Epoch, Epocher, Round},
97    Application, Automaton, Block, CertifiableAutomaton, CertifiableBlock, Epochable, Heightable,
98    Relay, Reporter,
99};
100use commonware_actor::Feedback;
101use commonware_coding::Scheme as CodingScheme;
102use commonware_cryptography::{
103    certificate::{Provider, Scheme as CertificateScheme},
104    Committable, Digestible, Hasher,
105};
106use commonware_macros::select;
107use commonware_p2p::Recipients;
108use commonware_parallel::Strategy;
109use commonware_runtime::{
110    telemetry::metrics::{
111        histogram::{Buckets, Timed},
112        MetricsExt as _,
113    },
114    Clock, Metrics, Spawner, Storage,
115};
116use commonware_utils::{
117    channel::{fallible::OneshotExt, oneshot},
118    sync::AsyncMutex,
119};
120use rand::Rng;
121use std::sync::Arc;
122use tracing::{debug, warn};
123
124/// Configuration for initializing [`Marshaled`].
125#[allow(clippy::type_complexity)]
126pub struct MarshaledConfig<A, B, C, H, Z, S, ES>
127where
128    B: CertifiableBlock<Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>>,
129    C: CodingScheme,
130    H: Hasher,
131    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
132    S: Strategy,
133    ES: Epocher,
134{
135    /// The underlying application to wrap.
136    pub application: A,
137    /// Mailbox for communicating with the marshal engine.
138    pub marshal:
139        core::Mailbox<Z::Scheme, Coding<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>>,
140    /// Mailbox for communicating with the shards engine.
141    pub shards: shards::Mailbox<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>,
142    /// Provider for signing schemes scoped by epoch.
143    pub scheme_provider: Z,
144    /// Strategy for parallel operations.
145    pub strategy: S,
146    /// Strategy for determining epoch boundaries.
147    pub epocher: ES,
148}
149
150/// An [`Application`] adapter that handles epoch transitions and erasure coded broadcast.
151///
152/// This wrapper intercepts consensus operations to enforce epoch boundaries. It prevents
153/// blocks from being produced outside their valid epoch and handles the special case of
154/// re-proposing boundary blocks during epoch transitions.
155#[allow(clippy::type_complexity)]
156pub struct Marshaled<E, A, B, C, H, Z, S, ES>
157where
158    E: Rng + Storage + Spawner + Metrics + Clock,
159    A: Application<E>,
160    B: CertifiableBlock<Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>>,
161    C: CodingScheme,
162    H: Hasher,
163    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
164    S: Strategy,
165    ES: Epocher,
166{
167    context: Arc<AsyncMutex<E>>,
168    application: A,
169    marshal: core::Mailbox<Z::Scheme, Coding<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>>,
170    shards: shards::Mailbox<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>,
171    scheme_provider: Z,
172    epocher: ES,
173    strategy: S,
174    verification_tasks: VerificationTasks<Commitment>,
175
176    build_duration: Timed,
177    verify_duration: Timed,
178    proposal_parent_fetch_duration: Timed,
179    ancestor_fetch_duration: Timed,
180    erasure_encode_duration: Timed,
181}
182
183impl<E, A, B, C, H, Z, S, ES> Clone for Marshaled<E, A, B, C, H, Z, S, ES>
184where
185    E: Rng + Storage + Spawner + Metrics + Clock,
186    A: Application<E>,
187    B: CertifiableBlock<Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>>,
188    C: CodingScheme,
189    H: Hasher,
190    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
191    S: Strategy,
192    ES: Epocher,
193{
194    fn clone(&self) -> Self {
195        Self {
196            context: self.context.clone(),
197            application: self.application.clone(),
198            marshal: self.marshal.clone(),
199            shards: self.shards.clone(),
200            scheme_provider: self.scheme_provider.clone(),
201            epocher: self.epocher.clone(),
202            strategy: self.strategy.clone(),
203            verification_tasks: self.verification_tasks.clone(),
204            build_duration: self.build_duration.clone(),
205            verify_duration: self.verify_duration.clone(),
206            proposal_parent_fetch_duration: self.proposal_parent_fetch_duration.clone(),
207            ancestor_fetch_duration: self.ancestor_fetch_duration.clone(),
208            erasure_encode_duration: self.erasure_encode_duration.clone(),
209        }
210    }
211}
212
213impl<E, A, B, C, H, Z, S, ES> Marshaled<E, A, B, C, H, Z, S, ES>
214where
215    E: Rng + Storage + Spawner + Metrics + Clock,
216    A: Application<
217        E,
218        Block = B,
219        SigningScheme = Z::Scheme,
220        Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
221    >,
222    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
223    C: CodingScheme,
224    H: Hasher,
225    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
226    S: Strategy,
227    ES: Epocher,
228{
229    /// Creates a new [`Marshaled`] wrapper.
230    ///
231    /// # Panics
232    ///
233    /// Panics if the marshal metadata store cannot be initialized.
234    pub fn new(context: E, cfg: MarshaledConfig<A, B, C, H, Z, S, ES>) -> Self {
235        let MarshaledConfig {
236            application,
237            marshal,
238            shards,
239            scheme_provider,
240            strategy,
241            epocher,
242        } = cfg;
243
244        let build_histogram = context.histogram(
245            "build_duration",
246            "Histogram of time taken for the application to build a new block, in seconds",
247            Buckets::LOCAL,
248        );
249        let build_duration = Timed::new(build_histogram);
250
251        let verify_histogram = context.histogram(
252            "verify_duration",
253            "Histogram of time taken for the application to verify a block, in seconds",
254            Buckets::LOCAL,
255        );
256        let verify_duration = Timed::new(verify_histogram);
257
258        let parent_fetch_histogram = context.histogram(
259            "parent_fetch_duration",
260            "Histogram of time taken to fetch a parent block in proposal, in seconds",
261            Buckets::LOCAL,
262        );
263        let proposal_parent_fetch_duration = Timed::new(parent_fetch_histogram);
264
265        let ancestor_fetch_histogram = context.histogram(
266            "ancestor_fetch_duration",
267            "Histogram of time taken to fetch a block via the ancestry stream, in seconds",
268            Buckets::LOCAL,
269        );
270        let ancestor_fetch_duration = Timed::new(ancestor_fetch_histogram);
271
272        let erasure_histogram = context.histogram(
273            "erasure_encode_duration",
274            "Histogram of time taken to erasure encode a block, in seconds",
275            Buckets::LOCAL,
276        );
277        let erasure_encode_duration = Timed::new(erasure_histogram);
278
279        Self {
280            context: Arc::new(AsyncMutex::new(context)),
281            application,
282            marshal,
283            shards,
284            scheme_provider,
285            strategy,
286            epocher,
287            verification_tasks: VerificationTasks::new(),
288
289            build_duration,
290            verify_duration,
291            proposal_parent_fetch_duration,
292            ancestor_fetch_duration,
293            erasure_encode_duration,
294        }
295    }
296
297    /// Verifies a proposed block within epoch boundaries.
298    ///
299    /// This method validates that:
300    /// 1. The block is within the current epoch (unless it's a boundary block re-proposal)
301    /// 2. Re-proposals are only allowed for the last block in an epoch
302    /// 3. The block's parent digest matches the consensus context's expected parent
303    /// 4. The block's height is exactly one greater than the parent's height
304    /// 5. The block's embedded context digest matches the commitment
305    /// 6. The block's embedded context matches the consensus context
306    /// 7. The underlying application's verification logic passes
307    ///
308    /// Verification is spawned in a background task and returns a receiver that will contain
309    /// the verification result.
310    ///
311    /// If `prefetched_block` is provided, it will be used directly instead of fetching from
312    /// the marshal. This is useful in `certify` when we've already fetched the block to
313    /// extract its embedded context.
314    async fn deferred_verify(
315        &mut self,
316        consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
317        commitment: Commitment,
318        prefetched_block: Option<CodedBlock<B, C, H>>,
319        stage: Stage,
320    ) -> oneshot::Receiver<bool> {
321        let mut marshal = self.marshal.clone();
322        let mut application = self.application.clone();
323        let epocher = self.epocher.clone();
324        let verify_duration = self.verify_duration.clone();
325        let ancestor_fetch_duration = self.ancestor_fetch_duration.clone();
326
327        let (mut tx, rx) = oneshot::channel();
328        let context = self
329            .context
330            .lock()
331            .await
332            .child("deferred_verify")
333            .with_attribute("round", consensus_context.round);
334        context.spawn(move |runtime_context| async move {
335            let round = consensus_context.round;
336            let (parent_view, parent_commitment) = consensus_context.parent;
337
338            // Get the candidate block either from the caller or by waiting for
339            // local reconstruction. Candidate data remains local-only: a
340            // notarization is not sufficient reason to request it from peers.
341            let block = if let Some(block) = prefetched_block {
342                block
343            } else {
344                let block_request =
345                    marshal.subscribe_by_commitment(commitment, core::CommitmentFallback::Wait);
346                select! {
347                    _ = tx.closed() => {
348                        debug!(
349                            reason = "consensus dropped receiver",
350                            "skipping verification"
351                        );
352                        return;
353                    },
354                    result = block_request => match result {
355                        Ok(block) => block,
356                        Err(_) => {
357                            debug!(
358                                reason = "block unavailable",
359                                "skipping verification"
360                            );
361                            return;
362                        }
363                    },
364                }
365            };
366
367            // The context supplies the certified parent round. Do not derive a
368            // height from the unverified child block for this lookup.
369            let fallback = core::CommitmentFallback::FetchByRound {
370                round: Round::new(consensus_context.epoch(), parent_view),
371            };
372            let parent_request = marshal.subscribe_by_commitment(parent_commitment, fallback);
373            let parent = select! {
374                _ = tx.closed() => {
375                    debug!(
376                        reason = "consensus dropped receiver",
377                        "skipping verification"
378                    );
379                    return;
380                },
381                result = parent_request => match result {
382                    Ok(parent) => parent,
383                    Err(_) => {
384                        debug!(reason = "failed to fetch parent", "skipping verification");
385                        return;
386                    }
387                },
388            };
389
390            if let Err(err) = validate_block::<H, _, _>(
391                &epocher,
392                &block,
393                &parent,
394                &consensus_context,
395                commitment,
396                parent_commitment,
397            ) {
398                debug!(
399                    ?err,
400                    expected_commitment = %commitment,
401                    block_commitment = %block.commitment(),
402                    expected_parent_commitment = %parent_commitment,
403                    parent_commitment = %parent.commitment(),
404                    expected_parent = %parent.digest(),
405                    block_parent = %block.parent(),
406                    parent_height = %parent.height(),
407                    block_height = %block.height(),
408                    "block failed coded invariant validation"
409                );
410                tx.send_lossy(false);
411                return;
412            }
413
414            let ancestry_stream = marshal.ancestor_stream(
415                Arc::new(runtime_context.child("ancestor_stream")),
416                [block.clone(), parent],
417                ancestor_fetch_duration,
418            );
419            let validity_request = application.verify(
420                (
421                    runtime_context.child("app_verify"),
422                    consensus_context.clone(),
423                ),
424                ancestry_stream,
425            );
426
427            // If consensus drops the receiver, we can stop work early.
428            let timer = verify_duration.timer(&runtime_context);
429            let application_valid = select! {
430                _ = tx.closed() => {
431                    debug!(
432                        reason = "consensus dropped receiver",
433                        "skipping verification"
434                    );
435                    return;
436                },
437                is_valid = validity_request => is_valid,
438            };
439            timer.observe(&runtime_context);
440            if application_valid && !stage.store(&mut marshal, round, block).await {
441                debug!(?round, "marshal unable to accept block");
442                return;
443            }
444            tx.send_lossy(application_valid);
445        });
446
447        rx
448    }
449
450    async fn certify_from_embedded_context(
451        &mut self,
452        round: Round,
453        payload: Commitment,
454    ) -> oneshot::Receiver<bool> {
455        // Certify may be reached without an earlier `verify`, so the shard
456        // engine may not know the leader yet. A notarized commitment is still
457        // enough to start reconstruction from sender-indexed gossip shards
458        // already buffered for the commitment.
459        self.shards.notarized(payload, round);
460
461        // No in-progress task means we never verified this proposal locally.
462        // We can use the block's embedded context to move to the next view. If a Byzantine
463        // proposer embedded a malicious context, the f+1 honest validators from the notarizing quorum
464        // will verify against the proper context and reject the mismatch, preventing a 2f+1
465        // finalization quorum.
466        //
467        // We must fetch here rather than only wait for local reconstruction. A Byzantine
468        // leader can send enough shards to just f+1 honest validators, collect enough honest
469        // notarize votes to form a notarization, and leave the remaining honest validators
470        // unable to reconstruct the block. Those validators need the notarized round to
471        // recover and certify; otherwise they can remain stuck if the Byzantine validators
472        // stop participating in the next view.
473        //
474        // Subscribe to the block and verify using its embedded context once available.
475        debug!(
476            ?round,
477            ?payload,
478            "subscribing to block for certification using embedded context"
479        );
480        let block_rx = self
481            .marshal
482            .subscribe_by_commitment(payload, core::CommitmentFallback::FetchByRound { round });
483        let mut marshaled = self.clone();
484        let shards = self.shards.clone();
485        let (mut tx, rx) = oneshot::channel();
486        let context = self
487            .context
488            .lock()
489            .await
490            .child("certify")
491            .with_attribute("round", round);
492        context.spawn(move |_| async move {
493            let block = select! {
494                _ = tx.closed() => {
495                    debug!(
496                        reason = "consensus dropped receiver",
497                        "skipping certification"
498                    );
499                    return;
500                },
501                result = block_rx => match result {
502                    Ok(block) => block,
503                    Err(_) => {
504                        debug!(
505                            ?payload,
506                            reason = "failed to fetch block for certification",
507                            "skipping certification"
508                        );
509                        return;
510                    }
511                },
512            };
513
514            // Re-proposal detection for certify path: we don't have the consensus
515            // context, only the block's embedded context from original proposal.
516            // Infer re-proposal from:
517            // 1. Block is at epoch boundary (only boundary blocks can be re-proposed)
518            // 2. Certification round's view > embedded context's view (re-proposals
519            //    retain their original embedded context, so a later view indicates
520            //    the block was re-proposed)
521            // 3. Same epoch (re-proposals don't cross epoch boundaries)
522            let embedded_context = block.context();
523            let is_reproposal = is_inferred_reproposal_at_certify(
524                &marshaled.epocher,
525                block.height(),
526                embedded_context.round,
527                round,
528            );
529            if is_reproposal {
530                // Certifier holds a notarization for this block, so route
531                // the write to the notarized cache. `certified` is
532                // idempotent, so crash-recovery double-invocation is safe.
533                if !marshaled.marshal.certified(round, block).await {
534                    debug!(?round, "marshal unable to accept block");
535                    return;
536                }
537                tx.send_lossy(true);
538                return;
539            }
540
541            // Inform the shard engine of an externally proposed commitment.
542            shards.discovered(
543                payload,
544                embedded_context.leader.clone(),
545                embedded_context.round,
546            );
547
548            // Use the block's embedded context for verification, passing the
549            // prefetched block to avoid fetching it again inside deferred_verify.
550            let verify_rx = marshaled
551                .deferred_verify(embedded_context, payload, Some(block), Stage::Certified)
552                .await;
553            if let Ok(result) = verify_rx.await {
554                tx.send_lossy(result);
555            }
556        });
557        rx
558    }
559
560    async fn certify_from_existing_task(
561        &mut self,
562        round: Round,
563        payload: Commitment,
564        task: oneshot::Receiver<bool>,
565    ) -> oneshot::Receiver<bool> {
566        // `verify()` intentionally waits only for local candidate data. Once
567        // certification starts, a notarization exists and the same pending
568        // verifier must be unblocked by round-bound recovery if local
569        // reconstruction never completes.
570        self.shards.notarized(payload, round);
571        self.marshal.hint_notarized(round, payload);
572
573        let mut marshaled = self.clone();
574        let (mut tx, rx) = oneshot::channel();
575        let context = self
576            .context
577            .lock()
578            .await
579            .child("certify_existing")
580            .with_attribute("round", round);
581        context.spawn(move |_| async move {
582            let result = select! {
583                _ = tx.closed() => {
584                    debug!(
585                        reason = "consensus dropped receiver",
586                        "skipping certification"
587                    );
588                    return;
589                },
590                result = task => result,
591            };
592            match result {
593                Ok(result) => {
594                    tx.send_lossy(result);
595                }
596                Err(_) => {
597                    debug!(
598                        ?round,
599                        ?payload,
600                        "verification task closed before certification, falling back to embedded context"
601                    );
602                    let fallback = marshaled.certify_from_embedded_context(round, payload).await;
603                    let result = select! {
604                        _ = tx.closed() => {
605                            debug!(
606                                reason = "consensus dropped receiver",
607                                "skipping certification"
608                            );
609                            return;
610                        },
611                        result = fallback => result,
612                    };
613                    if let Ok(result) = result {
614                        tx.send_lossy(result);
615                    }
616                }
617            }
618        });
619        rx
620    }
621}
622
623impl<E, A, B, C, H, Z, S, ES> Automaton for Marshaled<E, A, B, C, H, Z, S, ES>
624where
625    E: Rng + Storage + Spawner + Metrics + Clock,
626    A: Application<
627        E,
628        Block = B,
629        SigningScheme = Z::Scheme,
630        Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
631    >,
632    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
633    C: CodingScheme,
634    H: Hasher,
635    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
636    S: Strategy,
637    ES: Epocher,
638{
639    type Digest = Commitment;
640    type Context = Context<Self::Digest, <Z::Scheme as CertificateScheme>::PublicKey>;
641
642    /// Proposes a new block or re-proposes the epoch boundary block.
643    ///
644    /// This method builds a new block from the underlying application unless the parent block
645    /// is the last block in the current epoch. When at an epoch boundary, it re-proposes the
646    /// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
647    ///
648    /// The proposal operation is spawned in a background task and returns a receiver that will
649    /// contain the proposed block's commitment when ready. The built block is persisted via
650    /// [`core::Mailbox::verified`] before the commitment is delivered, so consensus can rely
651    /// on the block surviving restart.
652    async fn propose(
653        &mut self,
654        consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
655    ) -> oneshot::Receiver<Self::Digest> {
656        let marshal = self.marshal.clone();
657        let mut application = self.application.clone();
658        let epocher = self.epocher.clone();
659        let strategy = self.strategy.clone();
660
661        // If there's no scheme for the current epoch, we cannot verify the proposal.
662        // Send back a receiver with a dropped sender.
663        let Some(scheme) = self.scheme_provider.scoped(consensus_context.epoch()) else {
664            debug!(
665                round = %consensus_context.round,
666                "no scheme for epoch, skipping propose"
667            );
668            let (_, rx) = oneshot::channel();
669            return rx;
670        };
671
672        let n_participants =
673            u16::try_from(scheme.participants().len()).expect("too many participants");
674        let coding_config = coding_config_for_participants(n_participants);
675
676        // Metrics
677        let build_duration = self.build_duration.clone();
678        let proposal_parent_fetch_duration = self.proposal_parent_fetch_duration.clone();
679        let ancestor_fetch_duration = self.ancestor_fetch_duration.clone();
680        let erasure_encode_duration = self.erasure_encode_duration.clone();
681
682        let (mut tx, rx) = oneshot::channel();
683        let context = self
684            .context
685            .lock()
686            .await
687            .child("propose")
688            .with_attribute("round", consensus_context.round);
689        context.spawn(move |runtime_context| async move {
690            // On leader recovery, marshal may already hold a verified block
691            // for this round (persisted before voting in consensus).
692            //
693            // Building a fresh block would land on the same prunable
694            // archive index and be silently dropped, so the stored block
695            // is the only proposal we can broadcast for this round.
696            //
697            // The recovered block is safe to reuse only if its embedded
698            // context matches the context simplex just recovered.
699            // Otherwise the cached block was built against a different
700            // parent and cannot be broadcast under the current header, so
701            // drop the receiver and let the voter nullify the view via
702            // timeout.
703            if let Some(block) = marshal.get_verified(consensus_context.round).await {
704                let block_context = block.context();
705                if block_context != consensus_context {
706                    debug!(
707                        round = ?consensus_context.round,
708                        ?consensus_context,
709                        ?block_context,
710                        "skipping proposal: cached verified block context no longer matches"
711                    );
712                    return;
713                }
714                let commitment = block.commitment();
715                let round = consensus_context.round;
716                let success = tx.send_lossy(commitment);
717                debug!(
718                    ?round,
719                    ?commitment,
720                    success,
721                    "reused verified block from marshal on leader recovery"
722                );
723                return;
724            }
725
726            // The parent for any consensus context is in the same epoch: the
727            // boundary block of the previous epoch is the genesis block of the
728            // current epoch.
729            //
730            // Proposal context carries the certified parent view/commitment but
731            // not the parent height. The parent may be certified above the
732            // finalized tip, so this must stay round-bound until the block is
733            // returned.
734            let (parent_view, parent_commitment) = consensus_context.parent;
735            let parent_request = marshal.subscribe_by_commitment(
736                parent_commitment,
737                core::CommitmentFallback::FetchByRound {
738                    round: Round::new(consensus_context.epoch(), parent_view),
739                },
740            );
741
742            let parent_timer = proposal_parent_fetch_duration.timer(&runtime_context);
743            let parent = select! {
744                _ = tx.closed() => {
745                    debug!(reason = "consensus dropped receiver", "skipping proposal");
746                    return;
747                },
748                result = parent_request => match result {
749                    Ok(parent) => parent,
750                    Err(_) => {
751                        debug!(
752                            ?parent_commitment,
753                            reason = "failed to fetch parent block",
754                            "skipping proposal"
755                        );
756                        return;
757                    }
758                },
759            };
760            parent_timer.observe(&runtime_context);
761
762            // Special case: If the parent block is the last block in the epoch,
763            // re-propose it as to not produce any blocks that will be cut out
764            // by the epoch transition.
765            let last_in_epoch = epocher
766                .last(consensus_context.epoch())
767                .expect("current epoch should exist");
768            if parent.height() == last_in_epoch {
769                let commitment = parent.commitment();
770                let round = consensus_context.round;
771                if !marshal.verified(round, parent).await {
772                    debug!(
773                        ?round,
774                        ?commitment,
775                        "marshal rejected re-proposed boundary block"
776                    );
777                    return;
778                }
779                let success = tx.send_lossy(commitment);
780                debug!(
781                    ?round,
782                    ?commitment,
783                    success,
784                    "re-proposed parent block at epoch boundary"
785                );
786                return;
787            }
788
789            let ancestor_stream = marshal.ancestor_stream(
790                Arc::new(runtime_context.child("ancestor_stream")),
791                [parent],
792                ancestor_fetch_duration,
793            );
794            let build_request = application.propose(
795                (
796                    runtime_context.child("app_propose"),
797                    consensus_context.clone(),
798                ),
799                ancestor_stream,
800            );
801
802            let build_timer = build_duration.timer(&runtime_context);
803            let built_block = select! {
804                _ = tx.closed() => {
805                    debug!(reason = "consensus dropped receiver", "skipping proposal");
806                    return;
807                },
808                result = build_request => match result {
809                    Some(block) => block,
810                    None => {
811                        debug!(
812                            ?parent_commitment,
813                            reason = "block building failed",
814                            "skipping proposal"
815                        );
816                        return;
817                    }
818                },
819            };
820            build_timer.observe(&runtime_context);
821
822            let erasure_timer = erasure_encode_duration.timer(&runtime_context);
823            let coded_block = CodedBlock::<B, C, H>::new(built_block, coding_config, &strategy);
824            erasure_timer.observe(&runtime_context);
825
826            let commitment = coded_block.commitment();
827            let round = consensus_context.round;
828            if !marshal.proposed(round, coded_block).await {
829                debug!(?round, ?commitment, "marshal rejected proposed block");
830                return;
831            }
832            let success = tx.send_lossy(commitment);
833            debug!(?round, ?commitment, success, "proposed new block");
834        });
835        rx
836    }
837
838    /// Verifies a received shard for a given round.
839    ///
840    /// This method validates that:
841    /// 1. The coding configuration matches the expected configuration for the current scheme.
842    /// 2. The commitment's context digest matches the consensus context (unless this is a re-proposal).
843    /// 3. The shard is contained within the consensus commitment.
844    ///
845    /// Verification is spawned in a background task and returns a receiver that will contain
846    /// the verification result. Additionally, this method kicks off deferred verification to
847    /// start block verification early (hidden behind shard validity and network latency).
848    async fn verify(
849        &mut self,
850        consensus_context: Context<Self::Digest, <Z::Scheme as CertificateScheme>::PublicKey>,
851        payload: Self::Digest,
852    ) -> oneshot::Receiver<bool> {
853        // If there's no scheme for the current epoch, we cannot vote on the proposal.
854        // Send back a receiver with a dropped sender.
855        let Some(scheme) = self.scheme_provider.scoped(consensus_context.epoch()) else {
856            debug!(
857                round = %consensus_context.round,
858                "no scheme for epoch, skipping verify"
859            );
860            let (_, rx) = oneshot::channel();
861            return rx;
862        };
863
864        let n_participants =
865            u16::try_from(scheme.participants().len()).expect("too many participants");
866        let coding_config = coding_config_for_participants(n_participants);
867        let is_reproposal = payload == consensus_context.parent.1;
868
869        // Validate proposal-level invariants:
870        // - coding config must match active participant set
871        // - context digest must match unless this is a re-proposal
872        let proposal_context = (!is_reproposal).then_some(&consensus_context);
873        if let Err(err) = validate_proposal::<H, _>(payload, coding_config, proposal_context) {
874            match err {
875                ProposalError::CodingConfig => {
876                    warn!(
877                        round = %consensus_context.round,
878                        got = ?payload.config(),
879                        expected = ?coding_config,
880                        "rejected proposal with unexpected coding configuration"
881                    );
882                }
883                ProposalError::ContextDigest => {
884                    let expected = hash_context::<H, _>(&consensus_context);
885                    let got = payload.context::<H::Digest>();
886                    warn!(
887                        round = %consensus_context.round,
888                        expected = ?expected,
889                        got = ?got,
890                        "rejected proposal with mismatched context digest"
891                    );
892                }
893            }
894
895            let (tx, rx) = oneshot::channel();
896            tx.send_lossy(false);
897            return rx;
898        }
899
900        // Re-proposals skip context-digest validation because the consensus context will point
901        // at the prior epoch-boundary block while the embedded block context is from the
902        // original proposal view.
903        //
904        // Re-proposals also skip shard-validity and deferred verification because:
905        // 1. The block was already verified when originally proposed
906        // 2. The parent-child height check would fail (parent IS the block)
907        // 3. Waiting for shards could stall if the leader doesn't rebroadcast
908        if is_reproposal {
909            // Fetch the block to verify it's at the epoch boundary.
910            // This should be fast since the parent block is typically already cached.
911            let block_rx = self
912                .marshal
913                .subscribe_by_commitment(payload, core::CommitmentFallback::Wait);
914            let marshal = self.marshal.clone();
915            let epocher = self.epocher.clone();
916            let round = consensus_context.round;
917            let verification_tasks = self.verification_tasks.clone();
918
919            // Register a verification task synchronously before spawning work so
920            // `certify` can always find it (no race with task startup).
921            let (task_tx, task_rx) = oneshot::channel();
922            verification_tasks.insert(round, payload, task_rx);
923
924            let (mut tx, rx) = oneshot::channel();
925            let context = self
926                .context
927                .lock()
928                .await
929                .child("verify_reproposal")
930                .with_attribute("round", round);
931            context.spawn(move |_| {
932                async move {
933                    let block = select! {
934                        _ = tx.closed() => {
935                            debug!(
936                                reason = "consensus dropped receiver",
937                                "skipping re-proposal verification"
938                            );
939                            return;
940                        },
941                        block = block_rx => match block {
942                            Ok(block) => block,
943                            Err(_) => {
944                                debug!(
945                                    ?payload,
946                                    reason = "failed to fetch block for re-proposal verification",
947                                    "skipping re-proposal verification"
948                                );
949                                // Fetch failure is an availability issue, not an explicit
950                                // invalidity proof. Do not synthesize `false` here.
951                                return;
952                            }
953                        },
954                    };
955
956                    if !is_valid_reproposal_at_verify(&epocher, block.height(), round.epoch()) {
957                        debug!(
958                            height = %block.height(),
959                            "re-proposal is not at epoch boundary"
960                        );
961                        task_tx.send_lossy(false);
962                        tx.send_lossy(false);
963                        return;
964                    }
965
966                    // Valid re-proposal: notify the marshal and complete the
967                    // verification task for `certify`.
968                    if !marshal.verified(round, block).await {
969                        debug!(?round, "marshal unable to accept block");
970                        return;
971                    }
972                    task_tx.send_lossy(true);
973                    tx.send_lossy(true);
974                }
975            });
976            return rx;
977        }
978
979        // Inform the shard engine of an externally proposed commitment.
980        self.shards.discovered(
981            payload,
982            consensus_context.leader.clone(),
983            consensus_context.round,
984        );
985
986        // Kick off deferred verification early to hide verification latency behind
987        // shard validity checks and network latency for collecting votes.
988        let round = consensus_context.round;
989        let task = self
990            .deferred_verify(consensus_context, payload, None, Stage::Verified)
991            .await;
992        self.verification_tasks.insert(round, payload, task);
993
994        match scheme.me() {
995            Some(_) => {
996                // Subscribe to assigned shard verification. For participants, this
997                // only completes once the leader-delivered shard for our
998                // assigned index has been verified. Reconstructing the block
999                // from peer gossip is useful for certification later, but is
1000                // not enough to emit a notarize vote.
1001                let validity_rx = self.shards.subscribe_assigned_shard_verified(payload);
1002                let (tx, rx) = oneshot::channel();
1003                let context = self
1004                    .context
1005                    .lock()
1006                    .await
1007                    .child("shard_validity_wait")
1008                    .with_attribute("round", round);
1009                context.spawn(|_| async move {
1010                    if validity_rx.await.is_ok() {
1011                        tx.send_lossy(true);
1012                    }
1013                });
1014                rx
1015            }
1016            None => {
1017                // If we are not participating, there's no shard to verify; just accept the proposal.
1018                //
1019                // Later, when certifying, we will wait to receive the block from the network.
1020                let (tx, rx) = oneshot::channel();
1021                tx.send_lossy(true);
1022                rx
1023            }
1024        }
1025    }
1026}
1027
1028impl<E, A, B, C, H, Z, S, ES> CertifiableAutomaton for Marshaled<E, A, B, C, H, Z, S, ES>
1029where
1030    E: Rng + Storage + Spawner + Metrics + Clock,
1031    A: Application<
1032        E,
1033        Block = B,
1034        SigningScheme = Z::Scheme,
1035        Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
1036    >,
1037    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
1038    C: CodingScheme,
1039    H: Hasher,
1040    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
1041    S: Strategy,
1042    ES: Epocher,
1043{
1044    async fn certify(&mut self, round: Round, payload: Self::Digest) -> oneshot::Receiver<bool> {
1045        // First, check for an in-progress verification task from `verify()`.
1046        let task = self.verification_tasks.take(round, payload);
1047        if let Some(task) = task {
1048            return self.certify_from_existing_task(round, payload, task).await;
1049        }
1050
1051        self.certify_from_embedded_context(round, payload).await
1052    }
1053}
1054
1055impl<E, A, B, C, H, Z, S, ES> Relay for Marshaled<E, A, B, C, H, Z, S, ES>
1056where
1057    E: Rng + Storage + Spawner + Metrics + Clock,
1058    A: Application<
1059        E,
1060        Block = B,
1061        Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
1062    >,
1063    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
1064    C: CodingScheme,
1065    H: Hasher,
1066    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
1067    S: Strategy,
1068    ES: Epocher,
1069{
1070    type Digest = Commitment;
1071    type PublicKey = <Z::Scheme as CertificateScheme>::PublicKey;
1072    type Plan = Plan<Self::PublicKey>;
1073
1074    fn broadcast(&mut self, commitment: Self::Digest, plan: Self::Plan) -> Feedback {
1075        // Coding variant does not support targeted forwarding;
1076        // peers reconstruct blocks from erasure-coded shards.
1077        //
1078        // TODO(#3389): Support checked data forwarding for PhasedScheme.
1079        let Plan::Propose { round } = plan else {
1080            return Feedback::Ok;
1081        };
1082        self.marshal.forward(round, commitment, Recipients::All)
1083    }
1084}
1085
1086impl<E, A, B, C, H, Z, S, ES> Reporter for Marshaled<E, A, B, C, H, Z, S, ES>
1087where
1088    E: Rng + Storage + Spawner + Metrics + Clock,
1089    A: Application<
1090            E,
1091            Block = B,
1092            Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
1093        > + Reporter<Activity = Update<B>>,
1094    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
1095    C: CodingScheme,
1096    H: Hasher,
1097    Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
1098    S: Strategy,
1099    ES: Epocher,
1100{
1101    type Activity = A::Activity;
1102
1103    /// Relays a report to the underlying [`Application`] and cleans up old verification data.
1104    fn report(&mut self, update: Self::Activity) -> Feedback {
1105        // Clean up verification tasks and contexts for rounds <= the finalized round.
1106        if let Update::Tip(round, _, _) = &update {
1107            self.verification_tasks.retain_after(round);
1108        }
1109        self.application.report(update)
1110    }
1111}