Skip to main content

commonware_consensus/application/
marshaled.rs

1//! Wrapper for consensus applications that handles epochs and block dissemination.
2//!
3//! # Overview
4//!
5//! [`Marshaled`] is an adapter that wraps any [`VerifyingApplication`] implementation to handle
6//! epoch transitions automatically. It intercepts consensus operations (propose, verify) and
7//! ensures blocks are only produced within valid epoch boundaries.
8//!
9//! # Epoch Boundaries
10//!
11//! When the parent is the last block in an epoch (as determined by the [`Epocher`]), this wrapper
12//! re-proposes that boundary block instead of building a new block. This avoids producing blocks
13//! that would be pruned by the epoch transition.
14//!
15//! # Deferred Verification
16//!
17//! Before casting a notarize vote, [`Marshaled`] waits for the block to become available and
18//! then verifies that the block's embedded context matches the consensus context. However, it does not
19//! wait for the application to finish verifying the block contents before voting. This enables verification
20//! to run while we wait for a quorum of votes to form a certificate (hiding verification latency behind network
21//! latency). Once a certificate is formed, we wait on the verification result in [`CertifiableAutomaton::certify`]
22//! before voting to finalize (ensuring no invalid blocks are admitted to the canonical chain).
23//!
24//! # Usage
25//!
26//! Wrap your [`Application`] implementation with [`Marshaled::new`] and provide it to your
27//! consensus engine for the [`Automaton`] and [`Relay`]. The wrapper handles all epoch logic transparently.
28//!
29//! ```rust,ignore
30//! let application = Marshaled::new(
31//!     context,
32//!     my_application,
33//!     marshal_mailbox,
34//!     epocher,
35//! );
36//! ```
37//!
38//! # Implementation Notes
39//!
40//! - Genesis blocks are handled specially: epoch 0 returns the application's genesis block,
41//!   while subsequent epochs use the last block of the previous epoch as genesis
42//! - Blocks are automatically verified to be within the current epoch
43//!
44//! # Future Work
45//!
46//! - To further reduce view latency, a participant could optimistically vote for a block prior to
47//!   observing its availability during [`Automaton::verify`]. However, this would require updating
48//!   other components (like [`crate::marshal`]) to handle backfill where notarization does not imply
49//!   a block is fetchable (without modification, a malicious leader that withholds blocks during propose
50//!   could get an honest node to exhaust their network rate limit fetching things that don't exist rather
51//!   than blocks they need AND can fetch).
52
53use crate::{
54    marshal::{self, ingress::mailbox::AncestorStream, Update},
55    simplex::types::Context,
56    types::{Epoch, Epocher, Height, Round},
57    Application, Automaton, Block, CertifiableAutomaton, CertifiableBlock, Epochable, Relay,
58    Reporter, VerifyingApplication,
59};
60use commonware_cryptography::{certificate::Scheme, Committable};
61use commonware_macros::select;
62use commonware_runtime::{telemetry::metrics::status::GaugeExt, Clock, Metrics, Spawner};
63use commonware_utils::channel::{
64    fallible::OneshotExt,
65    oneshot::{self, error::RecvError},
66};
67use futures::{
68    future::{ready, Either, Ready},
69    lock::Mutex,
70};
71use prometheus_client::metrics::gauge::Gauge;
72use rand::Rng;
73use std::{collections::HashMap, sync::Arc, time::Instant};
74use tracing::{debug, warn};
75
76type TasksMap<B> = HashMap<(Round, <B as Committable>::Commitment), oneshot::Receiver<bool>>;
77
78/// An [`Application`] adapter that handles epoch transitions and validates block ancestry.
79///
80/// This wrapper intercepts consensus operations to enforce epoch boundaries and validate
81/// block ancestry. It prevents blocks from being produced outside their valid epoch,
82/// handles the special case of re-proposing boundary blocks at epoch boundaries,
83/// and ensures all blocks have valid parent linkage and contiguous heights.
84///
85/// # Ancestry Validation
86///
87/// Applications wrapped by [`Marshaled`] can rely on the following ancestry checks being
88/// performed automatically during verification:
89/// - Parent commitment matches the consensus context's expected parent
90/// - Block height is exactly one greater than the parent's height
91///
92/// Verifying only the immediate parent is sufficient since the parent itself must have
93/// been notarized by consensus, which guarantees it was verified and accepted by a quorum.
94/// This means the entire ancestry chain back to genesis is transitively validated.
95///
96/// Applications do not need to re-implement these checks in their own verification logic.
97///
98/// # Context Recovery
99///
100/// With deferred verification, validators wait for data availability (DA) and verify the context
101/// before voting. If a validator crashes after voting but before certification, they lose their in-memory
102/// verification task. When recovering, validators extract context from a [`CertifiableBlock`].
103///
104/// _This embedded context is trustworthy because the notarizing quorum (which contains at least f+1 honest
105/// validators) verified that the block's context matched the consensus context before voting._
106#[derive(Clone)]
107pub struct Marshaled<E, S, A, B, ES>
108where
109    E: Rng + Spawner + Metrics + Clock,
110    S: Scheme,
111    A: Application<E>,
112    B: CertifiableBlock,
113    ES: Epocher,
114{
115    context: E,
116    application: A,
117    marshal: marshal::Mailbox<S, B>,
118    epocher: ES,
119    last_built: Arc<Mutex<Option<(Round, B)>>>,
120    verification_tasks: Arc<Mutex<TasksMap<B>>>,
121
122    build_duration: Gauge,
123}
124
125impl<E, S, A, B, ES> Marshaled<E, S, A, B, ES>
126where
127    E: Rng + Spawner + Metrics + Clock,
128    S: Scheme,
129    A: VerifyingApplication<
130        E,
131        Block = B,
132        SigningScheme = S,
133        Context = Context<B::Commitment, S::PublicKey>,
134    >,
135    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
136    ES: Epocher,
137{
138    /// Creates a new [`Marshaled`] wrapper.
139    pub fn new(context: E, application: A, marshal: marshal::Mailbox<S, B>, epocher: ES) -> Self {
140        let build_duration = Gauge::default();
141        context.register(
142            "build_duration",
143            "Time taken for the application to build a new block, in milliseconds",
144            build_duration.clone(),
145        );
146
147        Self {
148            context,
149            application,
150            marshal,
151            epocher,
152            last_built: Arc::new(Mutex::new(None)),
153            verification_tasks: Arc::new(Mutex::new(HashMap::new())),
154
155            build_duration,
156        }
157    }
158
159    /// Verifies a proposed block's application-level validity.
160    ///
161    /// This method validates that:
162    /// 1. The block's parent commitment matches the expected parent
163    /// 2. The block's height is exactly one greater than the parent's height
164    /// 3. The underlying application's verification logic passes
165    ///
166    /// Verification is spawned in a background task and returns a receiver that will contain
167    /// the verification result. Valid blocks are reported to the marshal as verified.
168    #[inline]
169    async fn deferred_verify(
170        &mut self,
171        context: <Self as Automaton>::Context,
172        block: B,
173    ) -> oneshot::Receiver<bool> {
174        let mut marshal = self.marshal.clone();
175        let mut application = self.application.clone();
176        let (mut tx, rx) = oneshot::channel();
177        self.context
178            .with_label("deferred_verify")
179            .with_attribute("round", context.round)
180            .spawn(move |runtime_context| async move {
181                let (parent_view, parent_commitment) = context.parent;
182                let parent_request = fetch_parent(
183                    parent_commitment,
184                    Some(Round::new(context.epoch(), parent_view)),
185                    &mut application,
186                    &mut marshal,
187                )
188                .await;
189
190                // If consensus drops the receiver, we can stop work early.
191                let parent = select! {
192                    _ = tx.closed() => {
193                        debug!(
194                            reason = "consensus dropped receiver",
195                            "skipping verification"
196                        );
197                        return;
198                    },
199                    result = parent_request => match result {
200                        Ok(parent) => parent,
201                        Err(_) => {
202                            debug!(
203                                reason = "failed to fetch parent or block",
204                                "skipping verification"
205                            );
206                            return;
207                        }
208                    },
209                };
210
211                // Validate parent commitment and height contiguity.
212                if block.parent() != parent.commitment() || parent.commitment() != parent_commitment
213                {
214                    debug!(
215                        block_parent = %block.parent(),
216                        expected_parent = %parent.commitment(),
217                        "block parent commitment does not match expected parent"
218                    );
219                    tx.send_lossy(false);
220                    return;
221                }
222                if parent.height().next() != block.height() {
223                    debug!(
224                        parent_height = %parent.height(),
225                        block_height = %block.height(),
226                        "block height is not contiguous with parent height"
227                    );
228                    tx.send_lossy(false);
229                    return;
230                }
231
232                // Request verification from the application.
233                let ancestry_stream = AncestorStream::new(marshal.clone(), [block.clone(), parent]);
234                let validity_request = application.verify(
235                    (runtime_context.with_label("app_verify"), context.clone()),
236                    ancestry_stream,
237                );
238                // If consensus drops the receiver, we can stop work early.
239                let application_valid = select! {
240                    _ = tx.closed() => {
241                        debug!(
242                            reason = "consensus dropped receiver",
243                            "skipping verification"
244                        );
245                        return;
246                    },
247                    valid = validity_request => valid,
248                };
249
250                // Handle the verification result.
251                if application_valid {
252                    marshal.verified(context.round, block).await;
253                }
254                tx.send_lossy(application_valid);
255            });
256
257        rx
258    }
259}
260
261impl<E, S, A, B, ES> Automaton for Marshaled<E, S, A, B, ES>
262where
263    E: Rng + Spawner + Metrics + Clock,
264    S: Scheme,
265    A: VerifyingApplication<
266        E,
267        Block = B,
268        SigningScheme = S,
269        Context = Context<B::Commitment, S::PublicKey>,
270    >,
271    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
272    ES: Epocher,
273{
274    type Digest = B::Commitment;
275    type Context = Context<Self::Digest, S::PublicKey>;
276
277    /// Returns the genesis commitment for a given epoch.
278    ///
279    /// For epoch 0, this returns the application's genesis block commitment. For subsequent
280    /// epochs, it returns the commitment of the last block from the previous epoch, which
281    /// serves as the genesis block for the new epoch.
282    ///
283    /// # Panics
284    ///
285    /// Panics if a non-zero epoch is requested but the previous epoch's final block is not
286    /// available in storage. This indicates a critical error in the consensus engine startup
287    /// sequence, as engines must always have the genesis block before starting.
288    async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
289        if epoch.is_zero() {
290            return self.application.genesis().await.commitment();
291        }
292
293        let prev = epoch.previous().expect("checked to be non-zero above");
294        let last_height = self
295            .epocher
296            .last(prev)
297            .expect("previous epoch should exist");
298        let Some(block) = self.marshal.get_block(last_height).await else {
299            // A new consensus engine will never be started without having the genesis block
300            // of the new epoch (the last block of the previous epoch) already stored.
301            unreachable!("missing starting epoch block at height {}", last_height);
302        };
303        block.commitment()
304    }
305
306    /// Proposes a new block or re-proposes the epoch boundary block.
307    ///
308    /// This method builds a new block from the underlying application unless the parent block
309    /// is the last block in the current epoch. When at an epoch boundary, it re-proposes the
310    /// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
311    ///
312    /// The proposal operation is spawned in a background task and returns a receiver that will
313    /// contain the proposed block's commitment when ready. The built block is cached for later
314    /// broadcasting.
315    async fn propose(
316        &mut self,
317        consensus_context: Context<Self::Digest, S::PublicKey>,
318    ) -> oneshot::Receiver<Self::Digest> {
319        let mut marshal = self.marshal.clone();
320        let mut application = self.application.clone();
321        let last_built = self.last_built.clone();
322        let epocher = self.epocher.clone();
323
324        // Metrics
325        let build_duration = self.build_duration.clone();
326
327        let (mut tx, rx) = oneshot::channel();
328        self.context
329            .with_label("propose")
330            .with_attribute("round", consensus_context.round)
331            .spawn(move |runtime_context| async move {
332                let (parent_view, parent_commitment) = consensus_context.parent;
333                let parent_request = fetch_parent(
334                    parent_commitment,
335                    Some(Round::new(consensus_context.epoch(), parent_view)),
336                    &mut application,
337                    &mut marshal,
338                )
339                .await;
340
341                let parent = select! {
342                    _ = tx.closed() => {
343                        debug!(reason = "consensus dropped receiver", "skipping proposal");
344                        return;
345                    },
346                    result = parent_request => match result {
347                        Ok(parent) => parent,
348                        Err(_) => {
349                            debug!(
350                                ?parent_commitment,
351                                reason = "failed to fetch parent block",
352                                "skipping proposal"
353                            );
354                            return;
355                        }
356                    },
357                };
358
359                // Special case: If the parent block is the last block in the epoch,
360                // re-propose it as to not produce any blocks that will be cut out
361                // by the epoch transition.
362                let last_in_epoch = epocher
363                    .last(consensus_context.epoch())
364                    .expect("current epoch should exist");
365                if parent.height() == last_in_epoch {
366                    let digest = parent.commitment();
367                    {
368                        let mut lock = last_built.lock().await;
369                        *lock = Some((consensus_context.round, parent));
370                    }
371
372                    let success = tx.send_lossy(digest);
373                    debug!(
374                        round = ?consensus_context.round,
375                        ?digest,
376                        success,
377                        "re-proposed parent block at epoch boundary"
378                    );
379                    return;
380                }
381
382                let ancestor_stream = AncestorStream::new(marshal.clone(), [parent]);
383                let build_request = application.propose(
384                    (
385                        runtime_context.with_label("app_propose"),
386                        consensus_context.clone(),
387                    ),
388                    ancestor_stream,
389                );
390
391                let start = Instant::now();
392
393                let built_block = select! {
394                    _ = tx.closed() => {
395                        debug!(reason = "consensus dropped receiver", "skipping proposal");
396                        return;
397                    },
398                    block = build_request => match block {
399                        Some(block) => block,
400                        _ => {
401                            debug!(
402                                ?parent_commitment,
403                                reason = "block building failed",
404                                "skipping proposal"
405                            );
406                            return;
407                        }
408                    },
409                };
410                let _ = build_duration.try_set(start.elapsed().as_millis());
411
412                let digest = built_block.commitment();
413                {
414                    let mut lock = last_built.lock().await;
415                    *lock = Some((consensus_context.round, built_block));
416                }
417
418                let success = tx.send_lossy(digest);
419                debug!(
420                    round = ?consensus_context.round,
421                    ?digest,
422                    success,
423                    "proposed new block"
424                );
425            });
426        rx
427    }
428
429    async fn verify(
430        &mut self,
431        context: Context<Self::Digest, S::PublicKey>,
432        commitment: Self::Digest,
433    ) -> oneshot::Receiver<bool> {
434        let mut marshal = self.marshal.clone();
435        let mut marshaled = self.clone();
436
437        let (mut tx, rx) = oneshot::channel();
438        self.context
439            .with_label("optimistic_verify")
440            .with_attribute("round", context.round)
441            .spawn(move |_| async move {
442                let block_request = marshal.subscribe(Some(context.round), commitment).await;
443
444                let block = select! {
445                    _ = tx.closed() => {
446                        debug!(
447                            reason = "consensus dropped receiver",
448                            "skipping optimistic verification"
449                        );
450                        return;
451                    },
452                    result = block_request => match result {
453                        Ok(block) => block,
454                        Err(_) => {
455                            debug!(
456                                ?commitment,
457                                reason = "failed to fetch block for optimistic verification",
458                                "skipping optimistic verification"
459                            );
460                            return;
461                        }
462                    },
463                };
464
465                // Blocks are invalid if they are not within the current epoch and they aren't
466                // a re-proposal of the boundary block.
467                let Some(block_bounds) = marshaled.epocher.containing(block.height()) else {
468                    debug!(
469                        height = %block.height(),
470                        "block height not in any known epoch"
471                    );
472                    tx.send_lossy(false);
473                    return;
474                };
475                if block_bounds.epoch() != context.epoch() {
476                    debug!(
477                        epoch = %context.epoch(),
478                        block_epoch = %block_bounds.epoch(),
479                        "block is not in the current epoch"
480                    );
481                    tx.send_lossy(false);
482                    return;
483                }
484
485                // Re-proposal detection: consensus signals a re-proposal by setting
486                // context.parent to the block being verified (commitment == context.parent.1).
487                //
488                // Re-proposals skip normal verification because:
489                // 1. The block was already verified when originally proposed
490                // 2. The parent-child height check would fail (parent IS the block)
491                let is_reproposal = commitment == context.parent.1;
492                if is_reproposal {
493                    if !is_at_epoch_boundary(&marshaled.epocher, block.height(), context.epoch()) {
494                        debug!(
495                            height = %block.height(),
496                            last_in_epoch = %block_bounds.last(),
497                            "re-proposal is not at epoch boundary"
498                        );
499                        tx.send_lossy(false);
500                        return;
501                    }
502
503                    // Valid re-proposal. Create a completed verification task for `certify`
504                    let round = context.round;
505                    marshal.verified(round, block).await;
506
507                    let (task_tx, task_rx) = oneshot::channel();
508                    task_tx.send_lossy(true);
509                    marshaled
510                        .verification_tasks
511                        .lock()
512                        .await
513                        .insert((round, commitment), task_rx);
514
515                    tx.send_lossy(true);
516                    return;
517                }
518
519                // Before casting a notarize vote, ensure the block's embedded context matches
520                // the consensus context.
521                //
522                // This is a critical step - the notarize quorum is guaranteed to have at least
523                // f+1 honest validators who will verify against this context, preventing a Byzantine
524                // proposer from embedding a malicious context. The other f honest validators who did
525                // not vote will later use the block-embedded context to help finalize if Byzantine
526                // validators withhold their finalize votes.
527                if block.context() != context {
528                    debug!(
529                        ?context,
530                        block_context = ?block.context(),
531                        "block-embedded context does not match consensus context during optimistic verification"
532                    );
533                    tx.send_lossy(false);
534                    return;
535                }
536
537                // Begin the rest of the verification process asynchronously.
538                let round = context.round;
539                let task = marshaled.deferred_verify(context, block).await;
540                marshaled
541                    .verification_tasks
542                    .lock()
543                    .await
544                    .insert((round, commitment), task);
545
546                tx.send_lossy(true);
547            });
548        rx
549    }
550}
551
552impl<E, S, A, B, ES> CertifiableAutomaton for Marshaled<E, S, A, B, ES>
553where
554    E: Rng + Spawner + Metrics + Clock,
555    S: Scheme,
556    A: VerifyingApplication<
557        E,
558        Block = B,
559        SigningScheme = S,
560        Context = Context<B::Commitment, S::PublicKey>,
561    >,
562    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
563    ES: Epocher,
564{
565    async fn certify(&mut self, round: Round, commitment: Self::Digest) -> oneshot::Receiver<bool> {
566        // Attempt to retrieve the existing verification task for this (round, payload).
567        let mut tasks_guard = self.verification_tasks.lock().await;
568        let task = tasks_guard.remove(&(round, commitment));
569        drop(tasks_guard);
570        if let Some(task) = task {
571            return task;
572        }
573
574        // No in-progress task means we never verified this proposal locally. We can use the
575        // block's embedded context to help complete finalization when Byzantine validators
576        // withhold their finalize votes. If a Byzantine proposer embedded a malicious context,
577        // the f+1 honest validators from the notarizing quorum will verify against the proper
578        // context and reject the mismatch, preventing a 2f+1 finalization quorum.
579        //
580        // Subscribe to the block and verify using its embedded context once available.
581        debug!(
582            ?round,
583            ?commitment,
584            "subscribing to block for certification using embedded context"
585        );
586        let block_rx = self.marshal.subscribe(Some(round), commitment).await;
587        let mut marshaled = self.clone();
588        let epocher = self.epocher.clone();
589        let (mut tx, rx) = oneshot::channel();
590        self.context
591            .with_label("certify")
592            .with_attribute("round", round)
593            .spawn(move |_| async move {
594                let block = select! {
595                    _ = tx.closed() => {
596                        debug!(
597                            reason = "consensus dropped receiver",
598                            "skipping certification"
599                        );
600                        return;
601                    },
602                    result = block_rx => match result {
603                        Ok(block) => block,
604                        Err(_) => {
605                            debug!(
606                                ?commitment,
607                                reason = "failed to fetch block for certification",
608                                "skipping certification"
609                            );
610                            return;
611                        }
612                    },
613                };
614
615                // Re-proposal detection for certify path: we don't have the consensus context,
616                // only the block's embedded context from original proposal. Infer re-proposal from:
617                // 1. Block is at epoch boundary (only boundary blocks can be re-proposed)
618                // 2. Certification round's view > embedded context's view (re-proposals retain their
619                //    original embedded context, so a later view indicates the block was re-proposed)
620                // 3. Same epoch (re-proposals don't cross epoch boundaries)
621                let embedded_context = block.context();
622                let is_reproposal =
623                    is_at_epoch_boundary(&epocher, block.height(), embedded_context.round.epoch())
624                        && round.view() > embedded_context.round.view()
625                        && round.epoch() == embedded_context.round.epoch();
626                if is_reproposal {
627                    // NOTE: It is possible that, during crash recovery, we call `marshal.verified`
628                    // twice for the same block. That function is idempotent, so this is safe.
629                    marshaled.marshal.verified(round, block).await;
630                    tx.send_lossy(true);
631                    return;
632                }
633
634                let verify_rx = marshaled.deferred_verify(embedded_context, block).await;
635                if let Ok(result) = verify_rx.await {
636                    tx.send_lossy(result);
637                }
638            });
639        rx
640    }
641}
642
643impl<E, S, A, B, ES> Relay for Marshaled<E, S, A, B, ES>
644where
645    E: Rng + Spawner + Metrics + Clock,
646    S: Scheme,
647    A: Application<E, Block = B, Context = Context<B::Commitment, S::PublicKey>>,
648    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
649    ES: Epocher,
650{
651    type Digest = B::Commitment;
652
653    /// Broadcasts a previously built block to the network.
654    ///
655    /// This uses the cached block from the last proposal operation. If no block was built or
656    /// the commitment does not match the cached block, the broadcast is skipped with a warning.
657    async fn broadcast(&mut self, commitment: Self::Digest) {
658        let Some((round, block)) = self.last_built.lock().await.clone() else {
659            warn!("missing block to broadcast");
660            return;
661        };
662
663        if block.commitment() != commitment {
664            warn!(
665                round = %round,
666                commitment = %block.commitment(),
667                height = %block.height(),
668                "skipping requested broadcast of block with mismatched commitment"
669            );
670            return;
671        }
672
673        debug!(
674            round = %round,
675            commitment = %block.commitment(),
676            height = %block.height(),
677            "requested broadcast of built block"
678        );
679        self.marshal.proposed(round, block).await;
680    }
681}
682
683impl<E, S, A, B, ES> Reporter for Marshaled<E, S, A, B, ES>
684where
685    E: Rng + Spawner + Metrics + Clock,
686    S: Scheme,
687    A: Application<E, Block = B, Context = Context<B::Commitment, S::PublicKey>>
688        + Reporter<Activity = Update<B>>,
689    B: CertifiableBlock<Context = <A as Application<E>>::Context>,
690    ES: Epocher,
691{
692    type Activity = A::Activity;
693
694    /// Relays a report to the underlying [`Application`] and cleans up old verification tasks.
695    async fn report(&mut self, update: Self::Activity) {
696        // Clean up verification tasks for rounds <= the finalized round.
697        if let Update::Tip(round, _, _) = &update {
698            let mut tasks_guard = self.verification_tasks.lock().await;
699            tasks_guard.retain(|(task_round, _), _| task_round > round);
700        }
701        self.application.report(update).await
702    }
703}
704
705/// Returns true if the block is at an epoch boundary (last block in its epoch).
706///
707/// This is used to validate re-proposals, which are only allowed for boundary blocks.
708#[inline]
709fn is_at_epoch_boundary<ES: Epocher>(epocher: &ES, block_height: Height, epoch: Epoch) -> bool {
710    epocher.last(epoch).is_some_and(|last| last == block_height)
711}
712
713/// Fetches the parent block given its commitment and optional round.
714///
715/// This is a helper function used during proposal and verification to retrieve the parent
716/// block. If the parent commitment matches the genesis block, it returns the genesis block
717/// directly without querying the marshal. Otherwise, it subscribes to the marshal to await
718/// the parent block's availability.
719///
720/// Returns an error if the marshal subscription is cancelled.
721#[inline]
722async fn fetch_parent<E, S, A, B>(
723    parent_commitment: B::Commitment,
724    parent_round: Option<Round>,
725    application: &mut A,
726    marshal: &mut marshal::Mailbox<S, B>,
727) -> Either<Ready<Result<B, RecvError>>, oneshot::Receiver<B>>
728where
729    E: Rng + Spawner + Metrics + Clock,
730    S: Scheme,
731    A: Application<E, Block = B, Context = Context<B::Commitment, S::PublicKey>>,
732    B: Block,
733{
734    let genesis = application.genesis().await;
735    if parent_commitment == genesis.commitment() {
736        Either::Left(ready(Ok(genesis)))
737    } else {
738        Either::Right(marshal.subscribe(parent_round, parent_commitment).await)
739    }
740}