Skip to main content

commonware_consensus/marshal/standard/
deferred.rs

1//! Wrapper for consensus applications that handles epochs and block dissemination.
2//!
3//! # Overview
4//!
5//! [`Deferred`] is an adapter that wraps any [`VerifyingApplication`] implementation to handle
6//! epoch transitions automatically. It intercepts consensus operations (propose, verify) and
7//! ensures blocks are only produced within valid epoch boundaries.
8//!
9//! # Epoch Boundaries
10//!
11//! When the parent is the last block in an epoch (as determined by the [`Epocher`]), this wrapper
12//! re-proposes that boundary block instead of building a new block. This avoids producing blocks
13//! that would be pruned by the epoch transition.
14//!
15//! # Deferred Verification
16//!
17//! Before casting a notarize vote, [`Deferred`] waits for the block to become available and
18//! then verifies that the block's embedded context matches the consensus context. However, it does not
19//! wait for the application to finish verifying the block contents before voting. This enables verification
20//! to run while we wait for a quorum of votes to form a certificate (hiding verification latency behind network
21//! latency). Once a certificate is formed, we wait on the verification result in [`CertifiableAutomaton::certify`]
22//! before voting to finalize (ensuring no invalid blocks are admitted to the canonical chain).
23//!
24//! # Usage
25//!
26//! Wrap your [`Application`] implementation with [`Deferred::new`] and provide it to your
27//! consensus engine for the [`Automaton`] and [`Relay`]. The wrapper handles all epoch logic transparently.
28//!
29//! ```rust,ignore
30//! let application = Deferred::new(
31//!     context,
32//!     my_application,
33//!     marshal_mailbox,
34//!     epocher,
35//! );
36//! ```
37//!
38//! # Implementation Notes
39//!
40//! - Genesis blocks are handled specially: epoch 0 returns the application's genesis block,
41//!   while subsequent epochs use the last block of the previous epoch as genesis
42//! - Blocks are automatically verified to be within the current epoch
43//!
44//! # Notarization and Data Availability
45//!
46//! In rare crash cases, it is possible for a notarization certificate to exist without a block being
47//! available to the honest parties if [`CertifiableAutomaton::certify`] fails after a notarization is
48//! formed.
49//!
50//! For this reason, it should not be expected that every notarized payload will be certifiable due
51//! to the lack of an available block. However, if even one honest and online party has the block,
52//! they will attempt to forward it to others via marshal's resolver.
53//!
54//! ```text
55//!                                      ┌───────────────────────────────────────────────────┐
56//!                                      ▼                                                   │
57//! ┌─────────────────────┐   ┌─────────────────────┐   ┌─────────────────────┐   ┌─────────────────────┐
58//! │          B1         │◀──│          B2         │◀──│          B3         │XXX│          B4         │
59//! └─────────────────────┘   └─────────────────────┘   └──────────┬──────────┘   └─────────────────────┘
60//!                                                                │
61//!                                                          Failed Certify
62//! ```
63//!
64//! # Future Work
65//!
66//! - To further reduce view latency, a participant could optimistically vote for a block prior to
67//!   observing its availability during [`Automaton::verify`]. However, this would require updating
68//!   other components (like [`crate::marshal`]) to handle backfill where notarization does not imply
69//!   a block is fetchable (without modification, a malicious leader that withholds blocks during propose
70//!   could get an honest node to exhaust their network rate limit fetching things that don't exist rather
71//!   than blocks they need AND can fetch).
72
73use crate::{
74    marshal::{
75        ancestry::AncestorStream,
76        application::{
77            validation::{is_inferred_reproposal_at_certify, LastBuilt},
78            verification_tasks::VerificationTasks,
79        },
80        core::Mailbox,
81        standard::{
82            validation::{
83                fetch_parent, precheck_epoch_and_reproposal, verify_with_parent, Decision,
84            },
85            Standard,
86        },
87        Update,
88    },
89    simplex::{types::Context, Plan},
90    types::{Epoch, Epocher, Round},
91    Application, Automaton, CertifiableAutomaton, CertifiableBlock, Epochable, Relay, Reporter,
92    VerifyingApplication,
93};
94use commonware_cryptography::{certificate::Scheme, Digestible};
95use commonware_macros::select;
96use commonware_runtime::{
97    telemetry::metrics::histogram::{Buckets, Timed},
98    Clock, Metrics, Spawner,
99};
100use commonware_utils::{
101    channel::{fallible::OneshotExt, oneshot},
102    sync::Mutex,
103};
104use rand::Rng;
105use std::sync::Arc;
106use tracing::{debug, warn};
107
108/// An [`Application`] adapter that handles epoch transitions and validates block ancestry.
109///
110/// This wrapper intercepts consensus operations to enforce epoch boundaries and validate
111/// block ancestry. It prevents blocks from being produced outside their valid epoch,
112/// handles the special case of re-proposing boundary blocks at epoch boundaries,
113/// and ensures all blocks have valid parent linkage and contiguous heights.
114///
115/// # Ancestry Validation
116///
117/// Applications wrapped by [`Deferred`] can rely on the following ancestry checks being
118/// performed automatically during verification:
119/// - Parent digest matches the consensus context's expected parent
120/// - Block height is exactly one greater than the parent's height
121///
122/// Verifying only the immediate parent is sufficient since the parent itself must have
123/// been notarized by consensus, which guarantees it was verified and accepted by a quorum.
124/// This means the entire ancestry chain back to genesis is transitively validated.
125///
126/// Applications do not need to re-implement these checks in their own verification logic.
127///
128/// # Context Recovery
129///
130/// With deferred verification, validators wait for data availability (DA) and verify the context
131/// before voting. If a validator crashes after voting but before certification, they lose their in-memory
132/// verification task. When recovering, validators extract context from a [`CertifiableBlock`].
133///
134/// _This embedded context is trustworthy because the notarizing quorum (which contains at least f+1 honest
135/// validators) verified that the block's context matched the consensus context before voting._
136#[derive(Clone)]
137pub struct Deferred<E, S, A, B, ES>
138where
139    E: Rng + Spawner + Metrics + Clock,
140    S: Scheme,
141    A: Application<E>,
142    B: CertifiableBlock,
143    ES: Epocher,
144{
145    context: E,
146    application: A,
147    marshal: Mailbox<S, Standard<B>>,
148    epocher: ES,
149    last_built: LastBuilt<B>,
150    verification_tasks: VerificationTasks<<B as Digestible>::Digest>,
151
152    build_duration: Timed<E>,
153}
154
155impl<E, S, A, B, ES> Deferred<E, S, A, B, ES>
156where
157    E: Rng + Spawner + Metrics + Clock,
158    S: Scheme,
159    A: VerifyingApplication<
160        E,
161        Block = B,
162        SigningScheme = S,
163        Context = Context<B::Digest, S::PublicKey>,
164    >,
165    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
166    ES: Epocher,
167{
168    /// Creates a new [`Deferred`] wrapper.
169    pub fn new(context: E, application: A, marshal: Mailbox<S, Standard<B>>, epocher: ES) -> Self {
170        use prometheus_client::metrics::histogram::Histogram;
171
172        let build_histogram = Histogram::new(Buckets::LOCAL);
173        context.register(
174            "build_duration",
175            "Histogram of time taken for the application to build a new block, in seconds",
176            build_histogram.clone(),
177        );
178        let build_duration = Timed::new(build_histogram, Arc::new(context.clone()));
179
180        Self {
181            context,
182            application,
183            marshal,
184            epocher,
185            last_built: Arc::new(Mutex::new(None)),
186            verification_tasks: VerificationTasks::new(),
187
188            build_duration,
189        }
190    }
191
192    /// Verifies a proposed block's application-level validity.
193    ///
194    /// This method validates that:
195    /// 1. The block's parent digest matches the expected parent
196    /// 2. The block's height is exactly one greater than the parent's height
197    /// 3. The underlying application's verification logic passes
198    ///
199    /// Verification is spawned in a background task and returns a receiver that will contain
200    /// the verification result. Valid blocks are reported to the marshal as verified.
201    #[inline]
202    fn deferred_verify(
203        &mut self,
204        context: <Self as Automaton>::Context,
205        block: B,
206    ) -> oneshot::Receiver<bool> {
207        let mut marshal = self.marshal.clone();
208        let mut application = self.application.clone();
209        let (mut tx, rx) = oneshot::channel();
210        self.context
211            .with_label("deferred_verify")
212            .with_attribute("round", context.round)
213            .spawn(move |runtime_context| async move {
214                // Shared non-reproposal verification:
215                // - fetch parent (using trusted round hint from consensus context)
216                // - validate standard ancestry invariants
217                // - run application verification over ancestry
218                //
219                // The helper preserves the prior early-exit behavior and returns
220                // `None` when work should stop (for example receiver dropped or
221                // parent unavailable).
222                let application_valid = match verify_with_parent(
223                    runtime_context,
224                    context,
225                    block,
226                    &mut application,
227                    &mut marshal,
228                    &mut tx,
229                )
230                .await
231                {
232                    Some(valid) => valid,
233                    None => return,
234                };
235                tx.send_lossy(application_valid);
236            });
237
238        rx
239    }
240}
241
242impl<E, S, A, B, ES> Automaton for Deferred<E, S, A, B, ES>
243where
244    E: Rng + Spawner + Metrics + Clock,
245    S: Scheme,
246    A: VerifyingApplication<
247        E,
248        Block = B,
249        SigningScheme = S,
250        Context = Context<B::Digest, S::PublicKey>,
251    >,
252    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
253    ES: Epocher,
254{
255    type Digest = B::Digest;
256    type Context = Context<Self::Digest, S::PublicKey>;
257
258    /// Returns the genesis digest for a given epoch.
259    ///
260    /// For epoch 0, this returns the application's genesis block digest. For subsequent
261    /// epochs, it returns the digest of the last block from the previous epoch, which
262    /// serves as the genesis block for the new epoch.
263    ///
264    /// # Panics
265    ///
266    /// Panics if a non-zero epoch is requested but the previous epoch's final block is not
267    /// available in storage. This indicates a critical error in the consensus engine startup
268    /// sequence, as engines must always have the genesis block before starting.
269    async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
270        if epoch.is_zero() {
271            return self.application.genesis().await.digest();
272        }
273
274        let prev = epoch.previous().expect("checked to be non-zero above");
275        let last_height = self
276            .epocher
277            .last(prev)
278            .expect("previous epoch should exist");
279        let Some(block) = self.marshal.get_block(last_height).await else {
280            // A new consensus engine will never be started without having the genesis block
281            // of the new epoch (the last block of the previous epoch) already stored.
282            unreachable!("missing starting epoch block at height {}", last_height);
283        };
284        block.digest()
285    }
286
287    /// Proposes a new block or re-proposes the epoch boundary block.
288    ///
289    /// This method builds a new block from the underlying application unless the parent block
290    /// is the last block in the current epoch. When at an epoch boundary, it re-proposes the
291    /// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
292    ///
293    /// The proposal operation is spawned in a background task and returns a receiver that will
294    /// contain the proposed block's digest when ready. The built block is cached for later
295    /// broadcasting.
296    async fn propose(
297        &mut self,
298        consensus_context: Context<Self::Digest, S::PublicKey>,
299    ) -> oneshot::Receiver<Self::Digest> {
300        let mut marshal = self.marshal.clone();
301        let mut application = self.application.clone();
302        let last_built = self.last_built.clone();
303        let epocher = self.epocher.clone();
304
305        // Metrics
306        let build_duration = self.build_duration.clone();
307
308        let (mut tx, rx) = oneshot::channel();
309        self.context
310            .with_label("propose")
311            .with_attribute("round", consensus_context.round)
312            .spawn(move |runtime_context| async move {
313                let (parent_view, parent_digest) = consensus_context.parent;
314                let parent_request = fetch_parent(
315                    parent_digest,
316                    // We are guaranteed that the parent round for any `consensus_context` is
317                    // in the same epoch (recall, the boundary block of the previous epoch
318                    // is the genesis block of the current epoch).
319                    Some(Round::new(consensus_context.epoch(), parent_view)),
320                    &mut application,
321                    &mut marshal,
322                )
323                .await;
324
325                let parent = select! {
326                    _ = tx.closed() => {
327                        debug!(reason = "consensus dropped receiver", "skipping proposal");
328                        return;
329                    },
330                    result = parent_request => match result {
331                        Ok(parent) => parent,
332                        Err(_) => {
333                            debug!(
334                                ?parent_digest,
335                                reason = "failed to fetch parent block",
336                                "skipping proposal"
337                            );
338                            return;
339                        }
340                    },
341                };
342
343                // Special case: If the parent block is the last block in the epoch,
344                // re-propose it as to not produce any blocks that will be cut out
345                // by the epoch transition.
346                let last_in_epoch = epocher
347                    .last(consensus_context.epoch())
348                    .expect("current epoch should exist");
349                if parent.height() == last_in_epoch {
350                    let digest = parent.digest();
351                    {
352                        let mut lock = last_built.lock();
353                        *lock = Some((consensus_context.round, parent));
354                    }
355
356                    let success = tx.send_lossy(digest);
357                    debug!(
358                        round = ?consensus_context.round,
359                        ?digest,
360                        success,
361                        "re-proposed parent block at epoch boundary"
362                    );
363                    return;
364                }
365
366                let ancestor_stream = AncestorStream::new(marshal.clone(), [parent]);
367                let build_request = application.propose(
368                    (
369                        runtime_context.with_label("app_propose"),
370                        consensus_context.clone(),
371                    ),
372                    ancestor_stream,
373                );
374
375                let mut build_timer = build_duration.timer();
376                let built_block = select! {
377                    _ = tx.closed() => {
378                        debug!(reason = "consensus dropped receiver", "skipping proposal");
379                        return;
380                    },
381                    result = build_request => match result {
382                        Some(block) => block,
383                        None => {
384                            debug!(
385                                ?parent_digest,
386                                reason = "block building failed",
387                                "skipping proposal"
388                            );
389                            return;
390                        }
391                    },
392                };
393                build_timer.observe();
394
395                let digest = built_block.digest();
396                {
397                    let mut lock = last_built.lock();
398                    *lock = Some((consensus_context.round, built_block));
399                }
400
401                let success = tx.send_lossy(digest);
402                debug!(
403                    round = ?consensus_context.round,
404                    ?digest,
405                    success,
406                    "proposed new block"
407                );
408            });
409        rx
410    }
411
412    async fn verify(
413        &mut self,
414        context: Context<Self::Digest, S::PublicKey>,
415        digest: Self::Digest,
416    ) -> oneshot::Receiver<bool> {
417        let mut marshal = self.marshal.clone();
418        let mut marshaled = self.clone();
419
420        let (mut tx, rx) = oneshot::channel();
421        self.context
422            .with_label("optimistic_verify")
423            .with_attribute("round", context.round)
424            .spawn(move |_| async move {
425                let block_request = marshal.subscribe_by_digest(Some(context.round), digest).await;
426                let block = select! {
427                    _ = tx.closed() => {
428                        debug!(
429                            reason = "consensus dropped receiver",
430                            "skipping optimistic verification"
431                        );
432                        return;
433                    },
434                    result = block_request => match result {
435                        Ok(block) => block,
436                        Err(_) => {
437                            debug!(
438                                ?digest,
439                                reason = "failed to fetch block for optimistic verification",
440                                "skipping optimistic verification"
441                            );
442                            return;
443                        }
444                    },
445                };
446
447                // Shared pre-checks enforce:
448                // - Block epoch membership.
449                // - Re-proposal detection via `digest == context.parent.1`.
450                //
451                // Re-proposals return early and skip normal parent/height checks
452                // because they were already verified when originally proposed and
453                // parent-child checks would fail by construction when parent == block.
454                let block = match precheck_epoch_and_reproposal(
455                    &marshaled.epocher,
456                    &mut marshal,
457                    &context,
458                    digest,
459                    block,
460                )
461                .await
462                {
463                    Decision::Complete(valid) => {
464                        if valid {
465                            // Valid re-proposal. Create a completed verification task for `certify`.
466                            let round = context.round;
467                            let (task_tx, task_rx) = oneshot::channel();
468                            task_tx.send_lossy(true);
469                            marshaled.verification_tasks.insert(round, digest, task_rx);
470                        }
471                        // `Complete` means either immediate rejection or successful
472                        // re-proposal handling with no further ancestry validation.
473                        tx.send_lossy(valid);
474                        return;
475                    }
476                    Decision::Continue(block) => block,
477                };
478
479                // Before casting a notarize vote, ensure the block's embedded context matches
480                // the consensus context.
481                //
482                // This is a critical step - the notarize quorum is guaranteed to have at least
483                // f+1 honest validators who will verify against this context, preventing a Byzantine
484                // proposer from embedding a malicious context. The other f honest validators who did
485                // not vote will later use the block-embedded context to help finalize if Byzantine
486                // validators withhold their finalize votes.
487                if block.context() != context {
488                    debug!(
489                        ?context,
490                        block_context = ?block.context(),
491                        "block-embedded context does not match consensus context during optimistic verification"
492                    );
493                    tx.send_lossy(false);
494                    return;
495                }
496
497                // Begin the rest of the verification process asynchronously.
498                let round = context.round;
499                let task = marshaled.deferred_verify(context, block);
500                marshaled.verification_tasks.insert(round, digest, task);
501
502                tx.send_lossy(true);
503            });
504        rx
505    }
506}
507
508impl<E, S, A, B, ES> CertifiableAutomaton for Deferred<E, S, A, B, ES>
509where
510    E: Rng + Spawner + Metrics + Clock,
511    S: Scheme,
512    A: VerifyingApplication<
513        E,
514        Block = B,
515        SigningScheme = S,
516        Context = Context<B::Digest, S::PublicKey>,
517    >,
518    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
519    ES: Epocher,
520{
521    async fn certify(&mut self, round: Round, digest: Self::Digest) -> oneshot::Receiver<bool> {
522        // Attempt to retrieve the existing verification task for this (round, payload).
523        let task = self.verification_tasks.take(round, digest);
524        if let Some(task) = task {
525            return task;
526        }
527
528        // No in-progress task means we never verified this proposal locally. We can use the
529        // block's embedded context to help complete finalization when Byzantine validators
530        // withhold their finalize votes. If a Byzantine proposer embedded a malicious context,
531        // the f+1 honest validators from the notarizing quorum will verify against the proper
532        // context and reject the mismatch, preventing a 2f+1 finalization quorum.
533        //
534        // Subscribe to the block and verify using its embedded context once available.
535        debug!(
536            ?round,
537            ?digest,
538            "subscribing to block for certification using embedded context"
539        );
540        let block_rx = self.marshal.subscribe_by_digest(Some(round), digest).await;
541        let mut marshaled = self.clone();
542        let epocher = self.epocher.clone();
543        let (mut tx, rx) = oneshot::channel();
544        self.context
545            .with_label("certify")
546            .with_attribute("round", round)
547            .spawn(move |_| async move {
548                let block = select! {
549                    _ = tx.closed() => {
550                        debug!(
551                            reason = "consensus dropped receiver",
552                            "skipping certification"
553                        );
554                        return;
555                    },
556                    result = block_rx => match result {
557                        Ok(block) => block,
558                        Err(_) => {
559                            debug!(
560                                ?digest,
561                                reason = "failed to fetch block for certification",
562                                "skipping certification"
563                            );
564                            return;
565                        }
566                    },
567                };
568
569                // Re-proposal detection for certify path: we don't have the consensus context,
570                // only the block's embedded context from original proposal. Infer re-proposal from:
571                // 1. Block is at epoch boundary (only boundary blocks can be re-proposed)
572                // 2. Certification round's view > embedded context's view (re-proposals retain their
573                //    original embedded context, so a later view indicates the block was re-proposed)
574                // 3. Same epoch (re-proposals don't cross epoch boundaries)
575                let embedded_context = block.context();
576                let is_reproposal = is_inferred_reproposal_at_certify(
577                    &epocher,
578                    block.height(),
579                    embedded_context.round,
580                    round,
581                );
582                if is_reproposal {
583                    // NOTE: It is possible that, during crash recovery, we call `marshal.verified`
584                    // twice for the same block. That function is idempotent, so this is safe.
585                    marshaled.marshal.verified(round, block).await;
586                    tx.send_lossy(true);
587                    return;
588                }
589
590                let verify_rx = marshaled.deferred_verify(embedded_context, block);
591                if let Ok(result) = verify_rx.await {
592                    tx.send_lossy(result);
593                }
594            });
595        rx
596    }
597}
598
599impl<E, S, A, B, ES> Relay for Deferred<E, S, A, B, ES>
600where
601    E: Rng + Spawner + Metrics + Clock,
602    S: Scheme,
603    A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>,
604    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
605    ES: Epocher,
606{
607    type Digest = B::Digest;
608    type PublicKey = S::PublicKey;
609    type Plan = Plan<S::PublicKey>;
610
611    async fn broadcast(&mut self, digest: Self::Digest, plan: Plan<S::PublicKey>) {
612        match plan {
613            Plan::Propose => {
614                let Some((round, block)) = self.last_built.lock().take() else {
615                    warn!("missing block to broadcast");
616                    return;
617                };
618                if block.digest() != digest {
619                    warn!(
620                        round = %round,
621                        digest = %block.digest(),
622                        height = %block.height(),
623                        "skipping requested broadcast of block with mismatched digest"
624                    );
625                    return;
626                }
627                debug!(
628                    round = %round,
629                    digest = %block.digest(),
630                    height = %block.height(),
631                    "requested broadcast of built block"
632                );
633                self.marshal.proposed(round, block).await;
634            }
635            Plan::Forward { round, peers } => {
636                self.marshal.forward(round, digest, peers).await;
637            }
638        }
639    }
640}
641
642impl<E, S, A, B, ES> Reporter for Deferred<E, S, A, B, ES>
643where
644    E: Rng + Spawner + Metrics + Clock,
645    S: Scheme,
646    A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>
647        + Reporter<Activity = Update<B>>,
648    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
649    ES: Epocher,
650{
651    type Activity = A::Activity;
652
653    /// Relays a report to the underlying [`Application`] and cleans up old verification tasks.
654    async fn report(&mut self, update: Self::Activity) {
655        // Clean up verification tasks for rounds <= the finalized round.
656        if let Update::Tip(round, _, _) = &update {
657            self.verification_tasks.retain_after(round);
658        }
659        self.application.report(update).await
660    }
661}
662
663#[cfg(test)]
664mod tests {
665    use super::Deferred;
666    use crate::{
667        marshal::mocks::{
668            harness::{
669                default_leader, make_raw_block, setup_network_with_participants, Ctx,
670                StandardHarness, TestHarness, B, BLOCKS_PER_EPOCH, NAMESPACE, NUM_VALIDATORS, S, V,
671            },
672            verifying::MockVerifyingApp,
673        },
674        simplex::scheme::bls12381_threshold::vrf as bls12381_threshold_vrf,
675        types::{Epoch, Epocher, FixedEpocher, Height, Round, View},
676        Automaton, CertifiableAutomaton,
677    };
678    use commonware_cryptography::{
679        certificate::{mocks::Fixture, ConstantProvider},
680        sha256::Sha256,
681        Digestible, Hasher as _,
682    };
683    use commonware_macros::{select, test_traced};
684    use commonware_runtime::{deterministic, Clock, Metrics, Runner};
685    use commonware_utils::NZUsize;
686    use std::time::Duration;
687
688    #[test_traced("INFO")]
689    fn test_certify_lower_view_after_higher_view() {
690        let runner = deterministic::Runner::timed(Duration::from_secs(60));
691        runner.start(|mut context| async move {
692            let Fixture {
693                participants,
694                schemes,
695                ..
696            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
697            let mut oracle =
698                setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
699                    .await;
700
701            let me = participants[0].clone();
702
703            let setup = StandardHarness::setup_validator(
704                context.with_label("validator_0"),
705                &mut oracle,
706                me.clone(),
707                ConstantProvider::new(schemes[0].clone()),
708            )
709            .await;
710            let marshal = setup.mailbox;
711
712            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
713            let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
714
715            let mut marshaled = Deferred::new(
716                context.clone(),
717                mock_app,
718                marshal.clone(),
719                FixedEpocher::new(BLOCKS_PER_EPOCH),
720            );
721
722            // Create parent block at height 1
723            let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
724            let parent_digest = parent.digest();
725            marshal
726                .clone()
727                .proposed(Round::new(Epoch::new(0), View::new(1)), parent.clone())
728                .await;
729
730            // Block A at view 5 (height 2)
731            let round_a = Round::new(Epoch::new(0), View::new(5));
732            let context_a = Ctx {
733                round: round_a,
734                leader: me.clone(),
735                parent: (View::new(1), parent_digest),
736            };
737            let block_a = B::new::<Sha256>(context_a.clone(), parent_digest, Height::new(2), 200);
738            let commitment_a = block_a.digest();
739            marshal.clone().proposed(round_a, block_a.clone()).await;
740
741            // Block B at view 10 (height 2, different block same height)
742            let round_b = Round::new(Epoch::new(0), View::new(10));
743            let context_b = Ctx {
744                round: round_b,
745                leader: me.clone(),
746                parent: (View::new(1), parent_digest),
747            };
748            let block_b = B::new::<Sha256>(context_b.clone(), parent_digest, Height::new(2), 300);
749            let commitment_b = block_b.digest();
750            marshal.clone().proposed(round_b, block_b.clone()).await;
751
752            context.sleep(Duration::from_millis(10)).await;
753
754            // Step 1: Verify block A at view 5
755            let _ = marshaled.verify(context_a, commitment_a).await.await;
756
757            // Step 2: Verify block B at view 10
758            let _ = marshaled.verify(context_b, commitment_b).await.await;
759
760            // Step 3: Certify block B at view 10 FIRST
761            let certify_b = marshaled.certify(round_b, commitment_b).await;
762            assert!(
763                certify_b.await.unwrap(),
764                "Block B certification should succeed"
765            );
766
767            // Step 4: Certify block A at view 5 - should succeed
768            let certify_a = marshaled.certify(round_a, commitment_a).await;
769
770            select! {
771                result = certify_a => {
772                    assert!(result.unwrap(), "Block A certification should succeed");
773                },
774                _ = context.sleep(Duration::from_secs(5)) => {
775                    panic!("Block A certification timed out");
776                },
777            }
778        })
779    }
780
781    #[test_traced("WARN")]
782    fn test_marshaled_rejects_unsupported_epoch() {
783        #[derive(Clone)]
784        struct LimitedEpocher {
785            inner: FixedEpocher,
786            max_epoch: u64,
787        }
788
789        impl Epocher for LimitedEpocher {
790            fn containing(&self, height: Height) -> Option<crate::types::EpochInfo> {
791                let bounds = self.inner.containing(height)?;
792                if bounds.epoch().get() > self.max_epoch {
793                    None
794                } else {
795                    Some(bounds)
796                }
797            }
798
799            fn first(&self, epoch: Epoch) -> Option<Height> {
800                if epoch.get() > self.max_epoch {
801                    None
802                } else {
803                    self.inner.first(epoch)
804                }
805            }
806
807            fn last(&self, epoch: Epoch) -> Option<Height> {
808                if epoch.get() > self.max_epoch {
809                    None
810                } else {
811                    self.inner.last(epoch)
812                }
813            }
814        }
815
816        let runner = deterministic::Runner::timed(Duration::from_secs(60));
817        runner.start(|mut context| async move {
818            let Fixture {
819                participants,
820                schemes,
821                ..
822            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
823            let mut oracle =
824                setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
825                    .await;
826
827            let me = participants[0].clone();
828
829            let setup = StandardHarness::setup_validator(
830                context.with_label("validator_0"),
831                &mut oracle,
832                me.clone(),
833                ConstantProvider::new(schemes[0].clone()),
834            )
835            .await;
836            let marshal = setup.mailbox;
837
838            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
839            let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
840            let limited_epocher = LimitedEpocher {
841                inner: FixedEpocher::new(BLOCKS_PER_EPOCH),
842                max_epoch: 0,
843            };
844
845            let mut marshaled =
846                Deferred::new(context.clone(), mock_app, marshal.clone(), limited_epocher);
847
848            // Create a parent block at height 19 (last block in epoch 0, which is supported)
849            let parent_ctx = Ctx {
850                round: Round::new(Epoch::zero(), View::new(19)),
851                leader: default_leader(),
852                parent: (View::zero(), genesis.digest()),
853            };
854            let parent =
855                B::new::<Sha256>(parent_ctx.clone(), genesis.digest(), Height::new(19), 1000);
856            let parent_digest = parent.digest();
857            marshal
858                .clone()
859                .proposed(Round::new(Epoch::zero(), View::new(19)), parent.clone())
860                .await;
861
862            // Create a block at height 20 (first block in epoch 1, which is NOT supported)
863            let unsupported_round = Round::new(Epoch::new(1), View::new(20));
864            let unsupported_context = Ctx {
865                round: unsupported_round,
866                leader: me.clone(),
867                parent: (View::new(19), parent_digest),
868            };
869            let block = B::new::<Sha256>(
870                unsupported_context.clone(),
871                parent_digest,
872                Height::new(20),
873                2000,
874            );
875            let block_commitment = block.digest();
876            marshal
877                .clone()
878                .proposed(unsupported_round, block.clone())
879                .await;
880
881            context.sleep(Duration::from_millis(10)).await;
882
883            // Call verify and wait for the result (verify returns optimistic result,
884            // but also spawns deferred verification)
885            let verify_result = marshaled
886                .verify(unsupported_context, block_commitment)
887                .await;
888            // Wait for optimistic verify to complete so the verification task is registered
889            let optimistic_result = verify_result.await;
890
891            // The optimistic verify should return false because the block is in an unsupported epoch
892            assert!(
893                !optimistic_result.unwrap(),
894                "Optimistic verify should reject block in unsupported epoch"
895            );
896        })
897    }
898
899    /// Test that marshaled rejects blocks when consensus context doesn't match block's embedded context.
900    ///
901    /// This tests that when verify() is called with a context that doesn't match what's embedded
902    /// in the block, the verification should fail. A Byzantine proposer could broadcast a block
903    /// with one embedded context but consensus could call verify() with a different context.
904    #[test_traced("WARN")]
905    fn test_marshaled_rejects_mismatched_context() {
906        let runner = deterministic::Runner::timed(Duration::from_secs(30));
907        runner.start(|mut context| async move {
908            let Fixture {
909                participants,
910                schemes,
911                ..
912            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
913            let mut oracle =
914                setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
915                    .await;
916
917            let me = participants[0].clone();
918
919            let setup = StandardHarness::setup_validator(
920                context.with_label("validator_0"),
921                &mut oracle,
922                me.clone(),
923                ConstantProvider::new(schemes[0].clone()),
924            )
925            .await;
926            let marshal = setup.mailbox;
927
928            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
929            let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
930
931            let mut marshaled = Deferred::new(
932                context.clone(),
933                mock_app,
934                marshal.clone(),
935                FixedEpocher::new(BLOCKS_PER_EPOCH),
936            );
937
938            // Create parent block at height 1 so the commitment is well-formed.
939            let parent_ctx = Ctx {
940                round: Round::new(Epoch::zero(), View::new(1)),
941                leader: default_leader(),
942                parent: (View::zero(), genesis.digest()),
943            };
944            let parent = B::new::<Sha256>(parent_ctx, genesis.digest(), Height::new(1), 100);
945            let parent_commitment = parent.digest();
946            marshal
947                .clone()
948                .proposed(Round::new(Epoch::zero(), View::new(1)), parent.clone())
949                .await;
950
951            // Build a block with context A (embedded in the block).
952            let round_a = Round::new(Epoch::zero(), View::new(2));
953            let context_a = Ctx {
954                round: round_a,
955                leader: me.clone(),
956                parent: (View::new(1), parent_commitment),
957            };
958            let block_a = B::new::<Sha256>(context_a, parent.digest(), Height::new(2), 200);
959            let commitment_a = block_a.digest();
960            marshal.clone().proposed(round_a, block_a).await;
961
962            context.sleep(Duration::from_millis(10)).await;
963
964            // Verify using a different consensus context B (hash mismatch).
965            let round_b = Round::new(Epoch::zero(), View::new(3));
966            let context_b = Ctx {
967                round: round_b,
968                leader: participants[1].clone(),
969                parent: (View::new(1), parent_commitment),
970            };
971
972            let verify_rx = marshaled.verify(context_b, commitment_a).await;
973            select! {
974                result = verify_rx => {
975                    assert!(
976                        !result.unwrap(),
977                        "mismatched context hash should be rejected"
978                    );
979                },
980                _ = context.sleep(Duration::from_secs(5)) => {
981                    panic!("verify should reject mismatched context hash promptly");
982                },
983            }
984        })
985    }
986}