Skip to main content

commonware_consensus/marshal/standard/
inline.rs

1//! Wrapper for standard marshal with inline verification.
2//!
3//! # Overview
4//!
5//! [`Inline`] adapts any [`VerifyingApplication`] to the marshal/consensus interfaces
6//! while keeping block validation in the [`Automaton::verify`] path. Unlike
7//! [`super::Deferred`], it does not defer application verification to certification.
8//! Instead, it only reports `true` from `verify` after parent/height checks and
9//! application verification complete.
10//!
11//! # Epoch Boundaries
12//!
13//! As with [`super::Deferred`], when the parent is the last block of the epoch,
14//! [`Inline`] re-proposes that boundary block instead of building a new block.
15//! This prevents proposing blocks that would be excluded by epoch transition.
16//!
17//! # Verification Model
18//!
19//! Inline mode intentionally avoids relying on embedded block context. This allows
20//! usage with block types that implement [`crate::Block`] but not
21//! [`crate::CertifiableBlock`].
22//!
23//! Because verification is completed inline, the default
24//! [`CertifiableAutomaton::certify`] behavior (always `true`) is sufficient: no
25//! additional deferred verification state must be awaited at certify time.
26//!
27//! # Usage
28//!
29//! ```rust,ignore
30//! let application = Inline::new(
31//!     context,
32//!     my_application,
33//!     marshal_mailbox,
34//!     epocher,
35//! );
36//! ```
37//!
38//! # When to Use
39//!
40//! Prefer this wrapper when:
41//! - Your application block type is not certifiable.
42//! - You prefer simpler verification semantics over deferred verification latency hiding.
43//! - You are willing to perform full application verification before casting a notarize vote.
44
45use crate::{
46    marshal::{
47        ancestry::AncestorStream,
48        application::validation::LastBuilt,
49        core::Mailbox,
50        standard::{
51            validation::{
52                fetch_parent, precheck_epoch_and_reproposal, verify_with_parent, Decision,
53            },
54            Standard,
55        },
56        Update,
57    },
58    simplex::types::Context,
59    types::{Epoch, Epocher, Round},
60    Application, Automaton, Block, CertifiableAutomaton, Epochable, Relay, Reporter,
61    VerifyingApplication,
62};
63use commonware_cryptography::certificate::Scheme;
64use commonware_macros::select;
65use commonware_runtime::{
66    telemetry::metrics::histogram::{Buckets, Timed},
67    Clock, Metrics, Spawner,
68};
69use commonware_utils::{
70    channel::{fallible::OneshotExt, oneshot},
71    sync::Mutex,
72};
73use prometheus_client::metrics::histogram::Histogram;
74use rand::Rng;
75use std::sync::Arc;
76use tracing::{debug, warn};
77
78/// Standard marshal wrapper that verifies blocks inline in `verify`.
79///
80/// # Ancestry Validation
81///
82/// [`Inline`] always validates immediate ancestry before invoking application
83/// verification:
84/// - Parent digest matches consensus context's expected parent
85/// - Child height is exactly parent height plus one
86///
87/// This is sufficient because the parent must have already been accepted by consensus.
88///
89/// # Certifiability
90///
91/// This wrapper requires only [`crate::Block`] for `B`, not
92/// [`crate::CertifiableBlock`]. It is designed for applications that cannot
93/// recover consensus context directly from block payloads.
94#[derive(Clone)]
95pub struct Inline<E, S, A, B, ES>
96where
97    E: Rng + Spawner + Metrics + Clock,
98    S: Scheme,
99    A: Application<E>,
100    B: Block + Clone,
101    ES: Epocher,
102{
103    context: E,
104    application: A,
105    marshal: Mailbox<S, Standard<B>>,
106    epocher: ES,
107    last_built: LastBuilt<B>,
108
109    build_duration: Timed<E>,
110}
111
112impl<E, S, A, B, ES> Inline<E, S, A, B, ES>
113where
114    E: Rng + Spawner + Metrics + Clock,
115    S: Scheme,
116    A: VerifyingApplication<
117        E,
118        Block = B,
119        SigningScheme = S,
120        Context = Context<B::Digest, S::PublicKey>,
121    >,
122    B: Block + Clone,
123    ES: Epocher,
124{
125    /// Creates a new inline-verification wrapper.
126    ///
127    /// Registers a `build_duration` histogram for proposal latency and initializes
128    /// the shared "last built block" cache used by [`Relay::broadcast`].
129    pub fn new(context: E, application: A, marshal: Mailbox<S, Standard<B>>, epocher: ES) -> Self {
130        let build_histogram = Histogram::new(Buckets::LOCAL);
131        context.register(
132            "build_duration",
133            "Histogram of time taken for the application to build a new block, in seconds",
134            build_histogram.clone(),
135        );
136        let build_duration = Timed::new(build_histogram, Arc::new(context.clone()));
137
138        Self {
139            context,
140            application,
141            marshal,
142            epocher,
143            last_built: Arc::new(Mutex::new(None)),
144            build_duration,
145        }
146    }
147}
148
149impl<E, S, A, B, ES> Automaton for Inline<E, S, A, B, ES>
150where
151    E: Rng + Spawner + Metrics + Clock,
152    S: Scheme,
153    A: VerifyingApplication<
154        E,
155        Block = B,
156        SigningScheme = S,
157        Context = Context<B::Digest, S::PublicKey>,
158    >,
159    B: Block + Clone,
160    ES: Epocher,
161{
162    type Digest = B::Digest;
163    type Context = Context<Self::Digest, S::PublicKey>;
164
165    /// Returns the genesis digest for `epoch`.
166    ///
167    /// For epoch zero, returns the application genesis digest. For later epochs,
168    /// uses the previous epoch's terminal block from marshal storage.
169    async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
170        if epoch.is_zero() {
171            return self.application.genesis().await.digest();
172        }
173
174        let prev = epoch.previous().expect("checked to be non-zero above");
175        let last_height = self
176            .epocher
177            .last(prev)
178            .expect("previous epoch should exist");
179        let Some(block) = self.marshal.get_block(last_height).await else {
180            unreachable!("missing starting epoch block at height {}", last_height);
181        };
182        block.digest()
183    }
184
185    /// Proposes a new block or re-proposes an epoch boundary block.
186    ///
187    /// Proposal runs in a spawned task and returns a receiver for the resulting digest.
188    /// Built/re-proposed blocks are cached in `last_built` so relay can broadcast
189    /// exactly what was proposed.
190    async fn propose(
191        &mut self,
192        consensus_context: Context<Self::Digest, S::PublicKey>,
193    ) -> oneshot::Receiver<Self::Digest> {
194        let mut marshal = self.marshal.clone();
195        let mut application = self.application.clone();
196        let last_built = self.last_built.clone();
197        let epocher = self.epocher.clone();
198        let build_duration = self.build_duration.clone();
199
200        let (mut tx, rx) = oneshot::channel();
201        self.context
202            .with_label("propose")
203            .with_attribute("round", consensus_context.round)
204            .spawn(move |runtime_context| async move {
205                let (parent_view, parent_digest) = consensus_context.parent;
206                let parent_request = fetch_parent(
207                    parent_digest,
208                    // We are guaranteed that the parent round for any `consensus_context` is
209                    // in the same epoch (recall, the boundary block of the previous epoch
210                    // is the genesis block of the current epoch).
211                    Some(Round::new(consensus_context.epoch(), parent_view)),
212                    &mut application,
213                    &mut marshal,
214                )
215                .await;
216
217                let parent = select! {
218                    _ = tx.closed() => {
219                        debug!(reason = "consensus dropped receiver", "skipping proposal");
220                        return;
221                    },
222                    result = parent_request => match result {
223                        Ok(parent) => parent,
224                        Err(_) => {
225                            debug!(
226                                ?parent_digest,
227                                reason = "failed to fetch parent block",
228                                "skipping proposal"
229                            );
230                            return;
231                        }
232                    },
233                };
234
235                // At epoch boundary, re-propose the parent block.
236                let last_in_epoch = epocher
237                    .last(consensus_context.epoch())
238                    .expect("current epoch should exist");
239                if parent.height() == last_in_epoch {
240                    let digest = parent.digest();
241                    {
242                        let mut lock = last_built.lock();
243                        *lock = Some((consensus_context.round, parent));
244                    }
245
246                    let success = tx.send_lossy(digest);
247                    debug!(
248                        round = ?consensus_context.round,
249                        ?digest,
250                        success,
251                        "re-proposed parent block at epoch boundary"
252                    );
253                    return;
254                }
255
256                let ancestor_stream = AncestorStream::new(marshal.clone(), [parent]);
257                let build_request = application.propose(
258                    (
259                        runtime_context.with_label("app_propose"),
260                        consensus_context.clone(),
261                    ),
262                    ancestor_stream,
263                );
264
265                let mut build_timer = build_duration.timer();
266                let built_block = select! {
267                    _ = tx.closed() => {
268                        debug!(reason = "consensus dropped receiver", "skipping proposal");
269                        return;
270                    },
271                    result = build_request => match result {
272                        Some(block) => block,
273                        None => {
274                            debug!(
275                                ?parent_digest,
276                                reason = "block building failed",
277                                "skipping proposal"
278                            );
279                            return;
280                        }
281                    },
282                };
283                build_timer.observe();
284
285                let digest = built_block.digest();
286                {
287                    let mut lock = last_built.lock();
288                    *lock = Some((consensus_context.round, built_block));
289                }
290                let success = tx.send_lossy(digest);
291                debug!(
292                    round = ?consensus_context.round,
293                    ?digest,
294                    success,
295                    "proposed new block"
296                );
297            });
298        rx
299    }
300
301    /// Performs complete verification inline.
302    ///
303    /// This method:
304    /// 1. Fetches the block by digest
305    /// 2. Enforces epoch/re-proposal rules
306    /// 3. Fetches and validates the parent relationship
307    /// 4. Runs application verification over ancestry
308    ///
309    /// It reports `true` only after all verification steps finish. Successful
310    /// verification marks the block as verified in marshal immediately.
311    async fn verify(
312        &mut self,
313        context: Context<Self::Digest, S::PublicKey>,
314        digest: Self::Digest,
315    ) -> oneshot::Receiver<bool> {
316        let mut marshal = self.marshal.clone();
317        let mut application = self.application.clone();
318        let epocher = self.epocher.clone();
319
320        let (mut tx, rx) = oneshot::channel();
321        self.context
322            .with_label("inline_verify")
323            .with_attribute("round", context.round)
324            .spawn(move |runtime_context| async move {
325                let block_request = marshal
326                    .subscribe_by_digest(Some(context.round), digest)
327                    .await;
328                let block = select! {
329                    _ = tx.closed() => {
330                        debug!(
331                            reason = "consensus dropped receiver",
332                            "skipping verification"
333                        );
334                        return;
335                    },
336                    result = block_request => match result {
337                        Ok(block) => block,
338                        Err(_) => {
339                            debug!(
340                                ?digest,
341                                reason = "failed to fetch block for verification",
342                                "skipping verification"
343                            );
344                            return;
345                        }
346                    },
347                };
348
349                // Shared pre-checks:
350                // - Blocks are invalid if they are not in the expected epoch and are
351                //   not a valid boundary re-proposal.
352                // - Re-proposals are detected when `digest == context.parent.1`.
353                // - Re-proposals skip normal parent/height checks because:
354                //   1) the block was already verified when originally proposed
355                //   2) parent-child checks would fail by construction when parent == block
356                let block = match precheck_epoch_and_reproposal(
357                    &epocher,
358                    &mut marshal,
359                    &context,
360                    digest,
361                    block,
362                )
363                .await
364                {
365                    Decision::Complete(valid) => {
366                        // `Complete` means either an immediate reject or a valid
367                        // re-proposal accepted without further ancestry checks.
368                        tx.send_lossy(valid);
369                        return;
370                    }
371                    Decision::Continue(block) => block,
372                };
373
374                // Non-reproposal path: fetch expected parent, validate ancestry, then
375                // run application verification over the ancestry stream.
376                // The helper returns `None` when work should stop early (for example,
377                // receiver closed or parent unavailable).
378                let application_valid = match verify_with_parent(
379                    runtime_context,
380                    context,
381                    block,
382                    &mut application,
383                    &mut marshal,
384                    &mut tx,
385                )
386                .await
387                {
388                    Some(valid) => valid,
389                    None => return,
390                };
391                tx.send_lossy(application_valid);
392            });
393        rx
394    }
395}
396
397/// Inline mode relies on the default certification behavior.
398///
399/// Verification is completed during [`Automaton::verify`], so certify does not
400/// need additional wrapper-managed checks.
401impl<E, S, A, B, ES> CertifiableAutomaton for Inline<E, S, A, B, ES>
402where
403    E: Rng + Spawner + Metrics + Clock,
404    S: Scheme,
405    A: VerifyingApplication<
406        E,
407        Block = B,
408        SigningScheme = S,
409        Context = Context<B::Digest, S::PublicKey>,
410    >,
411    B: Block + Clone,
412    ES: Epocher,
413{
414}
415
416impl<E, S, A, B, ES> Relay for Inline<E, S, A, B, ES>
417where
418    E: Rng + Spawner + Metrics + Clock,
419    S: Scheme,
420    A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>,
421    B: Block + Clone,
422    ES: Epocher,
423{
424    type Digest = B::Digest;
425
426    /// Broadcasts the last proposed block, if it matches the requested digest.
427    async fn broadcast(&mut self, digest: Self::Digest) {
428        let Some((round, block)) = self.last_built.lock().take() else {
429            warn!("missing block to broadcast");
430            return;
431        };
432        if block.digest() != digest {
433            warn!(
434                round = %round,
435                digest = %block.digest(),
436                height = %block.height(),
437                "skipping requested broadcast of block with mismatched digest"
438            );
439            return;
440        }
441        self.marshal.proposed(round, block).await;
442    }
443}
444
445impl<E, S, A, B, ES> Reporter for Inline<E, S, A, B, ES>
446where
447    E: Rng + Spawner + Metrics + Clock,
448    S: Scheme,
449    A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>
450        + Reporter<Activity = Update<B>>,
451    B: Block + Clone,
452    ES: Epocher,
453{
454    type Activity = A::Activity;
455
456    /// Forwards consensus activity to the wrapped application reporter.
457    async fn report(&mut self, update: Self::Activity) {
458        self.application.report(update).await
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    use super::Inline;
465    use crate::{
466        simplex::types::Context, Automaton, Block, CertifiableAutomaton, Relay,
467        VerifyingApplication,
468    };
469    use commonware_cryptography::certificate::Scheme;
470    use commonware_runtime::{Clock, Metrics, Spawner};
471    use rand::Rng;
472
473    // Compile-time assertion only: inline standard wrapper must not require `CertifiableBlock`.
474    #[allow(dead_code)]
475    fn assert_non_certifiable_block_supported<E, S, A, B, ES>()
476    where
477        E: Rng + Spawner + Metrics + Clock,
478        S: Scheme,
479        A: VerifyingApplication<
480            E,
481            Block = B,
482            SigningScheme = S,
483            Context = Context<B::Digest, S::PublicKey>,
484        >,
485        B: Block + Clone,
486        ES: crate::types::Epocher,
487    {
488        fn assert_automaton<T: Automaton>() {}
489        fn assert_certifiable<T: CertifiableAutomaton>() {}
490        fn assert_relay<T: Relay>() {}
491
492        assert_automaton::<Inline<E, S, A, B, ES>>();
493        assert_certifiable::<Inline<E, S, A, B, ES>>();
494        assert_relay::<Inline<E, S, A, B, ES>>();
495    }
496}