commonware_consensus/marshal/standard/deferred.rs
1//! Wrapper for consensus applications that handles epochs and block dissemination.
2//!
3//! # Overview
4//!
5//! [`Deferred`] is an adapter that wraps any [`VerifyingApplication`] implementation to handle
6//! epoch transitions automatically. It intercepts consensus operations (propose, verify) and
7//! ensures blocks are only produced within valid epoch boundaries.
8//!
9//! # Epoch Boundaries
10//!
11//! When the parent is the last block in an epoch (as determined by the [`Epocher`]), this wrapper
12//! re-proposes that boundary block instead of building a new block. This avoids producing blocks
13//! that would be pruned by the epoch transition.
14//!
15//! # Deferred Verification
16//!
17//! Before casting a notarize vote, [`Deferred`] waits for the block to become available and
18//! then verifies that the block's embedded context matches the consensus context. However, it does not
19//! wait for the application to finish verifying the block contents before voting. This enables verification
20//! to run while we wait for a quorum of votes to form a certificate (hiding verification latency behind network
21//! latency). Once a certificate is formed, we wait on the verification result in [`CertifiableAutomaton::certify`]
22//! before voting to finalize (ensuring no invalid blocks are admitted to the canonical chain).
23//!
24//! # Usage
25//!
26//! Wrap your [`Application`] implementation with [`Deferred::new`] and provide it to your
27//! consensus engine for the [`Automaton`] and [`Relay`]. The wrapper handles all epoch logic transparently.
28//!
29//! ```rust,ignore
30//! let application = Deferred::new(
31//! context,
32//! my_application,
33//! marshal_mailbox,
34//! epocher,
35//! );
36//! ```
37//!
38//! # Implementation Notes
39//!
40//! - Genesis blocks are handled specially: epoch 0 returns the application's genesis block,
41//! while subsequent epochs use the last block of the previous epoch as genesis
42//! - Blocks are automatically verified to be within the current epoch
43//!
44//! # Notarization and Data Availability
45//!
46//! In rare crash cases, it is possible for a notarization certificate to exist without a block being
47//! available to the honest parties if [`CertifiableAutomaton::certify`] fails after a notarization is
48//! formed.
49//!
50//! For this reason, it should not be expected that every notarized payload will be certifiable due
51//! to the lack of an available block. However, if even one honest and online party has the block,
52//! they will attempt to forward it to others via marshal's resolver.
53//!
54//! ```text
55//! ┌───────────────────────────────────────────────────┐
56//! ▼ │
57//! ┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
58//! │ B1 │◀──│ B2 │◀──│ B3 │XXX│ B4 │
59//! └─────────────────────┘ └─────────────────────┘ └──────────┬──────────┘ └─────────────────────┘
60//! │
61//! Failed Certify
62//! ```
63//!
64//! # Future Work
65//!
66//! - To further reduce view latency, a participant could optimistically vote for a block prior to
67//! observing its availability during [`Automaton::verify`]. However, this would require updating
68//! other components (like [`crate::marshal`]) to handle backfill where notarization does not imply
69//! a block is fetchable (without modification, a malicious leader that withholds blocks during propose
70//! could get an honest node to exhaust their network rate limit fetching things that don't exist rather
71//! than blocks they need AND can fetch).
72
73use crate::{
74 marshal::{
75 ancestry::AncestorStream,
76 application::{
77 validation::{is_inferred_reproposal_at_certify, LastBuilt},
78 verification_tasks::VerificationTasks,
79 },
80 core::Mailbox,
81 standard::{
82 validation::{
83 fetch_parent, precheck_epoch_and_reproposal, verify_with_parent, Decision,
84 },
85 Standard,
86 },
87 Update,
88 },
89 simplex::{types::Context, Plan},
90 types::{Epoch, Epocher, Round},
91 Application, Automaton, CertifiableAutomaton, CertifiableBlock, Epochable, Relay, Reporter,
92 VerifyingApplication,
93};
94use commonware_cryptography::{certificate::Scheme, Digestible};
95use commonware_macros::select;
96use commonware_runtime::{
97 telemetry::metrics::histogram::{Buckets, Timed},
98 Clock, Metrics, Spawner,
99};
100use commonware_utils::{
101 channel::{fallible::OneshotExt, oneshot},
102 sync::Mutex,
103};
104use rand::Rng;
105use std::sync::Arc;
106use tracing::{debug, warn};
107
108/// An [`Application`] adapter that handles epoch transitions and validates block ancestry.
109///
110/// This wrapper intercepts consensus operations to enforce epoch boundaries and validate
111/// block ancestry. It prevents blocks from being produced outside their valid epoch,
112/// handles the special case of re-proposing boundary blocks at epoch boundaries,
113/// and ensures all blocks have valid parent linkage and contiguous heights.
114///
115/// # Ancestry Validation
116///
117/// Applications wrapped by [`Deferred`] can rely on the following ancestry checks being
118/// performed automatically during verification:
119/// - Parent digest matches the consensus context's expected parent
120/// - Block height is exactly one greater than the parent's height
121///
122/// Verifying only the immediate parent is sufficient since the parent itself must have
123/// been notarized by consensus, which guarantees it was verified and accepted by a quorum.
124/// This means the entire ancestry chain back to genesis is transitively validated.
125///
126/// Applications do not need to re-implement these checks in their own verification logic.
127///
128/// # Context Recovery
129///
130/// With deferred verification, validators wait for data availability (DA) and verify the context
131/// before voting. If a validator crashes after voting but before certification, they lose their in-memory
132/// verification task. When recovering, validators extract context from a [`CertifiableBlock`].
133///
134/// _This embedded context is trustworthy because the notarizing quorum (which contains at least f+1 honest
135/// validators) verified that the block's context matched the consensus context before voting._
136#[derive(Clone)]
137pub struct Deferred<E, S, A, B, ES>
138where
139 E: Rng + Spawner + Metrics + Clock,
140 S: Scheme,
141 A: Application<E>,
142 B: CertifiableBlock,
143 ES: Epocher,
144{
145 context: E,
146 application: A,
147 marshal: Mailbox<S, Standard<B>>,
148 epocher: ES,
149 last_built: LastBuilt<B>,
150 verification_tasks: VerificationTasks<<B as Digestible>::Digest>,
151
152 build_duration: Timed<E>,
153}
154
155impl<E, S, A, B, ES> Deferred<E, S, A, B, ES>
156where
157 E: Rng + Spawner + Metrics + Clock,
158 S: Scheme,
159 A: VerifyingApplication<
160 E,
161 Block = B,
162 SigningScheme = S,
163 Context = Context<B::Digest, S::PublicKey>,
164 >,
165 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
166 ES: Epocher,
167{
168 /// Creates a new [`Deferred`] wrapper.
169 pub fn new(context: E, application: A, marshal: Mailbox<S, Standard<B>>, epocher: ES) -> Self {
170 use prometheus_client::metrics::histogram::Histogram;
171
172 let build_histogram = Histogram::new(Buckets::LOCAL);
173 context.register(
174 "build_duration",
175 "Histogram of time taken for the application to build a new block, in seconds",
176 build_histogram.clone(),
177 );
178 let build_duration = Timed::new(build_histogram, Arc::new(context.clone()));
179
180 Self {
181 context,
182 application,
183 marshal,
184 epocher,
185 last_built: Arc::new(Mutex::new(None)),
186 verification_tasks: VerificationTasks::new(),
187
188 build_duration,
189 }
190 }
191
192 /// Verifies a proposed block's application-level validity.
193 ///
194 /// This method validates that:
195 /// 1. The block's parent digest matches the expected parent
196 /// 2. The block's height is exactly one greater than the parent's height
197 /// 3. The underlying application's verification logic passes
198 ///
199 /// Verification is spawned in a background task and returns a receiver that will contain
200 /// the verification result. Valid blocks are reported to the marshal as verified.
201 #[inline]
202 fn deferred_verify(
203 &mut self,
204 context: <Self as Automaton>::Context,
205 block: B,
206 ) -> oneshot::Receiver<bool> {
207 let mut marshal = self.marshal.clone();
208 let mut application = self.application.clone();
209 let (mut tx, rx) = oneshot::channel();
210 self.context
211 .with_label("deferred_verify")
212 .with_attribute("round", context.round)
213 .spawn(move |runtime_context| async move {
214 // Shared non-reproposal verification:
215 // - fetch parent (using trusted round hint from consensus context)
216 // - validate standard ancestry invariants
217 // - run application verification over ancestry
218 //
219 // The helper preserves the prior early-exit behavior and returns
220 // `None` when work should stop (for example receiver dropped or
221 // parent unavailable).
222 let application_valid = match verify_with_parent(
223 runtime_context,
224 context,
225 block,
226 &mut application,
227 &mut marshal,
228 &mut tx,
229 )
230 .await
231 {
232 Some(valid) => valid,
233 None => return,
234 };
235 tx.send_lossy(application_valid);
236 });
237
238 rx
239 }
240}
241
242impl<E, S, A, B, ES> Automaton for Deferred<E, S, A, B, ES>
243where
244 E: Rng + Spawner + Metrics + Clock,
245 S: Scheme,
246 A: VerifyingApplication<
247 E,
248 Block = B,
249 SigningScheme = S,
250 Context = Context<B::Digest, S::PublicKey>,
251 >,
252 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
253 ES: Epocher,
254{
255 type Digest = B::Digest;
256 type Context = Context<Self::Digest, S::PublicKey>;
257
258 /// Returns the genesis digest for a given epoch.
259 ///
260 /// For epoch 0, this returns the application's genesis block digest. For subsequent
261 /// epochs, it returns the digest of the last block from the previous epoch, which
262 /// serves as the genesis block for the new epoch.
263 ///
264 /// # Panics
265 ///
266 /// Panics if a non-zero epoch is requested but the previous epoch's final block is not
267 /// available in storage. This indicates a critical error in the consensus engine startup
268 /// sequence, as engines must always have the genesis block before starting.
269 async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
270 if epoch.is_zero() {
271 return self.application.genesis().await.digest();
272 }
273
274 let prev = epoch.previous().expect("checked to be non-zero above");
275 let last_height = self
276 .epocher
277 .last(prev)
278 .expect("previous epoch should exist");
279 let Some(block) = self.marshal.get_block(last_height).await else {
280 // A new consensus engine will never be started without having the genesis block
281 // of the new epoch (the last block of the previous epoch) already stored.
282 unreachable!("missing starting epoch block at height {}", last_height);
283 };
284 block.digest()
285 }
286
287 /// Proposes a new block or re-proposes the epoch boundary block.
288 ///
289 /// This method builds a new block from the underlying application unless the parent block
290 /// is the last block in the current epoch. When at an epoch boundary, it re-proposes the
291 /// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
292 ///
293 /// The proposal operation is spawned in a background task and returns a receiver that will
294 /// contain the proposed block's digest when ready. The built block is cached for later
295 /// broadcasting.
296 async fn propose(
297 &mut self,
298 consensus_context: Context<Self::Digest, S::PublicKey>,
299 ) -> oneshot::Receiver<Self::Digest> {
300 let mut marshal = self.marshal.clone();
301 let mut application = self.application.clone();
302 let last_built = self.last_built.clone();
303 let epocher = self.epocher.clone();
304
305 // Metrics
306 let build_duration = self.build_duration.clone();
307
308 let (mut tx, rx) = oneshot::channel();
309 self.context
310 .with_label("propose")
311 .with_attribute("round", consensus_context.round)
312 .spawn(move |runtime_context| async move {
313 let (parent_view, parent_digest) = consensus_context.parent;
314 let parent_request = fetch_parent(
315 parent_digest,
316 // We are guaranteed that the parent round for any `consensus_context` is
317 // in the same epoch (recall, the boundary block of the previous epoch
318 // is the genesis block of the current epoch).
319 Some(Round::new(consensus_context.epoch(), parent_view)),
320 &mut application,
321 &mut marshal,
322 )
323 .await;
324
325 let parent = select! {
326 _ = tx.closed() => {
327 debug!(reason = "consensus dropped receiver", "skipping proposal");
328 return;
329 },
330 result = parent_request => match result {
331 Ok(parent) => parent,
332 Err(_) => {
333 debug!(
334 ?parent_digest,
335 reason = "failed to fetch parent block",
336 "skipping proposal"
337 );
338 return;
339 }
340 },
341 };
342
343 // Special case: If the parent block is the last block in the epoch,
344 // re-propose it as to not produce any blocks that will be cut out
345 // by the epoch transition.
346 let last_in_epoch = epocher
347 .last(consensus_context.epoch())
348 .expect("current epoch should exist");
349 if parent.height() == last_in_epoch {
350 let digest = parent.digest();
351 {
352 let mut lock = last_built.lock();
353 *lock = Some((consensus_context.round, parent));
354 }
355
356 let success = tx.send_lossy(digest);
357 debug!(
358 round = ?consensus_context.round,
359 ?digest,
360 success,
361 "re-proposed parent block at epoch boundary"
362 );
363 return;
364 }
365
366 let ancestor_stream = AncestorStream::new(marshal.clone(), [parent]);
367 let build_request = application.propose(
368 (
369 runtime_context.with_label("app_propose"),
370 consensus_context.clone(),
371 ),
372 ancestor_stream,
373 );
374
375 let mut build_timer = build_duration.timer();
376 let built_block = select! {
377 _ = tx.closed() => {
378 debug!(reason = "consensus dropped receiver", "skipping proposal");
379 return;
380 },
381 result = build_request => match result {
382 Some(block) => block,
383 None => {
384 debug!(
385 ?parent_digest,
386 reason = "block building failed",
387 "skipping proposal"
388 );
389 return;
390 }
391 },
392 };
393 build_timer.observe();
394
395 let digest = built_block.digest();
396 {
397 let mut lock = last_built.lock();
398 *lock = Some((consensus_context.round, built_block));
399 }
400
401 let success = tx.send_lossy(digest);
402 debug!(
403 round = ?consensus_context.round,
404 ?digest,
405 success,
406 "proposed new block"
407 );
408 });
409 rx
410 }
411
412 async fn verify(
413 &mut self,
414 context: Context<Self::Digest, S::PublicKey>,
415 digest: Self::Digest,
416 ) -> oneshot::Receiver<bool> {
417 let mut marshal = self.marshal.clone();
418 let mut marshaled = self.clone();
419
420 let (mut tx, rx) = oneshot::channel();
421 self.context
422 .with_label("optimistic_verify")
423 .with_attribute("round", context.round)
424 .spawn(move |_| async move {
425 let block_request = marshal.subscribe_by_digest(Some(context.round), digest).await;
426 let block = select! {
427 _ = tx.closed() => {
428 debug!(
429 reason = "consensus dropped receiver",
430 "skipping optimistic verification"
431 );
432 return;
433 },
434 result = block_request => match result {
435 Ok(block) => block,
436 Err(_) => {
437 debug!(
438 ?digest,
439 reason = "failed to fetch block for optimistic verification",
440 "skipping optimistic verification"
441 );
442 return;
443 }
444 },
445 };
446
447 // Shared pre-checks enforce:
448 // - Block epoch membership.
449 // - Re-proposal detection via `digest == context.parent.1`.
450 //
451 // Re-proposals return early and skip normal parent/height checks
452 // because they were already verified when originally proposed and
453 // parent-child checks would fail by construction when parent == block.
454 let block = match precheck_epoch_and_reproposal(
455 &marshaled.epocher,
456 &mut marshal,
457 &context,
458 digest,
459 block,
460 )
461 .await
462 {
463 Decision::Complete(valid) => {
464 if valid {
465 // Valid re-proposal. Create a completed verification task for `certify`.
466 let round = context.round;
467 let (task_tx, task_rx) = oneshot::channel();
468 task_tx.send_lossy(true);
469 marshaled.verification_tasks.insert(round, digest, task_rx);
470 }
471 // `Complete` means either immediate rejection or successful
472 // re-proposal handling with no further ancestry validation.
473 tx.send_lossy(valid);
474 return;
475 }
476 Decision::Continue(block) => block,
477 };
478
479 // Before casting a notarize vote, ensure the block's embedded context matches
480 // the consensus context.
481 //
482 // This is a critical step - the notarize quorum is guaranteed to have at least
483 // f+1 honest validators who will verify against this context, preventing a Byzantine
484 // proposer from embedding a malicious context. The other f honest validators who did
485 // not vote will later use the block-embedded context to help finalize if Byzantine
486 // validators withhold their finalize votes.
487 if block.context() != context {
488 debug!(
489 ?context,
490 block_context = ?block.context(),
491 "block-embedded context does not match consensus context during optimistic verification"
492 );
493 tx.send_lossy(false);
494 return;
495 }
496
497 // Begin the rest of the verification process asynchronously.
498 let round = context.round;
499 let task = marshaled.deferred_verify(context, block);
500 marshaled.verification_tasks.insert(round, digest, task);
501
502 tx.send_lossy(true);
503 });
504 rx
505 }
506}
507
508impl<E, S, A, B, ES> CertifiableAutomaton for Deferred<E, S, A, B, ES>
509where
510 E: Rng + Spawner + Metrics + Clock,
511 S: Scheme,
512 A: VerifyingApplication<
513 E,
514 Block = B,
515 SigningScheme = S,
516 Context = Context<B::Digest, S::PublicKey>,
517 >,
518 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
519 ES: Epocher,
520{
521 async fn certify(&mut self, round: Round, digest: Self::Digest) -> oneshot::Receiver<bool> {
522 // Attempt to retrieve the existing verification task for this (round, payload).
523 let task = self.verification_tasks.take(round, digest);
524 if let Some(task) = task {
525 return task;
526 }
527
528 // No in-progress task means we never verified this proposal locally. We can use the
529 // block's embedded context to help complete finalization when Byzantine validators
530 // withhold their finalize votes. If a Byzantine proposer embedded a malicious context,
531 // the f+1 honest validators from the notarizing quorum will verify against the proper
532 // context and reject the mismatch, preventing a 2f+1 finalization quorum.
533 //
534 // Subscribe to the block and verify using its embedded context once available.
535 debug!(
536 ?round,
537 ?digest,
538 "subscribing to block for certification using embedded context"
539 );
540 let block_rx = self.marshal.subscribe_by_digest(Some(round), digest).await;
541 let mut marshaled = self.clone();
542 let epocher = self.epocher.clone();
543 let (mut tx, rx) = oneshot::channel();
544 self.context
545 .with_label("certify")
546 .with_attribute("round", round)
547 .spawn(move |_| async move {
548 let block = select! {
549 _ = tx.closed() => {
550 debug!(
551 reason = "consensus dropped receiver",
552 "skipping certification"
553 );
554 return;
555 },
556 result = block_rx => match result {
557 Ok(block) => block,
558 Err(_) => {
559 debug!(
560 ?digest,
561 reason = "failed to fetch block for certification",
562 "skipping certification"
563 );
564 return;
565 }
566 },
567 };
568
569 // Re-proposal detection for certify path: we don't have the consensus context,
570 // only the block's embedded context from original proposal. Infer re-proposal from:
571 // 1. Block is at epoch boundary (only boundary blocks can be re-proposed)
572 // 2. Certification round's view > embedded context's view (re-proposals retain their
573 // original embedded context, so a later view indicates the block was re-proposed)
574 // 3. Same epoch (re-proposals don't cross epoch boundaries)
575 let embedded_context = block.context();
576 let is_reproposal = is_inferred_reproposal_at_certify(
577 &epocher,
578 block.height(),
579 embedded_context.round,
580 round,
581 );
582 if is_reproposal {
583 // NOTE: It is possible that, during crash recovery, we call `marshal.verified`
584 // twice for the same block. That function is idempotent, so this is safe.
585 marshaled.marshal.verified(round, block).await;
586 tx.send_lossy(true);
587 return;
588 }
589
590 let verify_rx = marshaled.deferred_verify(embedded_context, block);
591 if let Ok(result) = verify_rx.await {
592 tx.send_lossy(result);
593 }
594 });
595 rx
596 }
597}
598
599impl<E, S, A, B, ES> Relay for Deferred<E, S, A, B, ES>
600where
601 E: Rng + Spawner + Metrics + Clock,
602 S: Scheme,
603 A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>,
604 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
605 ES: Epocher,
606{
607 type Digest = B::Digest;
608 type PublicKey = S::PublicKey;
609 type Plan = Plan<S::PublicKey>;
610
611 async fn broadcast(&mut self, digest: Self::Digest, plan: Plan<S::PublicKey>) {
612 match plan {
613 Plan::Propose => {
614 let Some((round, block)) = self.last_built.lock().take() else {
615 warn!("missing block to broadcast");
616 return;
617 };
618 if block.digest() != digest {
619 warn!(
620 round = %round,
621 digest = %block.digest(),
622 height = %block.height(),
623 "skipping requested broadcast of block with mismatched digest"
624 );
625 return;
626 }
627 debug!(
628 round = %round,
629 digest = %block.digest(),
630 height = %block.height(),
631 "requested broadcast of built block"
632 );
633 self.marshal.proposed(round, block).await;
634 }
635 Plan::Forward { round, peers } => {
636 self.marshal.forward(round, digest, peers).await;
637 }
638 }
639 }
640}
641
642impl<E, S, A, B, ES> Reporter for Deferred<E, S, A, B, ES>
643where
644 E: Rng + Spawner + Metrics + Clock,
645 S: Scheme,
646 A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>
647 + Reporter<Activity = Update<B>>,
648 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
649 ES: Epocher,
650{
651 type Activity = A::Activity;
652
653 /// Relays a report to the underlying [`Application`] and cleans up old verification tasks.
654 async fn report(&mut self, update: Self::Activity) {
655 // Clean up verification tasks for rounds <= the finalized round.
656 if let Update::Tip(round, _, _) = &update {
657 self.verification_tasks.retain_after(round);
658 }
659 self.application.report(update).await
660 }
661}
662
663#[cfg(test)]
664mod tests {
665 use super::Deferred;
666 use crate::{
667 marshal::mocks::{
668 harness::{
669 default_leader, make_raw_block, setup_network_with_participants, Ctx,
670 StandardHarness, TestHarness, B, BLOCKS_PER_EPOCH, NAMESPACE, NUM_VALIDATORS, S, V,
671 },
672 verifying::MockVerifyingApp,
673 },
674 simplex::scheme::bls12381_threshold::vrf as bls12381_threshold_vrf,
675 types::{Epoch, Epocher, FixedEpocher, Height, Round, View},
676 Automaton, CertifiableAutomaton,
677 };
678 use commonware_cryptography::{
679 certificate::{mocks::Fixture, ConstantProvider},
680 sha256::Sha256,
681 Digestible, Hasher as _,
682 };
683 use commonware_macros::{select, test_traced};
684 use commonware_runtime::{deterministic, Clock, Metrics, Runner};
685 use commonware_utils::NZUsize;
686 use std::time::Duration;
687
688 #[test_traced("INFO")]
689 fn test_certify_lower_view_after_higher_view() {
690 let runner = deterministic::Runner::timed(Duration::from_secs(60));
691 runner.start(|mut context| async move {
692 let Fixture {
693 participants,
694 schemes,
695 ..
696 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
697 let mut oracle =
698 setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
699 .await;
700
701 let me = participants[0].clone();
702
703 let setup = StandardHarness::setup_validator(
704 context.with_label("validator_0"),
705 &mut oracle,
706 me.clone(),
707 ConstantProvider::new(schemes[0].clone()),
708 )
709 .await;
710 let marshal = setup.mailbox;
711
712 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
713 let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
714
715 let mut marshaled = Deferred::new(
716 context.clone(),
717 mock_app,
718 marshal.clone(),
719 FixedEpocher::new(BLOCKS_PER_EPOCH),
720 );
721
722 // Create parent block at height 1
723 let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
724 let parent_digest = parent.digest();
725 marshal
726 .clone()
727 .proposed(Round::new(Epoch::new(0), View::new(1)), parent.clone())
728 .await;
729
730 // Block A at view 5 (height 2)
731 let round_a = Round::new(Epoch::new(0), View::new(5));
732 let context_a = Ctx {
733 round: round_a,
734 leader: me.clone(),
735 parent: (View::new(1), parent_digest),
736 };
737 let block_a = B::new::<Sha256>(context_a.clone(), parent_digest, Height::new(2), 200);
738 let commitment_a = block_a.digest();
739 marshal.clone().proposed(round_a, block_a.clone()).await;
740
741 // Block B at view 10 (height 2, different block same height)
742 let round_b = Round::new(Epoch::new(0), View::new(10));
743 let context_b = Ctx {
744 round: round_b,
745 leader: me.clone(),
746 parent: (View::new(1), parent_digest),
747 };
748 let block_b = B::new::<Sha256>(context_b.clone(), parent_digest, Height::new(2), 300);
749 let commitment_b = block_b.digest();
750 marshal.clone().proposed(round_b, block_b.clone()).await;
751
752 context.sleep(Duration::from_millis(10)).await;
753
754 // Step 1: Verify block A at view 5
755 let _ = marshaled.verify(context_a, commitment_a).await.await;
756
757 // Step 2: Verify block B at view 10
758 let _ = marshaled.verify(context_b, commitment_b).await.await;
759
760 // Step 3: Certify block B at view 10 FIRST
761 let certify_b = marshaled.certify(round_b, commitment_b).await;
762 assert!(
763 certify_b.await.unwrap(),
764 "Block B certification should succeed"
765 );
766
767 // Step 4: Certify block A at view 5 - should succeed
768 let certify_a = marshaled.certify(round_a, commitment_a).await;
769
770 select! {
771 result = certify_a => {
772 assert!(result.unwrap(), "Block A certification should succeed");
773 },
774 _ = context.sleep(Duration::from_secs(5)) => {
775 panic!("Block A certification timed out");
776 },
777 }
778 })
779 }
780
781 #[test_traced("WARN")]
782 fn test_marshaled_rejects_unsupported_epoch() {
783 #[derive(Clone)]
784 struct LimitedEpocher {
785 inner: FixedEpocher,
786 max_epoch: u64,
787 }
788
789 impl Epocher for LimitedEpocher {
790 fn containing(&self, height: Height) -> Option<crate::types::EpochInfo> {
791 let bounds = self.inner.containing(height)?;
792 if bounds.epoch().get() > self.max_epoch {
793 None
794 } else {
795 Some(bounds)
796 }
797 }
798
799 fn first(&self, epoch: Epoch) -> Option<Height> {
800 if epoch.get() > self.max_epoch {
801 None
802 } else {
803 self.inner.first(epoch)
804 }
805 }
806
807 fn last(&self, epoch: Epoch) -> Option<Height> {
808 if epoch.get() > self.max_epoch {
809 None
810 } else {
811 self.inner.last(epoch)
812 }
813 }
814 }
815
816 let runner = deterministic::Runner::timed(Duration::from_secs(60));
817 runner.start(|mut context| async move {
818 let Fixture {
819 participants,
820 schemes,
821 ..
822 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
823 let mut oracle =
824 setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
825 .await;
826
827 let me = participants[0].clone();
828
829 let setup = StandardHarness::setup_validator(
830 context.with_label("validator_0"),
831 &mut oracle,
832 me.clone(),
833 ConstantProvider::new(schemes[0].clone()),
834 )
835 .await;
836 let marshal = setup.mailbox;
837
838 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
839 let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
840 let limited_epocher = LimitedEpocher {
841 inner: FixedEpocher::new(BLOCKS_PER_EPOCH),
842 max_epoch: 0,
843 };
844
845 let mut marshaled =
846 Deferred::new(context.clone(), mock_app, marshal.clone(), limited_epocher);
847
848 // Create a parent block at height 19 (last block in epoch 0, which is supported)
849 let parent_ctx = Ctx {
850 round: Round::new(Epoch::zero(), View::new(19)),
851 leader: default_leader(),
852 parent: (View::zero(), genesis.digest()),
853 };
854 let parent =
855 B::new::<Sha256>(parent_ctx.clone(), genesis.digest(), Height::new(19), 1000);
856 let parent_digest = parent.digest();
857 marshal
858 .clone()
859 .proposed(Round::new(Epoch::zero(), View::new(19)), parent.clone())
860 .await;
861
862 // Create a block at height 20 (first block in epoch 1, which is NOT supported)
863 let unsupported_round = Round::new(Epoch::new(1), View::new(20));
864 let unsupported_context = Ctx {
865 round: unsupported_round,
866 leader: me.clone(),
867 parent: (View::new(19), parent_digest),
868 };
869 let block = B::new::<Sha256>(
870 unsupported_context.clone(),
871 parent_digest,
872 Height::new(20),
873 2000,
874 );
875 let block_commitment = block.digest();
876 marshal
877 .clone()
878 .proposed(unsupported_round, block.clone())
879 .await;
880
881 context.sleep(Duration::from_millis(10)).await;
882
883 // Call verify and wait for the result (verify returns optimistic result,
884 // but also spawns deferred verification)
885 let verify_result = marshaled
886 .verify(unsupported_context, block_commitment)
887 .await;
888 // Wait for optimistic verify to complete so the verification task is registered
889 let optimistic_result = verify_result.await;
890
891 // The optimistic verify should return false because the block is in an unsupported epoch
892 assert!(
893 !optimistic_result.unwrap(),
894 "Optimistic verify should reject block in unsupported epoch"
895 );
896 })
897 }
898
899 /// Test that marshaled rejects blocks when consensus context doesn't match block's embedded context.
900 ///
901 /// This tests that when verify() is called with a context that doesn't match what's embedded
902 /// in the block, the verification should fail. A Byzantine proposer could broadcast a block
903 /// with one embedded context but consensus could call verify() with a different context.
904 #[test_traced("WARN")]
905 fn test_marshaled_rejects_mismatched_context() {
906 let runner = deterministic::Runner::timed(Duration::from_secs(30));
907 runner.start(|mut context| async move {
908 let Fixture {
909 participants,
910 schemes,
911 ..
912 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
913 let mut oracle =
914 setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
915 .await;
916
917 let me = participants[0].clone();
918
919 let setup = StandardHarness::setup_validator(
920 context.with_label("validator_0"),
921 &mut oracle,
922 me.clone(),
923 ConstantProvider::new(schemes[0].clone()),
924 )
925 .await;
926 let marshal = setup.mailbox;
927
928 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
929 let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
930
931 let mut marshaled = Deferred::new(
932 context.clone(),
933 mock_app,
934 marshal.clone(),
935 FixedEpocher::new(BLOCKS_PER_EPOCH),
936 );
937
938 // Create parent block at height 1 so the commitment is well-formed.
939 let parent_ctx = Ctx {
940 round: Round::new(Epoch::zero(), View::new(1)),
941 leader: default_leader(),
942 parent: (View::zero(), genesis.digest()),
943 };
944 let parent = B::new::<Sha256>(parent_ctx, genesis.digest(), Height::new(1), 100);
945 let parent_commitment = parent.digest();
946 marshal
947 .clone()
948 .proposed(Round::new(Epoch::zero(), View::new(1)), parent.clone())
949 .await;
950
951 // Build a block with context A (embedded in the block).
952 let round_a = Round::new(Epoch::zero(), View::new(2));
953 let context_a = Ctx {
954 round: round_a,
955 leader: me.clone(),
956 parent: (View::new(1), parent_commitment),
957 };
958 let block_a = B::new::<Sha256>(context_a, parent.digest(), Height::new(2), 200);
959 let commitment_a = block_a.digest();
960 marshal.clone().proposed(round_a, block_a).await;
961
962 context.sleep(Duration::from_millis(10)).await;
963
964 // Verify using a different consensus context B (hash mismatch).
965 let round_b = Round::new(Epoch::zero(), View::new(3));
966 let context_b = Ctx {
967 round: round_b,
968 leader: participants[1].clone(),
969 parent: (View::new(1), parent_commitment),
970 };
971
972 let verify_rx = marshaled.verify(context_b, commitment_a).await;
973 select! {
974 result = verify_rx => {
975 assert!(
976 !result.unwrap(),
977 "mismatched context hash should be rejected"
978 );
979 },
980 _ = context.sleep(Duration::from_secs(5)) => {
981 panic!("verify should reject mismatched context hash promptly");
982 },
983 }
984 })
985 }
986}