commonware_consensus/application/marshaled.rs
1//! Wrapper for consensus applications that handles epochs and block dissemination.
2//!
3//! # Overview
4//!
5//! [Marshaled] is an adapter that wraps any [VerifyingApplication] implementation to handle
6//! epoch transitions automatically. It intercepts consensus operations (propose, verify) and
7//! ensures blocks are only produced within valid epoch boundaries.
8//!
9//! # Epoch Boundaries
10//!
11//! An epoch is a fixed number of blocks (the `epoch_length`). When the last block in an epoch
12//! is reached, this wrapper prevents new blocks from being built & proposed until the next epoch begins.
13//! Instead, it re-proposes the boundary block to avoid producing blocks that would be pruned
14//! by the epoch transition.
15//!
16//! # Usage
17//!
18//! Wrap your application implementation with [Marshaled::new] and provide it to your
19//! consensus engine for the [Automaton] and [Relay]. The wrapper handles all epoch logic transparently.
20//!
21//! ```rust,ignore
22//! let application = Marshaled::new(
23//! context,
24//! my_application,
25//! marshal_mailbox,
26//! BLOCKS_PER_EPOCH,
27//! );
28//! ```
29//!
30//! # Implementation Notes
31//!
32//! - Genesis blocks are handled specially: epoch 0 returns the application's genesis block,
33//! while subsequent epochs use the last block of the previous epoch as genesis
34//! - Blocks are automatically verified to be within the current epoch
35
36use crate::{
37 marshal::{self, ingress::mailbox::AncestorStream, Update},
38 simplex::types::Context,
39 types::{Epoch, Epocher, Round},
40 Application, Automaton, Block, CertifiableAutomaton, Epochable, Relay, Reporter,
41 VerifyingApplication,
42};
43use commonware_cryptography::certificate::Scheme;
44use commonware_runtime::{telemetry::metrics::status::GaugeExt, Clock, Metrics, Spawner};
45use commonware_utils::futures::ClosedExt;
46use futures::{
47 channel::oneshot::{self, Canceled},
48 future::{select, try_join, Either, Ready},
49 lock::Mutex,
50 pin_mut,
51};
52use prometheus_client::metrics::gauge::Gauge;
53use rand::Rng;
54use std::{sync::Arc, time::Instant};
55use tracing::{debug, warn};
56
57/// An [Application] adapter that handles epoch transitions and validates block ancestry.
58///
59/// This wrapper intercepts consensus operations to enforce epoch boundaries and validate
60/// block ancestry. It prevents blocks from being produced outside their valid epoch,
61/// handles the special case of re-proposing boundary blocks during epoch transitions,
62/// and ensures all blocks have valid parent linkage and contiguous heights.
63///
64/// # Ancestry Validation
65///
66/// Applications wrapped by [Marshaled] can rely on the following ancestry checks being
67/// performed automatically during verification:
68/// - Parent commitment matches the consensus context's expected parent
69/// - Block height is exactly one greater than the parent's height
70///
71/// Verifying only the immediate parent is sufficient since the parent itself must have
72/// been notarized by consensus, which guarantees it was verified and accepted by a quorum.
73/// This means the entire ancestry chain back to genesis is transitively validated.
74///
75/// Applications do not need to re-implement these checks in their own verification logic.
76#[derive(Clone)]
77pub struct Marshaled<E, S, A, B, ES>
78where
79 E: Rng + Spawner + Metrics + Clock,
80 S: Scheme,
81 A: Application<E>,
82 B: Block,
83 ES: Epocher,
84{
85 context: E,
86 application: A,
87 marshal: marshal::Mailbox<S, B>,
88 epocher: ES,
89 last_built: Arc<Mutex<Option<(Round, B)>>>,
90
91 build_duration: Gauge,
92}
93
94impl<E, S, A, B, ES> Marshaled<E, S, A, B, ES>
95where
96 E: Rng + Spawner + Metrics + Clock,
97 S: Scheme,
98 A: Application<E, Block = B, Context = Context<B::Commitment, S::PublicKey>>,
99 B: Block,
100 ES: Epocher,
101{
102 /// Creates a new [Marshaled] wrapper.
103 pub fn new(context: E, application: A, marshal: marshal::Mailbox<S, B>, epocher: ES) -> Self {
104 let build_duration = Gauge::default();
105 context.register(
106 "build_duration",
107 "Time taken for the application to build a new block, in milliseconds",
108 build_duration.clone(),
109 );
110
111 Self {
112 context,
113 application,
114 marshal,
115 epocher,
116 last_built: Arc::new(Mutex::new(None)),
117
118 build_duration,
119 }
120 }
121}
122
123impl<E, S, A, B, ES> Automaton for Marshaled<E, S, A, B, ES>
124where
125 E: Rng + Spawner + Metrics + Clock,
126 S: Scheme,
127 A: VerifyingApplication<
128 E,
129 Block = B,
130 SigningScheme = S,
131 Context = Context<B::Commitment, S::PublicKey>,
132 >,
133 B: Block,
134 ES: Epocher,
135{
136 type Digest = B::Commitment;
137 type Context = Context<Self::Digest, S::PublicKey>;
138
139 /// Returns the genesis commitment for a given epoch.
140 ///
141 /// For epoch 0, this returns the application's genesis block commitment. For subsequent
142 /// epochs, it returns the commitment of the last block from the previous epoch, which
143 /// serves as the genesis block for the new epoch.
144 ///
145 /// # Panics
146 ///
147 /// Panics if a non-zero epoch is requested but the previous epoch's final block is not
148 /// available in storage. This indicates a critical error in the consensus engine startup
149 /// sequence, as engines must always have the genesis block before starting.
150 async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
151 if epoch.is_zero() {
152 return self.application.genesis().await.commitment();
153 }
154
155 let prev = epoch.previous().expect("checked to be non-zero above");
156 let last_height = self
157 .epocher
158 .last(prev)
159 .expect("previous epoch should exist");
160 let Some(block) = self.marshal.get_block(last_height).await else {
161 // A new consensus engine will never be started without having the genesis block
162 // of the new epoch (the last block of the previous epoch) already stored.
163 unreachable!("missing starting epoch block at height {}", last_height);
164 };
165 block.commitment()
166 }
167
168 /// Proposes a new block or re-proposes the epoch boundary block.
169 ///
170 /// This method builds a new block from the underlying application unless the parent block
171 /// is the last block in the current epoch. When at an epoch boundary, it re-proposes the
172 /// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
173 ///
174 /// The proposal operation is spawned in a background task and returns a receiver that will
175 /// contain the proposed block's commitment when ready. The built block is cached for later
176 /// broadcasting.
177 async fn propose(
178 &mut self,
179 consensus_context: Context<Self::Digest, S::PublicKey>,
180 ) -> oneshot::Receiver<Self::Digest> {
181 let mut marshal = self.marshal.clone();
182 let mut application = self.application.clone();
183 let last_built = self.last_built.clone();
184 let epocher = self.epocher.clone();
185
186 // Metrics
187 let build_duration = self.build_duration.clone();
188
189 let (mut tx, rx) = oneshot::channel();
190 self.context
191 .with_label("propose")
192 .spawn(move |runtime_context| async move {
193 // Create a future for tracking if the receiver is dropped, which could allow
194 // us to cancel work early.
195 let tx_closed = tx.closed();
196 pin_mut!(tx_closed);
197
198 let (parent_view, parent_commitment) = consensus_context.parent;
199 let parent_request = fetch_parent(
200 parent_commitment,
201 Some(Round::new(consensus_context.epoch(), parent_view)),
202 &mut application,
203 &mut marshal,
204 )
205 .await;
206 pin_mut!(parent_request);
207
208 let parent = match select(parent_request, &mut tx_closed).await {
209 Either::Left((Ok(parent), _)) => parent,
210 Either::Left((Err(_), _)) => {
211 debug!(
212 ?parent_commitment,
213 reason = "failed to fetch parent block",
214 "skipping proposal"
215 );
216 return;
217 }
218 Either::Right(_) => {
219 debug!(reason = "consensus dropped receiver", "skipping proposal");
220 return;
221 }
222 };
223
224 // Special case: If the parent block is the last block in the epoch,
225 // re-propose it as to not produce any blocks that will be cut out
226 // by the epoch transition.
227 let last_in_epoch = epocher
228 .last(consensus_context.epoch())
229 .expect("current epoch should exist");
230 if parent.height() == last_in_epoch {
231 let digest = parent.commitment();
232 {
233 let mut lock = last_built.lock().await;
234 *lock = Some((consensus_context.round, parent));
235 }
236
237 let result = tx.send(digest);
238 debug!(
239 round = ?consensus_context.round,
240 ?digest,
241 success = result.is_ok(),
242 "re-proposed parent block at epoch boundary"
243 );
244 return;
245 }
246
247 let ancestor_stream = AncestorStream::new(marshal.clone(), [parent]);
248 let build_request = application.propose(
249 (
250 runtime_context.with_label("app_propose"),
251 consensus_context.clone(),
252 ),
253 ancestor_stream,
254 );
255 pin_mut!(build_request);
256
257 let start = Instant::now();
258 let built_block = match select(build_request, &mut tx_closed).await {
259 Either::Left((Some(block), _)) => block,
260 Either::Left((None, _)) => {
261 debug!(
262 ?parent_commitment,
263 reason = "block building failed",
264 "skipping proposal"
265 );
266 return;
267 }
268 Either::Right(_) => {
269 debug!(reason = "consensus dropped receiver", "skipping proposal");
270 return;
271 }
272 };
273 let _ = build_duration.try_set(start.elapsed().as_millis());
274
275 let digest = built_block.commitment();
276 {
277 let mut lock = last_built.lock().await;
278 *lock = Some((consensus_context.round, built_block));
279 }
280
281 let result = tx.send(digest);
282 debug!(
283 round = ?consensus_context.round,
284 ?digest,
285 success = result.is_ok(),
286 "proposed new block"
287 );
288 });
289 rx
290 }
291
292 /// Verifies a proposed block within epoch boundaries.
293 ///
294 /// This method validates that:
295 /// 1. The block is within the current epoch (unless it's a boundary block re-proposal)
296 /// 2. Re-proposals are only allowed for the last block in an epoch
297 /// 3. The block's parent commitment matches the consensus context's expected parent
298 /// 4. The block's height is exactly one greater than the parent's height
299 /// 5. The underlying application's verification logic passes
300 ///
301 /// Verification is spawned in a background task and returns a receiver that will contain
302 /// the verification result. Valid blocks are reported to the marshal as verified.
303 async fn verify(
304 &mut self,
305 context: Context<Self::Digest, S::PublicKey>,
306 digest: Self::Digest,
307 ) -> oneshot::Receiver<bool> {
308 let mut marshal = self.marshal.clone();
309 let mut application = self.application.clone();
310 let epocher = self.epocher.clone();
311
312 let (mut tx, rx) = oneshot::channel();
313 self.context
314 .with_label("verify")
315 .spawn(move |runtime_context| async move {
316 // Create a future for tracking if the receiver is dropped, which could allow
317 // us to cancel work early.
318 let tx_closed = tx.closed();
319 pin_mut!(tx_closed);
320
321 let (parent_view, parent_commitment) = context.parent;
322 let parent_request = fetch_parent(
323 parent_commitment,
324 Some(Round::new(context.epoch(), parent_view)),
325 &mut application,
326 &mut marshal,
327 )
328 .await;
329 let block_request = marshal.subscribe(None, digest).await;
330 let block_requests = try_join(parent_request, block_request);
331 pin_mut!(block_requests);
332
333 // If consensus drops the rceiver, we can stop work early.
334 let (parent, block) = match select(block_requests, &mut tx_closed).await {
335 Either::Left((Ok((parent, block)), _)) => (parent, block),
336 Either::Left((Err(_), _)) => {
337 debug!(
338 reason = "failed to fetch parent or block",
339 "skipping verification"
340 );
341 return;
342 }
343 Either::Right(_) => {
344 debug!(
345 reason = "consensus dropped receiver",
346 "skipping verification"
347 );
348 return;
349 }
350 };
351
352 // You can only re-propose the same block if it's the last height in the epoch.
353 if parent.commitment() == block.commitment() {
354 let last_in_epoch = epocher
355 .last(context.epoch())
356 .expect("current epoch should exist");
357 if block.height() == last_in_epoch {
358 marshal.verified(context.round, block).await;
359 let _ = tx.send(true);
360 } else {
361 let _ = tx.send(false);
362 }
363 return;
364 }
365
366 // Blocks are invalid if they are not within the current epoch and they aren't
367 // a re-proposal of the boundary block.
368 let Some(block_bounds) = epocher.containing(block.height()) else {
369 debug!(
370 height = block.height(),
371 "block height not covered by epoch strategy"
372 );
373 let _ = tx.send(false);
374 return;
375 };
376 if block_bounds.epoch() != context.epoch() {
377 let _ = tx.send(false);
378 return;
379 }
380
381 // Validate that the block's parent commitment matches what consensus expects.
382 if block.parent() != parent.commitment() {
383 debug!(
384 block_parent = %block.parent(),
385 expected_parent = %parent.commitment(),
386 "block parent commitment does not match expected parent"
387 );
388 let _ = tx.send(false);
389 return;
390 }
391
392 // Validate that heights are contiguous.
393 if parent.height().checked_add(1) != Some(block.height()) {
394 debug!(
395 parent_height = parent.height(),
396 block_height = block.height(),
397 "block height is not contiguous with parent height"
398 );
399 let _ = tx.send(false);
400 return;
401 }
402
403 let ancestry_stream = AncestorStream::new(marshal.clone(), [block.clone(), parent]);
404 let validity_request = application.verify(
405 (runtime_context.with_label("app_verify"), context.clone()),
406 ancestry_stream,
407 );
408 pin_mut!(validity_request);
409
410 // If consensus drops the rceiver, we can stop work early.
411 let application_valid = match select(validity_request, &mut tx_closed).await {
412 Either::Left((is_valid, _)) => is_valid,
413 Either::Right(_) => {
414 debug!(
415 reason = "consensus dropped receiver",
416 "skipping verification"
417 );
418 return;
419 }
420 };
421
422 if application_valid {
423 marshal.verified(context.round, block).await;
424 }
425 let _ = tx.send(application_valid);
426 });
427 rx
428 }
429}
430
431impl<E, S, A, B, ES> CertifiableAutomaton for Marshaled<E, S, A, B, ES>
432where
433 E: Rng + Spawner + Metrics + Clock,
434 S: Scheme,
435 A: VerifyingApplication<
436 E,
437 Block = B,
438 SigningScheme = S,
439 Context = Context<B::Commitment, S::PublicKey>,
440 >,
441 B: Block,
442 ES: Epocher,
443{
444 // Uses default certify implementation which always returns true
445}
446
447impl<E, S, A, B, ES> Relay for Marshaled<E, S, A, B, ES>
448where
449 E: Rng + Spawner + Metrics + Clock,
450 S: Scheme,
451 A: Application<E, Block = B, Context = Context<B::Commitment, S::PublicKey>>,
452 B: Block,
453 ES: Epocher,
454{
455 type Digest = B::Commitment;
456
457 /// Broadcasts a previously built block to the network.
458 ///
459 /// This uses the cached block from the last proposal operation. If no block was built or
460 /// the commitment does not match the cached block, the broadcast is skipped with a warning.
461 async fn broadcast(&mut self, commitment: Self::Digest) {
462 let Some((round, block)) = self.last_built.lock().await.clone() else {
463 warn!("missing block to broadcast");
464 return;
465 };
466
467 if block.commitment() != commitment {
468 warn!(
469 round = %round,
470 commitment = %block.commitment(),
471 height = block.height(),
472 "skipping requested broadcast of block with mismatched commitment"
473 );
474 return;
475 }
476
477 debug!(
478 round = %round,
479 commitment = %block.commitment(),
480 height = block.height(),
481 "requested broadcast of built block"
482 );
483 self.marshal.proposed(round, block).await;
484 }
485}
486
487impl<E, S, A, B, ES> Reporter for Marshaled<E, S, A, B, ES>
488where
489 E: Rng + Spawner + Metrics + Clock,
490 S: Scheme,
491 A: Application<E, Block = B, Context = Context<B::Commitment, S::PublicKey>>
492 + Reporter<Activity = Update<B>>,
493 B: Block,
494 ES: Epocher,
495{
496 type Activity = A::Activity;
497
498 /// Relays a report to the underlying [Application].
499 async fn report(&mut self, update: Self::Activity) {
500 self.application.report(update).await
501 }
502}
503
504/// Fetches the parent block given its commitment and optional round.
505///
506/// This is a helper function used during proposal and verification to retrieve the parent
507/// block. If the parent commitment matches the genesis block, it returns the genesis block
508/// directly without querying the marshal. Otherwise, it subscribes to the marshal to await
509/// the parent block's availability.
510///
511/// Returns an error if the marshal subscription is cancelled.
512#[inline]
513async fn fetch_parent<E, S, A, B>(
514 parent_commitment: B::Commitment,
515 parent_round: Option<Round>,
516 application: &mut A,
517 marshal: &mut marshal::Mailbox<S, B>,
518) -> Either<Ready<Result<B, Canceled>>, oneshot::Receiver<B>>
519where
520 E: Rng + Spawner + Metrics + Clock,
521 S: Scheme,
522 A: Application<E, Block = B, Context = Context<B::Commitment, S::PublicKey>>,
523 B: Block,
524{
525 let genesis = application.genesis().await;
526 if parent_commitment == genesis.commitment() {
527 Either::Left(futures::future::ready(Ok(genesis)))
528 } else {
529 Either::Right(marshal.subscribe(parent_round, parent_commitment).await)
530 }
531}