Skip to main content

commonware_consensus/marshal/standard/
deferred.rs

1//! Wrapper for consensus applications that handles epochs and block dissemination.
2//!
3//! # Overview
4//!
5//! [`Deferred`] is an adapter that wraps any [`VerifyingApplication`] implementation to handle
6//! epoch transitions automatically. It intercepts consensus operations (propose, verify) and
7//! ensures blocks are only produced within valid epoch boundaries.
8//!
9//! # Epoch Boundaries
10//!
11//! When the parent is the last block in an epoch (as determined by the [`Epocher`]), this wrapper
12//! re-proposes that boundary block instead of building a new block. This avoids producing blocks
13//! that would be pruned by the epoch transition.
14//!
15//! # Deferred Verification
16//!
17//! Before casting a notarize vote, [`Deferred`] waits for the block to become available and
18//! then verifies that the block's embedded context matches the consensus context. However, it does not
19//! wait for the application to finish verifying the block contents before voting. This enables verification
20//! to run while we wait for a quorum of votes to form a certificate (hiding verification latency behind network
21//! latency). Once a certificate is formed, we wait on the verification result in [`CertifiableAutomaton::certify`]
22//! before voting to finalize (ensuring no invalid blocks are admitted to the canonical chain).
23//!
24//! # Usage
25//!
26//! Wrap your [`Application`] implementation with [`Deferred::new`] and provide it to your
27//! consensus engine for the [`Automaton`] and [`Relay`]. The wrapper handles all epoch logic transparently.
28//!
29//! ```rust,ignore
30//! let application = Deferred::new(
31//!     context,
32//!     my_application,
33//!     marshal_mailbox,
34//!     epocher,
35//! );
36//! ```
37//!
38//! # Implementation Notes
39//!
40//! - Genesis blocks are handled specially: epoch 0 returns the application's genesis block,
41//!   while subsequent epochs use the last block of the previous epoch as genesis
42//! - Blocks are automatically verified to be within the current epoch
43//!
44//! # Notarization and Data Availability
45//!
46//! In rare crash cases, it is possible for a notarization certificate to exist without a block being
47//! available to the honest parties if [`CertifiableAutomaton::certify`] fails after a notarization is
48//! formed.
49//!
50//! For this reason, it should not be expected that every notarized payload will be certifiable due
51//! to the lack of an available block. However, if even one honest and online party has the block,
52//! they will attempt to forward it to others via marshal's resolver.
53//!
54//! ```text
55//!                                      ┌───────────────────────────────────────────────────┐
56//!                                      ▼                                                   │
57//! ┌─────────────────────┐   ┌─────────────────────┐   ┌─────────────────────┐   ┌─────────────────────┐
58//! │          B1         │◀──│          B2         │◀──│          B3         │XXX│          B4         │
59//! └─────────────────────┘   └─────────────────────┘   └──────────┬──────────┘   └─────────────────────┘
60//!                                                                │
61//!                                                          Failed Certify
62//! ```
63//!
64//! # Future Work
65//!
66//! - To further reduce view latency, a participant could optimistically vote for a block prior to
67//!   observing its availability during [`Automaton::verify`]. However, this would require updating
68//!   other components (like [`crate::marshal`]) to handle backfill where notarization does not imply
69//!   a block is fetchable (without modification, a malicious leader that withholds blocks during propose
70//!   could get an honest node to exhaust their network rate limit fetching things that don't exist rather
71//!   than blocks they need AND can fetch).
72
73use crate::{
74    marshal::{
75        ancestry::AncestorStream,
76        application::{
77            validation::{is_inferred_reproposal_at_certify, LastBuilt},
78            verification_tasks::VerificationTasks,
79        },
80        core::Mailbox,
81        standard::{
82            validation::{
83                fetch_parent, precheck_epoch_and_reproposal, verify_with_parent, Decision,
84            },
85            Standard,
86        },
87        Update,
88    },
89    simplex::types::Context,
90    types::{Epoch, Epocher, Round},
91    Application, Automaton, CertifiableAutomaton, CertifiableBlock, Epochable, Relay, Reporter,
92    VerifyingApplication,
93};
94use commonware_cryptography::{certificate::Scheme, Digestible};
95use commonware_macros::select;
96use commonware_runtime::{
97    telemetry::metrics::histogram::{Buckets, Timed},
98    Clock, Metrics, Spawner,
99};
100use commonware_utils::{
101    channel::{fallible::OneshotExt, oneshot},
102    sync::Mutex,
103};
104use rand::Rng;
105use std::sync::Arc;
106use tracing::{debug, warn};
107
108/// An [`Application`] adapter that handles epoch transitions and validates block ancestry.
109///
110/// This wrapper intercepts consensus operations to enforce epoch boundaries and validate
111/// block ancestry. It prevents blocks from being produced outside their valid epoch,
112/// handles the special case of re-proposing boundary blocks at epoch boundaries,
113/// and ensures all blocks have valid parent linkage and contiguous heights.
114///
115/// # Ancestry Validation
116///
117/// Applications wrapped by [`Deferred`] can rely on the following ancestry checks being
118/// performed automatically during verification:
119/// - Parent digest matches the consensus context's expected parent
120/// - Block height is exactly one greater than the parent's height
121///
122/// Verifying only the immediate parent is sufficient since the parent itself must have
123/// been notarized by consensus, which guarantees it was verified and accepted by a quorum.
124/// This means the entire ancestry chain back to genesis is transitively validated.
125///
126/// Applications do not need to re-implement these checks in their own verification logic.
127///
128/// # Context Recovery
129///
130/// With deferred verification, validators wait for data availability (DA) and verify the context
131/// before voting. If a validator crashes after voting but before certification, they lose their in-memory
132/// verification task. When recovering, validators extract context from a [`CertifiableBlock`].
133///
134/// _This embedded context is trustworthy because the notarizing quorum (which contains at least f+1 honest
135/// validators) verified that the block's context matched the consensus context before voting._
136#[derive(Clone)]
137pub struct Deferred<E, S, A, B, ES>
138where
139    E: Rng + Spawner + Metrics + Clock,
140    S: Scheme,
141    A: Application<E>,
142    B: CertifiableBlock,
143    ES: Epocher,
144{
145    context: E,
146    application: A,
147    marshal: Mailbox<S, Standard<B>>,
148    epocher: ES,
149    last_built: LastBuilt<B>,
150    verification_tasks: VerificationTasks<<B as Digestible>::Digest>,
151
152    build_duration: Timed<E>,
153}
154
155impl<E, S, A, B, ES> Deferred<E, S, A, B, ES>
156where
157    E: Rng + Spawner + Metrics + Clock,
158    S: Scheme,
159    A: VerifyingApplication<
160        E,
161        Block = B,
162        SigningScheme = S,
163        Context = Context<B::Digest, S::PublicKey>,
164    >,
165    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
166    ES: Epocher,
167{
168    /// Creates a new [`Deferred`] wrapper.
169    pub fn new(context: E, application: A, marshal: Mailbox<S, Standard<B>>, epocher: ES) -> Self {
170        use prometheus_client::metrics::histogram::Histogram;
171
172        let build_histogram = Histogram::new(Buckets::LOCAL);
173        context.register(
174            "build_duration",
175            "Histogram of time taken for the application to build a new block, in seconds",
176            build_histogram.clone(),
177        );
178        let build_duration = Timed::new(build_histogram, Arc::new(context.clone()));
179
180        Self {
181            context,
182            application,
183            marshal,
184            epocher,
185            last_built: Arc::new(Mutex::new(None)),
186            verification_tasks: VerificationTasks::new(),
187
188            build_duration,
189        }
190    }
191
192    /// Verifies a proposed block's application-level validity.
193    ///
194    /// This method validates that:
195    /// 1. The block's parent digest matches the expected parent
196    /// 2. The block's height is exactly one greater than the parent's height
197    /// 3. The underlying application's verification logic passes
198    ///
199    /// Verification is spawned in a background task and returns a receiver that will contain
200    /// the verification result. Valid blocks are reported to the marshal as verified.
201    #[inline]
202    fn deferred_verify(
203        &mut self,
204        context: <Self as Automaton>::Context,
205        block: B,
206    ) -> oneshot::Receiver<bool> {
207        let mut marshal = self.marshal.clone();
208        let mut application = self.application.clone();
209        let (mut tx, rx) = oneshot::channel();
210        self.context
211            .with_label("deferred_verify")
212            .with_attribute("round", context.round)
213            .spawn(move |runtime_context| async move {
214                // Shared non-reproposal verification:
215                // - fetch parent (using trusted round hint from consensus context)
216                // - validate standard ancestry invariants
217                // - run application verification over ancestry
218                //
219                // The helper preserves the prior early-exit behavior and returns
220                // `None` when work should stop (for example receiver dropped or
221                // parent unavailable).
222                let application_valid = match verify_with_parent(
223                    runtime_context,
224                    context,
225                    block,
226                    &mut application,
227                    &mut marshal,
228                    &mut tx,
229                )
230                .await
231                {
232                    Some(valid) => valid,
233                    None => return,
234                };
235                tx.send_lossy(application_valid);
236            });
237
238        rx
239    }
240}
241
242impl<E, S, A, B, ES> Automaton for Deferred<E, S, A, B, ES>
243where
244    E: Rng + Spawner + Metrics + Clock,
245    S: Scheme,
246    A: VerifyingApplication<
247        E,
248        Block = B,
249        SigningScheme = S,
250        Context = Context<B::Digest, S::PublicKey>,
251    >,
252    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
253    ES: Epocher,
254{
255    type Digest = B::Digest;
256    type Context = Context<Self::Digest, S::PublicKey>;
257
258    /// Returns the genesis digest for a given epoch.
259    ///
260    /// For epoch 0, this returns the application's genesis block digest. For subsequent
261    /// epochs, it returns the digest of the last block from the previous epoch, which
262    /// serves as the genesis block for the new epoch.
263    ///
264    /// # Panics
265    ///
266    /// Panics if a non-zero epoch is requested but the previous epoch's final block is not
267    /// available in storage. This indicates a critical error in the consensus engine startup
268    /// sequence, as engines must always have the genesis block before starting.
269    async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
270        if epoch.is_zero() {
271            return self.application.genesis().await.digest();
272        }
273
274        let prev = epoch.previous().expect("checked to be non-zero above");
275        let last_height = self
276            .epocher
277            .last(prev)
278            .expect("previous epoch should exist");
279        let Some(block) = self.marshal.get_block(last_height).await else {
280            // A new consensus engine will never be started without having the genesis block
281            // of the new epoch (the last block of the previous epoch) already stored.
282            unreachable!("missing starting epoch block at height {}", last_height);
283        };
284        block.digest()
285    }
286
287    /// Proposes a new block or re-proposes the epoch boundary block.
288    ///
289    /// This method builds a new block from the underlying application unless the parent block
290    /// is the last block in the current epoch. When at an epoch boundary, it re-proposes the
291    /// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
292    ///
293    /// The proposal operation is spawned in a background task and returns a receiver that will
294    /// contain the proposed block's digest when ready. The built block is cached for later
295    /// broadcasting.
296    async fn propose(
297        &mut self,
298        consensus_context: Context<Self::Digest, S::PublicKey>,
299    ) -> oneshot::Receiver<Self::Digest> {
300        let mut marshal = self.marshal.clone();
301        let mut application = self.application.clone();
302        let last_built = self.last_built.clone();
303        let epocher = self.epocher.clone();
304
305        // Metrics
306        let build_duration = self.build_duration.clone();
307
308        let (mut tx, rx) = oneshot::channel();
309        self.context
310            .with_label("propose")
311            .with_attribute("round", consensus_context.round)
312            .spawn(move |runtime_context| async move {
313                let (parent_view, parent_digest) = consensus_context.parent;
314                let parent_request = fetch_parent(
315                    parent_digest,
316                    // We are guaranteed that the parent round for any `consensus_context` is
317                    // in the same epoch (recall, the boundary block of the previous epoch
318                    // is the genesis block of the current epoch).
319                    Some(Round::new(consensus_context.epoch(), parent_view)),
320                    &mut application,
321                    &mut marshal,
322                )
323                .await;
324
325                let parent = select! {
326                    _ = tx.closed() => {
327                        debug!(reason = "consensus dropped receiver", "skipping proposal");
328                        return;
329                    },
330                    result = parent_request => match result {
331                        Ok(parent) => parent,
332                        Err(_) => {
333                            debug!(
334                                ?parent_digest,
335                                reason = "failed to fetch parent block",
336                                "skipping proposal"
337                            );
338                            return;
339                        }
340                    },
341                };
342
343                // Special case: If the parent block is the last block in the epoch,
344                // re-propose it as to not produce any blocks that will be cut out
345                // by the epoch transition.
346                let last_in_epoch = epocher
347                    .last(consensus_context.epoch())
348                    .expect("current epoch should exist");
349                if parent.height() == last_in_epoch {
350                    let digest = parent.digest();
351                    {
352                        let mut lock = last_built.lock();
353                        *lock = Some((consensus_context.round, parent));
354                    }
355
356                    let success = tx.send_lossy(digest);
357                    debug!(
358                        round = ?consensus_context.round,
359                        ?digest,
360                        success,
361                        "re-proposed parent block at epoch boundary"
362                    );
363                    return;
364                }
365
366                let ancestor_stream = AncestorStream::new(marshal.clone(), [parent]);
367                let build_request = application.propose(
368                    (
369                        runtime_context.with_label("app_propose"),
370                        consensus_context.clone(),
371                    ),
372                    ancestor_stream,
373                );
374
375                let mut build_timer = build_duration.timer();
376                let built_block = select! {
377                    _ = tx.closed() => {
378                        debug!(reason = "consensus dropped receiver", "skipping proposal");
379                        return;
380                    },
381                    result = build_request => match result {
382                        Some(block) => block,
383                        None => {
384                            debug!(
385                                ?parent_digest,
386                                reason = "block building failed",
387                                "skipping proposal"
388                            );
389                            return;
390                        }
391                    },
392                };
393                build_timer.observe();
394
395                let digest = built_block.digest();
396                {
397                    let mut lock = last_built.lock();
398                    *lock = Some((consensus_context.round, built_block));
399                }
400
401                let success = tx.send_lossy(digest);
402                debug!(
403                    round = ?consensus_context.round,
404                    ?digest,
405                    success,
406                    "proposed new block"
407                );
408            });
409        rx
410    }
411
412    async fn verify(
413        &mut self,
414        context: Context<Self::Digest, S::PublicKey>,
415        digest: Self::Digest,
416    ) -> oneshot::Receiver<bool> {
417        let mut marshal = self.marshal.clone();
418        let mut marshaled = self.clone();
419
420        let (mut tx, rx) = oneshot::channel();
421        self.context
422            .with_label("optimistic_verify")
423            .with_attribute("round", context.round)
424            .spawn(move |_| async move {
425                let block_request = marshal.subscribe_by_digest(Some(context.round), digest).await;
426                let block = select! {
427                    _ = tx.closed() => {
428                        debug!(
429                            reason = "consensus dropped receiver",
430                            "skipping optimistic verification"
431                        );
432                        return;
433                    },
434                    result = block_request => match result {
435                        Ok(block) => block,
436                        Err(_) => {
437                            debug!(
438                                ?digest,
439                                reason = "failed to fetch block for optimistic verification",
440                                "skipping optimistic verification"
441                            );
442                            return;
443                        }
444                    },
445                };
446
447                // Shared pre-checks enforce:
448                // - Block epoch membership.
449                // - Re-proposal detection via `digest == context.parent.1`.
450                //
451                // Re-proposals return early and skip normal parent/height checks
452                // because they were already verified when originally proposed and
453                // parent-child checks would fail by construction when parent == block.
454                let block = match precheck_epoch_and_reproposal(
455                    &marshaled.epocher,
456                    &mut marshal,
457                    &context,
458                    digest,
459                    block,
460                )
461                .await
462                {
463                    Decision::Complete(valid) => {
464                        if valid {
465                            // Valid re-proposal. Create a completed verification task for `certify`.
466                            let round = context.round;
467                            let (task_tx, task_rx) = oneshot::channel();
468                            task_tx.send_lossy(true);
469                            marshaled.verification_tasks.insert(round, digest, task_rx);
470                        }
471                        // `Complete` means either immediate rejection or successful
472                        // re-proposal handling with no further ancestry validation.
473                        tx.send_lossy(valid);
474                        return;
475                    }
476                    Decision::Continue(block) => block,
477                };
478
479                // Before casting a notarize vote, ensure the block's embedded context matches
480                // the consensus context.
481                //
482                // This is a critical step - the notarize quorum is guaranteed to have at least
483                // f+1 honest validators who will verify against this context, preventing a Byzantine
484                // proposer from embedding a malicious context. The other f honest validators who did
485                // not vote will later use the block-embedded context to help finalize if Byzantine
486                // validators withhold their finalize votes.
487                if block.context() != context {
488                    debug!(
489                        ?context,
490                        block_context = ?block.context(),
491                        "block-embedded context does not match consensus context during optimistic verification"
492                    );
493                    tx.send_lossy(false);
494                    return;
495                }
496
497                // Begin the rest of the verification process asynchronously.
498                let round = context.round;
499                let task = marshaled.deferred_verify(context, block);
500                marshaled.verification_tasks.insert(round, digest, task);
501
502                tx.send_lossy(true);
503            });
504        rx
505    }
506}
507
508impl<E, S, A, B, ES> CertifiableAutomaton for Deferred<E, S, A, B, ES>
509where
510    E: Rng + Spawner + Metrics + Clock,
511    S: Scheme,
512    A: VerifyingApplication<
513        E,
514        Block = B,
515        SigningScheme = S,
516        Context = Context<B::Digest, S::PublicKey>,
517    >,
518    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
519    ES: Epocher,
520{
521    async fn certify(&mut self, round: Round, digest: Self::Digest) -> oneshot::Receiver<bool> {
522        // Attempt to retrieve the existing verification task for this (round, payload).
523        let task = self.verification_tasks.take(round, digest);
524        if let Some(task) = task {
525            return task;
526        }
527
528        // No in-progress task means we never verified this proposal locally. We can use the
529        // block's embedded context to help complete finalization when Byzantine validators
530        // withhold their finalize votes. If a Byzantine proposer embedded a malicious context,
531        // the f+1 honest validators from the notarizing quorum will verify against the proper
532        // context and reject the mismatch, preventing a 2f+1 finalization quorum.
533        //
534        // Subscribe to the block and verify using its embedded context once available.
535        debug!(
536            ?round,
537            ?digest,
538            "subscribing to block for certification using embedded context"
539        );
540        let block_rx = self.marshal.subscribe_by_digest(Some(round), digest).await;
541        let mut marshaled = self.clone();
542        let epocher = self.epocher.clone();
543        let (mut tx, rx) = oneshot::channel();
544        self.context
545            .with_label("certify")
546            .with_attribute("round", round)
547            .spawn(move |_| async move {
548                let block = select! {
549                    _ = tx.closed() => {
550                        debug!(
551                            reason = "consensus dropped receiver",
552                            "skipping certification"
553                        );
554                        return;
555                    },
556                    result = block_rx => match result {
557                        Ok(block) => block,
558                        Err(_) => {
559                            debug!(
560                                ?digest,
561                                reason = "failed to fetch block for certification",
562                                "skipping certification"
563                            );
564                            return;
565                        }
566                    },
567                };
568
569                // Re-proposal detection for certify path: we don't have the consensus context,
570                // only the block's embedded context from original proposal. Infer re-proposal from:
571                // 1. Block is at epoch boundary (only boundary blocks can be re-proposed)
572                // 2. Certification round's view > embedded context's view (re-proposals retain their
573                //    original embedded context, so a later view indicates the block was re-proposed)
574                // 3. Same epoch (re-proposals don't cross epoch boundaries)
575                let embedded_context = block.context();
576                let is_reproposal = is_inferred_reproposal_at_certify(
577                    &epocher,
578                    block.height(),
579                    embedded_context.round,
580                    round,
581                );
582                if is_reproposal {
583                    // NOTE: It is possible that, during crash recovery, we call `marshal.verified`
584                    // twice for the same block. That function is idempotent, so this is safe.
585                    marshaled.marshal.verified(round, block).await;
586                    tx.send_lossy(true);
587                    return;
588                }
589
590                let verify_rx = marshaled.deferred_verify(embedded_context, block);
591                if let Ok(result) = verify_rx.await {
592                    tx.send_lossy(result);
593                }
594            });
595        rx
596    }
597}
598
599impl<E, S, A, B, ES> Relay for Deferred<E, S, A, B, ES>
600where
601    E: Rng + Spawner + Metrics + Clock,
602    S: Scheme,
603    A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>,
604    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
605    ES: Epocher,
606{
607    type Digest = B::Digest;
608
609    /// Broadcasts a previously built block to the network.
610    ///
611    /// This uses the cached block from the last proposal operation. If no block was built or
612    /// the digest does not match the cached block, the broadcast is skipped with a warning.
613    async fn broadcast(&mut self, digest: Self::Digest) {
614        let Some((round, block)) = self.last_built.lock().take() else {
615            warn!("missing block to broadcast");
616            return;
617        };
618
619        if block.digest() != digest {
620            warn!(
621                round = %round,
622                digest = %block.digest(),
623                height = %block.height(),
624                "skipping requested broadcast of block with mismatched digest"
625            );
626            return;
627        }
628
629        debug!(
630            round = %round,
631            digest = %block.digest(),
632            height = %block.height(),
633            "requested broadcast of built block"
634        );
635        self.marshal.proposed(round, block).await;
636    }
637}
638
639impl<E, S, A, B, ES> Reporter for Deferred<E, S, A, B, ES>
640where
641    E: Rng + Spawner + Metrics + Clock,
642    S: Scheme,
643    A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>
644        + Reporter<Activity = Update<B>>,
645    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
646    ES: Epocher,
647{
648    type Activity = A::Activity;
649
650    /// Relays a report to the underlying [`Application`] and cleans up old verification tasks.
651    async fn report(&mut self, update: Self::Activity) {
652        // Clean up verification tasks for rounds <= the finalized round.
653        if let Update::Tip(round, _, _) = &update {
654            self.verification_tasks.retain_after(round);
655        }
656        self.application.report(update).await
657    }
658}
659
660#[cfg(test)]
661mod tests {
662    use super::Deferred;
663    use crate::{
664        marshal::mocks::{
665            harness::{
666                default_leader, make_raw_block, setup_network, Ctx, StandardHarness, TestHarness,
667                B, BLOCKS_PER_EPOCH, NAMESPACE, NUM_VALIDATORS, S, V,
668            },
669            verifying::MockVerifyingApp,
670        },
671        simplex::scheme::bls12381_threshold::vrf as bls12381_threshold_vrf,
672        types::{Epoch, Epocher, FixedEpocher, Height, Round, View},
673        Automaton, CertifiableAutomaton,
674    };
675    use commonware_cryptography::{
676        certificate::{mocks::Fixture, ConstantProvider},
677        sha256::Sha256,
678        Digestible, Hasher as _,
679    };
680    use commonware_macros::{select, test_traced};
681    use commonware_runtime::{deterministic, Clock, Metrics, Runner};
682    use std::time::Duration;
683
684    #[test_traced("INFO")]
685    fn test_certify_lower_view_after_higher_view() {
686        let runner = deterministic::Runner::timed(Duration::from_secs(60));
687        runner.start(|mut context| async move {
688            let mut oracle = setup_network(context.clone(), None);
689            let Fixture {
690                participants,
691                schemes,
692                ..
693            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
694
695            let me = participants[0].clone();
696
697            let setup = StandardHarness::setup_validator(
698                context.with_label("validator_0"),
699                &mut oracle,
700                me.clone(),
701                ConstantProvider::new(schemes[0].clone()),
702            )
703            .await;
704            let marshal = setup.mailbox;
705
706            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
707            let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
708
709            let mut marshaled = Deferred::new(
710                context.clone(),
711                mock_app,
712                marshal.clone(),
713                FixedEpocher::new(BLOCKS_PER_EPOCH),
714            );
715
716            // Create parent block at height 1
717            let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
718            let parent_digest = parent.digest();
719            marshal
720                .clone()
721                .proposed(Round::new(Epoch::new(0), View::new(1)), parent.clone())
722                .await;
723
724            // Block A at view 5 (height 2)
725            let round_a = Round::new(Epoch::new(0), View::new(5));
726            let context_a = Ctx {
727                round: round_a,
728                leader: me.clone(),
729                parent: (View::new(1), parent_digest),
730            };
731            let block_a = B::new::<Sha256>(context_a.clone(), parent_digest, Height::new(2), 200);
732            let commitment_a = block_a.digest();
733            marshal.clone().proposed(round_a, block_a.clone()).await;
734
735            // Block B at view 10 (height 2, different block same height)
736            let round_b = Round::new(Epoch::new(0), View::new(10));
737            let context_b = Ctx {
738                round: round_b,
739                leader: me.clone(),
740                parent: (View::new(1), parent_digest),
741            };
742            let block_b = B::new::<Sha256>(context_b.clone(), parent_digest, Height::new(2), 300);
743            let commitment_b = block_b.digest();
744            marshal.clone().proposed(round_b, block_b.clone()).await;
745
746            context.sleep(Duration::from_millis(10)).await;
747
748            // Step 1: Verify block A at view 5
749            let _ = marshaled.verify(context_a, commitment_a).await.await;
750
751            // Step 2: Verify block B at view 10
752            let _ = marshaled.verify(context_b, commitment_b).await.await;
753
754            // Step 3: Certify block B at view 10 FIRST
755            let certify_b = marshaled.certify(round_b, commitment_b).await;
756            assert!(
757                certify_b.await.unwrap(),
758                "Block B certification should succeed"
759            );
760
761            // Step 4: Certify block A at view 5 - should succeed
762            let certify_a = marshaled.certify(round_a, commitment_a).await;
763
764            select! {
765                result = certify_a => {
766                    assert!(result.unwrap(), "Block A certification should succeed");
767                },
768                _ = context.sleep(Duration::from_secs(5)) => {
769                    panic!("Block A certification timed out");
770                },
771            }
772        })
773    }
774
775    #[test_traced("WARN")]
776    fn test_marshaled_rejects_unsupported_epoch() {
777        #[derive(Clone)]
778        struct LimitedEpocher {
779            inner: FixedEpocher,
780            max_epoch: u64,
781        }
782
783        impl Epocher for LimitedEpocher {
784            fn containing(&self, height: Height) -> Option<crate::types::EpochInfo> {
785                let bounds = self.inner.containing(height)?;
786                if bounds.epoch().get() > self.max_epoch {
787                    None
788                } else {
789                    Some(bounds)
790                }
791            }
792
793            fn first(&self, epoch: Epoch) -> Option<Height> {
794                if epoch.get() > self.max_epoch {
795                    None
796                } else {
797                    self.inner.first(epoch)
798                }
799            }
800
801            fn last(&self, epoch: Epoch) -> Option<Height> {
802                if epoch.get() > self.max_epoch {
803                    None
804                } else {
805                    self.inner.last(epoch)
806                }
807            }
808        }
809
810        let runner = deterministic::Runner::timed(Duration::from_secs(60));
811        runner.start(|mut context| async move {
812            let mut oracle = setup_network(context.clone(), None);
813            let Fixture {
814                participants,
815                schemes,
816                ..
817            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
818
819            let me = participants[0].clone();
820
821            let setup = StandardHarness::setup_validator(
822                context.with_label("validator_0"),
823                &mut oracle,
824                me.clone(),
825                ConstantProvider::new(schemes[0].clone()),
826            )
827            .await;
828            let marshal = setup.mailbox;
829
830            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
831            let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
832            let limited_epocher = LimitedEpocher {
833                inner: FixedEpocher::new(BLOCKS_PER_EPOCH),
834                max_epoch: 0,
835            };
836
837            let mut marshaled =
838                Deferred::new(context.clone(), mock_app, marshal.clone(), limited_epocher);
839
840            // Create a parent block at height 19 (last block in epoch 0, which is supported)
841            let parent_ctx = Ctx {
842                round: Round::new(Epoch::zero(), View::new(19)),
843                leader: default_leader(),
844                parent: (View::zero(), genesis.digest()),
845            };
846            let parent =
847                B::new::<Sha256>(parent_ctx.clone(), genesis.digest(), Height::new(19), 1000);
848            let parent_digest = parent.digest();
849            marshal
850                .clone()
851                .proposed(Round::new(Epoch::zero(), View::new(19)), parent.clone())
852                .await;
853
854            // Create a block at height 20 (first block in epoch 1, which is NOT supported)
855            let unsupported_round = Round::new(Epoch::new(1), View::new(20));
856            let unsupported_context = Ctx {
857                round: unsupported_round,
858                leader: me.clone(),
859                parent: (View::new(19), parent_digest),
860            };
861            let block = B::new::<Sha256>(
862                unsupported_context.clone(),
863                parent_digest,
864                Height::new(20),
865                2000,
866            );
867            let block_commitment = block.digest();
868            marshal
869                .clone()
870                .proposed(unsupported_round, block.clone())
871                .await;
872
873            context.sleep(Duration::from_millis(10)).await;
874
875            // Call verify and wait for the result (verify returns optimistic result,
876            // but also spawns deferred verification)
877            let verify_result = marshaled
878                .verify(unsupported_context, block_commitment)
879                .await;
880            // Wait for optimistic verify to complete so the verification task is registered
881            let optimistic_result = verify_result.await;
882
883            // The optimistic verify should return false because the block is in an unsupported epoch
884            assert!(
885                !optimistic_result.unwrap(),
886                "Optimistic verify should reject block in unsupported epoch"
887            );
888        })
889    }
890
891    /// Test that marshaled rejects blocks when consensus context doesn't match block's embedded context.
892    ///
893    /// This tests that when verify() is called with a context that doesn't match what's embedded
894    /// in the block, the verification should fail. A Byzantine proposer could broadcast a block
895    /// with one embedded context but consensus could call verify() with a different context.
896    #[test_traced("WARN")]
897    fn test_marshaled_rejects_mismatched_context() {
898        let runner = deterministic::Runner::timed(Duration::from_secs(30));
899        runner.start(|mut context| async move {
900            let mut oracle = setup_network(context.clone(), None);
901            let Fixture {
902                participants,
903                schemes,
904                ..
905            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
906
907            let me = participants[0].clone();
908
909            let setup = StandardHarness::setup_validator(
910                context.with_label("validator_0"),
911                &mut oracle,
912                me.clone(),
913                ConstantProvider::new(schemes[0].clone()),
914            )
915            .await;
916            let marshal = setup.mailbox;
917
918            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
919            let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
920
921            let mut marshaled = Deferred::new(
922                context.clone(),
923                mock_app,
924                marshal.clone(),
925                FixedEpocher::new(BLOCKS_PER_EPOCH),
926            );
927
928            // Create parent block at height 1 so the commitment is well-formed.
929            let parent_ctx = Ctx {
930                round: Round::new(Epoch::zero(), View::new(1)),
931                leader: default_leader(),
932                parent: (View::zero(), genesis.digest()),
933            };
934            let parent = B::new::<Sha256>(parent_ctx, genesis.digest(), Height::new(1), 100);
935            let parent_commitment = parent.digest();
936            marshal
937                .clone()
938                .proposed(Round::new(Epoch::zero(), View::new(1)), parent.clone())
939                .await;
940
941            // Build a block with context A (embedded in the block).
942            let round_a = Round::new(Epoch::zero(), View::new(2));
943            let context_a = Ctx {
944                round: round_a,
945                leader: me.clone(),
946                parent: (View::new(1), parent_commitment),
947            };
948            let block_a = B::new::<Sha256>(context_a, parent.digest(), Height::new(2), 200);
949            let commitment_a = block_a.digest();
950            marshal.clone().proposed(round_a, block_a).await;
951
952            context.sleep(Duration::from_millis(10)).await;
953
954            // Verify using a different consensus context B (hash mismatch).
955            let round_b = Round::new(Epoch::zero(), View::new(3));
956            let context_b = Ctx {
957                round: round_b,
958                leader: participants[1].clone(),
959                parent: (View::new(1), parent_commitment),
960            };
961
962            let verify_rx = marshaled.verify(context_b, commitment_a).await;
963            select! {
964                result = verify_rx => {
965                    assert!(
966                        !result.unwrap(),
967                        "mismatched context hash should be rejected"
968                    );
969                },
970                _ = context.sleep(Duration::from_secs(5)) => {
971                    panic!("verify should reject mismatched context hash promptly");
972                },
973            }
974        })
975    }
976}