Skip to main content

commonware_consensus/marshal/standard/
inline.rs

1//! Wrapper for standard marshal with inline verification.
2//!
3//! # Overview
4//!
5//! [`Inline`] adapts any [`VerifyingApplication`] to the marshal/consensus interfaces
6//! while keeping block validation in the [`Automaton::verify`] path. Unlike
7//! [`super::Deferred`], it does not defer application verification to certification.
8//! Instead, it only reports `true` from `verify` after parent/height checks and
9//! application verification complete.
10//!
11//! # Epoch Boundaries
12//!
13//! As with [`super::Deferred`], when the parent is the last block of the epoch,
14//! [`Inline`] re-proposes that boundary block instead of building a new block.
15//! This prevents proposing blocks that would be excluded by epoch transition.
16//!
17//! # Verification Model
18//!
19//! Inline mode intentionally avoids relying on embedded block context. This allows
20//! usage with block types that implement [`crate::Block`] but not
21//! [`crate::CertifiableBlock`].
22//!
23//! Because verification is completed inline, `certify` must only wait for data
24//! availability in marshal. No additional deferred verification state needs to
25//! be awaited at certify time.
26//!
27//! # Usage
28//!
29//! ```rust,ignore
30//! let application = Inline::new(
31//!     context,
32//!     my_application,
33//!     marshal_mailbox,
34//!     epocher,
35//! );
36//! ```
37//!
38//! # When to Use
39//!
40//! Prefer this wrapper when:
41//! - Your application block type is not certifiable.
42//! - You prefer simpler verification semantics over deferred verification latency hiding.
43//! - You are willing to perform full application verification before casting a notarize vote.
44
45use crate::{
46    marshal::{
47        ancestry::AncestorStream,
48        application::validation::LastBuilt,
49        core::Mailbox,
50        standard::{
51            validation::{
52                fetch_parent, precheck_epoch_and_reproposal, verify_with_parent, Decision,
53            },
54            Standard,
55        },
56        Update,
57    },
58    simplex::{types::Context, Plan},
59    types::{Epoch, Epocher, Round},
60    Application, Automaton, Block, CertifiableAutomaton, Epochable, Relay, Reporter,
61    VerifyingApplication,
62};
63use commonware_cryptography::certificate::Scheme;
64use commonware_macros::select;
65use commonware_runtime::{
66    telemetry::metrics::histogram::{Buckets, Timed},
67    Clock, Metrics, Spawner,
68};
69use commonware_utils::{
70    channel::{fallible::OneshotExt, oneshot},
71    sync::Mutex,
72};
73use prometheus_client::metrics::histogram::Histogram;
74use rand::Rng;
75use std::{collections::BTreeSet, sync::Arc};
76use tracing::{debug, warn};
77
78/// Tracks `(round, digest)` pairs for which `verify` has already fetched the
79/// block, so `certify` can return immediately without re-subscribing to marshal.
80type AvailableBlocks<D> = Arc<Mutex<BTreeSet<(Round, D)>>>;
81
82/// Waits for a marshal block subscription while allowing consensus to cancel the work.
83async fn await_block_subscription<T, D>(
84    tx: &mut oneshot::Sender<bool>,
85    block_rx: oneshot::Receiver<T>,
86    digest: &D,
87    stage: &'static str,
88) -> Option<T>
89where
90    D: std::fmt::Debug + ?Sized,
91{
92    select! {
93        _ = tx.closed() => {
94            debug!(
95                stage,
96                reason = "consensus dropped receiver",
97                "skipping block wait"
98            );
99            None
100        },
101        result = block_rx => {
102            if result.is_err() {
103                debug!(
104                    stage,
105                    ?digest,
106                    reason = "failed to fetch block",
107                    "skipping block wait"
108                );
109            }
110            result.ok()
111        },
112    }
113}
114
115/// Standard marshal wrapper that verifies blocks inline in `verify`.
116///
117/// # Ancestry Validation
118///
119/// [`Inline`] always validates immediate ancestry before invoking application
120/// verification:
121/// - Parent digest matches consensus context's expected parent
122/// - Child height is exactly parent height plus one
123///
124/// This is sufficient because the parent must have already been accepted by consensus.
125///
126/// # Certifiability
127///
128/// This wrapper requires only [`crate::Block`] for `B`, not
129/// [`crate::CertifiableBlock`]. It is designed for applications that cannot
130/// recover consensus context directly from block payloads.
131#[derive(Clone)]
132pub struct Inline<E, S, A, B, ES>
133where
134    E: Rng + Spawner + Metrics + Clock,
135    S: Scheme,
136    A: Application<E>,
137    B: Block + Clone,
138    ES: Epocher,
139{
140    context: E,
141    application: A,
142    marshal: Mailbox<S, Standard<B>>,
143    epocher: ES,
144    last_built: LastBuilt<B>,
145    available_blocks: AvailableBlocks<B::Digest>,
146
147    build_duration: Timed<E>,
148}
149
150impl<E, S, A, B, ES> Inline<E, S, A, B, ES>
151where
152    E: Rng + Spawner + Metrics + Clock,
153    S: Scheme,
154    A: VerifyingApplication<
155        E,
156        Block = B,
157        SigningScheme = S,
158        Context = Context<B::Digest, S::PublicKey>,
159    >,
160    B: Block + Clone,
161    ES: Epocher,
162{
163    /// Creates a new inline-verification wrapper.
164    ///
165    /// Registers a `build_duration` histogram for proposal latency and initializes
166    /// the shared "last built block" cache used by [`Relay::broadcast`].
167    pub fn new(context: E, application: A, marshal: Mailbox<S, Standard<B>>, epocher: ES) -> Self {
168        let build_histogram = Histogram::new(Buckets::LOCAL);
169        context.register(
170            "build_duration",
171            "Histogram of time taken for the application to build a new block, in seconds",
172            build_histogram.clone(),
173        );
174        let build_duration = Timed::new(build_histogram, Arc::new(context.clone()));
175
176        Self {
177            context,
178            application,
179            marshal,
180            epocher,
181            last_built: Arc::new(Mutex::new(None)),
182            available_blocks: Arc::new(Mutex::new(BTreeSet::new())),
183            build_duration,
184        }
185    }
186}
187
188impl<E, S, A, B, ES> Automaton for Inline<E, S, A, B, ES>
189where
190    E: Rng + Spawner + Metrics + Clock,
191    S: Scheme,
192    A: VerifyingApplication<
193        E,
194        Block = B,
195        SigningScheme = S,
196        Context = Context<B::Digest, S::PublicKey>,
197    >,
198    B: Block + Clone,
199    ES: Epocher,
200{
201    type Digest = B::Digest;
202    type Context = Context<Self::Digest, S::PublicKey>;
203
204    /// Returns the genesis digest for `epoch`.
205    ///
206    /// For epoch zero, returns the application genesis digest. For later epochs,
207    /// uses the previous epoch's terminal block from marshal storage.
208    async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
209        if epoch.is_zero() {
210            return self.application.genesis().await.digest();
211        }
212
213        let prev = epoch.previous().expect("checked to be non-zero above");
214        let last_height = self
215            .epocher
216            .last(prev)
217            .expect("previous epoch should exist");
218        let Some(block) = self.marshal.get_block(last_height).await else {
219            unreachable!("missing starting epoch block at height {}", last_height);
220        };
221        block.digest()
222    }
223
224    /// Proposes a new block or re-proposes an epoch boundary block.
225    ///
226    /// Proposal runs in a spawned task and returns a receiver for the resulting digest.
227    /// Built/re-proposed blocks are cached in `last_built` so relay can broadcast
228    /// exactly what was proposed.
229    async fn propose(
230        &mut self,
231        consensus_context: Context<Self::Digest, S::PublicKey>,
232    ) -> oneshot::Receiver<Self::Digest> {
233        let mut marshal = self.marshal.clone();
234        let mut application = self.application.clone();
235        let last_built = self.last_built.clone();
236        let epocher = self.epocher.clone();
237        let build_duration = self.build_duration.clone();
238
239        let (mut tx, rx) = oneshot::channel();
240        self.context
241            .with_label("propose")
242            .with_attribute("round", consensus_context.round)
243            .spawn(move |runtime_context| async move {
244                let (parent_view, parent_digest) = consensus_context.parent;
245                let parent_request = fetch_parent(
246                    parent_digest,
247                    // We are guaranteed that the parent round for any `consensus_context` is
248                    // in the same epoch (recall, the boundary block of the previous epoch
249                    // is the genesis block of the current epoch).
250                    Some(Round::new(consensus_context.epoch(), parent_view)),
251                    &mut application,
252                    &mut marshal,
253                )
254                .await;
255
256                let parent = select! {
257                    _ = tx.closed() => {
258                        debug!(reason = "consensus dropped receiver", "skipping proposal");
259                        return;
260                    },
261                    result = parent_request => match result {
262                        Ok(parent) => parent,
263                        Err(_) => {
264                            debug!(
265                                ?parent_digest,
266                                reason = "failed to fetch parent block",
267                                "skipping proposal"
268                            );
269                            return;
270                        }
271                    },
272                };
273
274                // At epoch boundary, re-propose the parent block.
275                let last_in_epoch = epocher
276                    .last(consensus_context.epoch())
277                    .expect("current epoch should exist");
278                if parent.height() == last_in_epoch {
279                    let digest = parent.digest();
280                    {
281                        let mut lock = last_built.lock();
282                        *lock = Some((consensus_context.round, parent));
283                    }
284
285                    let success = tx.send_lossy(digest);
286                    debug!(
287                        round = ?consensus_context.round,
288                        ?digest,
289                        success,
290                        "re-proposed parent block at epoch boundary"
291                    );
292                    return;
293                }
294
295                let ancestor_stream = AncestorStream::new(marshal.clone(), [parent]);
296                let build_request = application.propose(
297                    (
298                        runtime_context.with_label("app_propose"),
299                        consensus_context.clone(),
300                    ),
301                    ancestor_stream,
302                );
303
304                let mut build_timer = build_duration.timer();
305                let built_block = select! {
306                    _ = tx.closed() => {
307                        debug!(reason = "consensus dropped receiver", "skipping proposal");
308                        return;
309                    },
310                    result = build_request => match result {
311                        Some(block) => block,
312                        None => {
313                            debug!(
314                                ?parent_digest,
315                                reason = "block building failed",
316                                "skipping proposal"
317                            );
318                            return;
319                        }
320                    },
321                };
322                build_timer.observe();
323
324                let digest = built_block.digest();
325                {
326                    let mut lock = last_built.lock();
327                    *lock = Some((consensus_context.round, built_block));
328                }
329                let success = tx.send_lossy(digest);
330                debug!(
331                    round = ?consensus_context.round,
332                    ?digest,
333                    success,
334                    "proposed new block"
335                );
336            });
337        rx
338    }
339
340    /// Performs complete verification inline.
341    ///
342    /// This method:
343    /// 1. Fetches the block by digest
344    /// 2. Enforces epoch/re-proposal rules
345    /// 3. Fetches and validates the parent relationship
346    /// 4. Runs application verification over ancestry
347    ///
348    /// It reports `true` only after all verification steps finish. Successful
349    /// verification marks the block as verified in marshal immediately.
350    async fn verify(
351        &mut self,
352        context: Context<Self::Digest, S::PublicKey>,
353        digest: Self::Digest,
354    ) -> oneshot::Receiver<bool> {
355        let mut marshal = self.marshal.clone();
356        let mut application = self.application.clone();
357        let epocher = self.epocher.clone();
358        let available_blocks = self.available_blocks.clone();
359
360        let (mut tx, rx) = oneshot::channel();
361        self.context
362            .with_label("inline_verify")
363            .with_attribute("round", context.round)
364            .spawn(move |runtime_context| async move {
365                // If block can be fetched, mark it as available.
366                let block_request = marshal
367                    .subscribe_by_digest(Some(context.round), digest)
368                    .await;
369                let Some(block) =
370                    await_block_subscription(&mut tx, block_request, &digest, "verification").await
371                else {
372                    return;
373                };
374                available_blocks.lock().insert((context.round, digest));
375
376                // Shared pre-checks:
377                // - Blocks are invalid if they are not in the expected epoch and are
378                //   not a valid boundary re-proposal.
379                // - Re-proposals are detected when `digest == context.parent.1`.
380                // - Re-proposals skip normal parent/height checks because:
381                //   1) the block was already verified when originally proposed
382                //   2) parent-child checks would fail by construction when parent == block
383                let block = match precheck_epoch_and_reproposal(
384                    &epocher,
385                    &mut marshal,
386                    &context,
387                    digest,
388                    block,
389                )
390                .await
391                {
392                    Decision::Complete(valid) => {
393                        // `Complete` means either an immediate reject or a valid
394                        // re-proposal accepted without further ancestry checks.
395                        tx.send_lossy(valid);
396                        return;
397                    }
398                    Decision::Continue(block) => block,
399                };
400
401                // Non-reproposal path: fetch expected parent, validate ancestry, then
402                // run application verification over the ancestry stream.
403                //
404                // The helper returns `None` when work should stop early (for example,
405                // receiver closed or parent unavailable).
406                let application_valid = match verify_with_parent(
407                    runtime_context,
408                    context,
409                    block,
410                    &mut application,
411                    &mut marshal,
412                    &mut tx,
413                )
414                .await
415                {
416                    Some(valid) => valid,
417                    None => return,
418                };
419                tx.send_lossy(application_valid);
420            });
421        rx
422    }
423}
424
425/// Inline mode only waits for block availability during certification.
426impl<E, S, A, B, ES> CertifiableAutomaton for Inline<E, S, A, B, ES>
427where
428    E: Rng + Spawner + Metrics + Clock,
429    S: Scheme,
430    A: VerifyingApplication<
431        E,
432        Block = B,
433        SigningScheme = S,
434        Context = Context<B::Digest, S::PublicKey>,
435    >,
436    B: Block + Clone,
437    ES: Epocher,
438{
439    async fn certify(&mut self, round: Round, digest: Self::Digest) -> oneshot::Receiver<bool> {
440        // If block was already seen, return immediately.
441        if self.available_blocks.lock().contains(&(round, digest)) {
442            let (tx, rx) = oneshot::channel();
443            tx.send_lossy(true);
444            return rx;
445        }
446
447        // Otherwise, subscribe to marshal for block availability.
448        //
449        // TODO(#3393): Avoid fetching the block just to check if it's available.
450        let block_rx = self.marshal.subscribe_by_digest(Some(round), digest).await;
451        let (mut tx, rx) = oneshot::channel();
452        self.context
453            .with_label("inline_certify")
454            .with_attribute("round", round)
455            .spawn(move |_| async move {
456                if await_block_subscription(&mut tx, block_rx, &digest, "certification")
457                    .await
458                    .is_some()
459                {
460                    tx.send_lossy(true);
461                }
462            });
463
464        // We don't need to verify the block here because we could not have
465        // reached certification without a notarization (implying at least f+1
466        // honest validators have verified the block).
467        rx
468    }
469}
470
471impl<E, S, A, B, ES> Relay for Inline<E, S, A, B, ES>
472where
473    E: Rng + Spawner + Metrics + Clock,
474    S: Scheme,
475    A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>,
476    B: Block + Clone,
477    ES: Epocher,
478{
479    type Digest = B::Digest;
480    type PublicKey = S::PublicKey;
481    type Plan = Plan<S::PublicKey>;
482
483    async fn broadcast(&mut self, digest: Self::Digest, plan: Plan<S::PublicKey>) {
484        match plan {
485            Plan::Propose => {
486                let Some((round, block)) = self.last_built.lock().take() else {
487                    warn!("missing block to broadcast");
488                    return;
489                };
490                if block.digest() != digest {
491                    warn!(
492                        round = %round,
493                        digest = %block.digest(),
494                        height = %block.height(),
495                        "skipping requested broadcast of block with mismatched digest"
496                    );
497                    return;
498                }
499                self.marshal.proposed(round, block).await;
500            }
501            Plan::Forward { round, peers } => {
502                self.marshal.forward(round, digest, peers).await;
503            }
504        }
505    }
506}
507
508impl<E, S, A, B, ES> Reporter for Inline<E, S, A, B, ES>
509where
510    E: Rng + Spawner + Metrics + Clock,
511    S: Scheme,
512    A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>
513        + Reporter<Activity = Update<B>>,
514    B: Block + Clone,
515    ES: Epocher,
516{
517    type Activity = A::Activity;
518
519    /// Forwards consensus activity to the wrapped application reporter.
520    async fn report(&mut self, update: Self::Activity) {
521        if let Update::Tip(tip_round, _, _) = &update {
522            self.available_blocks
523                .lock()
524                .retain(|(round, _)| round > tip_round);
525        }
526        self.application.report(update).await
527    }
528}
529
530#[cfg(test)]
531mod tests {
532    use super::Inline;
533    use crate::{
534        marshal::mocks::{
535            harness::{
536                default_leader, make_raw_block, setup_network_with_participants, Ctx,
537                StandardHarness, TestHarness, B, BLOCKS_PER_EPOCH, NAMESPACE, NUM_VALIDATORS, S, V,
538            },
539            verifying::MockVerifyingApp,
540        },
541        simplex::{scheme::bls12381_threshold::vrf as bls12381_threshold_vrf, types::Context},
542        types::{Epoch, FixedEpocher, Height, Round, View},
543        Automaton, Block, CertifiableAutomaton, Relay, VerifyingApplication,
544    };
545    use commonware_cryptography::{
546        certificate::{mocks::Fixture, ConstantProvider, Scheme},
547        sha256::Sha256,
548        Digestible, Hasher as _,
549    };
550    use commonware_macros::{select, test_traced};
551    use commonware_runtime::{deterministic, Clock, Metrics, Runner, Spawner};
552    use commonware_utils::NZUsize;
553    use rand::Rng;
554    use std::time::Duration;
555
556    // Compile-time assertion only: inline standard wrapper must not require `CertifiableBlock`.
557    #[allow(dead_code)]
558    fn assert_non_certifiable_block_supported<E, S, A, B, ES>()
559    where
560        E: Rng + Spawner + Metrics + Clock,
561        S: Scheme,
562        A: VerifyingApplication<
563            E,
564            Block = B,
565            SigningScheme = S,
566            Context = Context<B::Digest, S::PublicKey>,
567        >,
568        B: Block + Clone,
569        ES: crate::types::Epocher,
570    {
571        fn assert_automaton<T: Automaton>() {}
572        fn assert_certifiable<T: CertifiableAutomaton>() {}
573        fn assert_relay<T: Relay>() {}
574
575        assert_automaton::<Inline<E, S, A, B, ES>>();
576        assert_certifiable::<Inline<E, S, A, B, ES>>();
577        assert_relay::<Inline<E, S, A, B, ES>>();
578    }
579
580    #[test_traced("INFO")]
581    fn test_certify_returns_immediately_after_verify_fetches_block() {
582        let runner = deterministic::Runner::timed(Duration::from_secs(30));
583        runner.start(|mut context| async move {
584            let Fixture {
585                participants,
586                schemes,
587                ..
588            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
589            let mut oracle =
590                setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
591                    .await;
592
593            let me = participants[0].clone();
594            let setup = StandardHarness::setup_validator(
595                context.with_label("validator_0"),
596                &mut oracle,
597                me.clone(),
598                ConstantProvider::new(schemes[0].clone()),
599            )
600            .await;
601            let marshal = setup.mailbox;
602
603            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
604            let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
605            let mut inline = Inline::new(
606                context.clone(),
607                mock_app,
608                marshal.clone(),
609                FixedEpocher::new(BLOCKS_PER_EPOCH),
610            );
611
612            // Seed the parent and child blocks in marshal so verify can fetch locally.
613            let parent_round = Round::new(Epoch::zero(), View::new(1));
614            let parent_ctx = Ctx {
615                round: parent_round,
616                leader: default_leader(),
617                parent: (View::zero(), genesis.digest()),
618            };
619            let parent = B::new::<Sha256>(parent_ctx, genesis.digest(), Height::new(1), 100);
620            let parent_digest = parent.digest();
621            marshal.clone().proposed(parent_round, parent).await;
622
623            let round = Round::new(Epoch::zero(), View::new(2));
624            let verify_context = Ctx {
625                round,
626                leader: me,
627                parent: (View::new(1), parent_digest),
628            };
629            let block =
630                B::new::<Sha256>(verify_context.clone(), parent_digest, Height::new(2), 200);
631            let digest = block.digest();
632            marshal.clone().proposed(round, block).await;
633
634            // Complete verify first so the block is already available locally.
635            let verify_rx = inline.verify(verify_context, digest).await;
636            assert!(
637                verify_rx.await.unwrap(),
638                "verify should complete successfully before certify"
639            );
640
641            // Certify should return immediately instead of waiting on marshal.
642            let certify_rx = inline.certify(round, digest).await;
643
644            select! {
645                result = certify_rx => {
646                    assert!(
647                        result.unwrap(),
648                        "certify should return immediately once verify has fetched the block"
649                    );
650                },
651                _ = context.sleep(Duration::from_secs(5)) => {
652                    panic!("certify should not hang after local verify completed");
653                },
654            }
655        });
656    }
657
658    #[test_traced("INFO")]
659    fn test_certify_succeeds_without_verify_task() {
660        let runner = deterministic::Runner::timed(Duration::from_secs(30));
661        runner.start(|mut context| async move {
662            let Fixture {
663                participants,
664                schemes,
665                ..
666            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
667            let mut oracle =
668                setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
669                    .await;
670
671            let me = participants[0].clone();
672            let setup = StandardHarness::setup_validator(
673                context.with_label("validator_0"),
674                &mut oracle,
675                me.clone(),
676                ConstantProvider::new(schemes[0].clone()),
677            )
678            .await;
679            let marshal = setup.mailbox;
680
681            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
682            let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
683            let mut inline = Inline::new(
684                context.clone(),
685                mock_app,
686                marshal.clone(),
687                FixedEpocher::new(BLOCKS_PER_EPOCH),
688            );
689
690            // Seed the parent and child blocks in marshal without starting a verify task.
691            let parent_round = Round::new(Epoch::zero(), View::new(1));
692            let parent_ctx = Ctx {
693                round: parent_round,
694                leader: default_leader(),
695                parent: (View::zero(), genesis.digest()),
696            };
697            let parent = B::new::<Sha256>(parent_ctx, genesis.digest(), Height::new(1), 100);
698            let parent_digest = parent.digest();
699            marshal.clone().proposed(parent_round, parent).await;
700
701            let round = Round::new(Epoch::zero(), View::new(2));
702            let verify_context = Ctx {
703                round,
704                leader: me,
705                parent: (View::new(1), parent_digest),
706            };
707            let block =
708                B::new::<Sha256>(verify_context.clone(), parent_digest, Height::new(2), 200);
709            let digest = block.digest();
710            marshal.clone().proposed(round, block).await;
711
712            // Certify should still resolve by waiting on marshal block availability directly.
713            let certify_rx = inline.certify(round, digest).await;
714
715            select! {
716                result = certify_rx => {
717                    assert!(
718                        result.unwrap(),
719                        "certify should resolve once block availability is known"
720                    );
721                },
722                _ = context.sleep(Duration::from_secs(5)) => {
723                    panic!("certify should not hang when block is already available in marshal");
724                },
725            }
726        });
727    }
728}