commonware_consensus/application/
marshaled.rs

1//! Wrapper for consensus applications that handles epochs and block dissemination.
2//!
3//! # Overview
4//!
5//! [Marshaled] is an adapter that wraps any [VerifyingApplication] implementation to handle
6//! epoch transitions automatically. It intercepts consensus operations (propose, verify) and
7//! ensures blocks are only produced within valid epoch boundaries.
8//!
9//! # Epoch Boundaries
10//!
11//! An epoch is a fixed number of blocks (the `epoch_length`). When the last block in an epoch
12//! is reached, this wrapper prevents new blocks from being built & proposed until the next epoch begins.
13//! Instead, it re-proposes the boundary block to avoid producing blocks that would be pruned
14//! by the epoch transition.
15//!
16//! # Usage
17//!
18//! Wrap your application implementation with [Marshaled::new] and provide it to your
19//! consensus engine for the [Automaton] and [Relay]. The wrapper handles all epoch logic transparently.
20//!
21//! ```rust,ignore
22//! let application = Marshaled::new(
23//!     context,
24//!     my_application,
25//!     marshal_mailbox,
26//!     BLOCKS_PER_EPOCH,
27//! );
28//! ```
29//!
30//! # Implementation Notes
31//!
32//! - Genesis blocks are handled specially: epoch 0 returns the application's genesis block,
33//!   while subsequent epochs use the last block of the previous epoch as genesis
34//! - Blocks are automatically verified to be within the current epoch
35
36use crate::{
37    marshal::{self, ingress::mailbox::AncestorStream, Update},
38    simplex::types::Context,
39    types::{Epoch, Epocher, Round},
40    Application, Automaton, Block, CertifiableAutomaton, Epochable, Relay, Reporter,
41    VerifyingApplication,
42};
43use commonware_cryptography::certificate::Scheme;
44use commonware_runtime::{telemetry::metrics::status::GaugeExt, Clock, Metrics, Spawner};
45use commonware_utils::futures::ClosedExt;
46use futures::{
47    channel::oneshot::{self, Canceled},
48    future::{select, try_join, Either, Ready},
49    lock::Mutex,
50    pin_mut,
51};
52use prometheus_client::metrics::gauge::Gauge;
53use rand::Rng;
54use std::{sync::Arc, time::Instant};
55use tracing::{debug, warn};
56
57/// An [Application] adapter that handles epoch transitions and validates block ancestry.
58///
59/// This wrapper intercepts consensus operations to enforce epoch boundaries and validate
60/// block ancestry. It prevents blocks from being produced outside their valid epoch,
61/// handles the special case of re-proposing boundary blocks during epoch transitions,
62/// and ensures all blocks have valid parent linkage and contiguous heights.
63///
64/// # Ancestry Validation
65///
66/// Applications wrapped by [Marshaled] can rely on the following ancestry checks being
67/// performed automatically during verification:
68/// - Parent commitment matches the consensus context's expected parent
69/// - Block height is exactly one greater than the parent's height
70///
71/// Verifying only the immediate parent is sufficient since the parent itself must have
72/// been notarized by consensus, which guarantees it was verified and accepted by a quorum.
73/// This means the entire ancestry chain back to genesis is transitively validated.
74///
75/// Applications do not need to re-implement these checks in their own verification logic.
76#[derive(Clone)]
77pub struct Marshaled<E, S, A, B, ES>
78where
79    E: Rng + Spawner + Metrics + Clock,
80    S: Scheme,
81    A: Application<E>,
82    B: Block,
83    ES: Epocher,
84{
85    context: E,
86    application: A,
87    marshal: marshal::Mailbox<S, B>,
88    epocher: ES,
89    last_built: Arc<Mutex<Option<(Round, B)>>>,
90
91    build_duration: Gauge,
92}
93
94impl<E, S, A, B, ES> Marshaled<E, S, A, B, ES>
95where
96    E: Rng + Spawner + Metrics + Clock,
97    S: Scheme,
98    A: Application<E, Block = B, Context = Context<B::Commitment, S::PublicKey>>,
99    B: Block,
100    ES: Epocher,
101{
102    /// Creates a new [Marshaled] wrapper.
103    pub fn new(context: E, application: A, marshal: marshal::Mailbox<S, B>, epocher: ES) -> Self {
104        let build_duration = Gauge::default();
105        context.register(
106            "build_duration",
107            "Time taken for the application to build a new block, in milliseconds",
108            build_duration.clone(),
109        );
110
111        Self {
112            context,
113            application,
114            marshal,
115            epocher,
116            last_built: Arc::new(Mutex::new(None)),
117
118            build_duration,
119        }
120    }
121}
122
123impl<E, S, A, B, ES> Automaton for Marshaled<E, S, A, B, ES>
124where
125    E: Rng + Spawner + Metrics + Clock,
126    S: Scheme,
127    A: VerifyingApplication<
128        E,
129        Block = B,
130        SigningScheme = S,
131        Context = Context<B::Commitment, S::PublicKey>,
132    >,
133    B: Block,
134    ES: Epocher,
135{
136    type Digest = B::Commitment;
137    type Context = Context<Self::Digest, S::PublicKey>;
138
139    /// Returns the genesis commitment for a given epoch.
140    ///
141    /// For epoch 0, this returns the application's genesis block commitment. For subsequent
142    /// epochs, it returns the commitment of the last block from the previous epoch, which
143    /// serves as the genesis block for the new epoch.
144    ///
145    /// # Panics
146    ///
147    /// Panics if a non-zero epoch is requested but the previous epoch's final block is not
148    /// available in storage. This indicates a critical error in the consensus engine startup
149    /// sequence, as engines must always have the genesis block before starting.
150    async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
151        if epoch.is_zero() {
152            return self.application.genesis().await.commitment();
153        }
154
155        let prev = epoch.previous().expect("checked to be non-zero above");
156        let last_height = self
157            .epocher
158            .last(prev)
159            .expect("previous epoch should exist");
160        let Some(block) = self.marshal.get_block(last_height).await else {
161            // A new consensus engine will never be started without having the genesis block
162            // of the new epoch (the last block of the previous epoch) already stored.
163            unreachable!("missing starting epoch block at height {}", last_height);
164        };
165        block.commitment()
166    }
167
168    /// Proposes a new block or re-proposes the epoch boundary block.
169    ///
170    /// This method builds a new block from the underlying application unless the parent block
171    /// is the last block in the current epoch. When at an epoch boundary, it re-proposes the
172    /// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
173    ///
174    /// The proposal operation is spawned in a background task and returns a receiver that will
175    /// contain the proposed block's commitment when ready. The built block is cached for later
176    /// broadcasting.
177    async fn propose(
178        &mut self,
179        consensus_context: Context<Self::Digest, S::PublicKey>,
180    ) -> oneshot::Receiver<Self::Digest> {
181        let mut marshal = self.marshal.clone();
182        let mut application = self.application.clone();
183        let last_built = self.last_built.clone();
184        let epocher = self.epocher.clone();
185
186        // Metrics
187        let build_duration = self.build_duration.clone();
188
189        let (mut tx, rx) = oneshot::channel();
190        self.context
191            .with_label("propose")
192            .spawn(move |runtime_context| async move {
193                // Create a future for tracking if the receiver is dropped, which could allow
194                // us to cancel work early.
195                let tx_closed = tx.closed();
196                pin_mut!(tx_closed);
197
198                let (parent_view, parent_commitment) = consensus_context.parent;
199                let parent_request = fetch_parent(
200                    parent_commitment,
201                    Some(Round::new(consensus_context.epoch(), parent_view)),
202                    &mut application,
203                    &mut marshal,
204                )
205                .await;
206                pin_mut!(parent_request);
207
208                let parent = match select(parent_request, &mut tx_closed).await {
209                    Either::Left((Ok(parent), _)) => parent,
210                    Either::Left((Err(_), _)) => {
211                        debug!(
212                            ?parent_commitment,
213                            reason = "failed to fetch parent block",
214                            "skipping proposal"
215                        );
216                        return;
217                    }
218                    Either::Right(_) => {
219                        debug!(reason = "consensus dropped receiver", "skipping proposal");
220                        return;
221                    }
222                };
223
224                // Special case: If the parent block is the last block in the epoch,
225                // re-propose it as to not produce any blocks that will be cut out
226                // by the epoch transition.
227                let last_in_epoch = epocher
228                    .last(consensus_context.epoch())
229                    .expect("current epoch should exist");
230                if parent.height() == last_in_epoch {
231                    let digest = parent.commitment();
232                    {
233                        let mut lock = last_built.lock().await;
234                        *lock = Some((consensus_context.round, parent));
235                    }
236
237                    let result = tx.send(digest);
238                    debug!(
239                        round = ?consensus_context.round,
240                        ?digest,
241                        success = result.is_ok(),
242                        "re-proposed parent block at epoch boundary"
243                    );
244                    return;
245                }
246
247                let ancestor_stream = AncestorStream::new(marshal.clone(), [parent]);
248                let build_request = application.propose(
249                    (
250                        runtime_context.with_label("app_propose"),
251                        consensus_context.clone(),
252                    ),
253                    ancestor_stream,
254                );
255                pin_mut!(build_request);
256
257                let start = Instant::now();
258                let built_block = match select(build_request, &mut tx_closed).await {
259                    Either::Left((Some(block), _)) => block,
260                    Either::Left((None, _)) => {
261                        debug!(
262                            ?parent_commitment,
263                            reason = "block building failed",
264                            "skipping proposal"
265                        );
266                        return;
267                    }
268                    Either::Right(_) => {
269                        debug!(reason = "consensus dropped receiver", "skipping proposal");
270                        return;
271                    }
272                };
273                let _ = build_duration.try_set(start.elapsed().as_millis());
274
275                let digest = built_block.commitment();
276                {
277                    let mut lock = last_built.lock().await;
278                    *lock = Some((consensus_context.round, built_block));
279                }
280
281                let result = tx.send(digest);
282                debug!(
283                    round = ?consensus_context.round,
284                    ?digest,
285                    success = result.is_ok(),
286                    "proposed new block"
287                );
288            });
289        rx
290    }
291
292    /// Verifies a proposed block within epoch boundaries.
293    ///
294    /// This method validates that:
295    /// 1. The block is within the current epoch (unless it's a boundary block re-proposal)
296    /// 2. Re-proposals are only allowed for the last block in an epoch
297    /// 3. The block's parent commitment matches the consensus context's expected parent
298    /// 4. The block's height is exactly one greater than the parent's height
299    /// 5. The underlying application's verification logic passes
300    ///
301    /// Verification is spawned in a background task and returns a receiver that will contain
302    /// the verification result. Valid blocks are reported to the marshal as verified.
303    async fn verify(
304        &mut self,
305        context: Context<Self::Digest, S::PublicKey>,
306        digest: Self::Digest,
307    ) -> oneshot::Receiver<bool> {
308        let mut marshal = self.marshal.clone();
309        let mut application = self.application.clone();
310        let epocher = self.epocher.clone();
311
312        let (mut tx, rx) = oneshot::channel();
313        self.context
314            .with_label("verify")
315            .spawn(move |runtime_context| async move {
316                // Create a future for tracking if the receiver is dropped, which could allow
317                // us to cancel work early.
318                let tx_closed = tx.closed();
319                pin_mut!(tx_closed);
320
321                let (parent_view, parent_commitment) = context.parent;
322                let parent_request = fetch_parent(
323                    parent_commitment,
324                    Some(Round::new(context.epoch(), parent_view)),
325                    &mut application,
326                    &mut marshal,
327                )
328                .await;
329                let block_request = marshal.subscribe(None, digest).await;
330                let block_requests = try_join(parent_request, block_request);
331                pin_mut!(block_requests);
332
333                // If consensus drops the rceiver, we can stop work early.
334                let (parent, block) = match select(block_requests, &mut tx_closed).await {
335                    Either::Left((Ok((parent, block)), _)) => (parent, block),
336                    Either::Left((Err(_), _)) => {
337                        debug!(
338                            reason = "failed to fetch parent or block",
339                            "skipping verification"
340                        );
341                        return;
342                    }
343                    Either::Right(_) => {
344                        debug!(
345                            reason = "consensus dropped receiver",
346                            "skipping verification"
347                        );
348                        return;
349                    }
350                };
351
352                // You can only re-propose the same block if it's the last height in the epoch.
353                if parent.commitment() == block.commitment() {
354                    let last_in_epoch = epocher
355                        .last(context.epoch())
356                        .expect("current epoch should exist");
357                    if block.height() == last_in_epoch {
358                        marshal.verified(context.round, block).await;
359                        let _ = tx.send(true);
360                    } else {
361                        let _ = tx.send(false);
362                    }
363                    return;
364                }
365
366                // Blocks are invalid if they are not within the current epoch and they aren't
367                // a re-proposal of the boundary block.
368                let Some(block_bounds) = epocher.containing(block.height()) else {
369                    debug!(
370                        height = block.height(),
371                        "block height not covered by epoch strategy"
372                    );
373                    let _ = tx.send(false);
374                    return;
375                };
376                if block_bounds.epoch() != context.epoch() {
377                    let _ = tx.send(false);
378                    return;
379                }
380
381                // Validate that the block's parent commitment matches what consensus expects.
382                if block.parent() != parent.commitment() {
383                    debug!(
384                        block_parent = %block.parent(),
385                        expected_parent = %parent.commitment(),
386                        "block parent commitment does not match expected parent"
387                    );
388                    let _ = tx.send(false);
389                    return;
390                }
391
392                // Validate that heights are contiguous.
393                if parent.height().checked_add(1) != Some(block.height()) {
394                    debug!(
395                        parent_height = parent.height(),
396                        block_height = block.height(),
397                        "block height is not contiguous with parent height"
398                    );
399                    let _ = tx.send(false);
400                    return;
401                }
402
403                let ancestry_stream = AncestorStream::new(marshal.clone(), [block.clone(), parent]);
404                let validity_request = application.verify(
405                    (runtime_context.with_label("app_verify"), context.clone()),
406                    ancestry_stream,
407                );
408                pin_mut!(validity_request);
409
410                // If consensus drops the rceiver, we can stop work early.
411                let application_valid = match select(validity_request, &mut tx_closed).await {
412                    Either::Left((is_valid, _)) => is_valid,
413                    Either::Right(_) => {
414                        debug!(
415                            reason = "consensus dropped receiver",
416                            "skipping verification"
417                        );
418                        return;
419                    }
420                };
421
422                if application_valid {
423                    marshal.verified(context.round, block).await;
424                }
425                let _ = tx.send(application_valid);
426            });
427        rx
428    }
429}
430
431impl<E, S, A, B, ES> CertifiableAutomaton for Marshaled<E, S, A, B, ES>
432where
433    E: Rng + Spawner + Metrics + Clock,
434    S: Scheme,
435    A: VerifyingApplication<
436        E,
437        Block = B,
438        SigningScheme = S,
439        Context = Context<B::Commitment, S::PublicKey>,
440    >,
441    B: Block,
442    ES: Epocher,
443{
444    // Uses default certify implementation which always returns true
445}
446
447impl<E, S, A, B, ES> Relay for Marshaled<E, S, A, B, ES>
448where
449    E: Rng + Spawner + Metrics + Clock,
450    S: Scheme,
451    A: Application<E, Block = B, Context = Context<B::Commitment, S::PublicKey>>,
452    B: Block,
453    ES: Epocher,
454{
455    type Digest = B::Commitment;
456
457    /// Broadcasts a previously built block to the network.
458    ///
459    /// This uses the cached block from the last proposal operation. If no block was built or
460    /// the commitment does not match the cached block, the broadcast is skipped with a warning.
461    async fn broadcast(&mut self, commitment: Self::Digest) {
462        let Some((round, block)) = self.last_built.lock().await.clone() else {
463            warn!("missing block to broadcast");
464            return;
465        };
466
467        if block.commitment() != commitment {
468            warn!(
469                round = %round,
470                commitment = %block.commitment(),
471                height = block.height(),
472                "skipping requested broadcast of block with mismatched commitment"
473            );
474            return;
475        }
476
477        debug!(
478            round = %round,
479            commitment = %block.commitment(),
480            height = block.height(),
481            "requested broadcast of built block"
482        );
483        self.marshal.proposed(round, block).await;
484    }
485}
486
487impl<E, S, A, B, ES> Reporter for Marshaled<E, S, A, B, ES>
488where
489    E: Rng + Spawner + Metrics + Clock,
490    S: Scheme,
491    A: Application<E, Block = B, Context = Context<B::Commitment, S::PublicKey>>
492        + Reporter<Activity = Update<B>>,
493    B: Block,
494    ES: Epocher,
495{
496    type Activity = A::Activity;
497
498    /// Relays a report to the underlying [Application].
499    async fn report(&mut self, update: Self::Activity) {
500        self.application.report(update).await
501    }
502}
503
504/// Fetches the parent block given its commitment and optional round.
505///
506/// This is a helper function used during proposal and verification to retrieve the parent
507/// block. If the parent commitment matches the genesis block, it returns the genesis block
508/// directly without querying the marshal. Otherwise, it subscribes to the marshal to await
509/// the parent block's availability.
510///
511/// Returns an error if the marshal subscription is cancelled.
512#[inline]
513async fn fetch_parent<E, S, A, B>(
514    parent_commitment: B::Commitment,
515    parent_round: Option<Round>,
516    application: &mut A,
517    marshal: &mut marshal::Mailbox<S, B>,
518) -> Either<Ready<Result<B, Canceled>>, oneshot::Receiver<B>>
519where
520    E: Rng + Spawner + Metrics + Clock,
521    S: Scheme,
522    A: Application<E, Block = B, Context = Context<B::Commitment, S::PublicKey>>,
523    B: Block,
524{
525    let genesis = application.genesis().await;
526    if parent_commitment == genesis.commitment() {
527        Either::Left(futures::future::ready(Ok(genesis)))
528    } else {
529        Either::Right(marshal.subscribe(parent_round, parent_commitment).await)
530    }
531}