commonware_consensus/marshal/coding/marshaled.rs
1//! Wrapper for consensus applications that handles epochs, erasure coding, and block dissemination.
2//!
3//! # Overview
4//!
5//! [`Marshaled`] is an adapter that wraps any [`VerifyingApplication`] implementation to handle
6//! epoch transitions and erasure coded broadcast automatically. It intercepts consensus
7//! operations (propose, verify, certify) and ensures blocks are only produced within valid epoch boundaries.
8//!
9//! # Epoch Boundaries
10//!
11//! An epoch is a fixed number of blocks (the `epoch_length`). When the last block in an epoch
12//! is reached, this wrapper prevents new blocks from being built & proposed until the next epoch begins.
13//! Instead, it re-proposes the boundary block to avoid producing blocks that would be pruned
14//! by the epoch transition.
15//!
16//! # Erasure Coding
17//!
18//! This wrapper integrates with a variant of marshal that supports erasure coded broadcast. When a leader
19//! proposes a new block, it is automatically erasure encoded and its shards are broadcasted to active
20//! participants. When verifying a proposed block (the precondition for notarization), the wrapper
21//! ensures the commitment's context digest matches the consensus context and subscribes to shard validity
22//! for the shard received by the proposer. If the shard is valid, the local shard is relayed to all
23//! other participants to aid in block reconstruction.
24//!
25//! During certification (the phase between notarization and finalization), the wrapper subscribes to
26//! block reconstruction and validates epoch boundaries, parent commitment, height contiguity, and
27//! that the block's embedded context matches the consensus context before allowing the block to be
28//! certified. If certification fails, the voter can still emit a nullify vote to advance the view.
29//!
30//! # Usage
31//!
32//! Wrap your [`VerifyingApplication`] implementation with [`Marshaled::new`] and provide it to your
33//! consensus engine for the [`Automaton`] and [`Relay`]. The wrapper handles all epoch logic transparently.
34//!
35//! ```rust,ignore
36//! let cfg = MarshaledConfig {
37//! application: my_application,
38//! marshal: marshal_mailbox,
39//! shards: shard_mailbox,
40//! scheme_provider,
41//! epocher,
42//! strategy,
43//! };
44//! let application = Marshaled::new(context, cfg);
45//! ```
46//!
47//! # Implementation Notes
48//!
49//! - Genesis blocks are handled specially: epoch 0 returns the application's genesis block,
50//! while subsequent epochs use the last block of the previous epoch as genesis
51//! - Blocks are automatically verified to be within the current epoch
52//!
53//! # Notarization and Data Availability
54//!
55//! In rare crash cases, it is possible for a notarization certificate to exist without a block being
56//! available to the honest parties (e.g., if the whole network crashed before receiving `f+1` shards
57//! and the proposer went permanently offline). In this case, `certify` will be unable to fetch the
58//! block before timeout and result in a nullification.
59//!
60//! For this reason, it should not be expected that every notarized payload will be certifiable due
61//! to the lack of an available block. However, if even one honest and online party has the block,
62//! they will attempt to forward it to others via marshal's resolver. This case is already present
63//! in the event of a block that was proposed with invalid codec; Marshal will not be able to reconstruct
64//! the block, and therefore won't serve it.
65//!
66//! ```text
67//! ┌───────────────────────────────────────────────────┐
68//! ▼ │
69//! ┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
70//! │ B1 │◀──│ B2 │◀──│ B3 │XXX│ B4 │
71//! └─────────────────────┘ └─────────────────────┘ └──────────┬──────────┘ └─────────────────────┘
72//! │
73//! Failed Certify
74//! ```
75
76use crate::{
77 marshal::{
78 ancestry::AncestorStream,
79 application::{
80 validation::{
81 is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, LastBuilt,
82 },
83 verification_tasks::VerificationTasks,
84 },
85 coding::{
86 shards,
87 types::{coding_config_for_participants, hash_context, CodedBlock},
88 validation::{validate_block, validate_proposal, ProposalError},
89 Coding,
90 },
91 core, Update,
92 },
93 simplex::{scheme::Scheme, types::Context},
94 types::{coding::Commitment, Epoch, Epocher, Round},
95 Application, Automaton, Block, CertifiableAutomaton, CertifiableBlock, Epochable, Heightable,
96 Relay, Reporter, VerifyingApplication,
97};
98use commonware_coding::{Config as CodingConfig, Scheme as CodingScheme};
99use commonware_cryptography::{
100 certificate::{Provider, Scheme as CertificateScheme},
101 Committable, Digestible, Hasher,
102};
103use commonware_macros::select;
104use commonware_parallel::Strategy;
105use commonware_runtime::{
106 telemetry::metrics::histogram::{Buckets, Timed},
107 Clock, Metrics, Spawner, Storage,
108};
109use commonware_utils::{
110 channel::{
111 fallible::OneshotExt,
112 oneshot::{self, error::RecvError},
113 },
114 sync::Mutex,
115 NZU16,
116};
117use futures::future::{ready, try_join, Either, Ready};
118use prometheus_client::metrics::histogram::Histogram;
119use rand::Rng;
120use std::sync::{Arc, OnceLock};
121use tracing::{debug, warn};
122
123/// The [`CodingConfig`] used for genesis blocks. These blocks are never broadcasted in
124/// the proposal phase, and thus the configuration is irrelevant.
125const GENESIS_CODING_CONFIG: CodingConfig = CodingConfig {
126 minimum_shards: NZU16!(1),
127 extra_shards: NZU16!(1),
128};
129
130/// Configuration for initializing [`Marshaled`].
131#[allow(clippy::type_complexity)]
132pub struct MarshaledConfig<A, B, C, H, Z, S, ES>
133where
134 B: CertifiableBlock<Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>>,
135 C: CodingScheme,
136 H: Hasher,
137 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
138 S: Strategy,
139 ES: Epocher,
140{
141 /// The underlying application to wrap.
142 pub application: A,
143 /// Mailbox for communicating with the marshal engine.
144 pub marshal:
145 core::Mailbox<Z::Scheme, Coding<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>>,
146 /// Mailbox for communicating with the shards engine.
147 pub shards: shards::Mailbox<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>,
148 /// Provider for signing schemes scoped by epoch.
149 pub scheme_provider: Z,
150 /// Strategy for parallel operations.
151 pub strategy: S,
152 /// Strategy for determining epoch boundaries.
153 pub epocher: ES,
154}
155
156/// An [`Application`] adapter that handles epoch transitions and erasure coded broadcast.
157///
158/// This wrapper intercepts consensus operations to enforce epoch boundaries. It prevents
159/// blocks from being produced outside their valid epoch and handles the special case of
160/// re-proposing boundary blocks during epoch transitions.
161#[derive(Clone)]
162#[allow(clippy::type_complexity)]
163pub struct Marshaled<E, A, B, C, H, Z, S, ES>
164where
165 E: Rng + Storage + Spawner + Metrics + Clock,
166 A: Application<E>,
167 B: CertifiableBlock<Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>>,
168 C: CodingScheme,
169 H: Hasher,
170 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
171 S: Strategy,
172 ES: Epocher,
173{
174 context: E,
175 application: A,
176 marshal: core::Mailbox<Z::Scheme, Coding<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>>,
177 shards: shards::Mailbox<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>,
178 scheme_provider: Z,
179 epocher: ES,
180 strategy: S,
181 last_built: LastBuilt<CodedBlock<B, C, H>>,
182 verification_tasks: VerificationTasks<Commitment>,
183 cached_genesis: Arc<OnceLock<(Commitment, CodedBlock<B, C, H>)>>,
184
185 build_duration: Timed<E>,
186 verify_duration: Timed<E>,
187 proposal_parent_fetch_duration: Timed<E>,
188 erasure_encode_duration: Timed<E>,
189}
190
191impl<E, A, B, C, H, Z, S, ES> Marshaled<E, A, B, C, H, Z, S, ES>
192where
193 E: Rng + Storage + Spawner + Metrics + Clock,
194 A: VerifyingApplication<
195 E,
196 Block = B,
197 SigningScheme = Z::Scheme,
198 Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
199 >,
200 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
201 C: CodingScheme,
202 H: Hasher,
203 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
204 S: Strategy,
205 ES: Epocher,
206{
207 /// Creates a new [`Marshaled`] wrapper.
208 ///
209 /// # Panics
210 ///
211 /// Panics if the marshal metadata store cannot be initialized.
212 pub fn new(context: E, cfg: MarshaledConfig<A, B, C, H, Z, S, ES>) -> Self {
213 let MarshaledConfig {
214 application,
215 marshal,
216 shards,
217 scheme_provider,
218 strategy,
219 epocher,
220 } = cfg;
221
222 let clock = Arc::new(context.clone());
223
224 let build_histogram = Histogram::new(Buckets::LOCAL);
225 context.register(
226 "build_duration",
227 "Histogram of time taken for the application to build a new block, in seconds",
228 build_histogram.clone(),
229 );
230 let build_duration = Timed::new(build_histogram, clock.clone());
231
232 let verify_histogram = Histogram::new(Buckets::LOCAL);
233 context.register(
234 "verify_duration",
235 "Histogram of time taken for the application to verify a block, in seconds",
236 verify_histogram.clone(),
237 );
238 let verify_duration = Timed::new(verify_histogram, clock.clone());
239
240 let parent_fetch_histogram = Histogram::new(Buckets::LOCAL);
241 context.register(
242 "parent_fetch_duration",
243 "Histogram of time taken to fetch a parent block in proposal, in seconds",
244 parent_fetch_histogram.clone(),
245 );
246 let proposal_parent_fetch_duration = Timed::new(parent_fetch_histogram, clock.clone());
247
248 let erasure_histogram = Histogram::new(Buckets::LOCAL);
249 context.register(
250 "erasure_encode_duration",
251 "Histogram of time taken to erasure encode a block, in seconds",
252 erasure_histogram.clone(),
253 );
254 let erasure_encode_duration = Timed::new(erasure_histogram, clock);
255
256 Self {
257 context,
258 application,
259 marshal,
260 shards,
261 scheme_provider,
262 strategy,
263 epocher,
264 last_built: Arc::new(Mutex::new(None)),
265 verification_tasks: VerificationTasks::new(),
266 cached_genesis: Arc::new(OnceLock::new()),
267
268 build_duration,
269 verify_duration,
270 proposal_parent_fetch_duration,
271 erasure_encode_duration,
272 }
273 }
274
275 /// Verifies a proposed block within epoch boundaries.
276 ///
277 /// This method validates that:
278 /// 1. The block is within the current epoch (unless it's a boundary block re-proposal)
279 /// 2. Re-proposals are only allowed for the last block in an epoch
280 /// 3. The block's parent digest matches the consensus context's expected parent
281 /// 4. The block's height is exactly one greater than the parent's height
282 /// 5. The block's embedded context digest matches the commitment
283 /// 6. The block's embedded context matches the consensus context
284 /// 7. The underlying application's verification logic passes
285 ///
286 /// Verification is spawned in a background task and returns a receiver that will contain
287 /// the verification result.
288 ///
289 /// If `prefetched_block` is provided, it will be used directly instead of fetching from
290 /// the marshal. This is useful in `certify` when we've already fetched the block to
291 /// extract its embedded context.
292 fn deferred_verify(
293 &mut self,
294 consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
295 commitment: Commitment,
296 prefetched_block: Option<CodedBlock<B, C, H>>,
297 ) -> oneshot::Receiver<bool> {
298 let mut marshal = self.marshal.clone();
299 let mut application = self.application.clone();
300 let epocher = self.epocher.clone();
301 let verify_duration = self.verify_duration.clone();
302 let cached_genesis = self.cached_genesis.clone();
303
304 let (mut tx, rx) = oneshot::channel();
305 self.context
306 .with_label("deferred_verify")
307 .with_attribute("round", consensus_context.round)
308 .spawn(move |runtime_context| async move {
309 let round = consensus_context.round;
310
311 // Fetch parent block
312 let (parent_view, parent_commitment) = consensus_context.parent;
313 let parent_request = fetch_parent(
314 parent_commitment,
315 // We are guaranteed that the parent round for any `consensus_context` is
316 // in the same epoch (recall, the boundary block of the previous epoch
317 // is the genesis block of the current epoch).
318 Some(Round::new(consensus_context.epoch(), parent_view)),
319 &mut application,
320 &mut marshal,
321 cached_genesis,
322 )
323 .await;
324
325 // Get block either from prefetched or by subscribing
326 let (parent, block) = if let Some(block) = prefetched_block {
327 // We have a prefetched block, just fetch the parent
328 let parent = select! {
329 _ = tx.closed() => {
330 debug!(
331 reason = "consensus dropped receiver",
332 "skipping verification"
333 );
334 return;
335 },
336 result = parent_request => match result {
337 Ok(parent) => parent,
338 Err(_) => {
339 debug!(reason = "failed to fetch parent", "skipping verification");
340 return;
341 }
342 },
343 };
344 (parent, block)
345 } else {
346 // No prefetched block, fetch both parent and block
347 let block_request = marshal
348 .subscribe_by_commitment(Some(round), commitment)
349 .await;
350 let block_requests = try_join(parent_request, block_request);
351
352 select! {
353 _ = tx.closed() => {
354 debug!(
355 reason = "consensus dropped receiver",
356 "skipping verification"
357 );
358 return;
359 },
360 result = block_requests => match result {
361 Ok(results) => results,
362 Err(_) => {
363 debug!(
364 reason = "failed to fetch parent or block",
365 "skipping verification"
366 );
367 return;
368 }
369 },
370 }
371 };
372
373 if let Err(err) = validate_block::<H, _, _>(
374 &epocher,
375 &block,
376 &parent,
377 &consensus_context,
378 commitment,
379 parent_commitment,
380 ) {
381 debug!(
382 ?err,
383 expected_commitment = %commitment,
384 block_commitment = %block.commitment(),
385 expected_parent_commitment = %parent_commitment,
386 parent_commitment = %parent.commitment(),
387 expected_parent = %parent.digest(),
388 block_parent = %block.parent(),
389 parent_height = %parent.height(),
390 block_height = %block.height(),
391 "block failed coded invariant validation"
392 );
393 tx.send_lossy(false);
394 return;
395 }
396
397 let ancestry_stream = AncestorStream::new(
398 marshal.clone(),
399 [block.clone().into_inner(), parent.into_inner()],
400 );
401 let validity_request = application.verify(
402 (
403 runtime_context.with_label("app_verify"),
404 consensus_context.clone(),
405 ),
406 ancestry_stream,
407 );
408
409 // If consensus drops the receiver, we can stop work early.
410 let mut timer = verify_duration.timer();
411 let application_valid = select! {
412 _ = tx.closed() => {
413 debug!(
414 reason = "consensus dropped receiver",
415 "skipping verification"
416 );
417 return;
418 },
419 is_valid = validity_request => is_valid,
420 };
421 timer.observe();
422 if application_valid {
423 // The block is only persisted at this point.
424 marshal.verified(round, block).await;
425 }
426 tx.send_lossy(application_valid);
427 });
428
429 rx
430 }
431}
432
433impl<E, A, B, C, H, Z, S, ES> Automaton for Marshaled<E, A, B, C, H, Z, S, ES>
434where
435 E: Rng + Storage + Spawner + Metrics + Clock,
436 A: VerifyingApplication<
437 E,
438 Block = B,
439 SigningScheme = Z::Scheme,
440 Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
441 >,
442 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
443 C: CodingScheme,
444 H: Hasher,
445 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
446 S: Strategy,
447 ES: Epocher,
448{
449 type Digest = Commitment;
450 type Context = Context<Self::Digest, <Z::Scheme as CertificateScheme>::PublicKey>;
451
452 /// Returns the genesis digest for a given epoch.
453 ///
454 /// For epoch 0, this returns the application's genesis block digest. For subsequent
455 /// epochs, it returns the digest of the last block from the previous epoch, which
456 /// serves as the genesis block for the new epoch.
457 ///
458 /// # Panics
459 ///
460 /// Panics if a non-zero epoch is requested but the previous epoch's final block is not
461 /// available in storage. This indicates a critical error in the consensus engine startup
462 /// sequence, as engines must always have the genesis block before starting.
463 async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
464 let Some(previous_epoch) = epoch.previous() else {
465 let genesis_block = self.application.genesis().await;
466 return genesis_coding_commitment::<H, _>(&genesis_block);
467 };
468
469 let last_height = self
470 .epocher
471 .last(previous_epoch)
472 .expect("previous epoch should exist");
473 let Some(block) = self.marshal.get_block(last_height).await else {
474 // A new consensus engine will never be started without having the genesis block
475 // of the new epoch (the last block of the previous epoch) already stored.
476 unreachable!("missing starting epoch block at height {last_height}");
477 };
478 block.commitment()
479 }
480
481 /// Proposes a new block or re-proposes the epoch boundary block.
482 ///
483 /// This method builds a new block from the underlying application unless the parent block
484 /// is the last block in the current epoch. When at an epoch boundary, it re-proposes the
485 /// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
486 ///
487 /// The proposal operation is spawned in a background task and returns a receiver that will
488 /// contain the proposed block's digest when ready. The built block is cached for later
489 /// broadcasting.
490 async fn propose(
491 &mut self,
492 consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
493 ) -> oneshot::Receiver<Self::Digest> {
494 let mut marshal = self.marshal.clone();
495 let mut application = self.application.clone();
496 let last_built = self.last_built.clone();
497 let epocher = self.epocher.clone();
498 let strategy = self.strategy.clone();
499 let cached_genesis = self.cached_genesis.clone();
500
501 // If there's no scheme for the current epoch, we cannot verify the proposal.
502 // Send back a receiver with a dropped sender.
503 let Some(scheme) = self.scheme_provider.scoped(consensus_context.epoch()) else {
504 let (_, rx) = oneshot::channel();
505 return rx;
506 };
507
508 let n_participants =
509 u16::try_from(scheme.participants().len()).expect("too many participants");
510 let coding_config = coding_config_for_participants(n_participants);
511
512 // Metrics
513 let build_duration = self.build_duration.clone();
514 let proposal_parent_fetch_duration = self.proposal_parent_fetch_duration.clone();
515 let erasure_encode_duration = self.erasure_encode_duration.clone();
516
517 let (mut tx, rx) = oneshot::channel();
518 self.context
519 .with_label("propose")
520 .with_attribute("round", consensus_context.round)
521 .spawn(move |runtime_context| async move {
522 let (parent_view, parent_commitment) = consensus_context.parent;
523 let parent_request = fetch_parent(
524 parent_commitment,
525 // We are guaranteed that the parent round for any `consensus_context` is
526 // in the same epoch (recall, the boundary block of the previous epoch
527 // is the genesis block of the current epoch).
528 Some(Round::new(consensus_context.epoch(), parent_view)),
529 &mut application,
530 &mut marshal,
531 cached_genesis,
532 )
533 .await;
534
535 let mut parent_timer = proposal_parent_fetch_duration.timer();
536 let parent = select! {
537 _ = tx.closed() => {
538 debug!(reason = "consensus dropped receiver", "skipping proposal");
539 return;
540 },
541 result = parent_request => match result {
542 Ok(parent) => parent,
543 Err(_) => {
544 debug!(
545 ?parent_commitment,
546 reason = "failed to fetch parent block",
547 "skipping proposal"
548 );
549 return;
550 }
551 },
552 };
553 parent_timer.observe();
554
555 // Special case: If the parent block is the last block in the epoch,
556 // re-propose it as to not produce any blocks that will be cut out
557 // by the epoch transition.
558 let last_in_epoch = epocher
559 .last(consensus_context.epoch())
560 .expect("current epoch should exist");
561 if parent.height() == last_in_epoch {
562 let commitment = parent.commitment();
563 {
564 let mut lock = last_built.lock();
565 *lock = Some((consensus_context.round, parent));
566 }
567
568 let success = tx.send_lossy(commitment);
569 debug!(
570 round = ?consensus_context.round,
571 ?commitment,
572 success,
573 "re-proposed parent block at epoch boundary"
574 );
575 return;
576 }
577
578 let ancestor_stream = AncestorStream::new(marshal.clone(), [parent.into_inner()]);
579 let build_request = application.propose(
580 (
581 runtime_context.with_label("app_propose"),
582 consensus_context.clone(),
583 ),
584 ancestor_stream,
585 );
586
587 let mut build_timer = build_duration.timer();
588 let built_block = select! {
589 _ = tx.closed() => {
590 debug!(reason = "consensus dropped receiver", "skipping proposal");
591 return;
592 },
593 result = build_request => match result {
594 Some(block) => block,
595 None => {
596 debug!(
597 ?parent_commitment,
598 reason = "block building failed",
599 "skipping proposal"
600 );
601 return;
602 }
603 },
604 };
605 build_timer.observe();
606
607 let mut erasure_timer = erasure_encode_duration.timer();
608 let coded_block = CodedBlock::<B, C, H>::new(built_block, coding_config, &strategy);
609 erasure_timer.observe();
610
611 let commitment = coded_block.commitment();
612 {
613 let mut lock = last_built.lock();
614 *lock = Some((consensus_context.round, coded_block));
615 }
616
617 let success = tx.send_lossy(commitment);
618 debug!(
619 round = ?consensus_context.round,
620 ?commitment,
621 success,
622 "proposed new block"
623 );
624 });
625 rx
626 }
627
628 /// Verifies a received shard for a given round.
629 ///
630 /// This method validates that:
631 /// 1. The coding configuration matches the expected configuration for the current scheme.
632 /// 2. The commitment's context digest matches the consensus context (unless this is a re-proposal).
633 /// 3. The shard is contained within the consensus commitment.
634 ///
635 /// Verification is spawned in a background task and returns a receiver that will contain
636 /// the verification result. Additionally, this method kicks off deferred verification to
637 /// start block verification early (hidden behind shard validity and network latency).
638 async fn verify(
639 &mut self,
640 consensus_context: Context<Self::Digest, <Z::Scheme as CertificateScheme>::PublicKey>,
641 payload: Self::Digest,
642 ) -> oneshot::Receiver<bool> {
643 // If there's no scheme for the current epoch, we cannot vote on the proposal.
644 // Send back a receiver with a dropped sender.
645 let Some(scheme) = self.scheme_provider.scoped(consensus_context.epoch()) else {
646 let (_, rx) = oneshot::channel();
647 return rx;
648 };
649
650 let n_participants =
651 u16::try_from(scheme.participants().len()).expect("too many participants");
652 let coding_config = coding_config_for_participants(n_participants);
653 let is_reproposal = payload == consensus_context.parent.1;
654
655 // Validate proposal-level invariants:
656 // - coding config must match active participant set
657 // - context digest must match unless this is a re-proposal
658 let proposal_context = (!is_reproposal).then_some(&consensus_context);
659 if let Err(err) = validate_proposal::<H, _>(payload, coding_config, proposal_context) {
660 match err {
661 ProposalError::CodingConfig => {
662 warn!(
663 round = %consensus_context.round,
664 got = ?payload.config(),
665 expected = ?coding_config,
666 "rejected proposal with unexpected coding configuration"
667 );
668 }
669 ProposalError::ContextDigest => {
670 let expected = hash_context::<H, _>(&consensus_context);
671 let got = payload.context::<H::Digest>();
672 warn!(
673 round = %consensus_context.round,
674 expected = ?expected,
675 got = ?got,
676 "rejected proposal with mismatched context digest"
677 );
678 }
679 }
680
681 let (tx, rx) = oneshot::channel();
682 tx.send_lossy(false);
683 return rx;
684 }
685
686 // Re-proposals skip context-digest validation because the consensus context will point
687 // at the prior epoch-boundary block while the embedded block context is from the
688 // original proposal view.
689 //
690 // Re-proposals also skip shard-validity and deferred verification because:
691 // 1. The block was already verified when originally proposed
692 // 2. The parent-child height check would fail (parent IS the block)
693 // 3. Waiting for shards could stall if the leader doesn't rebroadcast
694 if is_reproposal {
695 // Fetch the block to verify it's at the epoch boundary.
696 // This should be fast since the parent block is typically already cached.
697 let block_rx = self
698 .marshal
699 .subscribe_by_commitment(Some(consensus_context.round), payload)
700 .await;
701 let marshal = self.marshal.clone();
702 let epocher = self.epocher.clone();
703 let round = consensus_context.round;
704 let verification_tasks = self.verification_tasks.clone();
705
706 // Register a verification task synchronously before spawning work so
707 // `certify` can always find it (no race with task startup).
708 let (task_tx, task_rx) = oneshot::channel();
709 verification_tasks.insert(round, payload, task_rx);
710
711 let (mut tx, rx) = oneshot::channel();
712 self.context
713 .with_label("verify_reproposal")
714 .spawn(move |_| async move {
715 let block = select! {
716 _ = tx.closed() => {
717 debug!(
718 reason = "consensus dropped receiver",
719 "skipping re-proposal verification"
720 );
721 return;
722 },
723 block = block_rx => match block {
724 Ok(block) => block,
725 Err(_) => {
726 debug!(
727 ?payload,
728 reason = "failed to fetch block for re-proposal verification",
729 "skipping re-proposal verification"
730 );
731 // Fetch failure is an availability issue, not an explicit
732 // invalidity proof. Do not synthesize `false` here.
733 return;
734 }
735 },
736 };
737
738 if !is_valid_reproposal_at_verify(&epocher, block.height(), round.epoch()) {
739 debug!(
740 height = %block.height(),
741 "re-proposal is not at epoch boundary"
742 );
743 task_tx.send_lossy(false);
744 tx.send_lossy(false);
745 return;
746 }
747
748 // Valid re-proposal. Notify the marshal and complete the
749 // verification task for `certify`.
750 marshal.verified(round, block).await;
751 task_tx.send_lossy(true);
752 tx.send_lossy(true);
753 });
754 return rx;
755 }
756
757 // Inform the shard engine of an externally proposed commitment.
758 self.shards
759 .discovered(
760 payload,
761 consensus_context.leader.clone(),
762 consensus_context.round,
763 )
764 .await;
765
766 // Kick off deferred verification early to hide verification latency behind
767 // shard validity checks and network latency for collecting votes.
768 let round = consensus_context.round;
769 let task = self.deferred_verify(consensus_context, payload, None);
770 self.verification_tasks.insert(round, payload, task);
771
772 match scheme.me() {
773 Some(_) => {
774 // Subscribe to shard validity. The subscription completes when a valid shard arrives.
775 let validity_rx = self.shards.subscribe_shard(payload).await;
776 let (tx, rx) = oneshot::channel();
777 self.context
778 .with_label("shard_validity_wait")
779 .spawn(|_| async move {
780 if validity_rx.await.is_ok() {
781 tx.send_lossy(true);
782 }
783 });
784 rx
785 }
786 None => {
787 // If we are not participating, there's no shard to verify; just accept the proposal.
788 //
789 // Later, when certifying, we will wait to receive the block from the network.
790 let (tx, rx) = oneshot::channel();
791 tx.send_lossy(true);
792 rx
793 }
794 }
795 }
796}
797
798impl<E, A, B, C, H, Z, S, ES> CertifiableAutomaton for Marshaled<E, A, B, C, H, Z, S, ES>
799where
800 E: Rng + Storage + Spawner + Metrics + Clock,
801 A: VerifyingApplication<
802 E,
803 Block = B,
804 SigningScheme = Z::Scheme,
805 Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
806 >,
807 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
808 C: CodingScheme,
809 H: Hasher,
810 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
811 S: Strategy,
812 ES: Epocher,
813{
814 async fn certify(&mut self, round: Round, payload: Self::Digest) -> oneshot::Receiver<bool> {
815 // First, check for an in-progress verification task from `verify()`.
816 let task = self.verification_tasks.take(round, payload);
817 if let Some(task) = task {
818 return task;
819 }
820
821 // No in-progress task means we never verified this proposal locally.
822 // We can use the block's embedded context to move to the next view. If a Byzantine
823 // proposer embedded a malicious context, the f+1 honest validators from the notarizing quorum
824 // will verify against the proper context and reject the mismatch, preventing a 2f+1
825 // finalization quorum.
826 //
827 // Subscribe to the block and verify using its embedded context once available.
828 debug!(
829 ?round,
830 ?payload,
831 "subscribing to block for certification using embedded context"
832 );
833 let block_rx = self
834 .marshal
835 .subscribe_by_commitment(Some(round), payload)
836 .await;
837 let mut marshaled = self.clone();
838 let shards = self.shards.clone();
839 let (mut tx, rx) = oneshot::channel();
840 self.context
841 .with_label("certify")
842 .with_attribute("round", round)
843 .spawn(move |_| async move {
844 let block = select! {
845 _ = tx.closed() => {
846 debug!(
847 reason = "consensus dropped receiver",
848 "skipping certification"
849 );
850 return;
851 },
852 result = block_rx => match result {
853 Ok(block) => block,
854 Err(_) => {
855 debug!(
856 ?payload,
857 reason = "failed to fetch block for certification",
858 "skipping certification"
859 );
860 return;
861 }
862 },
863 };
864
865 // Re-proposal detection for certify path: we don't have the consensus
866 // context, only the block's embedded context from original proposal.
867 // Infer re-proposal from:
868 // 1. Block is at epoch boundary (only boundary blocks can be re-proposed)
869 // 2. Certification round's view > embedded context's view (re-proposals
870 // retain their original embedded context, so a later view indicates
871 // the block was re-proposed)
872 // 3. Same epoch (re-proposals don't cross epoch boundaries)
873 let embedded_context = block.context();
874 let is_reproposal = is_inferred_reproposal_at_certify(
875 &marshaled.epocher,
876 block.height(),
877 embedded_context.round,
878 round,
879 );
880 if is_reproposal {
881 // NOTE: It is possible that, during crash recovery, we call
882 // `marshal.verified` twice for the same block. That function is
883 // idempotent, so this is safe.
884 marshaled.marshal.verified(round, block).await;
885 tx.send_lossy(true);
886 return;
887 }
888
889 // Inform the shard engine of an externally proposed commitment.
890 shards
891 .discovered(
892 payload,
893 embedded_context.leader.clone(),
894 embedded_context.round,
895 )
896 .await;
897
898 // Use the block's embedded context for verification, passing the
899 // prefetched block to avoid fetching it again inside deferred_verify.
900 let verify_rx = marshaled.deferred_verify(embedded_context, payload, Some(block));
901 if let Ok(result) = verify_rx.await {
902 tx.send_lossy(result);
903 }
904 });
905 rx
906 }
907}
908
909impl<E, A, B, C, H, Z, S, ES> Relay for Marshaled<E, A, B, C, H, Z, S, ES>
910where
911 E: Rng + Storage + Spawner + Metrics + Clock,
912 A: Application<
913 E,
914 Block = B,
915 Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
916 >,
917 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
918 C: CodingScheme,
919 H: Hasher,
920 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
921 S: Strategy,
922 ES: Epocher,
923{
924 type Digest = Commitment;
925
926 /// Broadcasts a previously built block to the network.
927 ///
928 /// This uses the cached block from the last proposal operation. If no block was built or
929 /// the digest does not match the cached block, the broadcast is skipped with a warning.
930 async fn broadcast(&mut self, commitment: Self::Digest) {
931 let Some((round, block)) = self.last_built.lock().take() else {
932 warn!("missing block to broadcast");
933 return;
934 };
935
936 if block.commitment() != commitment {
937 warn!(
938 round = %round,
939 commitment = %block.commitment(),
940 height = %block.height(),
941 "skipping requested broadcast of block with mismatched commitment"
942 );
943 return;
944 }
945
946 debug!(
947 round = %round,
948 commitment = %block.commitment(),
949 height = %block.height(),
950 "requested broadcast of built block"
951 );
952
953 self.shards.proposed(round, block).await;
954 }
955}
956
957impl<E, A, B, C, H, Z, S, ES> Reporter for Marshaled<E, A, B, C, H, Z, S, ES>
958where
959 E: Rng + Storage + Spawner + Metrics + Clock,
960 A: Application<
961 E,
962 Block = B,
963 Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
964 > + Reporter<Activity = Update<B>>,
965 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
966 C: CodingScheme,
967 H: Hasher,
968 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
969 S: Strategy,
970 ES: Epocher,
971{
972 type Activity = A::Activity;
973
974 /// Relays a report to the underlying [`Application`] and cleans up old verification data.
975 async fn report(&mut self, update: Self::Activity) {
976 // Clean up verification tasks and contexts for rounds <= the finalized round.
977 if let Update::Tip(round, _, _) = &update {
978 self.verification_tasks.retain_after(round);
979 }
980 self.application.report(update).await
981 }
982}
983
984/// Fetches the parent block given its digest and optional round.
985///
986/// This is a helper function used during proposal and verification to retrieve the parent
987/// block. If the parent digest matches the genesis block, it returns the genesis block
988/// directly without querying the marshal. Otherwise, it subscribes to the marshal to await
989/// the parent block's availability.
990///
991/// `parent_round` is an optional resolver hint. Callers should only provide a hint when
992/// the source context is trusted/validated. Untrusted paths should pass `None`.
993///
994/// Returns an error if the marshal subscription is cancelled.
995#[allow(clippy::type_complexity)]
996async fn fetch_parent<E, S, A, B, C, H>(
997 parent_commitment: Commitment,
998 parent_round: Option<Round>,
999 application: &mut A,
1000 marshal: &mut core::Mailbox<S, Coding<B, C, H, S::PublicKey>>,
1001 cached_genesis: Arc<OnceLock<(Commitment, CodedBlock<B, C, H>)>>,
1002) -> Either<Ready<Result<CodedBlock<B, C, H>, RecvError>>, oneshot::Receiver<CodedBlock<B, C, H>>>
1003where
1004 E: Rng + Spawner + Metrics + Clock,
1005 S: CertificateScheme,
1006 A: Application<E, Block = B, Context = Context<Commitment, S::PublicKey>>,
1007 B: CertifiableBlock<Context = Context<Commitment, S::PublicKey>>,
1008 C: CodingScheme,
1009 H: Hasher,
1010{
1011 if cached_genesis.get().is_none() {
1012 let genesis = application.genesis().await;
1013 let genesis_coding_commitment = genesis_coding_commitment::<H, _>(&genesis);
1014 let coded_genesis = CodedBlock::<B, C, H>::new_trusted(genesis, genesis_coding_commitment);
1015 let _ = cached_genesis.set((genesis_coding_commitment, coded_genesis));
1016 }
1017
1018 let (genesis_commitment, coded_genesis) = cached_genesis
1019 .get()
1020 .expect("genesis cache should be initialized");
1021 if parent_commitment == *genesis_commitment {
1022 Either::Left(ready(Ok(coded_genesis.clone())))
1023 } else {
1024 Either::Right(
1025 marshal
1026 .subscribe_by_commitment(parent_round, parent_commitment)
1027 .await,
1028 )
1029 }
1030}
1031
1032/// Constructs the [`Commitment`] for the genesis block.
1033fn genesis_coding_commitment<H: Hasher, B: CertifiableBlock>(block: &B) -> Commitment {
1034 Commitment::from((
1035 block.digest(),
1036 block.digest(),
1037 hash_context::<H, _>(&block.context()),
1038 GENESIS_CODING_CONFIG,
1039 ))
1040}