Skip to main content

commonware_consensus/marshal/standard/
deferred.rs

1//! Wrapper for consensus applications that handles epochs and block dissemination.
2//!
3//! # Overview
4//!
5//! [`Deferred`] is an adapter that wraps any [`Application`] implementation to handle
6//! epoch transitions automatically. It intercepts consensus operations (propose, verify) and
7//! ensures blocks are only produced within valid epoch boundaries.
8//!
9//! # Epoch Boundaries
10//!
11//! When the parent is the last block in an epoch (as determined by the [`Epocher`]), this wrapper
12//! re-proposes that boundary block instead of building a new block. This avoids producing blocks
13//! that would be pruned by the epoch transition.
14//!
15//! # Deferred Verification
16//!
17//! Before casting a notarize vote, [`Deferred`] waits for the block to become available and
18//! then verifies that the block's embedded context matches the consensus context. However, it does not
19//! wait for the application to finish verifying the block contents before voting. This enables verification
20//! to run while we wait for a quorum of votes to form a certificate (hiding verification latency behind network
21//! latency). Once a certificate is formed, we wait on the verification result in [`CertifiableAutomaton::certify`]
22//! before voting to finalize (ensuring no invalid blocks are admitted to the canonical chain).
23//!
24//! # Usage
25//!
26//! Wrap your [`Application`] implementation with [`Deferred::new`] and provide it to your
27//! consensus engine for the [`Automaton`] and [`Relay`]. The wrapper handles all epoch logic transparently.
28//!
29//! ```rust,ignore
30//! let application = Deferred::new(
31//!     context,
32//!     my_application,
33//!     marshal_mailbox,
34//!     epocher,
35//! );
36//! ```
37//!
38//! # Implementation Notes
39//!
40//! - Genesis blocks are handled specially: epoch 0 returns the application's genesis block,
41//!   while subsequent epochs use the last block of the previous epoch as genesis
42//! - Blocks are automatically verified to be within the current epoch
43//!
44//! # Notarization and Data Availability
45//!
46//! In rare crash cases, it is possible for a notarization certificate to exist without a block being
47//! available to the honest parties if [`CertifiableAutomaton::certify`] fails after a notarization is
48//! formed.
49//!
50//! For this reason, it should not be expected that every notarized payload will be certifiable due
51//! to the lack of an available block. However, if even one honest and online party has the block,
52//! they will attempt to forward it to others via marshal's resolver.
53//!
54//! ```text
55//!                                      ┌───────────────────────────────────────────────────┐
56//!                                      ▼                                                   │
57//! ┌─────────────────────┐   ┌─────────────────────┐   ┌─────────────────────┐   ┌─────────────────────┐
58//! │          B1         │◀──│          B2         │◀──│          B3         │XXX│          B4         │
59//! └─────────────────────┘   └─────────────────────┘   └──────────┬──────────┘   └─────────────────────┘
60//!                                                                │
61//!                                                          Failed Certify
62//! ```
63//!
64//! # Future Work
65//!
66//! - To further reduce view latency, a participant could optimistically vote for a block prior to
67//!   observing its availability during [`Automaton::verify`]. However, this would require updating
68//!   other components (like [`crate::marshal`]) to handle backfill where notarization does not imply
69//!   a block is fetchable (without modification, a malicious leader that withholds blocks during propose
70//!   could get an honest node to exhaust their network rate limit fetching things that don't exist rather
71//!   than blocks they need AND can fetch).
72
73use crate::{
74    marshal::{
75        application::{
76            validation::{is_inferred_reproposal_at_certify, Stage},
77            verification_tasks::VerificationTasks,
78        },
79        core::{CommitmentFallback, DigestFallback, Mailbox},
80        standard::{
81            validation::{precheck_epoch_and_reproposal, verify_with_parent, Decision},
82            Standard,
83        },
84        Update,
85    },
86    simplex::{types::Context, Plan},
87    types::{Epocher, Round},
88    Application, Automaton, CertifiableAutomaton, CertifiableBlock, Epochable, Relay, Reporter,
89};
90use commonware_actor::Feedback;
91use commonware_cryptography::{certificate::Scheme, Digestible};
92use commonware_macros::select;
93use commonware_p2p::Recipients;
94use commonware_runtime::{
95    telemetry::metrics::{
96        histogram::{Buckets, Timed},
97        MetricsExt as _,
98    },
99    Clock, Metrics, Spawner,
100};
101use commonware_utils::{
102    channel::{fallible::OneshotExt, oneshot},
103    sync::AsyncMutex,
104};
105use rand::Rng;
106use std::sync::Arc;
107use tracing::debug;
108
109/// An [`Application`] adapter that handles epoch transitions and validates block ancestry.
110///
111/// This wrapper intercepts consensus operations to enforce epoch boundaries and validate
112/// block ancestry. It prevents blocks from being produced outside their valid epoch,
113/// handles the special case of re-proposing boundary blocks at epoch boundaries,
114/// and ensures all blocks have valid parent linkage and contiguous heights.
115///
116/// # Ancestry Validation
117///
118/// Applications wrapped by [`Deferred`] can rely on the following ancestry checks being
119/// performed automatically during verification:
120/// - Parent digest matches the consensus context's expected parent
121/// - Block height is exactly one greater than the parent's height
122///
123/// Verifying only the immediate parent is sufficient since the parent itself must have
124/// been notarized by consensus, which guarantees it was verified and accepted by a quorum.
125/// This means the entire ancestry chain back to genesis is transitively validated.
126///
127/// Applications do not need to re-implement these checks in their own verification logic.
128///
129/// # Context Recovery
130///
131/// With deferred verification, validators wait for data availability (DA) and verify the context
132/// before voting. If a validator crashes after voting but before certification, they lose their in-memory
133/// verification task. When recovering, validators extract context from a [`CertifiableBlock`].
134///
135/// _This embedded context is trustworthy because the notarizing quorum (which contains at least f+1 honest
136/// validators) verified that the block's context matched the consensus context before voting._
137pub struct Deferred<E, S, A, B, ES>
138where
139    E: Rng + Spawner + Metrics + Clock,
140    S: Scheme,
141    A: Application<E>,
142    B: CertifiableBlock,
143    ES: Epocher,
144{
145    context: Arc<AsyncMutex<E>>,
146    application: A,
147    marshal: Mailbox<S, Standard<B>>,
148    epocher: ES,
149    verification_tasks: VerificationTasks<<B as Digestible>::Digest>,
150
151    build_duration: Timed,
152    proposal_parent_fetch_duration: Timed,
153    ancestor_fetch_duration: Timed,
154}
155
156impl<E, S, A, B, ES> Clone for Deferred<E, S, A, B, ES>
157where
158    E: Rng + Spawner + Metrics + Clock,
159    S: Scheme,
160    A: Application<E>,
161    B: CertifiableBlock,
162    ES: Epocher,
163{
164    fn clone(&self) -> Self {
165        Self {
166            context: self.context.clone(),
167            application: self.application.clone(),
168            marshal: self.marshal.clone(),
169            epocher: self.epocher.clone(),
170            verification_tasks: self.verification_tasks.clone(),
171            build_duration: self.build_duration.clone(),
172            proposal_parent_fetch_duration: self.proposal_parent_fetch_duration.clone(),
173            ancestor_fetch_duration: self.ancestor_fetch_duration.clone(),
174        }
175    }
176}
177
178impl<E, S, A, B, ES> Deferred<E, S, A, B, ES>
179where
180    E: Rng + Spawner + Metrics + Clock,
181    S: Scheme,
182    A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
183    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
184    ES: Epocher,
185{
186    /// Creates a new [`Deferred`] wrapper.
187    pub fn new(context: E, application: A, marshal: Mailbox<S, Standard<B>>, epocher: ES) -> Self {
188        let build_histogram = context.histogram(
189            "build_duration",
190            "Histogram of time taken for the application to build a new block, in seconds",
191            Buckets::LOCAL,
192        );
193        let build_duration = Timed::new(build_histogram);
194        let parent_fetch_histogram = context.histogram(
195            "parent_fetch_duration",
196            "Histogram of time taken to fetch a parent block in propose, in seconds",
197            Buckets::LOCAL,
198        );
199        let proposal_parent_fetch_duration = Timed::new(parent_fetch_histogram);
200        let ancestor_fetch_histogram = context.histogram(
201            "ancestor_fetch_duration",
202            "Histogram of time taken to fetch a block via the ancestry stream, in seconds",
203            Buckets::LOCAL,
204        );
205        let ancestor_fetch_duration = Timed::new(ancestor_fetch_histogram);
206
207        Self {
208            context: Arc::new(AsyncMutex::new(context)),
209            application,
210            marshal,
211            epocher,
212            verification_tasks: VerificationTasks::new(),
213
214            build_duration,
215            proposal_parent_fetch_duration,
216            ancestor_fetch_duration,
217        }
218    }
219
220    /// Verifies a proposed block's application-level validity.
221    ///
222    /// This method validates that:
223    /// 1. The block's parent digest matches the expected parent
224    /// 2. The block's height is exactly one greater than the parent's height
225    /// 3. The underlying application's verification logic passes
226    ///
227    /// Verification is spawned in a background task and returns a receiver that will contain
228    /// the verification result. Valid blocks are reported to the marshal as verified.
229    #[inline]
230    async fn deferred_verify(
231        &mut self,
232        context: <Self as Automaton>::Context,
233        block: B,
234        stage: Stage,
235    ) -> oneshot::Receiver<bool> {
236        let mut marshal = self.marshal.clone();
237        let mut application = self.application.clone();
238        let (mut tx, rx) = oneshot::channel();
239        let ancestor_fetch_duration = self.ancestor_fetch_duration.clone();
240        let runtime_context = self
241            .context
242            .lock()
243            .await
244            .child("deferred_verify")
245            .with_attribute("round", context.round);
246        runtime_context.spawn(move |runtime_context| async move {
247            // Shared non-reproposal verification:
248            // - fetch parent (using trusted round fallback from consensus context)
249            // - validate standard ancestry invariants
250            // - run application verification over ancestry
251            //
252            // The helper preserves the prior early-exit behavior and returns
253            // `None` when work should stop (for example receiver dropped or
254            // parent unavailable).
255            let application_valid = match verify_with_parent(
256                runtime_context,
257                context,
258                block,
259                &mut application,
260                &mut marshal,
261                &mut tx,
262                stage,
263                ancestor_fetch_duration,
264            )
265            .await
266            {
267                Some(valid) => valid,
268                None => return,
269            };
270            tx.send_lossy(application_valid);
271        });
272
273        rx
274    }
275
276    async fn certify_from_embedded_context(
277        &mut self,
278        round: Round,
279        digest: B::Digest,
280    ) -> oneshot::Receiver<bool> {
281        // No in-progress task means we never verified this proposal locally. We can use the
282        // block's embedded context to help complete finalization when Byzantine validators
283        // withhold their finalize votes. If a Byzantine proposer embedded a malicious context,
284        // the f+1 honest validators from the notarizing quorum will verify against the proper
285        // context and reject the mismatch, preventing a 2f+1 finalization quorum.
286        //
287        // We must fetch here rather than only wait for local broadcast delivery. A Byzantine
288        // leader can send a proposal to just f+1 honest validators, collect enough honest
289        // notarize votes to form a notarization, and leave the remaining honest validators
290        // without the block. Those validators need the notarized round to recover the block
291        // and certify; otherwise they can remain stuck if the Byzantine validators stop
292        // participating in the next view.
293        //
294        // Subscribe to the block and verify using its embedded context once available.
295        debug!(
296            ?round,
297            ?digest,
298            "subscribing to block for certification using embedded context"
299        );
300        let block_rx = self
301            .marshal
302            .subscribe_by_digest(digest, DigestFallback::FetchByRound { round });
303        let mut marshaled = self.clone();
304        let epocher = self.epocher.clone();
305        let (mut tx, rx) = oneshot::channel();
306        let context = self
307            .context
308            .lock()
309            .await
310            .child("certify")
311            .with_attribute("round", round);
312        context.spawn(move |_| async move {
313            let block = select! {
314                _ = tx.closed() => {
315                    debug!(
316                        reason = "consensus dropped receiver",
317                        "skipping certification"
318                    );
319                    return;
320                },
321                result = block_rx => match result {
322                    Ok(block) => block,
323                    Err(_) => {
324                        debug!(
325                            ?digest,
326                            reason = "failed to fetch block for certification",
327                            "skipping certification"
328                        );
329                        return;
330                    }
331                },
332            };
333
334            // Re-proposal detection for certify path: we don't have the consensus context,
335            // only the block's embedded context from original proposal. Infer re-proposal from:
336            // 1. Block is at epoch boundary (only boundary blocks can be re-proposed)
337            // 2. Certification round's view > embedded context's view (re-proposals retain their
338            //    original embedded context, so a later view indicates the block was re-proposed)
339            // 3. Same epoch (re-proposals don't cross epoch boundaries)
340            let embedded_context = block.context();
341            let is_reproposal = is_inferred_reproposal_at_certify(
342                &epocher,
343                block.height(),
344                embedded_context.round,
345                round,
346            );
347            if is_reproposal {
348                // Certifier holds a notarization for this block, so route
349                // the write to the notarized cache. `certified` is
350                // idempotent, so crash-recovery double-invocation is safe.
351                if !marshaled.marshal.certified(round, block).await {
352                    debug!(?round, "marshal unable to accept block");
353                    return;
354                }
355                tx.send_lossy(true);
356                return;
357            }
358
359            let verify_rx = marshaled
360                .deferred_verify(embedded_context, block, Stage::Certified)
361                .await;
362            if let Ok(result) = verify_rx.await {
363                tx.send_lossy(result);
364            }
365        });
366        rx
367    }
368
369    async fn certify_from_existing_task(
370        &mut self,
371        round: Round,
372        digest: B::Digest,
373        task: oneshot::Receiver<bool>,
374    ) -> oneshot::Receiver<bool> {
375        // `verify()` waits only on local broadcast delivery; nudge a
376        // round-bound notarized fetch so the existing waiter can be
377        // unblocked if local broadcast never arrives. For the standard
378        // variant, the digest is also the variant commitment.
379        self.marshal.hint_notarized(round, digest);
380
381        let mut marshaled = self.clone();
382        let (mut tx, rx) = oneshot::channel();
383        let context = self
384            .context
385            .lock()
386            .await
387            .child("certify_existing")
388            .with_attribute("round", round);
389        context.spawn(move |_| async move {
390            let result = select! {
391                _ = tx.closed() => {
392                    debug!(
393                        reason = "consensus dropped receiver",
394                        "skipping certification"
395                    );
396                    return;
397                },
398                result = task => result,
399            };
400            match result {
401                Ok(result) => {
402                    tx.send_lossy(result);
403                }
404                Err(_) => {
405                    debug!(
406                        ?round,
407                        ?digest,
408                        "verification task closed before certification, falling back to embedded context"
409                    );
410                    let fallback = marshaled.certify_from_embedded_context(round, digest).await;
411                    let result = select! {
412                        _ = tx.closed() => {
413                            debug!(
414                                reason = "consensus dropped receiver",
415                                "skipping certification"
416                            );
417                            return;
418                        },
419                        result = fallback => result,
420                    };
421                    if let Ok(result) = result {
422                        tx.send_lossy(result);
423                    }
424                }
425            }
426        });
427        rx
428    }
429}
430
431impl<E, S, A, B, ES> Automaton for Deferred<E, S, A, B, ES>
432where
433    E: Rng + Spawner + Metrics + Clock,
434    S: Scheme,
435    A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
436    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
437    ES: Epocher,
438{
439    type Digest = B::Digest;
440    type Context = Context<Self::Digest, S::PublicKey>;
441
442    /// Proposes a new block or re-proposes the epoch boundary block.
443    ///
444    /// This method builds a new block from the underlying application unless the parent block
445    /// is the last block in the current epoch. When at an epoch boundary, it re-proposes the
446    /// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
447    ///
448    /// The proposal operation is spawned in a background task and returns a receiver that will
449    /// contain the proposed block's digest when ready. The built block is persisted via
450    /// [`Mailbox::verified`] before the digest is delivered, so consensus can rely on the
451    /// block surviving restart.
452    async fn propose(
453        &mut self,
454        consensus_context: Context<Self::Digest, S::PublicKey>,
455    ) -> oneshot::Receiver<Self::Digest> {
456        let marshal = self.marshal.clone();
457        let mut application = self.application.clone();
458        let epocher = self.epocher.clone();
459
460        // Metrics
461        let build_duration = self.build_duration.clone();
462        let proposal_parent_fetch_duration = self.proposal_parent_fetch_duration.clone();
463        let ancestor_fetch_duration = self.ancestor_fetch_duration.clone();
464
465        let (mut tx, rx) = oneshot::channel();
466        let context = self
467            .context
468            .lock()
469            .await
470            .child("propose")
471            .with_attribute("round", consensus_context.round);
472        context.spawn(move |runtime_context| async move {
473            // On leader recovery, marshal may already hold a verified block
474            // for this round (persisted by a pre-crash propose whose
475            // notarize vote never reached the journal).
476            //
477            // Building a fresh block would land on the same prunable archive
478            // index and be silently dropped, so the stored block is the only proposal
479            // we can broadcast for this round.
480            //
481            // The recovered block is safe to reuse only if its embedded
482            // context matches the context simplex just recovered. Otherwise the
483            // cached block was built against a different parent and cannot be
484            // broadcast under the current header, so drop the receiver
485            // and let the voter nullify the view via timeout.
486            if let Some(block) = marshal.get_verified(consensus_context.round).await {
487                let block_context = block.context();
488                if block_context != consensus_context {
489                    debug!(
490                        round = ?consensus_context.round,
491                        ?consensus_context,
492                        ?block_context,
493                        "skipping proposal: cached verified block context no longer matches"
494                    );
495                    return;
496                }
497                let digest = block.digest();
498                let success = tx.send_lossy(digest);
499                debug!(
500                    round = ?consensus_context.round,
501                    ?digest,
502                    success,
503                    "reused verified block from marshal on leader recovery"
504                );
505                return;
506            }
507
508            // The parent for any consensus context is in the same epoch: the
509            // boundary block of the previous epoch is the genesis block of the
510            // current epoch.
511            //
512            // Proposal context carries the certified parent view/commitment but
513            // not the parent height. The parent may be certified above the
514            // finalized tip, so this must stay round-bound until the block is
515            // returned.
516            let (parent_view, parent_commitment) = consensus_context.parent;
517            let parent_request = marshal.subscribe_by_commitment(
518                parent_commitment,
519                CommitmentFallback::FetchByRound {
520                    round: Round::new(consensus_context.epoch(), parent_view),
521                },
522            );
523
524            let parent_timer = proposal_parent_fetch_duration.timer(&runtime_context);
525            let parent = select! {
526                _ = tx.closed() => {
527                    debug!(reason = "consensus dropped receiver", "skipping proposal");
528                    return;
529                },
530                result = parent_request => match result {
531                    Ok(parent) => parent,
532                    Err(_) => {
533                        debug!(
534                            ?parent_commitment,
535                            reason = "failed to fetch parent block",
536                            "skipping proposal"
537                        );
538                        return;
539                    }
540                },
541            };
542            parent_timer.observe(&runtime_context);
543
544            // Special case: If the parent block is the last block in the epoch,
545            // re-propose it as to not produce any blocks that will be cut out
546            // by the epoch transition.
547            let last_in_epoch = epocher
548                .last(consensus_context.epoch())
549                .expect("current epoch should exist");
550            if parent.height() == last_in_epoch {
551                let digest = parent.digest();
552                if !marshal.verified(consensus_context.round, parent).await {
553                    debug!(
554                        round = ?consensus_context.round,
555                        ?digest,
556                        "marshal rejected re-proposed boundary block"
557                    );
558                    return;
559                }
560                let success = tx.send_lossy(digest);
561                debug!(
562                    round = ?consensus_context.round,
563                    ?digest,
564                    success,
565                    "re-proposed parent block at epoch boundary"
566                );
567                return;
568            }
569
570            let ancestor_stream = marshal.ancestor_stream(
571                Arc::new(runtime_context.child("ancestor_stream")),
572                [parent],
573                ancestor_fetch_duration,
574            );
575            let build_request = application.propose(
576                (
577                    runtime_context.child("app_propose"),
578                    consensus_context.clone(),
579                ),
580                ancestor_stream,
581            );
582
583            let build_timer = build_duration.timer(&runtime_context);
584            let built_block = select! {
585                _ = tx.closed() => {
586                    debug!(reason = "consensus dropped receiver", "skipping proposal");
587                    return;
588                },
589                result = build_request => match result {
590                    Some(block) => block,
591                    None => {
592                        debug!(
593                            ?parent_commitment,
594                            reason = "block building failed",
595                            "skipping proposal"
596                        );
597                        return;
598                    }
599                },
600            };
601            build_timer.observe(&runtime_context);
602
603            let digest = built_block.digest();
604            if !marshal.proposed(consensus_context.round, built_block).await {
605                debug!(
606                    round = ?consensus_context.round,
607                    ?digest,
608                    "marshal rejected proposed block"
609                );
610                return;
611            }
612            let success = tx.send_lossy(digest);
613            debug!(
614                round = ?consensus_context.round,
615                ?digest,
616                success,
617                "proposed new block"
618            );
619        });
620        rx
621    }
622
623    async fn verify(
624        &mut self,
625        context: Context<Self::Digest, S::PublicKey>,
626        digest: Self::Digest,
627    ) -> oneshot::Receiver<bool> {
628        let mut marshal = self.marshal.clone();
629        let mut marshaled = self.clone();
630        let round = context.round;
631
632        // Register the verification task synchronously so `certify` finds a pending
633        // entry even while the optimistic block subscription is still waiting locally.
634        // This lets `certify` take the task and bump a round-bound notarized fetch
635        // via `hint_notarized`.
636        let (task_tx, task_rx) = oneshot::channel();
637        self.verification_tasks.insert(round, digest, task_rx);
638
639        let (mut tx, rx) = oneshot::channel();
640        let runtime_context = self
641            .context
642            .lock()
643            .await
644            .child("optimistic_verify")
645            .with_attribute("round", round);
646        runtime_context.spawn(move |_| async move {
647                let block_request = marshal.subscribe_by_digest(digest, DigestFallback::Wait);
648                let block = select! {
649                    _ = tx.closed() => {
650                        debug!(
651                            reason = "consensus dropped receiver",
652                            "skipping optimistic verification"
653                        );
654                        return;
655                    },
656                    result = block_request => match result {
657                        Ok(block) => block,
658                        Err(_) => {
659                            debug!(
660                                ?digest,
661                                reason = "failed to fetch block for optimistic verification",
662                                "skipping optimistic verification"
663                            );
664                            return;
665                        }
666                    },
667                };
668
669                // Shared pre-checks enforce:
670                // - Block epoch membership.
671                // - Re-proposal detection via `digest == context.parent.1`.
672                //
673                // Re-proposals return early and skip normal parent/height checks
674                // because they were already verified when originally proposed and
675                // parent-child checks would fail by construction when parent == block.
676                let Some(decision) = precheck_epoch_and_reproposal(
677                    &marshaled.epocher,
678                    &mut marshal,
679                    &context,
680                    digest,
681                    block,
682                )
683                .await
684                else {
685                    return;
686                };
687                let block = match decision {
688                    Decision::Complete(valid) => {
689                        // `Complete` means either immediate rejection or successful
690                        // re-proposal handling with no further ancestry validation.
691                        task_tx.send_lossy(valid);
692                        tx.send_lossy(valid);
693                        return;
694                    }
695                    Decision::Continue(block) => block,
696                };
697
698                // Before casting a notarize vote, ensure the block's embedded context matches
699                // the consensus context.
700                //
701                // This is a critical step - the notarize quorum is guaranteed to have at least
702                // f+1 honest validators who will verify against this context, preventing a Byzantine
703                // proposer from embedding a malicious context. The other f honest validators who did
704                // not vote will later use the block-embedded context to help finalize if Byzantine
705                // validators withhold their finalize votes.
706                if block.context() != context {
707                    debug!(
708                        ?context,
709                        block_context = ?block.context(),
710                        "block-embedded context does not match consensus context during optimistic verification"
711                    );
712                    task_tx.send_lossy(false);
713                    tx.send_lossy(false);
714                    return;
715                }
716
717                // Optimistic verify returns immediately; the deferred_verify task
718                // runs in the background and forwards its final verdict to
719                // `task_tx` so `certify` observes the same result via the
720                // synchronously-registered `task_rx`.
721                let deferred_rx = marshaled
722                    .deferred_verify(context, block, Stage::Verified)
723                    .await;
724                tx.send_lossy(true);
725                if let Ok(result) = deferred_rx.await {
726                    task_tx.send_lossy(result);
727                }
728        });
729        rx
730    }
731}
732
733impl<E, S, A, B, ES> CertifiableAutomaton for Deferred<E, S, A, B, ES>
734where
735    E: Rng + Spawner + Metrics + Clock,
736    S: Scheme,
737    A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
738    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
739    ES: Epocher,
740{
741    async fn certify(&mut self, round: Round, digest: Self::Digest) -> oneshot::Receiver<bool> {
742        // Attempt to retrieve the existing verification task for this (round, payload).
743        let task = self.verification_tasks.take(round, digest);
744        if let Some(task) = task {
745            return self.certify_from_existing_task(round, digest, task).await;
746        }
747
748        self.certify_from_embedded_context(round, digest).await
749    }
750}
751
752impl<E, S, A, B, ES> Relay for Deferred<E, S, A, B, ES>
753where
754    E: Rng + Spawner + Metrics + Clock,
755    S: Scheme,
756    A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>,
757    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
758    ES: Epocher,
759{
760    type Digest = B::Digest;
761    type PublicKey = S::PublicKey;
762    type Plan = Plan<S::PublicKey>;
763
764    fn broadcast(&mut self, commitment: Self::Digest, plan: Plan<S::PublicKey>) -> Feedback {
765        let (round, recipients) = match plan {
766            Plan::Propose { round } => (round, Recipients::All),
767            Plan::Forward { round, recipients } => (round, recipients),
768        };
769        self.marshal.forward(round, commitment, recipients)
770    }
771}
772
773impl<E, S, A, B, ES> Reporter for Deferred<E, S, A, B, ES>
774where
775    E: Rng + Spawner + Metrics + Clock,
776    S: Scheme,
777    A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>
778        + Reporter<Activity = Update<B>>,
779    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
780    ES: Epocher,
781{
782    type Activity = A::Activity;
783
784    /// Relays a report to the underlying [`Application`] and cleans up old verification tasks.
785    fn report(&mut self, update: Self::Activity) -> Feedback {
786        // Clean up verification tasks for rounds <= the finalized round.
787        if let Update::Tip(round, _, _) = &update {
788            self.verification_tasks.retain_after(round);
789        }
790        self.application.report(update)
791    }
792}
793
794#[cfg(test)]
795mod tests {
796    use super::Deferred;
797    use crate::{
798        marshal::mocks::{
799            harness::{
800                default_leader, make_raw_block, setup_network_with_participants, Ctx,
801                StandardHarness, TestHarness, B, BLOCKS_PER_EPOCH, NAMESPACE, NUM_VALIDATORS, S, V,
802            },
803            verifying::{GatedVerifyingApp, MockVerifyingApp},
804        },
805        simplex::scheme::bls12381_threshold::vrf as bls12381_threshold_vrf,
806        types::{Epoch, Epocher, FixedEpocher, Height, Round, View},
807        Automaton, CertifiableAutomaton,
808    };
809    use commonware_broadcast::Broadcaster;
810    use commonware_cryptography::{
811        certificate::{mocks::Fixture, ConstantProvider},
812        sha256::Sha256,
813        Digestible, Hasher as _,
814    };
815    use commonware_macros::{select, test_traced};
816    use commonware_runtime::{deterministic, Clock, Runner, Supervisor as _};
817    use commonware_utils::{channel::fallible::OneshotExt, NZUsize};
818    use std::time::Duration;
819
820    #[test_traced("INFO")]
821    fn test_certify_lower_view_after_higher_view() {
822        let runner = deterministic::Runner::timed(Duration::from_secs(60));
823        runner.start(|mut context| async move {
824            let Fixture {
825                participants,
826                schemes,
827                ..
828            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
829            let mut oracle = setup_network_with_participants(
830                context.child("network"),
831                NZUsize!(1),
832                participants.clone(),
833            )
834            .await;
835
836            let me = participants[0].clone();
837
838            let setup = StandardHarness::setup_validator(
839                context.child("validator").with_attribute("index", 0),
840                &mut oracle,
841                me.clone(),
842                ConstantProvider::new(schemes[0].clone()),
843            )
844            .await;
845            let marshal = setup.mailbox;
846
847            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
848            let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
849
850            let mut marshaled = Deferred::new(
851                context.child("deferred"),
852                mock_app,
853                marshal.clone(),
854                FixedEpocher::new(BLOCKS_PER_EPOCH),
855            );
856
857            // Create parent block at height 1
858            let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
859            let parent_digest = parent.digest();
860            assert!(
861                marshal
862                    .verified(Round::new(Epoch::new(0), View::new(1)), parent.clone())
863                    .await
864            );
865
866            // Block A at view 5 (height 2)
867            let round_a = Round::new(Epoch::new(0), View::new(5));
868            let context_a = Ctx {
869                round: round_a,
870                leader: me.clone(),
871                parent: (View::new(1), parent_digest),
872            };
873            let block_a = B::new::<Sha256>(context_a.clone(), parent_digest, Height::new(2), 200);
874            let commitment_a = StandardHarness::commitment(&block_a);
875            assert!(marshal.verified(round_a, block_a.clone()).await);
876
877            // Block B at view 10 (height 2, different block same height)
878            let round_b = Round::new(Epoch::new(0), View::new(10));
879            let context_b = Ctx {
880                round: round_b,
881                leader: me.clone(),
882                parent: (View::new(1), parent_digest),
883            };
884            let block_b = B::new::<Sha256>(context_b.clone(), parent_digest, Height::new(2), 300);
885            let commitment_b = StandardHarness::commitment(&block_b);
886            assert!(marshal.verified(round_b, block_b.clone()).await);
887
888            context.sleep(Duration::from_millis(10)).await;
889
890            // Step 1: Verify block A at view 5
891            let _ = marshaled.verify(context_a, commitment_a).await.await;
892
893            // Step 2: Verify block B at view 10
894            let _ = marshaled.verify(context_b, commitment_b).await.await;
895
896            // Step 3: Certify block B at view 10 FIRST
897            let certify_b = marshaled.certify(round_b, commitment_b).await;
898            assert!(
899                certify_b.await.unwrap(),
900                "Block B certification should succeed"
901            );
902
903            // Step 4: Certify block A at view 5 - should succeed
904            let certify_a = marshaled.certify(round_a, commitment_a).await;
905
906            select! {
907                result = certify_a => {
908                    assert!(result.unwrap(), "Block A certification should succeed");
909                },
910                _ = context.sleep(Duration::from_secs(5)) => {
911                    panic!("Block A certification timed out");
912                },
913            }
914        })
915    }
916
917    #[test_traced("WARN")]
918    fn test_marshaled_rejects_unsupported_epoch() {
919        #[derive(Clone)]
920        struct LimitedEpocher {
921            inner: FixedEpocher,
922            max_epoch: u64,
923        }
924
925        impl Epocher for LimitedEpocher {
926            fn containing(&self, height: Height) -> Option<crate::types::EpochInfo> {
927                let bounds = self.inner.containing(height)?;
928                if bounds.epoch().get() > self.max_epoch {
929                    None
930                } else {
931                    Some(bounds)
932                }
933            }
934
935            fn first(&self, epoch: Epoch) -> Option<Height> {
936                if epoch.get() > self.max_epoch {
937                    None
938                } else {
939                    self.inner.first(epoch)
940                }
941            }
942
943            fn last(&self, epoch: Epoch) -> Option<Height> {
944                if epoch.get() > self.max_epoch {
945                    None
946                } else {
947                    self.inner.last(epoch)
948                }
949            }
950        }
951
952        let runner = deterministic::Runner::timed(Duration::from_secs(60));
953        runner.start(|mut context| async move {
954            let Fixture {
955                participants,
956                schemes,
957                ..
958            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
959            let mut oracle = setup_network_with_participants(
960                context.child("network"),
961                NZUsize!(1),
962                participants.clone(),
963            )
964            .await;
965
966            let me = participants[0].clone();
967
968            let setup = StandardHarness::setup_validator(
969                context.child("validator").with_attribute("index", 0),
970                &mut oracle,
971                me.clone(),
972                ConstantProvider::new(schemes[0].clone()),
973            )
974            .await;
975            let marshal = setup.mailbox;
976
977            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
978            let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
979            let limited_epocher = LimitedEpocher {
980                inner: FixedEpocher::new(BLOCKS_PER_EPOCH),
981                max_epoch: 0,
982            };
983
984            let mut marshaled = Deferred::new(
985                context.child("deferred"),
986                mock_app,
987                marshal.clone(),
988                limited_epocher,
989            );
990
991            // Create a parent block at height 19 (last block in epoch 0, which is supported)
992            let parent_ctx = Ctx {
993                round: Round::new(Epoch::zero(), View::new(19)),
994                leader: default_leader(),
995                parent: (View::zero(), genesis.digest()),
996            };
997            let parent =
998                B::new::<Sha256>(parent_ctx.clone(), genesis.digest(), Height::new(19), 1000);
999            let parent_digest = parent.digest();
1000            assert!(
1001                marshal
1002                    .clone()
1003                    .verified(Round::new(Epoch::zero(), View::new(19)), parent.clone())
1004                    .await
1005            );
1006
1007            // Create a block at height 20 (first block in epoch 1, which is NOT supported)
1008            let unsupported_round = Round::new(Epoch::new(1), View::new(20));
1009            let unsupported_context = Ctx {
1010                round: unsupported_round,
1011                leader: me.clone(),
1012                parent: (View::new(19), parent_digest),
1013            };
1014            let block = B::new::<Sha256>(
1015                unsupported_context.clone(),
1016                parent_digest,
1017                Height::new(20),
1018                2000,
1019            );
1020            let block_commitment = StandardHarness::commitment(&block);
1021            assert!(
1022                marshal
1023                    .clone()
1024                    .verified(unsupported_round, block.clone())
1025                    .await
1026            );
1027
1028            context.sleep(Duration::from_millis(10)).await;
1029
1030            // Call verify and wait for the result (verify returns optimistic result,
1031            // but also spawns deferred verification)
1032            let verify_result = marshaled
1033                .verify(unsupported_context, block_commitment)
1034                .await;
1035            // Wait for optimistic verify to complete so the verification task is registered
1036            let optimistic_result = verify_result.await;
1037
1038            // The optimistic verify should return false because the block is in an unsupported epoch
1039            assert!(
1040                !optimistic_result.unwrap(),
1041                "Optimistic verify should reject block in unsupported epoch"
1042            );
1043        })
1044    }
1045
1046    /// Test that marshaled rejects blocks when consensus context doesn't match block's embedded context.
1047    ///
1048    /// This tests that when verify() is called with a context that doesn't match what's embedded
1049    /// in the block, the verification should fail. A Byzantine proposer could broadcast a block
1050    /// with one embedded context but consensus could call verify() with a different context.
1051    #[test_traced("WARN")]
1052    fn test_marshaled_rejects_mismatched_context() {
1053        let runner = deterministic::Runner::timed(Duration::from_secs(30));
1054        runner.start(|mut context| async move {
1055            let Fixture {
1056                participants,
1057                schemes,
1058                ..
1059            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1060            let mut oracle = setup_network_with_participants(
1061                context.child("network"),
1062                NZUsize!(1),
1063                participants.clone(),
1064            )
1065            .await;
1066
1067            let me = participants[0].clone();
1068
1069            let setup = StandardHarness::setup_validator(
1070                context.child("validator").with_attribute("index", 0),
1071                &mut oracle,
1072                me.clone(),
1073                ConstantProvider::new(schemes[0].clone()),
1074            )
1075            .await;
1076            let marshal = setup.mailbox;
1077
1078            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
1079            let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
1080
1081            let mut marshaled = Deferred::new(
1082                context.child("deferred"),
1083                mock_app,
1084                marshal.clone(),
1085                FixedEpocher::new(BLOCKS_PER_EPOCH),
1086            );
1087
1088            // Create parent block at height 1 so the commitment is well-formed.
1089            let parent_ctx = Ctx {
1090                round: Round::new(Epoch::zero(), View::new(1)),
1091                leader: default_leader(),
1092                parent: (View::zero(), genesis.digest()),
1093            };
1094            let parent = B::new::<Sha256>(parent_ctx, genesis.digest(), Height::new(1), 100);
1095            let parent_commitment = StandardHarness::commitment(&parent);
1096            assert!(
1097                marshal
1098                    .clone()
1099                    .verified(Round::new(Epoch::zero(), View::new(1)), parent.clone())
1100                    .await
1101            );
1102
1103            // Build a block with context A (embedded in the block).
1104            let round_a = Round::new(Epoch::zero(), View::new(2));
1105            let context_a = Ctx {
1106                round: round_a,
1107                leader: me.clone(),
1108                parent: (View::new(1), parent_commitment),
1109            };
1110            let block_a = B::new::<Sha256>(context_a, parent.digest(), Height::new(2), 200);
1111            let commitment_a = StandardHarness::commitment(&block_a);
1112            assert!(marshal.verified(round_a, block_a).await);
1113
1114            context.sleep(Duration::from_millis(10)).await;
1115
1116            // Verify using a different consensus context B (hash mismatch).
1117            let round_b = Round::new(Epoch::zero(), View::new(3));
1118            let context_b = Ctx {
1119                round: round_b,
1120                leader: participants[1].clone(),
1121                parent: (View::new(1), parent_commitment),
1122            };
1123
1124            let verify_rx = marshaled.verify(context_b, commitment_a).await;
1125            select! {
1126                result = verify_rx => {
1127                    assert!(
1128                        !result.unwrap(),
1129                        "mismatched context hash should be rejected"
1130                    );
1131                },
1132                _ = context.sleep(Duration::from_secs(5)) => {
1133                    panic!("verify should reject mismatched context hash promptly");
1134                },
1135            }
1136        })
1137    }
1138
1139    /// Dropping the optimistic verify receiver before the block is available can close the
1140    /// synchronously-registered verification task. `certify` must recover through the
1141    /// embedded-context path instead of returning the closed task to consensus.
1142    #[test_traced("WARN")]
1143    fn test_deferred_certify_recovers_after_verify_receiver_drop() {
1144        let runner = deterministic::Runner::timed(Duration::from_secs(30));
1145        runner.start(|mut context| async move {
1146            let Fixture {
1147                participants,
1148                schemes,
1149                ..
1150            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1151            let mut oracle = setup_network_with_participants(
1152                context.child("network"),
1153                NZUsize!(1),
1154                participants.clone(),
1155            )
1156            .await;
1157
1158            let me = participants[0].clone();
1159            let setup = StandardHarness::setup_validator(
1160                context.child("validator").with_attribute("index", 0),
1161                &mut oracle,
1162                me.clone(),
1163                ConstantProvider::new(schemes[0].clone()),
1164            )
1165            .await;
1166            let marshal = setup.mailbox;
1167
1168            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
1169            let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
1170            let mut marshaled = Deferred::new(
1171                context.child("deferred"),
1172                mock_app,
1173                marshal.clone(),
1174                FixedEpocher::new(BLOCKS_PER_EPOCH),
1175            );
1176
1177            let round = Round::new(Epoch::zero(), View::new(1));
1178            let block_context = Ctx {
1179                round,
1180                leader: me,
1181                parent: (View::zero(), genesis.digest()),
1182            };
1183            let block =
1184                B::new::<Sha256>(block_context.clone(), genesis.digest(), Height::new(1), 100);
1185            let digest = block.digest();
1186
1187            let verify_rx = marshaled.verify(block_context, digest).await;
1188            drop(verify_rx);
1189
1190            // Give the optimistic task a chance to observe the dropped receiver while its
1191            // block subscription is still pending.
1192            context.sleep(Duration::from_millis(10)).await;
1193
1194            assert!(marshal.proposed(round, block).await);
1195            let certify_rx = marshaled.certify(round, digest).await;
1196            select! {
1197                result = certify_rx => {
1198                    assert!(
1199                        result.expect("certify result missing"),
1200                        "certify should recover after verify receiver drop"
1201                    );
1202                },
1203                _ = context.sleep(Duration::from_secs(5)) => {
1204                    panic!("certify should recover promptly after verify drop");
1205                },
1206            }
1207        });
1208    }
1209
1210    /// Regression: `certify` resolving true drives the finalize vote, so it must imply
1211    /// the block is durably persisted. In deferred mode `verify()` spawns the
1212    /// `deferred_verify` background task and `certify()` returns that same receiver; the
1213    /// persistence ack happens inside `verify_with_parent` after `app.verify` returns.
1214    ///
1215    /// The gated app holds `app.verify()` open until the test releases it, so we can
1216    /// abort the marshal actor deterministically after the optimistic path has run but
1217    /// before the persistence-ack path runs. With the ack in place `verified()` returns
1218    /// false once the actor is gone, `verify_with_parent` returns `None`, and the tx is
1219    /// dropped unresolved; we assert the certify receiver errors.
1220    #[test_traced("WARN")]
1221    fn test_deferred_certify_does_not_bypass_failed_verify_persistence() {
1222        let runner = deterministic::Runner::timed(Duration::from_secs(30));
1223        runner.start(|mut context| async move {
1224            let Fixture {
1225                participants,
1226                schemes,
1227                ..
1228            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1229            let mut oracle = setup_network_with_participants(
1230                context.child("network"),
1231                NZUsize!(1),
1232                participants.clone(),
1233            )
1234            .await;
1235
1236            let me = participants[0].clone();
1237
1238            let setup = StandardHarness::setup_validator(
1239                context.child("validator").with_attribute("index", 0),
1240                &mut oracle,
1241                me.clone(),
1242                ConstantProvider::new(schemes[0].clone()),
1243            )
1244            .await;
1245            let marshal = setup.mailbox;
1246            let buffer = setup.extra;
1247            let marshal_actor_handle = setup.actor_handle;
1248
1249            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
1250            let (mock_app, verify_started, release_verify): (GatedVerifyingApp<B, S>, _, _) =
1251                GatedVerifyingApp::new();
1252            let mut marshaled = Deferred::new(
1253                context.child("deferred"),
1254                mock_app,
1255                marshal.clone(),
1256                FixedEpocher::new(BLOCKS_PER_EPOCH),
1257            );
1258
1259            // Seed parent and child via the buffer (in-memory only) so
1260            // `deferred_verify` can fetch them without going through the
1261            // persisted marshal path.
1262            let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
1263            let parent_digest = parent.digest();
1264
1265            let child_round = Round::new(Epoch::zero(), View::new(2));
1266            let child_ctx = Ctx {
1267                round: child_round,
1268                leader: me,
1269                parent: (View::new(1), parent_digest),
1270            };
1271            let child = B::new::<Sha256>(child_ctx.clone(), parent_digest, Height::new(2), 200);
1272            let child_digest = child.digest();
1273
1274            assert!(
1275                buffer
1276                    .broadcast(commonware_p2p::Recipients::Some(vec![]), parent)
1277                    .accepted(),
1278                "buffer broadcast for parent should be accepted"
1279            );
1280            assert!(
1281                buffer
1282                    .broadcast(commonware_p2p::Recipients::Some(vec![]), child)
1283                    .accepted(),
1284                "buffer broadcast for child should be accepted"
1285            );
1286
1287            // Kick off the optimistic verify, which spawns `deferred_verify`.
1288            // Its gated `app.verify` blocks until we release it, giving us a
1289            // deterministic window to abort the marshal actor.
1290            let optimistic_rx = marshaled.verify(child_ctx, child_digest).await;
1291            let result = optimistic_rx
1292                .await
1293                .expect("optimistic verify should resolve");
1294            assert!(
1295                result,
1296                "optimistic verify should accept the available block"
1297            );
1298
1299            let certify_rx = marshaled.certify(child_round, child_digest).await;
1300            verify_started
1301                .await
1302                .expect("verify should reach application before marshal abort");
1303
1304            // Wait for marshal shutdown to complete before releasing `app.verify`.
1305            // This makes the later persistence ack fail deterministically.
1306            marshal_actor_handle.abort();
1307            let _ = marshal_actor_handle.await;
1308            release_verify.send_lossy(());
1309
1310            select! {
1311                result = certify_rx => {
1312                    assert!(
1313                        result.is_err(),
1314                        "certify must not resolve after marshal.verified loses its persistence ack"
1315                    );
1316                },
1317                _ = context.sleep(Duration::from_secs(5)) => {
1318                    panic!("certify should terminate after marshal abort");
1319                },
1320            }
1321        });
1322    }
1323
1324    /// Regression: when marshal holds a verified block for a round from a
1325    /// pre-crash propose, a restarted leader's `propose` must return that
1326    /// block's digest instead of asking the application to build afresh.
1327    /// See `standard::inline::tests::test_propose_reuses_verified_block_on_restart`.
1328    #[test_traced("WARN")]
1329    fn test_propose_reuses_verified_block_on_restart() {
1330        let runner = deterministic::Runner::timed(Duration::from_secs(30));
1331        runner.start(|mut context| async move {
1332            let Fixture {
1333                participants,
1334                schemes,
1335                ..
1336            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1337            let mut oracle = setup_network_with_participants(
1338                context.child("network"),
1339                NZUsize!(1),
1340                participants.clone(),
1341            )
1342            .await;
1343
1344            let me = participants[0].clone();
1345            let setup = StandardHarness::setup_validator(
1346                context.child("validator").with_attribute("index", 0),
1347                &mut oracle,
1348                me.clone(),
1349                ConstantProvider::new(schemes[0].clone()),
1350            )
1351            .await;
1352            let marshal = setup.mailbox;
1353
1354            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
1355            let round = Round::new(Epoch::zero(), View::new(1));
1356            let ctx = Ctx {
1357                round,
1358                leader: me.clone(),
1359                parent: (View::zero(), genesis.digest()),
1360            };
1361            let block_a = B::new::<Sha256>(ctx.clone(), genesis.digest(), Height::new(1), 100);
1362            let digest_a = block_a.digest();
1363            assert!(marshal.verified(round, block_a.clone()).await);
1364
1365            let block_b = B::new::<Sha256>(ctx.clone(), genesis.digest(), Height::new(1), 200);
1366            let digest_b = block_b.digest();
1367            assert_ne!(digest_a, digest_b, "test requires distinct digests");
1368
1369            let mock_app: MockVerifyingApp<B, S> =
1370                MockVerifyingApp::new().with_propose_result(block_b);
1371            let mut marshaled = Deferred::new(
1372                context.child("deferred"),
1373                mock_app,
1374                marshal.clone(),
1375                FixedEpocher::new(BLOCKS_PER_EPOCH),
1376            );
1377
1378            let digest_rx = marshaled.propose(ctx).await;
1379            let digest = digest_rx.await.expect("propose must return a digest");
1380            assert_eq!(
1381                digest, digest_a,
1382                "propose must reuse the block marshal already persisted for this round"
1383            );
1384        });
1385    }
1386
1387    /// Regression: if a pre-crash leader persisted a verified block for a
1388    /// round but the simplex `Notarize` never reached the journal, replay
1389    /// can recover a `consensus_context` whose parent differs from the one
1390    /// the cached block was built against (e.g. a late certification of an
1391    /// older view changes the parent selected by `State::find_parent`).
1392    /// In that case the restarted leader must not broadcast the stale
1393    /// cached block; it must drop the receiver so the voter nullifies the
1394    /// view via `MissingProposal`.
1395    #[test_traced("WARN")]
1396    fn test_propose_skips_when_verified_block_context_changed() {
1397        let runner = deterministic::Runner::timed(Duration::from_secs(30));
1398        runner.start(|mut context| async move {
1399            let Fixture {
1400                participants,
1401                schemes,
1402                ..
1403            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1404            let mut oracle = setup_network_with_participants(
1405                context.child("network"),
1406                NZUsize!(1),
1407                participants.clone(),
1408            )
1409            .await;
1410
1411            let me = participants[0].clone();
1412            let setup = StandardHarness::setup_validator(
1413                context.child("validator").with_attribute("index", 0),
1414                &mut oracle,
1415                me.clone(),
1416                ConstantProvider::new(schemes[0].clone()),
1417            )
1418            .await;
1419            let marshal = setup.mailbox;
1420
1421            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
1422
1423            // Stash a stale block built against genesis as its parent at round V=2.
1424            let round = Round::new(Epoch::zero(), View::new(2));
1425            let stale_ctx = Ctx {
1426                round,
1427                leader: me.clone(),
1428                parent: (View::zero(), genesis.digest()),
1429            };
1430            let stale_block = B::new::<Sha256>(stale_ctx, genesis.digest(), Height::new(1), 100);
1431            assert!(marshal.verified(round, stale_block).await);
1432
1433            // Simulate a replay where parent selection now points to a
1434            // different parent view than the cached block was built for.
1435            let new_parent_digest = Sha256::hash(b"late-certified-parent");
1436            let new_ctx = Ctx {
1437                round,
1438                leader: me.clone(),
1439                parent: (View::new(1), new_parent_digest),
1440            };
1441
1442            let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
1443            let mut marshaled = Deferred::new(
1444                context.child("deferred"),
1445                mock_app,
1446                marshal.clone(),
1447                FixedEpocher::new(BLOCKS_PER_EPOCH),
1448            );
1449
1450            let digest_rx = marshaled.propose(new_ctx).await;
1451            assert!(
1452                digest_rx.await.is_err(),
1453                "propose must drop the receiver when the cached block's context no longer matches"
1454            );
1455        });
1456    }
1457}