commonware_consensus/marshal/coding/marshaled.rs
1//! Wrapper for consensus applications that handles epochs, erasure coding, and block dissemination.
2//!
3//! # Overview
4//!
5//! [`Marshaled`] is an adapter that wraps any [`VerifyingApplication`] implementation to handle
6//! epoch transitions and erasure coded broadcast automatically. It intercepts consensus
7//! operations (propose, verify, certify) and ensures blocks are only produced within valid epoch boundaries.
8//!
9//! # Epoch Boundaries
10//!
11//! An epoch is a fixed number of blocks (the `epoch_length`). When the last block in an epoch
12//! is reached, this wrapper prevents new blocks from being built & proposed until the next epoch begins.
13//! Instead, it re-proposes the boundary block to avoid producing blocks that would be pruned
14//! by the epoch transition.
15//!
16//! # Erasure Coding
17//!
18//! This wrapper integrates with a variant of marshal that supports erasure coded broadcast. When a leader
19//! proposes a new block, it is automatically erasure encoded and its shards are broadcasted to active
20//! participants. When verifying a proposed block (the precondition for notarization), the wrapper
21//! ensures the commitment's context digest matches the consensus context and waits for validation of
22//! the shard assigned to this participant by the proposer. If that shard is valid, the assigned shard is
23//! relayed to all other participants to aid in block reconstruction.
24//!
25//! A participant may still reconstruct the full block from gossiped shards before its designated
26//! leader-delivered shard arrives. That is sufficient for later certification and repair flows, but it
27//! is not treated as notarization readiness: a participant only helps form a notarization once it has
28//! validated the shard it is supposed to echo.
29//!
30//! During certification (the phase between notarization and finalization), the wrapper subscribes to
31//! block reconstruction and validates epoch boundaries, parent commitment, height contiguity, and
32//! that the block's embedded context matches the consensus context before allowing the block to be
33//! certified. If certification fails, the voter can still emit a nullify vote to advance the view.
34//!
35//! # Usage
36//!
37//! Wrap your [`VerifyingApplication`] implementation with [`Marshaled::new`] and provide it to your
38//! consensus engine for the [`Automaton`] and [`Relay`]. The wrapper handles all epoch logic transparently.
39//!
40//! ```rust,ignore
41//! let cfg = MarshaledConfig {
42//! application: my_application,
43//! marshal: marshal_mailbox,
44//! shards: shard_mailbox,
45//! scheme_provider,
46//! epocher,
47//! strategy,
48//! };
49//! let application = Marshaled::new(context, cfg);
50//! ```
51//!
52//! # Implementation Notes
53//!
54//! - Genesis blocks are handled specially: epoch 0 returns the application's genesis block,
55//! while subsequent epochs use the last block of the previous epoch as genesis
56//! - Blocks are automatically verified to be within the current epoch
57//!
58//! # Notarization and Data Availability
59//!
60//! In rare crash cases, it is possible for a notarization certificate to exist without a block being
61//! available to the honest parties (e.g., if the whole network crashed before receiving `f+1` shards
62//! and the proposer went permanently offline). In this case, `certify` will be unable to fetch the
63//! block before timeout and result in a nullification.
64//!
65//! For this reason, it should not be expected that every notarized payload will be certifiable due
66//! to the lack of an available block. However, if even one honest and online party has the block,
67//! they will attempt to forward it to others via marshal's resolver. This case is already present
68//! in the event of a block that was proposed with invalid codec; Marshal will not be able to reconstruct
69//! the block, and therefore won't serve it.
70//!
71//! ```text
72//! ┌───────────────────────────────────────────────────┐
73//! ▼ │
74//! ┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
75//! │ B1 │◀──│ B2 │◀──│ B3 │XXX│ B4 │
76//! └─────────────────────┘ └─────────────────────┘ └──────────┬──────────┘ └─────────────────────┘
77//! │
78//! Failed Certify
79//! ```
80
81use crate::{
82 marshal::{
83 ancestry::AncestorStream,
84 application::{
85 validation::{
86 is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, LastBuilt,
87 },
88 verification_tasks::VerificationTasks,
89 },
90 coding::{
91 shards,
92 types::{coding_config_for_participants, hash_context, CodedBlock},
93 validation::{validate_block, validate_proposal, ProposalError},
94 Coding,
95 },
96 core, Update,
97 },
98 simplex::{scheme::Scheme, types::Context, Plan},
99 types::{coding::Commitment, Epoch, Epocher, Round},
100 Application, Automaton, Block, CertifiableAutomaton, CertifiableBlock, Epochable, Heightable,
101 Relay, Reporter, VerifyingApplication,
102};
103use commonware_coding::{Config as CodingConfig, Scheme as CodingScheme};
104use commonware_cryptography::{
105 certificate::{Provider, Scheme as CertificateScheme},
106 Committable, Digestible, Hasher,
107};
108use commonware_macros::select;
109use commonware_parallel::Strategy;
110use commonware_runtime::{
111 telemetry::metrics::histogram::{Buckets, Timed},
112 Clock, Metrics, Spawner, Storage,
113};
114use commonware_utils::{
115 channel::{
116 fallible::OneshotExt,
117 oneshot::{self, error::RecvError},
118 },
119 sync::Mutex,
120 NZU16,
121};
122use futures::future::{ready, try_join, Either, Ready};
123use prometheus_client::metrics::histogram::Histogram;
124use rand::Rng;
125use std::sync::{Arc, OnceLock};
126use tracing::{debug, warn};
127
128/// The [`CodingConfig`] used for genesis blocks. These blocks are never broadcasted in
129/// the proposal phase, and thus the configuration is irrelevant.
130const GENESIS_CODING_CONFIG: CodingConfig = CodingConfig {
131 minimum_shards: NZU16!(1),
132 extra_shards: NZU16!(1),
133};
134
135/// Configuration for initializing [`Marshaled`].
136#[allow(clippy::type_complexity)]
137pub struct MarshaledConfig<A, B, C, H, Z, S, ES>
138where
139 B: CertifiableBlock<Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>>,
140 C: CodingScheme,
141 H: Hasher,
142 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
143 S: Strategy,
144 ES: Epocher,
145{
146 /// The underlying application to wrap.
147 pub application: A,
148 /// Mailbox for communicating with the marshal engine.
149 pub marshal:
150 core::Mailbox<Z::Scheme, Coding<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>>,
151 /// Mailbox for communicating with the shards engine.
152 pub shards: shards::Mailbox<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>,
153 /// Provider for signing schemes scoped by epoch.
154 pub scheme_provider: Z,
155 /// Strategy for parallel operations.
156 pub strategy: S,
157 /// Strategy for determining epoch boundaries.
158 pub epocher: ES,
159}
160
161/// An [`Application`] adapter that handles epoch transitions and erasure coded broadcast.
162///
163/// This wrapper intercepts consensus operations to enforce epoch boundaries. It prevents
164/// blocks from being produced outside their valid epoch and handles the special case of
165/// re-proposing boundary blocks during epoch transitions.
166#[derive(Clone)]
167#[allow(clippy::type_complexity)]
168pub struct Marshaled<E, A, B, C, H, Z, S, ES>
169where
170 E: Rng + Storage + Spawner + Metrics + Clock,
171 A: Application<E>,
172 B: CertifiableBlock<Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>>,
173 C: CodingScheme,
174 H: Hasher,
175 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
176 S: Strategy,
177 ES: Epocher,
178{
179 context: E,
180 application: A,
181 marshal: core::Mailbox<Z::Scheme, Coding<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>>,
182 shards: shards::Mailbox<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>,
183 scheme_provider: Z,
184 epocher: ES,
185 strategy: S,
186 last_built: LastBuilt<CodedBlock<B, C, H>>,
187 verification_tasks: VerificationTasks<Commitment>,
188 cached_genesis: Arc<OnceLock<(Commitment, CodedBlock<B, C, H>)>>,
189
190 build_duration: Timed<E>,
191 verify_duration: Timed<E>,
192 proposal_parent_fetch_duration: Timed<E>,
193 erasure_encode_duration: Timed<E>,
194}
195
196impl<E, A, B, C, H, Z, S, ES> Marshaled<E, A, B, C, H, Z, S, ES>
197where
198 E: Rng + Storage + Spawner + Metrics + Clock,
199 A: VerifyingApplication<
200 E,
201 Block = B,
202 SigningScheme = Z::Scheme,
203 Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
204 >,
205 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
206 C: CodingScheme,
207 H: Hasher,
208 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
209 S: Strategy,
210 ES: Epocher,
211{
212 /// Creates a new [`Marshaled`] wrapper.
213 ///
214 /// # Panics
215 ///
216 /// Panics if the marshal metadata store cannot be initialized.
217 pub fn new(context: E, cfg: MarshaledConfig<A, B, C, H, Z, S, ES>) -> Self {
218 let MarshaledConfig {
219 application,
220 marshal,
221 shards,
222 scheme_provider,
223 strategy,
224 epocher,
225 } = cfg;
226
227 let clock = Arc::new(context.clone());
228
229 let build_histogram = Histogram::new(Buckets::LOCAL);
230 context.register(
231 "build_duration",
232 "Histogram of time taken for the application to build a new block, in seconds",
233 build_histogram.clone(),
234 );
235 let build_duration = Timed::new(build_histogram, clock.clone());
236
237 let verify_histogram = Histogram::new(Buckets::LOCAL);
238 context.register(
239 "verify_duration",
240 "Histogram of time taken for the application to verify a block, in seconds",
241 verify_histogram.clone(),
242 );
243 let verify_duration = Timed::new(verify_histogram, clock.clone());
244
245 let parent_fetch_histogram = Histogram::new(Buckets::LOCAL);
246 context.register(
247 "parent_fetch_duration",
248 "Histogram of time taken to fetch a parent block in proposal, in seconds",
249 parent_fetch_histogram.clone(),
250 );
251 let proposal_parent_fetch_duration = Timed::new(parent_fetch_histogram, clock.clone());
252
253 let erasure_histogram = Histogram::new(Buckets::LOCAL);
254 context.register(
255 "erasure_encode_duration",
256 "Histogram of time taken to erasure encode a block, in seconds",
257 erasure_histogram.clone(),
258 );
259 let erasure_encode_duration = Timed::new(erasure_histogram, clock);
260
261 Self {
262 context,
263 application,
264 marshal,
265 shards,
266 scheme_provider,
267 strategy,
268 epocher,
269 last_built: Arc::new(Mutex::new(None)),
270 verification_tasks: VerificationTasks::new(),
271 cached_genesis: Arc::new(OnceLock::new()),
272
273 build_duration,
274 verify_duration,
275 proposal_parent_fetch_duration,
276 erasure_encode_duration,
277 }
278 }
279
280 /// Verifies a proposed block within epoch boundaries.
281 ///
282 /// This method validates that:
283 /// 1. The block is within the current epoch (unless it's a boundary block re-proposal)
284 /// 2. Re-proposals are only allowed for the last block in an epoch
285 /// 3. The block's parent digest matches the consensus context's expected parent
286 /// 4. The block's height is exactly one greater than the parent's height
287 /// 5. The block's embedded context digest matches the commitment
288 /// 6. The block's embedded context matches the consensus context
289 /// 7. The underlying application's verification logic passes
290 ///
291 /// Verification is spawned in a background task and returns a receiver that will contain
292 /// the verification result.
293 ///
294 /// If `prefetched_block` is provided, it will be used directly instead of fetching from
295 /// the marshal. This is useful in `certify` when we've already fetched the block to
296 /// extract its embedded context.
297 fn deferred_verify(
298 &mut self,
299 consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
300 commitment: Commitment,
301 prefetched_block: Option<CodedBlock<B, C, H>>,
302 ) -> oneshot::Receiver<bool> {
303 let mut marshal = self.marshal.clone();
304 let mut application = self.application.clone();
305 let epocher = self.epocher.clone();
306 let verify_duration = self.verify_duration.clone();
307 let cached_genesis = self.cached_genesis.clone();
308
309 let (mut tx, rx) = oneshot::channel();
310 self.context
311 .with_label("deferred_verify")
312 .with_attribute("round", consensus_context.round)
313 .spawn(move |runtime_context| async move {
314 let round = consensus_context.round;
315
316 // Fetch parent block
317 let (parent_view, parent_commitment) = consensus_context.parent;
318 let parent_request = fetch_parent(
319 parent_commitment,
320 // We are guaranteed that the parent round for any `consensus_context` is
321 // in the same epoch (recall, the boundary block of the previous epoch
322 // is the genesis block of the current epoch).
323 Some(Round::new(consensus_context.epoch(), parent_view)),
324 &mut application,
325 &mut marshal,
326 cached_genesis,
327 )
328 .await;
329
330 // Get block either from prefetched or by subscribing
331 let (parent, block) = if let Some(block) = prefetched_block {
332 // We have a prefetched block, just fetch the parent
333 let parent = select! {
334 _ = tx.closed() => {
335 debug!(
336 reason = "consensus dropped receiver",
337 "skipping verification"
338 );
339 return;
340 },
341 result = parent_request => match result {
342 Ok(parent) => parent,
343 Err(_) => {
344 debug!(reason = "failed to fetch parent", "skipping verification");
345 return;
346 }
347 },
348 };
349 (parent, block)
350 } else {
351 // No prefetched block, fetch both parent and block
352 let block_request = marshal
353 .subscribe_by_commitment(Some(round), commitment)
354 .await;
355 let block_requests = try_join(parent_request, block_request);
356
357 select! {
358 _ = tx.closed() => {
359 debug!(
360 reason = "consensus dropped receiver",
361 "skipping verification"
362 );
363 return;
364 },
365 result = block_requests => match result {
366 Ok(results) => results,
367 Err(_) => {
368 debug!(
369 reason = "failed to fetch parent or block",
370 "skipping verification"
371 );
372 return;
373 }
374 },
375 }
376 };
377
378 if let Err(err) = validate_block::<H, _, _>(
379 &epocher,
380 &block,
381 &parent,
382 &consensus_context,
383 commitment,
384 parent_commitment,
385 ) {
386 debug!(
387 ?err,
388 expected_commitment = %commitment,
389 block_commitment = %block.commitment(),
390 expected_parent_commitment = %parent_commitment,
391 parent_commitment = %parent.commitment(),
392 expected_parent = %parent.digest(),
393 block_parent = %block.parent(),
394 parent_height = %parent.height(),
395 block_height = %block.height(),
396 "block failed coded invariant validation"
397 );
398 tx.send_lossy(false);
399 return;
400 }
401
402 let ancestry_stream = AncestorStream::new(
403 marshal.clone(),
404 [block.clone().into_inner(), parent.into_inner()],
405 );
406 let validity_request = application.verify(
407 (
408 runtime_context.with_label("app_verify"),
409 consensus_context.clone(),
410 ),
411 ancestry_stream,
412 );
413
414 // If consensus drops the receiver, we can stop work early.
415 let mut timer = verify_duration.timer();
416 let application_valid = select! {
417 _ = tx.closed() => {
418 debug!(
419 reason = "consensus dropped receiver",
420 "skipping verification"
421 );
422 return;
423 },
424 is_valid = validity_request => is_valid,
425 };
426 timer.observe();
427 if application_valid {
428 // The block is only persisted at this point.
429 marshal.verified(round, block).await;
430 }
431 tx.send_lossy(application_valid);
432 });
433
434 rx
435 }
436}
437
438impl<E, A, B, C, H, Z, S, ES> Automaton for Marshaled<E, A, B, C, H, Z, S, ES>
439where
440 E: Rng + Storage + Spawner + Metrics + Clock,
441 A: VerifyingApplication<
442 E,
443 Block = B,
444 SigningScheme = Z::Scheme,
445 Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
446 >,
447 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
448 C: CodingScheme,
449 H: Hasher,
450 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
451 S: Strategy,
452 ES: Epocher,
453{
454 type Digest = Commitment;
455 type Context = Context<Self::Digest, <Z::Scheme as CertificateScheme>::PublicKey>;
456
457 /// Returns the genesis digest for a given epoch.
458 ///
459 /// For epoch 0, this returns the application's genesis block digest. For subsequent
460 /// epochs, it returns the digest of the last block from the previous epoch, which
461 /// serves as the genesis block for the new epoch.
462 ///
463 /// # Panics
464 ///
465 /// Panics if a non-zero epoch is requested but the previous epoch's final block is not
466 /// available in storage. This indicates a critical error in the consensus engine startup
467 /// sequence, as engines must always have the genesis block before starting.
468 async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
469 let Some(previous_epoch) = epoch.previous() else {
470 let genesis_block = self.application.genesis().await;
471 return genesis_coding_commitment::<H, _>(&genesis_block);
472 };
473
474 let last_height = self
475 .epocher
476 .last(previous_epoch)
477 .expect("previous epoch should exist");
478 let Some(block) = self.marshal.get_block(last_height).await else {
479 // A new consensus engine will never be started without having the genesis block
480 // of the new epoch (the last block of the previous epoch) already stored.
481 unreachable!("missing starting epoch block at height {last_height}");
482 };
483 block.commitment()
484 }
485
486 /// Proposes a new block or re-proposes the epoch boundary block.
487 ///
488 /// This method builds a new block from the underlying application unless the parent block
489 /// is the last block in the current epoch. When at an epoch boundary, it re-proposes the
490 /// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
491 ///
492 /// The proposal operation is spawned in a background task and returns a receiver that will
493 /// contain the proposed block's digest when ready. The built block is cached for later
494 /// broadcasting.
495 async fn propose(
496 &mut self,
497 consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
498 ) -> oneshot::Receiver<Self::Digest> {
499 let mut marshal = self.marshal.clone();
500 let mut application = self.application.clone();
501 let last_built = self.last_built.clone();
502 let epocher = self.epocher.clone();
503 let strategy = self.strategy.clone();
504 let cached_genesis = self.cached_genesis.clone();
505
506 // If there's no scheme for the current epoch, we cannot verify the proposal.
507 // Send back a receiver with a dropped sender.
508 let Some(scheme) = self.scheme_provider.scoped(consensus_context.epoch()) else {
509 debug!(
510 round = %consensus_context.round,
511 "no scheme for epoch, skipping propose"
512 );
513 let (_, rx) = oneshot::channel();
514 return rx;
515 };
516
517 let n_participants =
518 u16::try_from(scheme.participants().len()).expect("too many participants");
519 let coding_config = coding_config_for_participants(n_participants);
520
521 // Metrics
522 let build_duration = self.build_duration.clone();
523 let proposal_parent_fetch_duration = self.proposal_parent_fetch_duration.clone();
524 let erasure_encode_duration = self.erasure_encode_duration.clone();
525
526 let (mut tx, rx) = oneshot::channel();
527 self.context
528 .with_label("propose")
529 .with_attribute("round", consensus_context.round)
530 .spawn(move |runtime_context| async move {
531 let (parent_view, parent_commitment) = consensus_context.parent;
532 let parent_request = fetch_parent(
533 parent_commitment,
534 // We are guaranteed that the parent round for any `consensus_context` is
535 // in the same epoch (recall, the boundary block of the previous epoch
536 // is the genesis block of the current epoch).
537 Some(Round::new(consensus_context.epoch(), parent_view)),
538 &mut application,
539 &mut marshal,
540 cached_genesis,
541 )
542 .await;
543
544 let mut parent_timer = proposal_parent_fetch_duration.timer();
545 let parent = select! {
546 _ = tx.closed() => {
547 debug!(reason = "consensus dropped receiver", "skipping proposal");
548 return;
549 },
550 result = parent_request => match result {
551 Ok(parent) => parent,
552 Err(_) => {
553 debug!(
554 ?parent_commitment,
555 reason = "failed to fetch parent block",
556 "skipping proposal"
557 );
558 return;
559 }
560 },
561 };
562 parent_timer.observe();
563
564 // Special case: If the parent block is the last block in the epoch,
565 // re-propose it as to not produce any blocks that will be cut out
566 // by the epoch transition.
567 let last_in_epoch = epocher
568 .last(consensus_context.epoch())
569 .expect("current epoch should exist");
570 if parent.height() == last_in_epoch {
571 let commitment = parent.commitment();
572 {
573 let mut lock = last_built.lock();
574 *lock = Some((consensus_context.round, parent));
575 }
576
577 let success = tx.send_lossy(commitment);
578 debug!(
579 round = ?consensus_context.round,
580 ?commitment,
581 success,
582 "re-proposed parent block at epoch boundary"
583 );
584 return;
585 }
586
587 let ancestor_stream = AncestorStream::new(marshal.clone(), [parent.into_inner()]);
588 let build_request = application.propose(
589 (
590 runtime_context.with_label("app_propose"),
591 consensus_context.clone(),
592 ),
593 ancestor_stream,
594 );
595
596 let mut build_timer = build_duration.timer();
597 let built_block = select! {
598 _ = tx.closed() => {
599 debug!(reason = "consensus dropped receiver", "skipping proposal");
600 return;
601 },
602 result = build_request => match result {
603 Some(block) => block,
604 None => {
605 debug!(
606 ?parent_commitment,
607 reason = "block building failed",
608 "skipping proposal"
609 );
610 return;
611 }
612 },
613 };
614 build_timer.observe();
615
616 let mut erasure_timer = erasure_encode_duration.timer();
617 let coded_block = CodedBlock::<B, C, H>::new(built_block, coding_config, &strategy);
618 erasure_timer.observe();
619
620 let commitment = coded_block.commitment();
621 {
622 let mut lock = last_built.lock();
623 *lock = Some((consensus_context.round, coded_block));
624 }
625
626 let success = tx.send_lossy(commitment);
627 debug!(
628 round = ?consensus_context.round,
629 ?commitment,
630 success,
631 "proposed new block"
632 );
633 });
634 rx
635 }
636
637 /// Verifies a received shard for a given round.
638 ///
639 /// This method validates that:
640 /// 1. The coding configuration matches the expected configuration for the current scheme.
641 /// 2. The commitment's context digest matches the consensus context (unless this is a re-proposal).
642 /// 3. The shard is contained within the consensus commitment.
643 ///
644 /// Verification is spawned in a background task and returns a receiver that will contain
645 /// the verification result. Additionally, this method kicks off deferred verification to
646 /// start block verification early (hidden behind shard validity and network latency).
647 async fn verify(
648 &mut self,
649 consensus_context: Context<Self::Digest, <Z::Scheme as CertificateScheme>::PublicKey>,
650 payload: Self::Digest,
651 ) -> oneshot::Receiver<bool> {
652 // If there's no scheme for the current epoch, we cannot vote on the proposal.
653 // Send back a receiver with a dropped sender.
654 let Some(scheme) = self.scheme_provider.scoped(consensus_context.epoch()) else {
655 debug!(
656 round = %consensus_context.round,
657 "no scheme for epoch, skipping verify"
658 );
659 let (_, rx) = oneshot::channel();
660 return rx;
661 };
662
663 let n_participants =
664 u16::try_from(scheme.participants().len()).expect("too many participants");
665 let coding_config = coding_config_for_participants(n_participants);
666 let is_reproposal = payload == consensus_context.parent.1;
667
668 // Validate proposal-level invariants:
669 // - coding config must match active participant set
670 // - context digest must match unless this is a re-proposal
671 let proposal_context = (!is_reproposal).then_some(&consensus_context);
672 if let Err(err) = validate_proposal::<H, _>(payload, coding_config, proposal_context) {
673 match err {
674 ProposalError::CodingConfig => {
675 warn!(
676 round = %consensus_context.round,
677 got = ?payload.config(),
678 expected = ?coding_config,
679 "rejected proposal with unexpected coding configuration"
680 );
681 }
682 ProposalError::ContextDigest => {
683 let expected = hash_context::<H, _>(&consensus_context);
684 let got = payload.context::<H::Digest>();
685 warn!(
686 round = %consensus_context.round,
687 expected = ?expected,
688 got = ?got,
689 "rejected proposal with mismatched context digest"
690 );
691 }
692 }
693
694 let (tx, rx) = oneshot::channel();
695 tx.send_lossy(false);
696 return rx;
697 }
698
699 // Re-proposals skip context-digest validation because the consensus context will point
700 // at the prior epoch-boundary block while the embedded block context is from the
701 // original proposal view.
702 //
703 // Re-proposals also skip shard-validity and deferred verification because:
704 // 1. The block was already verified when originally proposed
705 // 2. The parent-child height check would fail (parent IS the block)
706 // 3. Waiting for shards could stall if the leader doesn't rebroadcast
707 if is_reproposal {
708 // Fetch the block to verify it's at the epoch boundary.
709 // This should be fast since the parent block is typically already cached.
710 let block_rx = self
711 .marshal
712 .subscribe_by_commitment(Some(consensus_context.round), payload)
713 .await;
714 let marshal = self.marshal.clone();
715 let epocher = self.epocher.clone();
716 let round = consensus_context.round;
717 let verification_tasks = self.verification_tasks.clone();
718
719 // Register a verification task synchronously before spawning work so
720 // `certify` can always find it (no race with task startup).
721 let (task_tx, task_rx) = oneshot::channel();
722 verification_tasks.insert(round, payload, task_rx);
723
724 let (mut tx, rx) = oneshot::channel();
725 self.context
726 .with_label("verify_reproposal")
727 .spawn(move |_| async move {
728 let block = select! {
729 _ = tx.closed() => {
730 debug!(
731 reason = "consensus dropped receiver",
732 "skipping re-proposal verification"
733 );
734 return;
735 },
736 block = block_rx => match block {
737 Ok(block) => block,
738 Err(_) => {
739 debug!(
740 ?payload,
741 reason = "failed to fetch block for re-proposal verification",
742 "skipping re-proposal verification"
743 );
744 // Fetch failure is an availability issue, not an explicit
745 // invalidity proof. Do not synthesize `false` here.
746 return;
747 }
748 },
749 };
750
751 if !is_valid_reproposal_at_verify(&epocher, block.height(), round.epoch()) {
752 debug!(
753 height = %block.height(),
754 "re-proposal is not at epoch boundary"
755 );
756 task_tx.send_lossy(false);
757 tx.send_lossy(false);
758 return;
759 }
760
761 // Valid re-proposal. Notify the marshal and complete the
762 // verification task for `certify`.
763 marshal.verified(round, block).await;
764 task_tx.send_lossy(true);
765 tx.send_lossy(true);
766 });
767 return rx;
768 }
769
770 // Inform the shard engine of an externally proposed commitment.
771 self.shards
772 .discovered(
773 payload,
774 consensus_context.leader.clone(),
775 consensus_context.round,
776 )
777 .await;
778
779 // Kick off deferred verification early to hide verification latency behind
780 // shard validity checks and network latency for collecting votes.
781 let round = consensus_context.round;
782 let task = self.deferred_verify(consensus_context, payload, None);
783 self.verification_tasks.insert(round, payload, task);
784
785 match scheme.me() {
786 Some(_) => {
787 // Subscribe to assigned shard verification. For participants, this
788 // only completes once the leader-delivered shard for our
789 // assigned index has been verified. Reconstructing the block
790 // from peer gossip is useful for certification later, but is
791 // not enough to emit a notarize vote.
792 let validity_rx = self.shards.subscribe_assigned_shard_verified(payload).await;
793 let (tx, rx) = oneshot::channel();
794 self.context
795 .with_label("shard_validity_wait")
796 .spawn(|_| async move {
797 if validity_rx.await.is_ok() {
798 tx.send_lossy(true);
799 }
800 });
801 rx
802 }
803 None => {
804 // If we are not participating, there's no shard to verify; just accept the proposal.
805 //
806 // Later, when certifying, we will wait to receive the block from the network.
807 let (tx, rx) = oneshot::channel();
808 tx.send_lossy(true);
809 rx
810 }
811 }
812 }
813}
814
815impl<E, A, B, C, H, Z, S, ES> CertifiableAutomaton for Marshaled<E, A, B, C, H, Z, S, ES>
816where
817 E: Rng + Storage + Spawner + Metrics + Clock,
818 A: VerifyingApplication<
819 E,
820 Block = B,
821 SigningScheme = Z::Scheme,
822 Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
823 >,
824 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
825 C: CodingScheme,
826 H: Hasher,
827 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
828 S: Strategy,
829 ES: Epocher,
830{
831 async fn certify(&mut self, round: Round, payload: Self::Digest) -> oneshot::Receiver<bool> {
832 // First, check for an in-progress verification task from `verify()`.
833 let task = self.verification_tasks.take(round, payload);
834 if let Some(task) = task {
835 return task;
836 }
837
838 // No in-progress task means we never verified this proposal locally.
839 // We can use the block's embedded context to move to the next view. If a Byzantine
840 // proposer embedded a malicious context, the f+1 honest validators from the notarizing quorum
841 // will verify against the proper context and reject the mismatch, preventing a 2f+1
842 // finalization quorum.
843 //
844 // Subscribe to the block and verify using its embedded context once available.
845 debug!(
846 ?round,
847 ?payload,
848 "subscribing to block for certification using embedded context"
849 );
850 let block_rx = self
851 .marshal
852 .subscribe_by_commitment(Some(round), payload)
853 .await;
854 let mut marshaled = self.clone();
855 let shards = self.shards.clone();
856 let (mut tx, rx) = oneshot::channel();
857 self.context
858 .with_label("certify")
859 .with_attribute("round", round)
860 .spawn(move |_| async move {
861 let block = select! {
862 _ = tx.closed() => {
863 debug!(
864 reason = "consensus dropped receiver",
865 "skipping certification"
866 );
867 return;
868 },
869 result = block_rx => match result {
870 Ok(block) => block,
871 Err(_) => {
872 debug!(
873 ?payload,
874 reason = "failed to fetch block for certification",
875 "skipping certification"
876 );
877 return;
878 }
879 },
880 };
881
882 // Re-proposal detection for certify path: we don't have the consensus
883 // context, only the block's embedded context from original proposal.
884 // Infer re-proposal from:
885 // 1. Block is at epoch boundary (only boundary blocks can be re-proposed)
886 // 2. Certification round's view > embedded context's view (re-proposals
887 // retain their original embedded context, so a later view indicates
888 // the block was re-proposed)
889 // 3. Same epoch (re-proposals don't cross epoch boundaries)
890 let embedded_context = block.context();
891 let is_reproposal = is_inferred_reproposal_at_certify(
892 &marshaled.epocher,
893 block.height(),
894 embedded_context.round,
895 round,
896 );
897 if is_reproposal {
898 // NOTE: It is possible that, during crash recovery, we call
899 // `marshal.verified` twice for the same block. That function is
900 // idempotent, so this is safe.
901 marshaled.marshal.verified(round, block).await;
902 tx.send_lossy(true);
903 return;
904 }
905
906 // Inform the shard engine of an externally proposed commitment.
907 shards
908 .discovered(
909 payload,
910 embedded_context.leader.clone(),
911 embedded_context.round,
912 )
913 .await;
914
915 // Use the block's embedded context for verification, passing the
916 // prefetched block to avoid fetching it again inside deferred_verify.
917 let verify_rx = marshaled.deferred_verify(embedded_context, payload, Some(block));
918 if let Ok(result) = verify_rx.await {
919 tx.send_lossy(result);
920 }
921 });
922 rx
923 }
924}
925
926impl<E, A, B, C, H, Z, S, ES> Relay for Marshaled<E, A, B, C, H, Z, S, ES>
927where
928 E: Rng + Storage + Spawner + Metrics + Clock,
929 A: Application<
930 E,
931 Block = B,
932 Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
933 >,
934 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
935 C: CodingScheme,
936 H: Hasher,
937 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
938 S: Strategy,
939 ES: Epocher,
940{
941 type Digest = Commitment;
942 type PublicKey = <Z::Scheme as CertificateScheme>::PublicKey;
943 type Plan = Plan<Self::PublicKey>;
944
945 async fn broadcast(&mut self, commitment: Self::Digest, plan: Self::Plan) {
946 match plan {
947 Plan::Propose => {
948 let Some((round, block)) = self.last_built.lock().take() else {
949 warn!("missing block to broadcast");
950 return;
951 };
952 if block.commitment() != commitment {
953 warn!(
954 round = %round,
955 commitment = %block.commitment(),
956 height = %block.height(),
957 "skipping requested broadcast of block with mismatched commitment"
958 );
959 return;
960 }
961 debug!(
962 round = %round,
963 commitment = %block.commitment(),
964 height = %block.height(),
965 "requested broadcast of built block"
966 );
967 self.shards.proposed(round, block).await;
968 }
969 Plan::Forward { .. } => {
970 // Coding variant does not support targeted forwarding;
971 // peers reconstruct blocks from erasure-coded shards.
972 //
973 // TODO(#3389): Support checked data forwarding for PhasedScheme.
974 }
975 }
976 }
977}
978
979impl<E, A, B, C, H, Z, S, ES> Reporter for Marshaled<E, A, B, C, H, Z, S, ES>
980where
981 E: Rng + Storage + Spawner + Metrics + Clock,
982 A: Application<
983 E,
984 Block = B,
985 Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
986 > + Reporter<Activity = Update<B>>,
987 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
988 C: CodingScheme,
989 H: Hasher,
990 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
991 S: Strategy,
992 ES: Epocher,
993{
994 type Activity = A::Activity;
995
996 /// Relays a report to the underlying [`Application`] and cleans up old verification data.
997 async fn report(&mut self, update: Self::Activity) {
998 // Clean up verification tasks and contexts for rounds <= the finalized round.
999 if let Update::Tip(round, _, _) = &update {
1000 self.verification_tasks.retain_after(round);
1001 }
1002 self.application.report(update).await
1003 }
1004}
1005
1006/// Fetches the parent block given its digest and optional round.
1007///
1008/// This is a helper function used during proposal and verification to retrieve the parent
1009/// block. If the parent digest matches the genesis block, it returns the genesis block
1010/// directly without querying the marshal. Otherwise, it subscribes to the marshal to await
1011/// the parent block's availability.
1012///
1013/// `parent_round` is an optional resolver hint. Callers should only provide a hint when
1014/// the source context is trusted/validated. Untrusted paths should pass `None`.
1015///
1016/// Returns an error if the marshal subscription is cancelled.
1017#[allow(clippy::type_complexity)]
1018async fn fetch_parent<E, S, A, B, C, H>(
1019 parent_commitment: Commitment,
1020 parent_round: Option<Round>,
1021 application: &mut A,
1022 marshal: &mut core::Mailbox<S, Coding<B, C, H, S::PublicKey>>,
1023 cached_genesis: Arc<OnceLock<(Commitment, CodedBlock<B, C, H>)>>,
1024) -> Either<Ready<Result<CodedBlock<B, C, H>, RecvError>>, oneshot::Receiver<CodedBlock<B, C, H>>>
1025where
1026 E: Rng + Spawner + Metrics + Clock,
1027 S: CertificateScheme,
1028 A: Application<E, Block = B, Context = Context<Commitment, S::PublicKey>>,
1029 B: CertifiableBlock<Context = Context<Commitment, S::PublicKey>>,
1030 C: CodingScheme,
1031 H: Hasher,
1032{
1033 if cached_genesis.get().is_none() {
1034 let genesis = application.genesis().await;
1035 let genesis_coding_commitment = genesis_coding_commitment::<H, _>(&genesis);
1036 let coded_genesis = CodedBlock::<B, C, H>::new_trusted(genesis, genesis_coding_commitment);
1037 let _ = cached_genesis.set((genesis_coding_commitment, coded_genesis));
1038 }
1039
1040 let (genesis_commitment, coded_genesis) = cached_genesis
1041 .get()
1042 .expect("genesis cache should be initialized");
1043 if parent_commitment == *genesis_commitment {
1044 Either::Left(ready(Ok(coded_genesis.clone())))
1045 } else {
1046 Either::Right(
1047 marshal
1048 .subscribe_by_commitment(parent_round, parent_commitment)
1049 .await,
1050 )
1051 }
1052}
1053
1054/// Constructs the [`Commitment`] for the genesis block.
1055fn genesis_coding_commitment<H: Hasher, B: CertifiableBlock>(block: &B) -> Commitment {
1056 Commitment::from((
1057 block.digest(),
1058 block.digest(),
1059 hash_context::<H, _>(&block.context()),
1060 GENESIS_CODING_CONFIG,
1061 ))
1062}