commonware_consensus/application/marshaled.rs
1//! Wrapper for consensus applications that handles epochs and block dissemination.
2//!
3//! # Overview
4//!
5//! [`Marshaled`] is an adapter that wraps any [`VerifyingApplication`] implementation to handle
6//! epoch transitions automatically. It intercepts consensus operations (propose, verify) and
7//! ensures blocks are only produced within valid epoch boundaries.
8//!
9//! # Epoch Boundaries
10//!
11//! When the parent is the last block in an epoch (as determined by the [`Epocher`]), this wrapper
12//! re-proposes that boundary block instead of building a new block. This avoids producing blocks
13//! that would be pruned by the epoch transition.
14//!
15//! # Deferred Verification
16//!
17//! Before casting a notarize vote, [`Marshaled`] waits for the block to become available and
18//! then verifies that the block's embedded context matches the consensus context. However, it does not
19//! wait for the application to finish verifying the block contents before voting. This enables verification
20//! to run while we wait for a quorum of votes to form a certificate (hiding verification latency behind network
21//! latency). Once a certificate is formed, we wait on the verification result in [`CertifiableAutomaton::certify`]
22//! before voting to finalize (ensuring no invalid blocks are admitted to the canonical chain).
23//!
24//! # Usage
25//!
26//! Wrap your [`Application`] implementation with [`Marshaled::new`] and provide it to your
27//! consensus engine for the [`Automaton`] and [`Relay`]. The wrapper handles all epoch logic transparently.
28//!
29//! ```rust,ignore
30//! let application = Marshaled::new(
31//! context,
32//! my_application,
33//! marshal_mailbox,
34//! epocher,
35//! );
36//! ```
37//!
38//! # Implementation Notes
39//!
40//! - Genesis blocks are handled specially: epoch 0 returns the application's genesis block,
41//! while subsequent epochs use the last block of the previous epoch as genesis
42//! - Blocks are automatically verified to be within the current epoch
43//!
44//! # Future Work
45//!
46//! - To further reduce view latency, a participant could optimistically vote for a block prior to
47//! observing its availability during [`Automaton::verify`]. However, this would require updating
48//! other components (like [`crate::marshal`]) to handle backfill where notarization does not imply
49//! a block is fetchable (without modification, a malicious leader that withholds blocks during propose
50//! could get an honest node to exhaust their network rate limit fetching things that don't exist rather
51//! than blocks they need AND can fetch).
52
53use crate::{
54 marshal::{self, ingress::mailbox::AncestorStream, Update},
55 simplex::types::Context,
56 types::{Epoch, Epocher, Height, Round},
57 Application, Automaton, Block, CertifiableAutomaton, CertifiableBlock, Epochable, Relay,
58 Reporter, VerifyingApplication,
59};
60use commonware_cryptography::{certificate::Scheme, Committable};
61use commonware_macros::select;
62use commonware_runtime::{telemetry::metrics::status::GaugeExt, Clock, Metrics, Spawner};
63use commonware_utils::channel::{
64 fallible::OneshotExt,
65 oneshot::{self, error::RecvError},
66};
67use futures::{
68 future::{ready, Either, Ready},
69 lock::Mutex,
70};
71use prometheus_client::metrics::gauge::Gauge;
72use rand::Rng;
73use std::{collections::HashMap, sync::Arc, time::Instant};
74use tracing::{debug, warn};
75
76type TasksMap<B> = HashMap<(Round, <B as Committable>::Commitment), oneshot::Receiver<bool>>;
77
78/// An [`Application`] adapter that handles epoch transitions and validates block ancestry.
79///
80/// This wrapper intercepts consensus operations to enforce epoch boundaries and validate
81/// block ancestry. It prevents blocks from being produced outside their valid epoch,
82/// handles the special case of re-proposing boundary blocks at epoch boundaries,
83/// and ensures all blocks have valid parent linkage and contiguous heights.
84///
85/// # Ancestry Validation
86///
87/// Applications wrapped by [`Marshaled`] can rely on the following ancestry checks being
88/// performed automatically during verification:
89/// - Parent commitment matches the consensus context's expected parent
90/// - Block height is exactly one greater than the parent's height
91///
92/// Verifying only the immediate parent is sufficient since the parent itself must have
93/// been notarized by consensus, which guarantees it was verified and accepted by a quorum.
94/// This means the entire ancestry chain back to genesis is transitively validated.
95///
96/// Applications do not need to re-implement these checks in their own verification logic.
97///
98/// # Context Recovery
99///
100/// With deferred verification, validators wait for data availability (DA) and verify the context
101/// before voting. If a validator crashes after voting but before certification, they lose their in-memory
102/// verification task. When recovering, validators extract context from a [`CertifiableBlock`].
103///
104/// _This embedded context is trustworthy because the notarizing quorum (which contains at least f+1 honest
105/// validators) verified that the block's context matched the consensus context before voting._
106#[derive(Clone)]
107pub struct Marshaled<E, S, A, B, ES>
108where
109 E: Rng + Spawner + Metrics + Clock,
110 S: Scheme,
111 A: Application<E>,
112 B: CertifiableBlock,
113 ES: Epocher,
114{
115 context: E,
116 application: A,
117 marshal: marshal::Mailbox<S, B>,
118 epocher: ES,
119 last_built: Arc<Mutex<Option<(Round, B)>>>,
120 verification_tasks: Arc<Mutex<TasksMap<B>>>,
121
122 build_duration: Gauge,
123}
124
125impl<E, S, A, B, ES> Marshaled<E, S, A, B, ES>
126where
127 E: Rng + Spawner + Metrics + Clock,
128 S: Scheme,
129 A: VerifyingApplication<
130 E,
131 Block = B,
132 SigningScheme = S,
133 Context = Context<B::Commitment, S::PublicKey>,
134 >,
135 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
136 ES: Epocher,
137{
138 /// Creates a new [`Marshaled`] wrapper.
139 pub fn new(context: E, application: A, marshal: marshal::Mailbox<S, B>, epocher: ES) -> Self {
140 let build_duration = Gauge::default();
141 context.register(
142 "build_duration",
143 "Time taken for the application to build a new block, in milliseconds",
144 build_duration.clone(),
145 );
146
147 Self {
148 context,
149 application,
150 marshal,
151 epocher,
152 last_built: Arc::new(Mutex::new(None)),
153 verification_tasks: Arc::new(Mutex::new(HashMap::new())),
154
155 build_duration,
156 }
157 }
158
159 /// Verifies a proposed block's application-level validity.
160 ///
161 /// This method validates that:
162 /// 1. The block's parent commitment matches the expected parent
163 /// 2. The block's height is exactly one greater than the parent's height
164 /// 3. The underlying application's verification logic passes
165 ///
166 /// Verification is spawned in a background task and returns a receiver that will contain
167 /// the verification result. Valid blocks are reported to the marshal as verified.
168 #[inline]
169 async fn deferred_verify(
170 &mut self,
171 context: <Self as Automaton>::Context,
172 block: B,
173 ) -> oneshot::Receiver<bool> {
174 let mut marshal = self.marshal.clone();
175 let mut application = self.application.clone();
176 let (mut tx, rx) = oneshot::channel();
177 self.context
178 .with_label("deferred_verify")
179 .with_attribute("round", context.round)
180 .spawn(move |runtime_context| async move {
181 let (parent_view, parent_commitment) = context.parent;
182 let parent_request = fetch_parent(
183 parent_commitment,
184 Some(Round::new(context.epoch(), parent_view)),
185 &mut application,
186 &mut marshal,
187 )
188 .await;
189
190 // If consensus drops the receiver, we can stop work early.
191 let parent = select! {
192 _ = tx.closed() => {
193 debug!(
194 reason = "consensus dropped receiver",
195 "skipping verification"
196 );
197 return;
198 },
199 result = parent_request => match result {
200 Ok(parent) => parent,
201 Err(_) => {
202 debug!(
203 reason = "failed to fetch parent or block",
204 "skipping verification"
205 );
206 return;
207 }
208 },
209 };
210
211 // Validate parent commitment and height contiguity.
212 if block.parent() != parent.commitment() || parent.commitment() != parent_commitment
213 {
214 debug!(
215 block_parent = %block.parent(),
216 expected_parent = %parent.commitment(),
217 "block parent commitment does not match expected parent"
218 );
219 tx.send_lossy(false);
220 return;
221 }
222 if parent.height().next() != block.height() {
223 debug!(
224 parent_height = %parent.height(),
225 block_height = %block.height(),
226 "block height is not contiguous with parent height"
227 );
228 tx.send_lossy(false);
229 return;
230 }
231
232 // Request verification from the application.
233 let ancestry_stream = AncestorStream::new(marshal.clone(), [block.clone(), parent]);
234 let validity_request = application.verify(
235 (runtime_context.with_label("app_verify"), context.clone()),
236 ancestry_stream,
237 );
238 // If consensus drops the receiver, we can stop work early.
239 let application_valid = select! {
240 _ = tx.closed() => {
241 debug!(
242 reason = "consensus dropped receiver",
243 "skipping verification"
244 );
245 return;
246 },
247 valid = validity_request => valid,
248 };
249
250 // Handle the verification result.
251 if application_valid {
252 marshal.verified(context.round, block).await;
253 }
254 tx.send_lossy(application_valid);
255 });
256
257 rx
258 }
259}
260
261impl<E, S, A, B, ES> Automaton for Marshaled<E, S, A, B, ES>
262where
263 E: Rng + Spawner + Metrics + Clock,
264 S: Scheme,
265 A: VerifyingApplication<
266 E,
267 Block = B,
268 SigningScheme = S,
269 Context = Context<B::Commitment, S::PublicKey>,
270 >,
271 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
272 ES: Epocher,
273{
274 type Digest = B::Commitment;
275 type Context = Context<Self::Digest, S::PublicKey>;
276
277 /// Returns the genesis commitment for a given epoch.
278 ///
279 /// For epoch 0, this returns the application's genesis block commitment. For subsequent
280 /// epochs, it returns the commitment of the last block from the previous epoch, which
281 /// serves as the genesis block for the new epoch.
282 ///
283 /// # Panics
284 ///
285 /// Panics if a non-zero epoch is requested but the previous epoch's final block is not
286 /// available in storage. This indicates a critical error in the consensus engine startup
287 /// sequence, as engines must always have the genesis block before starting.
288 async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
289 if epoch.is_zero() {
290 return self.application.genesis().await.commitment();
291 }
292
293 let prev = epoch.previous().expect("checked to be non-zero above");
294 let last_height = self
295 .epocher
296 .last(prev)
297 .expect("previous epoch should exist");
298 let Some(block) = self.marshal.get_block(last_height).await else {
299 // A new consensus engine will never be started without having the genesis block
300 // of the new epoch (the last block of the previous epoch) already stored.
301 unreachable!("missing starting epoch block at height {}", last_height);
302 };
303 block.commitment()
304 }
305
306 /// Proposes a new block or re-proposes the epoch boundary block.
307 ///
308 /// This method builds a new block from the underlying application unless the parent block
309 /// is the last block in the current epoch. When at an epoch boundary, it re-proposes the
310 /// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
311 ///
312 /// The proposal operation is spawned in a background task and returns a receiver that will
313 /// contain the proposed block's commitment when ready. The built block is cached for later
314 /// broadcasting.
315 async fn propose(
316 &mut self,
317 consensus_context: Context<Self::Digest, S::PublicKey>,
318 ) -> oneshot::Receiver<Self::Digest> {
319 let mut marshal = self.marshal.clone();
320 let mut application = self.application.clone();
321 let last_built = self.last_built.clone();
322 let epocher = self.epocher.clone();
323
324 // Metrics
325 let build_duration = self.build_duration.clone();
326
327 let (mut tx, rx) = oneshot::channel();
328 self.context
329 .with_label("propose")
330 .with_attribute("round", consensus_context.round)
331 .spawn(move |runtime_context| async move {
332 let (parent_view, parent_commitment) = consensus_context.parent;
333 let parent_request = fetch_parent(
334 parent_commitment,
335 Some(Round::new(consensus_context.epoch(), parent_view)),
336 &mut application,
337 &mut marshal,
338 )
339 .await;
340
341 let parent = select! {
342 _ = tx.closed() => {
343 debug!(reason = "consensus dropped receiver", "skipping proposal");
344 return;
345 },
346 result = parent_request => match result {
347 Ok(parent) => parent,
348 Err(_) => {
349 debug!(
350 ?parent_commitment,
351 reason = "failed to fetch parent block",
352 "skipping proposal"
353 );
354 return;
355 }
356 },
357 };
358
359 // Special case: If the parent block is the last block in the epoch,
360 // re-propose it as to not produce any blocks that will be cut out
361 // by the epoch transition.
362 let last_in_epoch = epocher
363 .last(consensus_context.epoch())
364 .expect("current epoch should exist");
365 if parent.height() == last_in_epoch {
366 let digest = parent.commitment();
367 {
368 let mut lock = last_built.lock().await;
369 *lock = Some((consensus_context.round, parent));
370 }
371
372 let success = tx.send_lossy(digest);
373 debug!(
374 round = ?consensus_context.round,
375 ?digest,
376 success,
377 "re-proposed parent block at epoch boundary"
378 );
379 return;
380 }
381
382 let ancestor_stream = AncestorStream::new(marshal.clone(), [parent]);
383 let build_request = application.propose(
384 (
385 runtime_context.with_label("app_propose"),
386 consensus_context.clone(),
387 ),
388 ancestor_stream,
389 );
390
391 let start = Instant::now();
392
393 let built_block = select! {
394 _ = tx.closed() => {
395 debug!(reason = "consensus dropped receiver", "skipping proposal");
396 return;
397 },
398 block = build_request => match block {
399 Some(block) => block,
400 _ => {
401 debug!(
402 ?parent_commitment,
403 reason = "block building failed",
404 "skipping proposal"
405 );
406 return;
407 }
408 },
409 };
410 let _ = build_duration.try_set(start.elapsed().as_millis());
411
412 let digest = built_block.commitment();
413 {
414 let mut lock = last_built.lock().await;
415 *lock = Some((consensus_context.round, built_block));
416 }
417
418 let success = tx.send_lossy(digest);
419 debug!(
420 round = ?consensus_context.round,
421 ?digest,
422 success,
423 "proposed new block"
424 );
425 });
426 rx
427 }
428
429 async fn verify(
430 &mut self,
431 context: Context<Self::Digest, S::PublicKey>,
432 commitment: Self::Digest,
433 ) -> oneshot::Receiver<bool> {
434 let mut marshal = self.marshal.clone();
435 let mut marshaled = self.clone();
436
437 let (mut tx, rx) = oneshot::channel();
438 self.context
439 .with_label("optimistic_verify")
440 .with_attribute("round", context.round)
441 .spawn(move |_| async move {
442 let block_request = marshal.subscribe(Some(context.round), commitment).await;
443
444 let block = select! {
445 _ = tx.closed() => {
446 debug!(
447 reason = "consensus dropped receiver",
448 "skipping optimistic verification"
449 );
450 return;
451 },
452 result = block_request => match result {
453 Ok(block) => block,
454 Err(_) => {
455 debug!(
456 ?commitment,
457 reason = "failed to fetch block for optimistic verification",
458 "skipping optimistic verification"
459 );
460 return;
461 }
462 },
463 };
464
465 // Blocks are invalid if they are not within the current epoch and they aren't
466 // a re-proposal of the boundary block.
467 let Some(block_bounds) = marshaled.epocher.containing(block.height()) else {
468 debug!(
469 height = %block.height(),
470 "block height not in any known epoch"
471 );
472 tx.send_lossy(false);
473 return;
474 };
475 if block_bounds.epoch() != context.epoch() {
476 debug!(
477 epoch = %context.epoch(),
478 block_epoch = %block_bounds.epoch(),
479 "block is not in the current epoch"
480 );
481 tx.send_lossy(false);
482 return;
483 }
484
485 // Re-proposal detection: consensus signals a re-proposal by setting
486 // context.parent to the block being verified (commitment == context.parent.1).
487 //
488 // Re-proposals skip normal verification because:
489 // 1. The block was already verified when originally proposed
490 // 2. The parent-child height check would fail (parent IS the block)
491 let is_reproposal = commitment == context.parent.1;
492 if is_reproposal {
493 if !is_at_epoch_boundary(&marshaled.epocher, block.height(), context.epoch()) {
494 debug!(
495 height = %block.height(),
496 last_in_epoch = %block_bounds.last(),
497 "re-proposal is not at epoch boundary"
498 );
499 tx.send_lossy(false);
500 return;
501 }
502
503 // Valid re-proposal. Create a completed verification task for `certify`
504 let round = context.round;
505 marshal.verified(round, block).await;
506
507 let (task_tx, task_rx) = oneshot::channel();
508 task_tx.send_lossy(true);
509 marshaled
510 .verification_tasks
511 .lock()
512 .await
513 .insert((round, commitment), task_rx);
514
515 tx.send_lossy(true);
516 return;
517 }
518
519 // Before casting a notarize vote, ensure the block's embedded context matches
520 // the consensus context.
521 //
522 // This is a critical step - the notarize quorum is guaranteed to have at least
523 // f+1 honest validators who will verify against this context, preventing a Byzantine
524 // proposer from embedding a malicious context. The other f honest validators who did
525 // not vote will later use the block-embedded context to help finalize if Byzantine
526 // validators withhold their finalize votes.
527 if block.context() != context {
528 debug!(
529 ?context,
530 block_context = ?block.context(),
531 "block-embedded context does not match consensus context during optimistic verification"
532 );
533 tx.send_lossy(false);
534 return;
535 }
536
537 // Begin the rest of the verification process asynchronously.
538 let round = context.round;
539 let task = marshaled.deferred_verify(context, block).await;
540 marshaled
541 .verification_tasks
542 .lock()
543 .await
544 .insert((round, commitment), task);
545
546 tx.send_lossy(true);
547 });
548 rx
549 }
550}
551
552impl<E, S, A, B, ES> CertifiableAutomaton for Marshaled<E, S, A, B, ES>
553where
554 E: Rng + Spawner + Metrics + Clock,
555 S: Scheme,
556 A: VerifyingApplication<
557 E,
558 Block = B,
559 SigningScheme = S,
560 Context = Context<B::Commitment, S::PublicKey>,
561 >,
562 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
563 ES: Epocher,
564{
565 async fn certify(&mut self, round: Round, commitment: Self::Digest) -> oneshot::Receiver<bool> {
566 // Attempt to retrieve the existing verification task for this (round, payload).
567 let mut tasks_guard = self.verification_tasks.lock().await;
568 let task = tasks_guard.remove(&(round, commitment));
569 drop(tasks_guard);
570 if let Some(task) = task {
571 return task;
572 }
573
574 // No in-progress task means we never verified this proposal locally. We can use the
575 // block's embedded context to help complete finalization when Byzantine validators
576 // withhold their finalize votes. If a Byzantine proposer embedded a malicious context,
577 // the f+1 honest validators from the notarizing quorum will verify against the proper
578 // context and reject the mismatch, preventing a 2f+1 finalization quorum.
579 //
580 // Subscribe to the block and verify using its embedded context once available.
581 debug!(
582 ?round,
583 ?commitment,
584 "subscribing to block for certification using embedded context"
585 );
586 let block_rx = self.marshal.subscribe(Some(round), commitment).await;
587 let mut marshaled = self.clone();
588 let epocher = self.epocher.clone();
589 let (mut tx, rx) = oneshot::channel();
590 self.context
591 .with_label("certify")
592 .with_attribute("round", round)
593 .spawn(move |_| async move {
594 let block = select! {
595 _ = tx.closed() => {
596 debug!(
597 reason = "consensus dropped receiver",
598 "skipping certification"
599 );
600 return;
601 },
602 result = block_rx => match result {
603 Ok(block) => block,
604 Err(_) => {
605 debug!(
606 ?commitment,
607 reason = "failed to fetch block for certification",
608 "skipping certification"
609 );
610 return;
611 }
612 },
613 };
614
615 // Re-proposal detection for certify path: we don't have the consensus context,
616 // only the block's embedded context from original proposal. Infer re-proposal from:
617 // 1. Block is at epoch boundary (only boundary blocks can be re-proposed)
618 // 2. Certification round's view > embedded context's view (re-proposals retain their
619 // original embedded context, so a later view indicates the block was re-proposed)
620 // 3. Same epoch (re-proposals don't cross epoch boundaries)
621 let embedded_context = block.context();
622 let is_reproposal =
623 is_at_epoch_boundary(&epocher, block.height(), embedded_context.round.epoch())
624 && round.view() > embedded_context.round.view()
625 && round.epoch() == embedded_context.round.epoch();
626 if is_reproposal {
627 // NOTE: It is possible that, during crash recovery, we call `marshal.verified`
628 // twice for the same block. That function is idempotent, so this is safe.
629 marshaled.marshal.verified(round, block).await;
630 tx.send_lossy(true);
631 return;
632 }
633
634 let verify_rx = marshaled.deferred_verify(embedded_context, block).await;
635 if let Ok(result) = verify_rx.await {
636 tx.send_lossy(result);
637 }
638 });
639 rx
640 }
641}
642
643impl<E, S, A, B, ES> Relay for Marshaled<E, S, A, B, ES>
644where
645 E: Rng + Spawner + Metrics + Clock,
646 S: Scheme,
647 A: Application<E, Block = B, Context = Context<B::Commitment, S::PublicKey>>,
648 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
649 ES: Epocher,
650{
651 type Digest = B::Commitment;
652
653 /// Broadcasts a previously built block to the network.
654 ///
655 /// This uses the cached block from the last proposal operation. If no block was built or
656 /// the commitment does not match the cached block, the broadcast is skipped with a warning.
657 async fn broadcast(&mut self, commitment: Self::Digest) {
658 let Some((round, block)) = self.last_built.lock().await.clone() else {
659 warn!("missing block to broadcast");
660 return;
661 };
662
663 if block.commitment() != commitment {
664 warn!(
665 round = %round,
666 commitment = %block.commitment(),
667 height = %block.height(),
668 "skipping requested broadcast of block with mismatched commitment"
669 );
670 return;
671 }
672
673 debug!(
674 round = %round,
675 commitment = %block.commitment(),
676 height = %block.height(),
677 "requested broadcast of built block"
678 );
679 self.marshal.proposed(round, block).await;
680 }
681}
682
683impl<E, S, A, B, ES> Reporter for Marshaled<E, S, A, B, ES>
684where
685 E: Rng + Spawner + Metrics + Clock,
686 S: Scheme,
687 A: Application<E, Block = B, Context = Context<B::Commitment, S::PublicKey>>
688 + Reporter<Activity = Update<B>>,
689 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
690 ES: Epocher,
691{
692 type Activity = A::Activity;
693
694 /// Relays a report to the underlying [`Application`] and cleans up old verification tasks.
695 async fn report(&mut self, update: Self::Activity) {
696 // Clean up verification tasks for rounds <= the finalized round.
697 if let Update::Tip(round, _, _) = &update {
698 let mut tasks_guard = self.verification_tasks.lock().await;
699 tasks_guard.retain(|(task_round, _), _| task_round > round);
700 }
701 self.application.report(update).await
702 }
703}
704
705/// Returns true if the block is at an epoch boundary (last block in its epoch).
706///
707/// This is used to validate re-proposals, which are only allowed for boundary blocks.
708#[inline]
709fn is_at_epoch_boundary<ES: Epocher>(epocher: &ES, block_height: Height, epoch: Epoch) -> bool {
710 epocher.last(epoch).is_some_and(|last| last == block_height)
711}
712
713/// Fetches the parent block given its commitment and optional round.
714///
715/// This is a helper function used during proposal and verification to retrieve the parent
716/// block. If the parent commitment matches the genesis block, it returns the genesis block
717/// directly without querying the marshal. Otherwise, it subscribes to the marshal to await
718/// the parent block's availability.
719///
720/// Returns an error if the marshal subscription is cancelled.
721#[inline]
722async fn fetch_parent<E, S, A, B>(
723 parent_commitment: B::Commitment,
724 parent_round: Option<Round>,
725 application: &mut A,
726 marshal: &mut marshal::Mailbox<S, B>,
727) -> Either<Ready<Result<B, RecvError>>, oneshot::Receiver<B>>
728where
729 E: Rng + Spawner + Metrics + Clock,
730 S: Scheme,
731 A: Application<E, Block = B, Context = Context<B::Commitment, S::PublicKey>>,
732 B: Block,
733{
734 let genesis = application.genesis().await;
735 if parent_commitment == genesis.commitment() {
736 Either::Left(ready(Ok(genesis)))
737 } else {
738 Either::Right(marshal.subscribe(parent_round, parent_commitment).await)
739 }
740}