commonware_consensus/marshal/standard/deferred.rs
1//! Wrapper for consensus applications that handles epochs and block dissemination.
2//!
3//! # Overview
4//!
5//! [`Deferred`] is an adapter that wraps any [`VerifyingApplication`] implementation to handle
6//! epoch transitions automatically. It intercepts consensus operations (propose, verify) and
7//! ensures blocks are only produced within valid epoch boundaries.
8//!
9//! # Epoch Boundaries
10//!
11//! When the parent is the last block in an epoch (as determined by the [`Epocher`]), this wrapper
12//! re-proposes that boundary block instead of building a new block. This avoids producing blocks
13//! that would be pruned by the epoch transition.
14//!
15//! # Deferred Verification
16//!
17//! Before casting a notarize vote, [`Deferred`] waits for the block to become available and
18//! then verifies that the block's embedded context matches the consensus context. However, it does not
19//! wait for the application to finish verifying the block contents before voting. This enables verification
20//! to run while we wait for a quorum of votes to form a certificate (hiding verification latency behind network
21//! latency). Once a certificate is formed, we wait on the verification result in [`CertifiableAutomaton::certify`]
22//! before voting to finalize (ensuring no invalid blocks are admitted to the canonical chain).
23//!
24//! # Usage
25//!
26//! Wrap your [`Application`] implementation with [`Deferred::new`] and provide it to your
27//! consensus engine for the [`Automaton`] and [`Relay`]. The wrapper handles all epoch logic transparently.
28//!
29//! ```rust,ignore
30//! let application = Deferred::new(
31//! context,
32//! my_application,
33//! marshal_mailbox,
34//! epocher,
35//! );
36//! ```
37//!
38//! # Implementation Notes
39//!
40//! - Genesis blocks are handled specially: epoch 0 returns the application's genesis block,
41//! while subsequent epochs use the last block of the previous epoch as genesis
42//! - Blocks are automatically verified to be within the current epoch
43//!
44//! # Notarization and Data Availability
45//!
46//! In rare crash cases, it is possible for a notarization certificate to exist without a block being
47//! available to the honest parties if [`CertifiableAutomaton::certify`] fails after a notarization is
48//! formed.
49//!
50//! For this reason, it should not be expected that every notarized payload will be certifiable due
51//! to the lack of an available block. However, if even one honest and online party has the block,
52//! they will attempt to forward it to others via marshal's resolver.
53//!
54//! ```text
55//! ┌───────────────────────────────────────────────────┐
56//! ▼ │
57//! ┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
58//! │ B1 │◀──│ B2 │◀──│ B3 │XXX│ B4 │
59//! └─────────────────────┘ └─────────────────────┘ └──────────┬──────────┘ └─────────────────────┘
60//! │
61//! Failed Certify
62//! ```
63//!
64//! # Future Work
65//!
66//! - To further reduce view latency, a participant could optimistically vote for a block prior to
67//! observing its availability during [`Automaton::verify`]. However, this would require updating
68//! other components (like [`crate::marshal`]) to handle backfill where notarization does not imply
69//! a block is fetchable (without modification, a malicious leader that withholds blocks during propose
70//! could get an honest node to exhaust their network rate limit fetching things that don't exist rather
71//! than blocks they need AND can fetch).
72
73use crate::{
74 marshal::{
75 ancestry::AncestorStream,
76 application::{
77 validation::{is_inferred_reproposal_at_certify, LastBuilt},
78 verification_tasks::VerificationTasks,
79 },
80 core::Mailbox,
81 standard::{
82 validation::{
83 fetch_parent, precheck_epoch_and_reproposal, verify_with_parent, Decision,
84 },
85 Standard,
86 },
87 Update,
88 },
89 simplex::types::Context,
90 types::{Epoch, Epocher, Round},
91 Application, Automaton, CertifiableAutomaton, CertifiableBlock, Epochable, Relay, Reporter,
92 VerifyingApplication,
93};
94use commonware_cryptography::{certificate::Scheme, Digestible};
95use commonware_macros::select;
96use commonware_runtime::{
97 telemetry::metrics::histogram::{Buckets, Timed},
98 Clock, Metrics, Spawner,
99};
100use commonware_utils::{
101 channel::{fallible::OneshotExt, oneshot},
102 sync::Mutex,
103};
104use rand::Rng;
105use std::sync::Arc;
106use tracing::{debug, warn};
107
108/// An [`Application`] adapter that handles epoch transitions and validates block ancestry.
109///
110/// This wrapper intercepts consensus operations to enforce epoch boundaries and validate
111/// block ancestry. It prevents blocks from being produced outside their valid epoch,
112/// handles the special case of re-proposing boundary blocks at epoch boundaries,
113/// and ensures all blocks have valid parent linkage and contiguous heights.
114///
115/// # Ancestry Validation
116///
117/// Applications wrapped by [`Deferred`] can rely on the following ancestry checks being
118/// performed automatically during verification:
119/// - Parent digest matches the consensus context's expected parent
120/// - Block height is exactly one greater than the parent's height
121///
122/// Verifying only the immediate parent is sufficient since the parent itself must have
123/// been notarized by consensus, which guarantees it was verified and accepted by a quorum.
124/// This means the entire ancestry chain back to genesis is transitively validated.
125///
126/// Applications do not need to re-implement these checks in their own verification logic.
127///
128/// # Context Recovery
129///
130/// With deferred verification, validators wait for data availability (DA) and verify the context
131/// before voting. If a validator crashes after voting but before certification, they lose their in-memory
132/// verification task. When recovering, validators extract context from a [`CertifiableBlock`].
133///
134/// _This embedded context is trustworthy because the notarizing quorum (which contains at least f+1 honest
135/// validators) verified that the block's context matched the consensus context before voting._
136#[derive(Clone)]
137pub struct Deferred<E, S, A, B, ES>
138where
139 E: Rng + Spawner + Metrics + Clock,
140 S: Scheme,
141 A: Application<E>,
142 B: CertifiableBlock,
143 ES: Epocher,
144{
145 context: E,
146 application: A,
147 marshal: Mailbox<S, Standard<B>>,
148 epocher: ES,
149 last_built: LastBuilt<B>,
150 verification_tasks: VerificationTasks<<B as Digestible>::Digest>,
151
152 build_duration: Timed<E>,
153}
154
155impl<E, S, A, B, ES> Deferred<E, S, A, B, ES>
156where
157 E: Rng + Spawner + Metrics + Clock,
158 S: Scheme,
159 A: VerifyingApplication<
160 E,
161 Block = B,
162 SigningScheme = S,
163 Context = Context<B::Digest, S::PublicKey>,
164 >,
165 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
166 ES: Epocher,
167{
168 /// Creates a new [`Deferred`] wrapper.
169 pub fn new(context: E, application: A, marshal: Mailbox<S, Standard<B>>, epocher: ES) -> Self {
170 use prometheus_client::metrics::histogram::Histogram;
171
172 let build_histogram = Histogram::new(Buckets::LOCAL);
173 context.register(
174 "build_duration",
175 "Histogram of time taken for the application to build a new block, in seconds",
176 build_histogram.clone(),
177 );
178 let build_duration = Timed::new(build_histogram, Arc::new(context.clone()));
179
180 Self {
181 context,
182 application,
183 marshal,
184 epocher,
185 last_built: Arc::new(Mutex::new(None)),
186 verification_tasks: VerificationTasks::new(),
187
188 build_duration,
189 }
190 }
191
192 /// Verifies a proposed block's application-level validity.
193 ///
194 /// This method validates that:
195 /// 1. The block's parent digest matches the expected parent
196 /// 2. The block's height is exactly one greater than the parent's height
197 /// 3. The underlying application's verification logic passes
198 ///
199 /// Verification is spawned in a background task and returns a receiver that will contain
200 /// the verification result. Valid blocks are reported to the marshal as verified.
201 #[inline]
202 fn deferred_verify(
203 &mut self,
204 context: <Self as Automaton>::Context,
205 block: B,
206 ) -> oneshot::Receiver<bool> {
207 let mut marshal = self.marshal.clone();
208 let mut application = self.application.clone();
209 let (mut tx, rx) = oneshot::channel();
210 self.context
211 .with_label("deferred_verify")
212 .with_attribute("round", context.round)
213 .spawn(move |runtime_context| async move {
214 // Shared non-reproposal verification:
215 // - fetch parent (using trusted round hint from consensus context)
216 // - validate standard ancestry invariants
217 // - run application verification over ancestry
218 //
219 // The helper preserves the prior early-exit behavior and returns
220 // `None` when work should stop (for example receiver dropped or
221 // parent unavailable).
222 let application_valid = match verify_with_parent(
223 runtime_context,
224 context,
225 block,
226 &mut application,
227 &mut marshal,
228 &mut tx,
229 )
230 .await
231 {
232 Some(valid) => valid,
233 None => return,
234 };
235 tx.send_lossy(application_valid);
236 });
237
238 rx
239 }
240}
241
242impl<E, S, A, B, ES> Automaton for Deferred<E, S, A, B, ES>
243where
244 E: Rng + Spawner + Metrics + Clock,
245 S: Scheme,
246 A: VerifyingApplication<
247 E,
248 Block = B,
249 SigningScheme = S,
250 Context = Context<B::Digest, S::PublicKey>,
251 >,
252 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
253 ES: Epocher,
254{
255 type Digest = B::Digest;
256 type Context = Context<Self::Digest, S::PublicKey>;
257
258 /// Returns the genesis digest for a given epoch.
259 ///
260 /// For epoch 0, this returns the application's genesis block digest. For subsequent
261 /// epochs, it returns the digest of the last block from the previous epoch, which
262 /// serves as the genesis block for the new epoch.
263 ///
264 /// # Panics
265 ///
266 /// Panics if a non-zero epoch is requested but the previous epoch's final block is not
267 /// available in storage. This indicates a critical error in the consensus engine startup
268 /// sequence, as engines must always have the genesis block before starting.
269 async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
270 if epoch.is_zero() {
271 return self.application.genesis().await.digest();
272 }
273
274 let prev = epoch.previous().expect("checked to be non-zero above");
275 let last_height = self
276 .epocher
277 .last(prev)
278 .expect("previous epoch should exist");
279 let Some(block) = self.marshal.get_block(last_height).await else {
280 // A new consensus engine will never be started without having the genesis block
281 // of the new epoch (the last block of the previous epoch) already stored.
282 unreachable!("missing starting epoch block at height {}", last_height);
283 };
284 block.digest()
285 }
286
287 /// Proposes a new block or re-proposes the epoch boundary block.
288 ///
289 /// This method builds a new block from the underlying application unless the parent block
290 /// is the last block in the current epoch. When at an epoch boundary, it re-proposes the
291 /// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
292 ///
293 /// The proposal operation is spawned in a background task and returns a receiver that will
294 /// contain the proposed block's digest when ready. The built block is cached for later
295 /// broadcasting.
296 async fn propose(
297 &mut self,
298 consensus_context: Context<Self::Digest, S::PublicKey>,
299 ) -> oneshot::Receiver<Self::Digest> {
300 let mut marshal = self.marshal.clone();
301 let mut application = self.application.clone();
302 let last_built = self.last_built.clone();
303 let epocher = self.epocher.clone();
304
305 // Metrics
306 let build_duration = self.build_duration.clone();
307
308 let (mut tx, rx) = oneshot::channel();
309 self.context
310 .with_label("propose")
311 .with_attribute("round", consensus_context.round)
312 .spawn(move |runtime_context| async move {
313 let (parent_view, parent_digest) = consensus_context.parent;
314 let parent_request = fetch_parent(
315 parent_digest,
316 // We are guaranteed that the parent round for any `consensus_context` is
317 // in the same epoch (recall, the boundary block of the previous epoch
318 // is the genesis block of the current epoch).
319 Some(Round::new(consensus_context.epoch(), parent_view)),
320 &mut application,
321 &mut marshal,
322 )
323 .await;
324
325 let parent = select! {
326 _ = tx.closed() => {
327 debug!(reason = "consensus dropped receiver", "skipping proposal");
328 return;
329 },
330 result = parent_request => match result {
331 Ok(parent) => parent,
332 Err(_) => {
333 debug!(
334 ?parent_digest,
335 reason = "failed to fetch parent block",
336 "skipping proposal"
337 );
338 return;
339 }
340 },
341 };
342
343 // Special case: If the parent block is the last block in the epoch,
344 // re-propose it as to not produce any blocks that will be cut out
345 // by the epoch transition.
346 let last_in_epoch = epocher
347 .last(consensus_context.epoch())
348 .expect("current epoch should exist");
349 if parent.height() == last_in_epoch {
350 let digest = parent.digest();
351 {
352 let mut lock = last_built.lock();
353 *lock = Some((consensus_context.round, parent));
354 }
355
356 let success = tx.send_lossy(digest);
357 debug!(
358 round = ?consensus_context.round,
359 ?digest,
360 success,
361 "re-proposed parent block at epoch boundary"
362 );
363 return;
364 }
365
366 let ancestor_stream = AncestorStream::new(marshal.clone(), [parent]);
367 let build_request = application.propose(
368 (
369 runtime_context.with_label("app_propose"),
370 consensus_context.clone(),
371 ),
372 ancestor_stream,
373 );
374
375 let mut build_timer = build_duration.timer();
376 let built_block = select! {
377 _ = tx.closed() => {
378 debug!(reason = "consensus dropped receiver", "skipping proposal");
379 return;
380 },
381 result = build_request => match result {
382 Some(block) => block,
383 None => {
384 debug!(
385 ?parent_digest,
386 reason = "block building failed",
387 "skipping proposal"
388 );
389 return;
390 }
391 },
392 };
393 build_timer.observe();
394
395 let digest = built_block.digest();
396 {
397 let mut lock = last_built.lock();
398 *lock = Some((consensus_context.round, built_block));
399 }
400
401 let success = tx.send_lossy(digest);
402 debug!(
403 round = ?consensus_context.round,
404 ?digest,
405 success,
406 "proposed new block"
407 );
408 });
409 rx
410 }
411
412 async fn verify(
413 &mut self,
414 context: Context<Self::Digest, S::PublicKey>,
415 digest: Self::Digest,
416 ) -> oneshot::Receiver<bool> {
417 let mut marshal = self.marshal.clone();
418 let mut marshaled = self.clone();
419
420 let (mut tx, rx) = oneshot::channel();
421 self.context
422 .with_label("optimistic_verify")
423 .with_attribute("round", context.round)
424 .spawn(move |_| async move {
425 let block_request = marshal.subscribe_by_digest(Some(context.round), digest).await;
426 let block = select! {
427 _ = tx.closed() => {
428 debug!(
429 reason = "consensus dropped receiver",
430 "skipping optimistic verification"
431 );
432 return;
433 },
434 result = block_request => match result {
435 Ok(block) => block,
436 Err(_) => {
437 debug!(
438 ?digest,
439 reason = "failed to fetch block for optimistic verification",
440 "skipping optimistic verification"
441 );
442 return;
443 }
444 },
445 };
446
447 // Shared pre-checks enforce:
448 // - Block epoch membership.
449 // - Re-proposal detection via `digest == context.parent.1`.
450 //
451 // Re-proposals return early and skip normal parent/height checks
452 // because they were already verified when originally proposed and
453 // parent-child checks would fail by construction when parent == block.
454 let block = match precheck_epoch_and_reproposal(
455 &marshaled.epocher,
456 &mut marshal,
457 &context,
458 digest,
459 block,
460 )
461 .await
462 {
463 Decision::Complete(valid) => {
464 if valid {
465 // Valid re-proposal. Create a completed verification task for `certify`.
466 let round = context.round;
467 let (task_tx, task_rx) = oneshot::channel();
468 task_tx.send_lossy(true);
469 marshaled.verification_tasks.insert(round, digest, task_rx);
470 }
471 // `Complete` means either immediate rejection or successful
472 // re-proposal handling with no further ancestry validation.
473 tx.send_lossy(valid);
474 return;
475 }
476 Decision::Continue(block) => block,
477 };
478
479 // Before casting a notarize vote, ensure the block's embedded context matches
480 // the consensus context.
481 //
482 // This is a critical step - the notarize quorum is guaranteed to have at least
483 // f+1 honest validators who will verify against this context, preventing a Byzantine
484 // proposer from embedding a malicious context. The other f honest validators who did
485 // not vote will later use the block-embedded context to help finalize if Byzantine
486 // validators withhold their finalize votes.
487 if block.context() != context {
488 debug!(
489 ?context,
490 block_context = ?block.context(),
491 "block-embedded context does not match consensus context during optimistic verification"
492 );
493 tx.send_lossy(false);
494 return;
495 }
496
497 // Begin the rest of the verification process asynchronously.
498 let round = context.round;
499 let task = marshaled.deferred_verify(context, block);
500 marshaled.verification_tasks.insert(round, digest, task);
501
502 tx.send_lossy(true);
503 });
504 rx
505 }
506}
507
508impl<E, S, A, B, ES> CertifiableAutomaton for Deferred<E, S, A, B, ES>
509where
510 E: Rng + Spawner + Metrics + Clock,
511 S: Scheme,
512 A: VerifyingApplication<
513 E,
514 Block = B,
515 SigningScheme = S,
516 Context = Context<B::Digest, S::PublicKey>,
517 >,
518 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
519 ES: Epocher,
520{
521 async fn certify(&mut self, round: Round, digest: Self::Digest) -> oneshot::Receiver<bool> {
522 // Attempt to retrieve the existing verification task for this (round, payload).
523 let task = self.verification_tasks.take(round, digest);
524 if let Some(task) = task {
525 return task;
526 }
527
528 // No in-progress task means we never verified this proposal locally. We can use the
529 // block's embedded context to help complete finalization when Byzantine validators
530 // withhold their finalize votes. If a Byzantine proposer embedded a malicious context,
531 // the f+1 honest validators from the notarizing quorum will verify against the proper
532 // context and reject the mismatch, preventing a 2f+1 finalization quorum.
533 //
534 // Subscribe to the block and verify using its embedded context once available.
535 debug!(
536 ?round,
537 ?digest,
538 "subscribing to block for certification using embedded context"
539 );
540 let block_rx = self.marshal.subscribe_by_digest(Some(round), digest).await;
541 let mut marshaled = self.clone();
542 let epocher = self.epocher.clone();
543 let (mut tx, rx) = oneshot::channel();
544 self.context
545 .with_label("certify")
546 .with_attribute("round", round)
547 .spawn(move |_| async move {
548 let block = select! {
549 _ = tx.closed() => {
550 debug!(
551 reason = "consensus dropped receiver",
552 "skipping certification"
553 );
554 return;
555 },
556 result = block_rx => match result {
557 Ok(block) => block,
558 Err(_) => {
559 debug!(
560 ?digest,
561 reason = "failed to fetch block for certification",
562 "skipping certification"
563 );
564 return;
565 }
566 },
567 };
568
569 // Re-proposal detection for certify path: we don't have the consensus context,
570 // only the block's embedded context from original proposal. Infer re-proposal from:
571 // 1. Block is at epoch boundary (only boundary blocks can be re-proposed)
572 // 2. Certification round's view > embedded context's view (re-proposals retain their
573 // original embedded context, so a later view indicates the block was re-proposed)
574 // 3. Same epoch (re-proposals don't cross epoch boundaries)
575 let embedded_context = block.context();
576 let is_reproposal = is_inferred_reproposal_at_certify(
577 &epocher,
578 block.height(),
579 embedded_context.round,
580 round,
581 );
582 if is_reproposal {
583 // NOTE: It is possible that, during crash recovery, we call `marshal.verified`
584 // twice for the same block. That function is idempotent, so this is safe.
585 marshaled.marshal.verified(round, block).await;
586 tx.send_lossy(true);
587 return;
588 }
589
590 let verify_rx = marshaled.deferred_verify(embedded_context, block);
591 if let Ok(result) = verify_rx.await {
592 tx.send_lossy(result);
593 }
594 });
595 rx
596 }
597}
598
599impl<E, S, A, B, ES> Relay for Deferred<E, S, A, B, ES>
600where
601 E: Rng + Spawner + Metrics + Clock,
602 S: Scheme,
603 A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>,
604 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
605 ES: Epocher,
606{
607 type Digest = B::Digest;
608
609 /// Broadcasts a previously built block to the network.
610 ///
611 /// This uses the cached block from the last proposal operation. If no block was built or
612 /// the digest does not match the cached block, the broadcast is skipped with a warning.
613 async fn broadcast(&mut self, digest: Self::Digest) {
614 let Some((round, block)) = self.last_built.lock().take() else {
615 warn!("missing block to broadcast");
616 return;
617 };
618
619 if block.digest() != digest {
620 warn!(
621 round = %round,
622 digest = %block.digest(),
623 height = %block.height(),
624 "skipping requested broadcast of block with mismatched digest"
625 );
626 return;
627 }
628
629 debug!(
630 round = %round,
631 digest = %block.digest(),
632 height = %block.height(),
633 "requested broadcast of built block"
634 );
635 self.marshal.proposed(round, block).await;
636 }
637}
638
639impl<E, S, A, B, ES> Reporter for Deferred<E, S, A, B, ES>
640where
641 E: Rng + Spawner + Metrics + Clock,
642 S: Scheme,
643 A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>
644 + Reporter<Activity = Update<B>>,
645 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
646 ES: Epocher,
647{
648 type Activity = A::Activity;
649
650 /// Relays a report to the underlying [`Application`] and cleans up old verification tasks.
651 async fn report(&mut self, update: Self::Activity) {
652 // Clean up verification tasks for rounds <= the finalized round.
653 if let Update::Tip(round, _, _) = &update {
654 self.verification_tasks.retain_after(round);
655 }
656 self.application.report(update).await
657 }
658}
659
660#[cfg(test)]
661mod tests {
662 use super::Deferred;
663 use crate::{
664 marshal::mocks::{
665 harness::{
666 default_leader, make_raw_block, setup_network, Ctx, StandardHarness, TestHarness,
667 B, BLOCKS_PER_EPOCH, NAMESPACE, NUM_VALIDATORS, S, V,
668 },
669 verifying::MockVerifyingApp,
670 },
671 simplex::scheme::bls12381_threshold::vrf as bls12381_threshold_vrf,
672 types::{Epoch, Epocher, FixedEpocher, Height, Round, View},
673 Automaton, CertifiableAutomaton,
674 };
675 use commonware_cryptography::{
676 certificate::{mocks::Fixture, ConstantProvider},
677 sha256::Sha256,
678 Digestible, Hasher as _,
679 };
680 use commonware_macros::{select, test_traced};
681 use commonware_runtime::{deterministic, Clock, Metrics, Runner};
682 use std::time::Duration;
683
684 #[test_traced("INFO")]
685 fn test_certify_lower_view_after_higher_view() {
686 let runner = deterministic::Runner::timed(Duration::from_secs(60));
687 runner.start(|mut context| async move {
688 let mut oracle = setup_network(context.clone(), None);
689 let Fixture {
690 participants,
691 schemes,
692 ..
693 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
694
695 let me = participants[0].clone();
696
697 let setup = StandardHarness::setup_validator(
698 context.with_label("validator_0"),
699 &mut oracle,
700 me.clone(),
701 ConstantProvider::new(schemes[0].clone()),
702 )
703 .await;
704 let marshal = setup.mailbox;
705
706 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
707 let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
708
709 let mut marshaled = Deferred::new(
710 context.clone(),
711 mock_app,
712 marshal.clone(),
713 FixedEpocher::new(BLOCKS_PER_EPOCH),
714 );
715
716 // Create parent block at height 1
717 let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
718 let parent_digest = parent.digest();
719 marshal
720 .clone()
721 .proposed(Round::new(Epoch::new(0), View::new(1)), parent.clone())
722 .await;
723
724 // Block A at view 5 (height 2)
725 let round_a = Round::new(Epoch::new(0), View::new(5));
726 let context_a = Ctx {
727 round: round_a,
728 leader: me.clone(),
729 parent: (View::new(1), parent_digest),
730 };
731 let block_a = B::new::<Sha256>(context_a.clone(), parent_digest, Height::new(2), 200);
732 let commitment_a = block_a.digest();
733 marshal.clone().proposed(round_a, block_a.clone()).await;
734
735 // Block B at view 10 (height 2, different block same height)
736 let round_b = Round::new(Epoch::new(0), View::new(10));
737 let context_b = Ctx {
738 round: round_b,
739 leader: me.clone(),
740 parent: (View::new(1), parent_digest),
741 };
742 let block_b = B::new::<Sha256>(context_b.clone(), parent_digest, Height::new(2), 300);
743 let commitment_b = block_b.digest();
744 marshal.clone().proposed(round_b, block_b.clone()).await;
745
746 context.sleep(Duration::from_millis(10)).await;
747
748 // Step 1: Verify block A at view 5
749 let _ = marshaled.verify(context_a, commitment_a).await.await;
750
751 // Step 2: Verify block B at view 10
752 let _ = marshaled.verify(context_b, commitment_b).await.await;
753
754 // Step 3: Certify block B at view 10 FIRST
755 let certify_b = marshaled.certify(round_b, commitment_b).await;
756 assert!(
757 certify_b.await.unwrap(),
758 "Block B certification should succeed"
759 );
760
761 // Step 4: Certify block A at view 5 - should succeed
762 let certify_a = marshaled.certify(round_a, commitment_a).await;
763
764 select! {
765 result = certify_a => {
766 assert!(result.unwrap(), "Block A certification should succeed");
767 },
768 _ = context.sleep(Duration::from_secs(5)) => {
769 panic!("Block A certification timed out");
770 },
771 }
772 })
773 }
774
775 #[test_traced("WARN")]
776 fn test_marshaled_rejects_unsupported_epoch() {
777 #[derive(Clone)]
778 struct LimitedEpocher {
779 inner: FixedEpocher,
780 max_epoch: u64,
781 }
782
783 impl Epocher for LimitedEpocher {
784 fn containing(&self, height: Height) -> Option<crate::types::EpochInfo> {
785 let bounds = self.inner.containing(height)?;
786 if bounds.epoch().get() > self.max_epoch {
787 None
788 } else {
789 Some(bounds)
790 }
791 }
792
793 fn first(&self, epoch: Epoch) -> Option<Height> {
794 if epoch.get() > self.max_epoch {
795 None
796 } else {
797 self.inner.first(epoch)
798 }
799 }
800
801 fn last(&self, epoch: Epoch) -> Option<Height> {
802 if epoch.get() > self.max_epoch {
803 None
804 } else {
805 self.inner.last(epoch)
806 }
807 }
808 }
809
810 let runner = deterministic::Runner::timed(Duration::from_secs(60));
811 runner.start(|mut context| async move {
812 let mut oracle = setup_network(context.clone(), None);
813 let Fixture {
814 participants,
815 schemes,
816 ..
817 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
818
819 let me = participants[0].clone();
820
821 let setup = StandardHarness::setup_validator(
822 context.with_label("validator_0"),
823 &mut oracle,
824 me.clone(),
825 ConstantProvider::new(schemes[0].clone()),
826 )
827 .await;
828 let marshal = setup.mailbox;
829
830 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
831 let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
832 let limited_epocher = LimitedEpocher {
833 inner: FixedEpocher::new(BLOCKS_PER_EPOCH),
834 max_epoch: 0,
835 };
836
837 let mut marshaled =
838 Deferred::new(context.clone(), mock_app, marshal.clone(), limited_epocher);
839
840 // Create a parent block at height 19 (last block in epoch 0, which is supported)
841 let parent_ctx = Ctx {
842 round: Round::new(Epoch::zero(), View::new(19)),
843 leader: default_leader(),
844 parent: (View::zero(), genesis.digest()),
845 };
846 let parent =
847 B::new::<Sha256>(parent_ctx.clone(), genesis.digest(), Height::new(19), 1000);
848 let parent_digest = parent.digest();
849 marshal
850 .clone()
851 .proposed(Round::new(Epoch::zero(), View::new(19)), parent.clone())
852 .await;
853
854 // Create a block at height 20 (first block in epoch 1, which is NOT supported)
855 let unsupported_round = Round::new(Epoch::new(1), View::new(20));
856 let unsupported_context = Ctx {
857 round: unsupported_round,
858 leader: me.clone(),
859 parent: (View::new(19), parent_digest),
860 };
861 let block = B::new::<Sha256>(
862 unsupported_context.clone(),
863 parent_digest,
864 Height::new(20),
865 2000,
866 );
867 let block_commitment = block.digest();
868 marshal
869 .clone()
870 .proposed(unsupported_round, block.clone())
871 .await;
872
873 context.sleep(Duration::from_millis(10)).await;
874
875 // Call verify and wait for the result (verify returns optimistic result,
876 // but also spawns deferred verification)
877 let verify_result = marshaled
878 .verify(unsupported_context, block_commitment)
879 .await;
880 // Wait for optimistic verify to complete so the verification task is registered
881 let optimistic_result = verify_result.await;
882
883 // The optimistic verify should return false because the block is in an unsupported epoch
884 assert!(
885 !optimistic_result.unwrap(),
886 "Optimistic verify should reject block in unsupported epoch"
887 );
888 })
889 }
890
891 /// Test that marshaled rejects blocks when consensus context doesn't match block's embedded context.
892 ///
893 /// This tests that when verify() is called with a context that doesn't match what's embedded
894 /// in the block, the verification should fail. A Byzantine proposer could broadcast a block
895 /// with one embedded context but consensus could call verify() with a different context.
896 #[test_traced("WARN")]
897 fn test_marshaled_rejects_mismatched_context() {
898 let runner = deterministic::Runner::timed(Duration::from_secs(30));
899 runner.start(|mut context| async move {
900 let mut oracle = setup_network(context.clone(), None);
901 let Fixture {
902 participants,
903 schemes,
904 ..
905 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
906
907 let me = participants[0].clone();
908
909 let setup = StandardHarness::setup_validator(
910 context.with_label("validator_0"),
911 &mut oracle,
912 me.clone(),
913 ConstantProvider::new(schemes[0].clone()),
914 )
915 .await;
916 let marshal = setup.mailbox;
917
918 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
919 let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
920
921 let mut marshaled = Deferred::new(
922 context.clone(),
923 mock_app,
924 marshal.clone(),
925 FixedEpocher::new(BLOCKS_PER_EPOCH),
926 );
927
928 // Create parent block at height 1 so the commitment is well-formed.
929 let parent_ctx = Ctx {
930 round: Round::new(Epoch::zero(), View::new(1)),
931 leader: default_leader(),
932 parent: (View::zero(), genesis.digest()),
933 };
934 let parent = B::new::<Sha256>(parent_ctx, genesis.digest(), Height::new(1), 100);
935 let parent_commitment = parent.digest();
936 marshal
937 .clone()
938 .proposed(Round::new(Epoch::zero(), View::new(1)), parent.clone())
939 .await;
940
941 // Build a block with context A (embedded in the block).
942 let round_a = Round::new(Epoch::zero(), View::new(2));
943 let context_a = Ctx {
944 round: round_a,
945 leader: me.clone(),
946 parent: (View::new(1), parent_commitment),
947 };
948 let block_a = B::new::<Sha256>(context_a, parent.digest(), Height::new(2), 200);
949 let commitment_a = block_a.digest();
950 marshal.clone().proposed(round_a, block_a).await;
951
952 context.sleep(Duration::from_millis(10)).await;
953
954 // Verify using a different consensus context B (hash mismatch).
955 let round_b = Round::new(Epoch::zero(), View::new(3));
956 let context_b = Ctx {
957 round: round_b,
958 leader: participants[1].clone(),
959 parent: (View::new(1), parent_commitment),
960 };
961
962 let verify_rx = marshaled.verify(context_b, commitment_a).await;
963 select! {
964 result = verify_rx => {
965 assert!(
966 !result.unwrap(),
967 "mismatched context hash should be rejected"
968 );
969 },
970 _ = context.sleep(Duration::from_secs(5)) => {
971 panic!("verify should reject mismatched context hash promptly");
972 },
973 }
974 })
975 }
976}