commonware_consensus/marshal/coding/marshaled.rs
1//! Wrapper for consensus applications that handles epochs, erasure coding, and block dissemination.
2//!
3//! # Overview
4//!
5//! [`Marshaled`] is an adapter that wraps any [`Application`] implementation to handle
6//! epoch transitions and erasure coded broadcast automatically. It intercepts consensus
7//! operations (propose, verify, certify) and ensures blocks are only produced within valid epoch boundaries.
8//!
9//! # Epoch Boundaries
10//!
11//! An epoch is a fixed number of blocks (the `epoch_length`). When the last block in an epoch
12//! is reached, this wrapper prevents new blocks from being built & proposed until the next epoch begins.
13//! Instead, it re-proposes the boundary block to avoid producing blocks that would be pruned
14//! by the epoch transition.
15//!
16//! # Erasure Coding
17//!
18//! This wrapper integrates with a variant of marshal that supports erasure coded broadcast. When a leader
19//! proposes a new block, it is automatically erasure encoded and its shards are broadcasted to active
20//! participants. When verifying a proposed block (the precondition for notarization), the wrapper
21//! ensures the commitment's context digest matches the consensus context and waits for validation of
22//! the shard assigned to this participant by the proposer. If that shard is valid, the assigned shard is
23//! relayed to all other participants to aid in block reconstruction.
24//!
25//! A participant may still reconstruct the full block from gossiped shards before its designated
26//! leader-delivered shard arrives. That is sufficient for later certification and repair flows, but it
27//! is not treated as notarization readiness: a participant only helps form a notarization once it has
28//! validated the shard it is supposed to echo.
29//!
30//! During certification (the phase between notarization and finalization), the wrapper subscribes to
31//! block reconstruction and validates epoch boundaries, parent commitment, height contiguity, and
32//! that the block's embedded context matches the consensus context before allowing the block to be
33//! certified. If certification fails, the voter can still emit a nullify vote to advance the view.
34//!
35//! # Usage
36//!
37//! Wrap your [`Application`] implementation with [`Marshaled::new`] and provide it to your
38//! consensus engine for the [`Automaton`] and [`Relay`]. The wrapper handles all epoch logic transparently.
39//!
40//! ```rust,ignore
41//! let cfg = MarshaledConfig {
42//! application: my_application,
43//! marshal: marshal_mailbox,
44//! shards: shard_mailbox,
45//! scheme_provider,
46//! epocher,
47//! strategy,
48//! };
49//! let application = Marshaled::new(context, cfg);
50//! ```
51//!
52//! # Implementation Notes
53//!
54//! - Genesis blocks are handled specially: epoch 0 returns the application's genesis block,
55//! while subsequent epochs use the last block of the previous epoch as genesis
56//! - Blocks are automatically verified to be within the current epoch
57//!
58//! # Notarization and Data Availability
59//!
60//! In rare crash cases, it is possible for a notarization certificate to exist without a block being
61//! available to the honest parties (e.g., if the whole network crashed before receiving `f+1` shards
62//! and the proposer went permanently offline). In this case, `certify` will be unable to fetch the
63//! block before timeout and result in a nullification.
64//!
65//! For this reason, it should not be expected that every notarized payload will be certifiable due
66//! to the lack of an available block. However, if even one honest and online party has the block,
67//! they will attempt to forward it to others via marshal's resolver. This case is already present
68//! in the event of a block that was proposed with invalid codec; Marshal will not be able to reconstruct
69//! the block, and therefore won't serve it.
70//!
71//! ```text
72//! ┌───────────────────────────────────────────────────┐
73//! ▼ │
74//! ┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
75//! │ B1 │◀──│ B2 │◀──│ B3 │XXX│ B4 │
76//! └─────────────────────┘ └─────────────────────┘ └──────────┬──────────┘ └─────────────────────┘
77//! │
78//! Failed Certify
79//! ```
80
81use crate::{
82 marshal::{
83 application::{
84 validation::{is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, Stage},
85 verification_tasks::VerificationTasks,
86 },
87 coding::{
88 shards,
89 types::{coding_config_for_participants, hash_context, CodedBlock},
90 validation::{validate_block, validate_proposal, ProposalError},
91 Coding,
92 },
93 core, Update,
94 },
95 simplex::{scheme::Scheme, types::Context, Plan},
96 types::{coding::Commitment, Epoch, Epocher, Round},
97 Application, Automaton, Block, CertifiableAutomaton, CertifiableBlock, Epochable, Heightable,
98 Relay, Reporter,
99};
100use commonware_actor::Feedback;
101use commonware_coding::Scheme as CodingScheme;
102use commonware_cryptography::{
103 certificate::{Provider, Scheme as CertificateScheme},
104 Committable, Digestible, Hasher,
105};
106use commonware_macros::select;
107use commonware_p2p::Recipients;
108use commonware_parallel::Strategy;
109use commonware_runtime::{
110 telemetry::metrics::{
111 histogram::{Buckets, Timed},
112 MetricsExt as _,
113 },
114 Clock, Metrics, Spawner, Storage,
115};
116use commonware_utils::{
117 channel::{fallible::OneshotExt, oneshot},
118 sync::AsyncMutex,
119};
120use rand::Rng;
121use std::sync::Arc;
122use tracing::{debug, warn};
123
124/// Configuration for initializing [`Marshaled`].
125#[allow(clippy::type_complexity)]
126pub struct MarshaledConfig<A, B, C, H, Z, S, ES>
127where
128 B: CertifiableBlock<Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>>,
129 C: CodingScheme,
130 H: Hasher,
131 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
132 S: Strategy,
133 ES: Epocher,
134{
135 /// The underlying application to wrap.
136 pub application: A,
137 /// Mailbox for communicating with the marshal engine.
138 pub marshal:
139 core::Mailbox<Z::Scheme, Coding<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>>,
140 /// Mailbox for communicating with the shards engine.
141 pub shards: shards::Mailbox<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>,
142 /// Provider for signing schemes scoped by epoch.
143 pub scheme_provider: Z,
144 /// Strategy for parallel operations.
145 pub strategy: S,
146 /// Strategy for determining epoch boundaries.
147 pub epocher: ES,
148}
149
150/// An [`Application`] adapter that handles epoch transitions and erasure coded broadcast.
151///
152/// This wrapper intercepts consensus operations to enforce epoch boundaries. It prevents
153/// blocks from being produced outside their valid epoch and handles the special case of
154/// re-proposing boundary blocks during epoch transitions.
155#[allow(clippy::type_complexity)]
156pub struct Marshaled<E, A, B, C, H, Z, S, ES>
157where
158 E: Rng + Storage + Spawner + Metrics + Clock,
159 A: Application<E>,
160 B: CertifiableBlock<Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>>,
161 C: CodingScheme,
162 H: Hasher,
163 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
164 S: Strategy,
165 ES: Epocher,
166{
167 context: Arc<AsyncMutex<E>>,
168 application: A,
169 marshal: core::Mailbox<Z::Scheme, Coding<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>>,
170 shards: shards::Mailbox<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>,
171 scheme_provider: Z,
172 epocher: ES,
173 strategy: S,
174 verification_tasks: VerificationTasks<Commitment>,
175
176 build_duration: Timed,
177 verify_duration: Timed,
178 proposal_parent_fetch_duration: Timed,
179 ancestor_fetch_duration: Timed,
180 erasure_encode_duration: Timed,
181}
182
183impl<E, A, B, C, H, Z, S, ES> Clone for Marshaled<E, A, B, C, H, Z, S, ES>
184where
185 E: Rng + Storage + Spawner + Metrics + Clock,
186 A: Application<E>,
187 B: CertifiableBlock<Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>>,
188 C: CodingScheme,
189 H: Hasher,
190 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
191 S: Strategy,
192 ES: Epocher,
193{
194 fn clone(&self) -> Self {
195 Self {
196 context: self.context.clone(),
197 application: self.application.clone(),
198 marshal: self.marshal.clone(),
199 shards: self.shards.clone(),
200 scheme_provider: self.scheme_provider.clone(),
201 epocher: self.epocher.clone(),
202 strategy: self.strategy.clone(),
203 verification_tasks: self.verification_tasks.clone(),
204 build_duration: self.build_duration.clone(),
205 verify_duration: self.verify_duration.clone(),
206 proposal_parent_fetch_duration: self.proposal_parent_fetch_duration.clone(),
207 ancestor_fetch_duration: self.ancestor_fetch_duration.clone(),
208 erasure_encode_duration: self.erasure_encode_duration.clone(),
209 }
210 }
211}
212
213impl<E, A, B, C, H, Z, S, ES> Marshaled<E, A, B, C, H, Z, S, ES>
214where
215 E: Rng + Storage + Spawner + Metrics + Clock,
216 A: Application<
217 E,
218 Block = B,
219 SigningScheme = Z::Scheme,
220 Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
221 >,
222 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
223 C: CodingScheme,
224 H: Hasher,
225 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
226 S: Strategy,
227 ES: Epocher,
228{
229 /// Creates a new [`Marshaled`] wrapper.
230 ///
231 /// # Panics
232 ///
233 /// Panics if the marshal metadata store cannot be initialized.
234 pub fn new(context: E, cfg: MarshaledConfig<A, B, C, H, Z, S, ES>) -> Self {
235 let MarshaledConfig {
236 application,
237 marshal,
238 shards,
239 scheme_provider,
240 strategy,
241 epocher,
242 } = cfg;
243
244 let build_histogram = context.histogram(
245 "build_duration",
246 "Histogram of time taken for the application to build a new block, in seconds",
247 Buckets::LOCAL,
248 );
249 let build_duration = Timed::new(build_histogram);
250
251 let verify_histogram = context.histogram(
252 "verify_duration",
253 "Histogram of time taken for the application to verify a block, in seconds",
254 Buckets::LOCAL,
255 );
256 let verify_duration = Timed::new(verify_histogram);
257
258 let parent_fetch_histogram = context.histogram(
259 "parent_fetch_duration",
260 "Histogram of time taken to fetch a parent block in proposal, in seconds",
261 Buckets::LOCAL,
262 );
263 let proposal_parent_fetch_duration = Timed::new(parent_fetch_histogram);
264
265 let ancestor_fetch_histogram = context.histogram(
266 "ancestor_fetch_duration",
267 "Histogram of time taken to fetch a block via the ancestry stream, in seconds",
268 Buckets::LOCAL,
269 );
270 let ancestor_fetch_duration = Timed::new(ancestor_fetch_histogram);
271
272 let erasure_histogram = context.histogram(
273 "erasure_encode_duration",
274 "Histogram of time taken to erasure encode a block, in seconds",
275 Buckets::LOCAL,
276 );
277 let erasure_encode_duration = Timed::new(erasure_histogram);
278
279 Self {
280 context: Arc::new(AsyncMutex::new(context)),
281 application,
282 marshal,
283 shards,
284 scheme_provider,
285 strategy,
286 epocher,
287 verification_tasks: VerificationTasks::new(),
288
289 build_duration,
290 verify_duration,
291 proposal_parent_fetch_duration,
292 ancestor_fetch_duration,
293 erasure_encode_duration,
294 }
295 }
296
297 /// Verifies a proposed block within epoch boundaries.
298 ///
299 /// This method validates that:
300 /// 1. The block is within the current epoch (unless it's a boundary block re-proposal)
301 /// 2. Re-proposals are only allowed for the last block in an epoch
302 /// 3. The block's parent digest matches the consensus context's expected parent
303 /// 4. The block's height is exactly one greater than the parent's height
304 /// 5. The block's embedded context digest matches the commitment
305 /// 6. The block's embedded context matches the consensus context
306 /// 7. The underlying application's verification logic passes
307 ///
308 /// Verification is spawned in a background task and returns a receiver that will contain
309 /// the verification result.
310 ///
311 /// If `prefetched_block` is provided, it will be used directly instead of fetching from
312 /// the marshal. This is useful in `certify` when we've already fetched the block to
313 /// extract its embedded context.
314 async fn deferred_verify(
315 &mut self,
316 consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
317 commitment: Commitment,
318 prefetched_block: Option<CodedBlock<B, C, H>>,
319 stage: Stage,
320 ) -> oneshot::Receiver<bool> {
321 let mut marshal = self.marshal.clone();
322 let mut application = self.application.clone();
323 let epocher = self.epocher.clone();
324 let verify_duration = self.verify_duration.clone();
325 let ancestor_fetch_duration = self.ancestor_fetch_duration.clone();
326
327 let (mut tx, rx) = oneshot::channel();
328 let context = self
329 .context
330 .lock()
331 .await
332 .child("deferred_verify")
333 .with_attribute("round", consensus_context.round);
334 context.spawn(move |runtime_context| async move {
335 let round = consensus_context.round;
336 let (parent_view, parent_commitment) = consensus_context.parent;
337
338 // Get the candidate block either from the caller or by waiting for
339 // local reconstruction. Candidate data remains local-only: a
340 // notarization is not sufficient reason to request it from peers.
341 let block = if let Some(block) = prefetched_block {
342 block
343 } else {
344 let block_request =
345 marshal.subscribe_by_commitment(commitment, core::CommitmentFallback::Wait);
346 select! {
347 _ = tx.closed() => {
348 debug!(
349 reason = "consensus dropped receiver",
350 "skipping verification"
351 );
352 return;
353 },
354 result = block_request => match result {
355 Ok(block) => block,
356 Err(_) => {
357 debug!(
358 reason = "block unavailable",
359 "skipping verification"
360 );
361 return;
362 }
363 },
364 }
365 };
366
367 // The context supplies the certified parent round. Do not derive a
368 // height from the unverified child block for this lookup.
369 let fallback = core::CommitmentFallback::FetchByRound {
370 round: Round::new(consensus_context.epoch(), parent_view),
371 };
372 let parent_request = marshal.subscribe_by_commitment(parent_commitment, fallback);
373 let parent = select! {
374 _ = tx.closed() => {
375 debug!(
376 reason = "consensus dropped receiver",
377 "skipping verification"
378 );
379 return;
380 },
381 result = parent_request => match result {
382 Ok(parent) => parent,
383 Err(_) => {
384 debug!(reason = "failed to fetch parent", "skipping verification");
385 return;
386 }
387 },
388 };
389
390 if let Err(err) = validate_block::<H, _, _>(
391 &epocher,
392 &block,
393 &parent,
394 &consensus_context,
395 commitment,
396 parent_commitment,
397 ) {
398 debug!(
399 ?err,
400 expected_commitment = %commitment,
401 block_commitment = %block.commitment(),
402 expected_parent_commitment = %parent_commitment,
403 parent_commitment = %parent.commitment(),
404 expected_parent = %parent.digest(),
405 block_parent = %block.parent(),
406 parent_height = %parent.height(),
407 block_height = %block.height(),
408 "block failed coded invariant validation"
409 );
410 tx.send_lossy(false);
411 return;
412 }
413
414 let ancestry_stream = marshal.ancestor_stream(
415 Arc::new(runtime_context.child("ancestor_stream")),
416 [block.clone(), parent],
417 ancestor_fetch_duration,
418 );
419 let validity_request = application.verify(
420 (
421 runtime_context.child("app_verify"),
422 consensus_context.clone(),
423 ),
424 ancestry_stream,
425 );
426
427 // If consensus drops the receiver, we can stop work early.
428 let timer = verify_duration.timer(&runtime_context);
429 let application_valid = select! {
430 _ = tx.closed() => {
431 debug!(
432 reason = "consensus dropped receiver",
433 "skipping verification"
434 );
435 return;
436 },
437 is_valid = validity_request => is_valid,
438 };
439 timer.observe(&runtime_context);
440 if application_valid && !stage.store(&mut marshal, round, block).await {
441 debug!(?round, "marshal unable to accept block");
442 return;
443 }
444 tx.send_lossy(application_valid);
445 });
446
447 rx
448 }
449
450 async fn certify_from_embedded_context(
451 &mut self,
452 round: Round,
453 payload: Commitment,
454 ) -> oneshot::Receiver<bool> {
455 // Certify may be reached without an earlier `verify`, so the shard
456 // engine may not know the leader yet. A notarized commitment is still
457 // enough to start reconstruction from sender-indexed gossip shards
458 // already buffered for the commitment.
459 self.shards.notarized(payload, round);
460
461 // No in-progress task means we never verified this proposal locally.
462 // We can use the block's embedded context to move to the next view. If a Byzantine
463 // proposer embedded a malicious context, the f+1 honest validators from the notarizing quorum
464 // will verify against the proper context and reject the mismatch, preventing a 2f+1
465 // finalization quorum.
466 //
467 // We must fetch here rather than only wait for local reconstruction. A Byzantine
468 // leader can send enough shards to just f+1 honest validators, collect enough honest
469 // notarize votes to form a notarization, and leave the remaining honest validators
470 // unable to reconstruct the block. Those validators need the notarized round to
471 // recover and certify; otherwise they can remain stuck if the Byzantine validators
472 // stop participating in the next view.
473 //
474 // Subscribe to the block and verify using its embedded context once available.
475 debug!(
476 ?round,
477 ?payload,
478 "subscribing to block for certification using embedded context"
479 );
480 let block_rx = self
481 .marshal
482 .subscribe_by_commitment(payload, core::CommitmentFallback::FetchByRound { round });
483 let mut marshaled = self.clone();
484 let shards = self.shards.clone();
485 let (mut tx, rx) = oneshot::channel();
486 let context = self
487 .context
488 .lock()
489 .await
490 .child("certify")
491 .with_attribute("round", round);
492 context.spawn(move |_| async move {
493 let block = select! {
494 _ = tx.closed() => {
495 debug!(
496 reason = "consensus dropped receiver",
497 "skipping certification"
498 );
499 return;
500 },
501 result = block_rx => match result {
502 Ok(block) => block,
503 Err(_) => {
504 debug!(
505 ?payload,
506 reason = "failed to fetch block for certification",
507 "skipping certification"
508 );
509 return;
510 }
511 },
512 };
513
514 // Re-proposal detection for certify path: we don't have the consensus
515 // context, only the block's embedded context from original proposal.
516 // Infer re-proposal from:
517 // 1. Block is at epoch boundary (only boundary blocks can be re-proposed)
518 // 2. Certification round's view > embedded context's view (re-proposals
519 // retain their original embedded context, so a later view indicates
520 // the block was re-proposed)
521 // 3. Same epoch (re-proposals don't cross epoch boundaries)
522 let embedded_context = block.context();
523 let is_reproposal = is_inferred_reproposal_at_certify(
524 &marshaled.epocher,
525 block.height(),
526 embedded_context.round,
527 round,
528 );
529 if is_reproposal {
530 // Certifier holds a notarization for this block, so route
531 // the write to the notarized cache. `certified` is
532 // idempotent, so crash-recovery double-invocation is safe.
533 if !marshaled.marshal.certified(round, block).await {
534 debug!(?round, "marshal unable to accept block");
535 return;
536 }
537 tx.send_lossy(true);
538 return;
539 }
540
541 // Inform the shard engine of an externally proposed commitment.
542 shards.discovered(
543 payload,
544 embedded_context.leader.clone(),
545 embedded_context.round,
546 );
547
548 // Use the block's embedded context for verification, passing the
549 // prefetched block to avoid fetching it again inside deferred_verify.
550 let verify_rx = marshaled
551 .deferred_verify(embedded_context, payload, Some(block), Stage::Certified)
552 .await;
553 if let Ok(result) = verify_rx.await {
554 tx.send_lossy(result);
555 }
556 });
557 rx
558 }
559
560 async fn certify_from_existing_task(
561 &mut self,
562 round: Round,
563 payload: Commitment,
564 task: oneshot::Receiver<bool>,
565 ) -> oneshot::Receiver<bool> {
566 // `verify()` intentionally waits only for local candidate data. Once
567 // certification starts, a notarization exists and the same pending
568 // verifier must be unblocked by round-bound recovery if local
569 // reconstruction never completes.
570 self.shards.notarized(payload, round);
571 self.marshal.hint_notarized(round, payload);
572
573 let mut marshaled = self.clone();
574 let (mut tx, rx) = oneshot::channel();
575 let context = self
576 .context
577 .lock()
578 .await
579 .child("certify_existing")
580 .with_attribute("round", round);
581 context.spawn(move |_| async move {
582 let result = select! {
583 _ = tx.closed() => {
584 debug!(
585 reason = "consensus dropped receiver",
586 "skipping certification"
587 );
588 return;
589 },
590 result = task => result,
591 };
592 match result {
593 Ok(result) => {
594 tx.send_lossy(result);
595 }
596 Err(_) => {
597 debug!(
598 ?round,
599 ?payload,
600 "verification task closed before certification, falling back to embedded context"
601 );
602 let fallback = marshaled.certify_from_embedded_context(round, payload).await;
603 let result = select! {
604 _ = tx.closed() => {
605 debug!(
606 reason = "consensus dropped receiver",
607 "skipping certification"
608 );
609 return;
610 },
611 result = fallback => result,
612 };
613 if let Ok(result) = result {
614 tx.send_lossy(result);
615 }
616 }
617 }
618 });
619 rx
620 }
621}
622
623impl<E, A, B, C, H, Z, S, ES> Automaton for Marshaled<E, A, B, C, H, Z, S, ES>
624where
625 E: Rng + Storage + Spawner + Metrics + Clock,
626 A: Application<
627 E,
628 Block = B,
629 SigningScheme = Z::Scheme,
630 Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
631 >,
632 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
633 C: CodingScheme,
634 H: Hasher,
635 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
636 S: Strategy,
637 ES: Epocher,
638{
639 type Digest = Commitment;
640 type Context = Context<Self::Digest, <Z::Scheme as CertificateScheme>::PublicKey>;
641
642 /// Proposes a new block or re-proposes the epoch boundary block.
643 ///
644 /// This method builds a new block from the underlying application unless the parent block
645 /// is the last block in the current epoch. When at an epoch boundary, it re-proposes the
646 /// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
647 ///
648 /// The proposal operation is spawned in a background task and returns a receiver that will
649 /// contain the proposed block's commitment when ready. The built block is persisted via
650 /// [`core::Mailbox::verified`] before the commitment is delivered, so consensus can rely
651 /// on the block surviving restart.
652 async fn propose(
653 &mut self,
654 consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
655 ) -> oneshot::Receiver<Self::Digest> {
656 let marshal = self.marshal.clone();
657 let mut application = self.application.clone();
658 let epocher = self.epocher.clone();
659 let strategy = self.strategy.clone();
660
661 // If there's no scheme for the current epoch, we cannot verify the proposal.
662 // Send back a receiver with a dropped sender.
663 let Some(scheme) = self.scheme_provider.scoped(consensus_context.epoch()) else {
664 debug!(
665 round = %consensus_context.round,
666 "no scheme for epoch, skipping propose"
667 );
668 let (_, rx) = oneshot::channel();
669 return rx;
670 };
671
672 let n_participants =
673 u16::try_from(scheme.participants().len()).expect("too many participants");
674 let coding_config = coding_config_for_participants(n_participants);
675
676 // Metrics
677 let build_duration = self.build_duration.clone();
678 let proposal_parent_fetch_duration = self.proposal_parent_fetch_duration.clone();
679 let ancestor_fetch_duration = self.ancestor_fetch_duration.clone();
680 let erasure_encode_duration = self.erasure_encode_duration.clone();
681
682 let (mut tx, rx) = oneshot::channel();
683 let context = self
684 .context
685 .lock()
686 .await
687 .child("propose")
688 .with_attribute("round", consensus_context.round);
689 context.spawn(move |runtime_context| async move {
690 // On leader recovery, marshal may already hold a verified block
691 // for this round (persisted before voting in consensus).
692 //
693 // Building a fresh block would land on the same prunable
694 // archive index and be silently dropped, so the stored block
695 // is the only proposal we can broadcast for this round.
696 //
697 // The recovered block is safe to reuse only if its embedded
698 // context matches the context simplex just recovered.
699 // Otherwise the cached block was built against a different
700 // parent and cannot be broadcast under the current header, so
701 // drop the receiver and let the voter nullify the view via
702 // timeout.
703 if let Some(block) = marshal.get_verified(consensus_context.round).await {
704 let block_context = block.context();
705 if block_context != consensus_context {
706 debug!(
707 round = ?consensus_context.round,
708 ?consensus_context,
709 ?block_context,
710 "skipping proposal: cached verified block context no longer matches"
711 );
712 return;
713 }
714 let commitment = block.commitment();
715 let round = consensus_context.round;
716 let success = tx.send_lossy(commitment);
717 debug!(
718 ?round,
719 ?commitment,
720 success,
721 "reused verified block from marshal on leader recovery"
722 );
723 return;
724 }
725
726 // The parent for any consensus context is in the same epoch: the
727 // boundary block of the previous epoch is the genesis block of the
728 // current epoch.
729 //
730 // Proposal context carries the certified parent view/commitment but
731 // not the parent height. The parent may be certified above the
732 // finalized tip, so this must stay round-bound until the block is
733 // returned.
734 let (parent_view, parent_commitment) = consensus_context.parent;
735 let parent_request = marshal.subscribe_by_commitment(
736 parent_commitment,
737 core::CommitmentFallback::FetchByRound {
738 round: Round::new(consensus_context.epoch(), parent_view),
739 },
740 );
741
742 let parent_timer = proposal_parent_fetch_duration.timer(&runtime_context);
743 let parent = select! {
744 _ = tx.closed() => {
745 debug!(reason = "consensus dropped receiver", "skipping proposal");
746 return;
747 },
748 result = parent_request => match result {
749 Ok(parent) => parent,
750 Err(_) => {
751 debug!(
752 ?parent_commitment,
753 reason = "failed to fetch parent block",
754 "skipping proposal"
755 );
756 return;
757 }
758 },
759 };
760 parent_timer.observe(&runtime_context);
761
762 // Special case: If the parent block is the last block in the epoch,
763 // re-propose it as to not produce any blocks that will be cut out
764 // by the epoch transition.
765 let last_in_epoch = epocher
766 .last(consensus_context.epoch())
767 .expect("current epoch should exist");
768 if parent.height() == last_in_epoch {
769 let commitment = parent.commitment();
770 let round = consensus_context.round;
771 if !marshal.verified(round, parent).await {
772 debug!(
773 ?round,
774 ?commitment,
775 "marshal rejected re-proposed boundary block"
776 );
777 return;
778 }
779 let success = tx.send_lossy(commitment);
780 debug!(
781 ?round,
782 ?commitment,
783 success,
784 "re-proposed parent block at epoch boundary"
785 );
786 return;
787 }
788
789 let ancestor_stream = marshal.ancestor_stream(
790 Arc::new(runtime_context.child("ancestor_stream")),
791 [parent],
792 ancestor_fetch_duration,
793 );
794 let build_request = application.propose(
795 (
796 runtime_context.child("app_propose"),
797 consensus_context.clone(),
798 ),
799 ancestor_stream,
800 );
801
802 let build_timer = build_duration.timer(&runtime_context);
803 let built_block = select! {
804 _ = tx.closed() => {
805 debug!(reason = "consensus dropped receiver", "skipping proposal");
806 return;
807 },
808 result = build_request => match result {
809 Some(block) => block,
810 None => {
811 debug!(
812 ?parent_commitment,
813 reason = "block building failed",
814 "skipping proposal"
815 );
816 return;
817 }
818 },
819 };
820 build_timer.observe(&runtime_context);
821
822 let erasure_timer = erasure_encode_duration.timer(&runtime_context);
823 let coded_block = CodedBlock::<B, C, H>::new(built_block, coding_config, &strategy);
824 erasure_timer.observe(&runtime_context);
825
826 let commitment = coded_block.commitment();
827 let round = consensus_context.round;
828 if !marshal.proposed(round, coded_block).await {
829 debug!(?round, ?commitment, "marshal rejected proposed block");
830 return;
831 }
832 let success = tx.send_lossy(commitment);
833 debug!(?round, ?commitment, success, "proposed new block");
834 });
835 rx
836 }
837
838 /// Verifies a received shard for a given round.
839 ///
840 /// This method validates that:
841 /// 1. The coding configuration matches the expected configuration for the current scheme.
842 /// 2. The commitment's context digest matches the consensus context (unless this is a re-proposal).
843 /// 3. The shard is contained within the consensus commitment.
844 ///
845 /// Verification is spawned in a background task and returns a receiver that will contain
846 /// the verification result. Additionally, this method kicks off deferred verification to
847 /// start block verification early (hidden behind shard validity and network latency).
848 async fn verify(
849 &mut self,
850 consensus_context: Context<Self::Digest, <Z::Scheme as CertificateScheme>::PublicKey>,
851 payload: Self::Digest,
852 ) -> oneshot::Receiver<bool> {
853 // If there's no scheme for the current epoch, we cannot vote on the proposal.
854 // Send back a receiver with a dropped sender.
855 let Some(scheme) = self.scheme_provider.scoped(consensus_context.epoch()) else {
856 debug!(
857 round = %consensus_context.round,
858 "no scheme for epoch, skipping verify"
859 );
860 let (_, rx) = oneshot::channel();
861 return rx;
862 };
863
864 let n_participants =
865 u16::try_from(scheme.participants().len()).expect("too many participants");
866 let coding_config = coding_config_for_participants(n_participants);
867 let is_reproposal = payload == consensus_context.parent.1;
868
869 // Validate proposal-level invariants:
870 // - coding config must match active participant set
871 // - context digest must match unless this is a re-proposal
872 let proposal_context = (!is_reproposal).then_some(&consensus_context);
873 if let Err(err) = validate_proposal::<H, _>(payload, coding_config, proposal_context) {
874 match err {
875 ProposalError::CodingConfig => {
876 warn!(
877 round = %consensus_context.round,
878 got = ?payload.config(),
879 expected = ?coding_config,
880 "rejected proposal with unexpected coding configuration"
881 );
882 }
883 ProposalError::ContextDigest => {
884 let expected = hash_context::<H, _>(&consensus_context);
885 let got = payload.context::<H::Digest>();
886 warn!(
887 round = %consensus_context.round,
888 expected = ?expected,
889 got = ?got,
890 "rejected proposal with mismatched context digest"
891 );
892 }
893 }
894
895 let (tx, rx) = oneshot::channel();
896 tx.send_lossy(false);
897 return rx;
898 }
899
900 // Re-proposals skip context-digest validation because the consensus context will point
901 // at the prior epoch-boundary block while the embedded block context is from the
902 // original proposal view.
903 //
904 // Re-proposals also skip shard-validity and deferred verification because:
905 // 1. The block was already verified when originally proposed
906 // 2. The parent-child height check would fail (parent IS the block)
907 // 3. Waiting for shards could stall if the leader doesn't rebroadcast
908 if is_reproposal {
909 // Fetch the block to verify it's at the epoch boundary.
910 // This should be fast since the parent block is typically already cached.
911 let block_rx = self
912 .marshal
913 .subscribe_by_commitment(payload, core::CommitmentFallback::Wait);
914 let marshal = self.marshal.clone();
915 let epocher = self.epocher.clone();
916 let round = consensus_context.round;
917 let verification_tasks = self.verification_tasks.clone();
918
919 // Register a verification task synchronously before spawning work so
920 // `certify` can always find it (no race with task startup).
921 let (task_tx, task_rx) = oneshot::channel();
922 verification_tasks.insert(round, payload, task_rx);
923
924 let (mut tx, rx) = oneshot::channel();
925 let context = self
926 .context
927 .lock()
928 .await
929 .child("verify_reproposal")
930 .with_attribute("round", round);
931 context.spawn(move |_| {
932 async move {
933 let block = select! {
934 _ = tx.closed() => {
935 debug!(
936 reason = "consensus dropped receiver",
937 "skipping re-proposal verification"
938 );
939 return;
940 },
941 block = block_rx => match block {
942 Ok(block) => block,
943 Err(_) => {
944 debug!(
945 ?payload,
946 reason = "failed to fetch block for re-proposal verification",
947 "skipping re-proposal verification"
948 );
949 // Fetch failure is an availability issue, not an explicit
950 // invalidity proof. Do not synthesize `false` here.
951 return;
952 }
953 },
954 };
955
956 if !is_valid_reproposal_at_verify(&epocher, block.height(), round.epoch()) {
957 debug!(
958 height = %block.height(),
959 "re-proposal is not at epoch boundary"
960 );
961 task_tx.send_lossy(false);
962 tx.send_lossy(false);
963 return;
964 }
965
966 // Valid re-proposal: notify the marshal and complete the
967 // verification task for `certify`.
968 if !marshal.verified(round, block).await {
969 debug!(?round, "marshal unable to accept block");
970 return;
971 }
972 task_tx.send_lossy(true);
973 tx.send_lossy(true);
974 }
975 });
976 return rx;
977 }
978
979 // Inform the shard engine of an externally proposed commitment.
980 self.shards.discovered(
981 payload,
982 consensus_context.leader.clone(),
983 consensus_context.round,
984 );
985
986 // Kick off deferred verification early to hide verification latency behind
987 // shard validity checks and network latency for collecting votes.
988 let round = consensus_context.round;
989 let task = self
990 .deferred_verify(consensus_context, payload, None, Stage::Verified)
991 .await;
992 self.verification_tasks.insert(round, payload, task);
993
994 match scheme.me() {
995 Some(_) => {
996 // Subscribe to assigned shard verification. For participants, this
997 // only completes once the leader-delivered shard for our
998 // assigned index has been verified. Reconstructing the block
999 // from peer gossip is useful for certification later, but is
1000 // not enough to emit a notarize vote.
1001 let validity_rx = self.shards.subscribe_assigned_shard_verified(payload);
1002 let (tx, rx) = oneshot::channel();
1003 let context = self
1004 .context
1005 .lock()
1006 .await
1007 .child("shard_validity_wait")
1008 .with_attribute("round", round);
1009 context.spawn(|_| async move {
1010 if validity_rx.await.is_ok() {
1011 tx.send_lossy(true);
1012 }
1013 });
1014 rx
1015 }
1016 None => {
1017 // If we are not participating, there's no shard to verify; just accept the proposal.
1018 //
1019 // Later, when certifying, we will wait to receive the block from the network.
1020 let (tx, rx) = oneshot::channel();
1021 tx.send_lossy(true);
1022 rx
1023 }
1024 }
1025 }
1026}
1027
1028impl<E, A, B, C, H, Z, S, ES> CertifiableAutomaton for Marshaled<E, A, B, C, H, Z, S, ES>
1029where
1030 E: Rng + Storage + Spawner + Metrics + Clock,
1031 A: Application<
1032 E,
1033 Block = B,
1034 SigningScheme = Z::Scheme,
1035 Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
1036 >,
1037 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
1038 C: CodingScheme,
1039 H: Hasher,
1040 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
1041 S: Strategy,
1042 ES: Epocher,
1043{
1044 async fn certify(&mut self, round: Round, payload: Self::Digest) -> oneshot::Receiver<bool> {
1045 // First, check for an in-progress verification task from `verify()`.
1046 let task = self.verification_tasks.take(round, payload);
1047 if let Some(task) = task {
1048 return self.certify_from_existing_task(round, payload, task).await;
1049 }
1050
1051 self.certify_from_embedded_context(round, payload).await
1052 }
1053}
1054
1055impl<E, A, B, C, H, Z, S, ES> Relay for Marshaled<E, A, B, C, H, Z, S, ES>
1056where
1057 E: Rng + Storage + Spawner + Metrics + Clock,
1058 A: Application<
1059 E,
1060 Block = B,
1061 Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
1062 >,
1063 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
1064 C: CodingScheme,
1065 H: Hasher,
1066 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
1067 S: Strategy,
1068 ES: Epocher,
1069{
1070 type Digest = Commitment;
1071 type PublicKey = <Z::Scheme as CertificateScheme>::PublicKey;
1072 type Plan = Plan<Self::PublicKey>;
1073
1074 fn broadcast(&mut self, commitment: Self::Digest, plan: Self::Plan) -> Feedback {
1075 // Coding variant does not support targeted forwarding;
1076 // peers reconstruct blocks from erasure-coded shards.
1077 //
1078 // TODO(#3389): Support checked data forwarding for PhasedScheme.
1079 let Plan::Propose { round } = plan else {
1080 return Feedback::Ok;
1081 };
1082 self.marshal.forward(round, commitment, Recipients::All)
1083 }
1084}
1085
1086impl<E, A, B, C, H, Z, S, ES> Reporter for Marshaled<E, A, B, C, H, Z, S, ES>
1087where
1088 E: Rng + Storage + Spawner + Metrics + Clock,
1089 A: Application<
1090 E,
1091 Block = B,
1092 Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
1093 > + Reporter<Activity = Update<B>>,
1094 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
1095 C: CodingScheme,
1096 H: Hasher,
1097 Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
1098 S: Strategy,
1099 ES: Epocher,
1100{
1101 type Activity = A::Activity;
1102
1103 /// Relays a report to the underlying [`Application`] and cleans up old verification data.
1104 fn report(&mut self, update: Self::Activity) -> Feedback {
1105 // Clean up verification tasks and contexts for rounds <= the finalized round.
1106 if let Update::Tip(round, _, _) = &update {
1107 self.verification_tasks.retain_after(round);
1108 }
1109 self.application.report(update)
1110 }
1111}