Skip to main content

commonware_consensus/marshal/standard/
inline.rs

1//! Wrapper for standard marshal with inline verification.
2//!
3//! # Overview
4//!
5//! [`Inline`] adapts any [`Application`] to the marshal/consensus interfaces
6//! while keeping block validation in the [`Automaton::verify`] path. Unlike
7//! [`super::Deferred`], it does not defer application verification to certification.
8//! Instead, it only reports `true` from `verify` after parent/height checks and
9//! application verification complete.
10//!
11//! # Epoch Boundaries
12//!
13//! As with [`super::Deferred`], when the parent is the last block of the epoch,
14//! [`Inline`] re-proposes that boundary block instead of building a new block.
15//! This prevents proposing blocks that would be excluded by epoch transition.
16//!
17//! # Verification Model
18//!
19//! Inline mode intentionally avoids relying on embedded block context. This allows
20//! usage with block types that implement [`crate::Block`] but not
21//! [`crate::CertifiableBlock`].
22//!
23//! Because verification is completed inline, `certify` must only wait for data
24//! availability in marshal. No additional deferred verification state needs to
25//! be awaited at certify time.
26//!
27//! # Usage
28//!
29//! ```rust,ignore
30//! let application = Inline::new(
31//!     context,
32//!     my_application,
33//!     marshal_mailbox,
34//!     epocher,
35//! );
36//! ```
37//!
38//! # When to Use
39//!
40//! Prefer this wrapper when:
41//! - Your application block type is not certifiable.
42//! - You prefer simpler verification semantics over deferred verification latency hiding.
43//! - You are willing to perform full application verification before casting a notarize vote.
44
45use crate::{
46    marshal::{
47        application::validation::Stage,
48        core::{CommitmentFallback, DigestFallback, Mailbox},
49        standard::{
50            validation::{precheck_epoch_and_reproposal, verify_with_parent, Decision},
51            Standard,
52        },
53        Update,
54    },
55    simplex::{types::Context, Plan},
56    types::{Epocher, Round},
57    Application, Automaton, Block, CertifiableAutomaton, Epochable, Relay, Reporter,
58};
59use commonware_actor::Feedback;
60use commonware_cryptography::certificate::Scheme;
61use commonware_macros::select;
62use commonware_p2p::Recipients;
63use commonware_runtime::{
64    telemetry::metrics::{
65        histogram::{Buckets, Timed},
66        MetricsExt as _,
67    },
68    Clock, Metrics, Spawner,
69};
70use commonware_utils::{
71    channel::{fallible::OneshotExt, oneshot},
72    sync::{AsyncMutex, Mutex},
73};
74use rand::Rng;
75use std::{collections::BTreeSet, sync::Arc};
76use tracing::debug;
77
78/// Tracks `(round, digest)` pairs for which `verify` has already fetched the
79/// block, so `certify` can return immediately without re-subscribing to marshal.
80type AvailableBlocks<D> = Arc<Mutex<BTreeSet<(Round, D)>>>;
81
82/// Waits for a marshal block subscription while allowing consensus to cancel the work.
83async fn await_block_subscription<T, D>(
84    tx: &mut oneshot::Sender<bool>,
85    block_rx: oneshot::Receiver<T>,
86    digest: &D,
87    stage: &'static str,
88) -> Option<T>
89where
90    D: std::fmt::Debug + ?Sized,
91{
92    select! {
93        _ = tx.closed() => {
94            debug!(
95                stage,
96                reason = "consensus dropped receiver",
97                "skipping block wait"
98            );
99            None
100        },
101        result = block_rx => {
102            if result.is_err() {
103                debug!(
104                    stage,
105                    ?digest,
106                    reason = "failed to fetch block",
107                    "skipping block wait"
108                );
109            }
110            result.ok()
111        },
112    }
113}
114
115/// Standard marshal wrapper that verifies blocks inline in `verify`.
116///
117/// # Ancestry Validation
118///
119/// [`Inline`] always validates immediate ancestry before invoking application
120/// verification:
121/// - Parent digest matches consensus context's expected parent
122/// - Child height is exactly parent height plus one
123///
124/// This is sufficient because the parent must have already been accepted by consensus.
125///
126/// # Certifiability
127///
128/// This wrapper requires only [`crate::Block`] for `B`, not
129/// [`crate::CertifiableBlock`]. It is designed for applications that cannot
130/// recover consensus context directly from block payloads.
131pub struct Inline<E, S, A, B, ES>
132where
133    E: Rng + Spawner + Metrics + Clock,
134    S: Scheme,
135    A: Application<E>,
136    B: Block + Clone,
137    ES: Epocher,
138{
139    context: Arc<AsyncMutex<E>>,
140    application: A,
141    marshal: Mailbox<S, Standard<B>>,
142    epocher: ES,
143    available_blocks: AvailableBlocks<B::Digest>,
144
145    build_duration: Timed,
146    proposal_parent_fetch_duration: Timed,
147    ancestor_fetch_duration: Timed,
148}
149
150impl<E, S, A, B, ES> Clone for Inline<E, S, A, B, ES>
151where
152    E: Rng + Spawner + Metrics + Clock,
153    S: Scheme,
154    A: Application<E>,
155    B: Block + Clone,
156    ES: Epocher,
157{
158    fn clone(&self) -> Self {
159        Self {
160            context: self.context.clone(),
161            application: self.application.clone(),
162            marshal: self.marshal.clone(),
163            epocher: self.epocher.clone(),
164            available_blocks: self.available_blocks.clone(),
165            build_duration: self.build_duration.clone(),
166            proposal_parent_fetch_duration: self.proposal_parent_fetch_duration.clone(),
167            ancestor_fetch_duration: self.ancestor_fetch_duration.clone(),
168        }
169    }
170}
171
172impl<E, S, A, B, ES> Inline<E, S, A, B, ES>
173where
174    E: Rng + Spawner + Metrics + Clock,
175    S: Scheme,
176    A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
177    B: Block + Clone,
178    ES: Epocher,
179{
180    /// Creates a new inline-verification wrapper.
181    ///
182    /// Registers a `build_duration` histogram for proposal latency.
183    pub fn new(context: E, application: A, marshal: Mailbox<S, Standard<B>>, epocher: ES) -> Self {
184        let build_histogram = context.histogram(
185            "build_duration",
186            "Histogram of time taken for the application to build a new block, in seconds",
187            Buckets::LOCAL,
188        );
189        let build_duration = Timed::new(build_histogram);
190        let parent_fetch_histogram = context.histogram(
191            "parent_fetch_duration",
192            "Histogram of time taken to fetch a parent block in propose, in seconds",
193            Buckets::LOCAL,
194        );
195        let proposal_parent_fetch_duration = Timed::new(parent_fetch_histogram);
196        let ancestor_fetch_histogram = context.histogram(
197            "ancestor_fetch_duration",
198            "Histogram of time taken to fetch a block via the ancestry stream, in seconds",
199            Buckets::LOCAL,
200        );
201        let ancestor_fetch_duration = Timed::new(ancestor_fetch_histogram);
202
203        Self {
204            context: Arc::new(AsyncMutex::new(context)),
205            application,
206            marshal,
207            epocher,
208            available_blocks: Arc::new(Mutex::new(BTreeSet::new())),
209            build_duration,
210            proposal_parent_fetch_duration,
211            ancestor_fetch_duration,
212        }
213    }
214}
215
216impl<E, S, A, B, ES> Automaton for Inline<E, S, A, B, ES>
217where
218    E: Rng + Spawner + Metrics + Clock,
219    S: Scheme,
220    A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
221    B: Block + Clone,
222    ES: Epocher,
223{
224    type Digest = B::Digest;
225    type Context = Context<Self::Digest, S::PublicKey>;
226
227    /// Proposes a new block or re-proposes an epoch boundary block.
228    ///
229    /// Proposal runs in a spawned task and returns a receiver for the resulting digest.
230    /// The built block is persisted via [`Mailbox::verified`] before the digest is
231    /// delivered, so a digest received from `propose()` implies the block is
232    /// recoverable after restart.
233    async fn propose(
234        &mut self,
235        consensus_context: Context<Self::Digest, S::PublicKey>,
236    ) -> oneshot::Receiver<Self::Digest> {
237        let marshal = self.marshal.clone();
238        let mut application = self.application.clone();
239        let epocher = self.epocher.clone();
240        let build_duration = self.build_duration.clone();
241        let proposal_parent_fetch_duration = self.proposal_parent_fetch_duration.clone();
242        let ancestor_fetch_duration = self.ancestor_fetch_duration.clone();
243
244        let (mut tx, rx) = oneshot::channel();
245        let context = self
246            .context
247            .lock()
248            .await
249            .child("propose")
250            .with_attribute("round", consensus_context.round);
251        context.spawn(move |runtime_context| async move {
252            // On leader recovery, marshal may already hold a verified block
253            // for this round (persisted by a pre-crash propose whose
254            // notarize vote never reached the journal).
255            //
256            // The parent context recovered by simplex may differ from the one
257            // the cached block was built against, so the stored block is not safe to reuse
258            // and building a fresh block would land on the same prunable
259            // archive index and be silently dropped.
260            //
261            // Skip this view and let the voter nullify it via timeout.
262            if marshal
263                .get_verified(consensus_context.round)
264                .await
265                .is_some()
266            {
267                debug!(
268                    round = ?consensus_context.round,
269                    "skipping proposal: verified block already exists for round on restart"
270                );
271                return;
272            }
273
274            // The parent for any consensus context is in the same epoch: the
275            // boundary block of the previous epoch is the genesis block of the
276            // current epoch.
277            //
278            // Proposal context carries the certified parent view/commitment but
279            // not the parent height. The parent may be certified above the
280            // finalized tip, so this must stay round-bound until the block is
281            // returned.
282            let (parent_view, parent_commitment) = consensus_context.parent;
283            let parent_request = marshal.subscribe_by_commitment(
284                parent_commitment,
285                CommitmentFallback::FetchByRound {
286                    round: Round::new(consensus_context.epoch(), parent_view),
287                },
288            );
289
290            let parent_timer = proposal_parent_fetch_duration.timer(&runtime_context);
291            let parent = select! {
292                _ = tx.closed() => {
293                    debug!(reason = "consensus dropped receiver", "skipping proposal");
294                    return;
295                },
296                result = parent_request => match result {
297                    Ok(parent) => parent,
298                    Err(_) => {
299                        debug!(
300                            ?parent_commitment,
301                            reason = "failed to fetch parent block",
302                            "skipping proposal"
303                        );
304                        return;
305                    }
306                },
307            };
308            parent_timer.observe(&runtime_context);
309
310            // At epoch boundary, re-propose the parent block.
311            let last_in_epoch = epocher
312                .last(consensus_context.epoch())
313                .expect("current epoch should exist");
314            if parent.height() == last_in_epoch {
315                let digest = parent.digest();
316                if !marshal.verified(consensus_context.round, parent).await {
317                    debug!(
318                        round = ?consensus_context.round,
319                        ?digest,
320                        "marshal rejected re-proposed boundary block"
321                    );
322                    return;
323                }
324                let success = tx.send_lossy(digest);
325                debug!(
326                    round = ?consensus_context.round,
327                    ?digest,
328                    success,
329                    "re-proposed parent block at epoch boundary"
330                );
331                return;
332            }
333
334            let ancestor_stream = marshal.ancestor_stream(
335                Arc::new(runtime_context.child("ancestor_stream")),
336                [parent],
337                ancestor_fetch_duration,
338            );
339            let build_request = application.propose(
340                (
341                    runtime_context.child("app_propose"),
342                    consensus_context.clone(),
343                ),
344                ancestor_stream,
345            );
346
347            let build_timer = build_duration.timer(&runtime_context);
348            let built_block = select! {
349                _ = tx.closed() => {
350                    debug!(reason = "consensus dropped receiver", "skipping proposal");
351                    return;
352                },
353                result = build_request => match result {
354                    Some(block) => block,
355                    None => {
356                        debug!(
357                            ?parent_commitment,
358                            reason = "block building failed",
359                            "skipping proposal"
360                        );
361                        return;
362                    }
363                },
364            };
365            build_timer.observe(&runtime_context);
366
367            let digest = built_block.digest();
368            if !marshal.proposed(consensus_context.round, built_block).await {
369                debug!(
370                    round = ?consensus_context.round,
371                    ?digest,
372                    "marshal rejected proposed block"
373                );
374                return;
375            }
376            let success = tx.send_lossy(digest);
377            debug!(
378                round = ?consensus_context.round,
379                ?digest,
380                success,
381                "proposed new block"
382            );
383        });
384        rx
385    }
386
387    /// Performs complete verification inline.
388    ///
389    /// This method:
390    /// 1. Waits for the block by digest
391    /// 2. Enforces epoch/re-proposal rules
392    /// 3. Fetches and validates the parent relationship
393    /// 4. Runs application verification over ancestry
394    ///
395    /// It reports `true` only after all verification steps finish. Successful
396    /// verification marks the block as verified in marshal immediately.
397    async fn verify(
398        &mut self,
399        context: Context<Self::Digest, S::PublicKey>,
400        digest: Self::Digest,
401    ) -> oneshot::Receiver<bool> {
402        let mut marshal = self.marshal.clone();
403        let mut application = self.application.clone();
404        let epocher = self.epocher.clone();
405        let available_blocks = self.available_blocks.clone();
406        let ancestor_fetch_duration = self.ancestor_fetch_duration.clone();
407
408        let (mut tx, rx) = oneshot::channel();
409        let runtime_context = self
410            .context
411            .lock()
412            .await
413            .child("inline_verify")
414            .with_attribute("round", context.round);
415        runtime_context.spawn(move |runtime_context| async move {
416            let block_request = marshal.subscribe_by_digest(digest, DigestFallback::Wait);
417            let Some(block) =
418                await_block_subscription(&mut tx, block_request, &digest, "verification").await
419            else {
420                return;
421            };
422
423            // Shared pre-checks:
424            // - Blocks are invalid if they are not in the expected epoch and are
425            //   not a valid boundary re-proposal.
426            // - Re-proposals are detected when `digest == context.parent.1`.
427            // - Re-proposals skip normal parent/height checks because:
428            //   1) the block was already verified when originally proposed
429            //   2) parent-child checks would fail by construction when parent == block
430            let Some(decision) =
431                precheck_epoch_and_reproposal(&epocher, &mut marshal, &context, digest, block)
432                    .await
433            else {
434                return;
435            };
436            let block = match decision {
437                Decision::Complete(valid) => {
438                    if valid {
439                        available_blocks.lock().insert((context.round, digest));
440                    }
441                    tx.send_lossy(valid);
442                    return;
443                }
444                Decision::Continue(block) => block,
445            };
446
447            // Non-reproposal path: fetch expected parent, validate ancestry, then
448            // run application verification over the ancestry stream.
449            //
450            // The helper returns `None` when work should stop early (for example,
451            // receiver closed or parent unavailable).
452            let round = context.round;
453            let application_valid = match verify_with_parent(
454                runtime_context,
455                context,
456                block,
457                &mut application,
458                &mut marshal,
459                &mut tx,
460                Stage::Verified,
461                ancestor_fetch_duration,
462            )
463            .await
464            {
465                Some(valid) => valid,
466                None => return,
467            };
468            if application_valid {
469                available_blocks.lock().insert((round, digest));
470            }
471            tx.send_lossy(application_valid);
472        });
473        rx
474    }
475}
476
477/// Inline mode only waits for block availability during certification.
478impl<E, S, A, B, ES> CertifiableAutomaton for Inline<E, S, A, B, ES>
479where
480    E: Rng + Spawner + Metrics + Clock,
481    S: Scheme,
482    A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
483    B: Block + Clone,
484    ES: Epocher,
485{
486    async fn certify(&mut self, round: Round, digest: Self::Digest) -> oneshot::Receiver<bool> {
487        // Verify has already run for this (round, digest) and its
488        // success was recorded in `available_blocks`. `verify` does not mark a
489        // round available until `marshal.verified(round, block)` has returned,
490        // and that call blocks on `put_sync` of the block into the round's
491        // verified cache. Because the verified and notarized caches share the
492        // same pruning schedule (both advance together to `min_view`), the
493        // block is already durable for this round and re-persisting it into
494        // the notarized cache would be a redundant `put_sync`. The slow path
495        // below persists through the notarized cache because in that case
496        // verify has not run locally and the block may be held only in the
497        // broadcast buffer, which is not durable.
498        if self.available_blocks.lock().contains(&(round, digest)) {
499            let (tx, rx) = oneshot::channel();
500            tx.send_lossy(true);
501            return rx;
502        }
503
504        // Otherwise, wait for local block availability and recover from peers by
505        // notarized round if necessary. A Byzantine leader can form a notarization
506        // after sending the proposal to only f+1 honest validators; the honest
507        // validators left without the block must fetch it here to certify and
508        // avoid getting stuck if Byzantine validators stop participating.
509        let block_rx = self
510            .marshal
511            .subscribe_by_digest(digest, DigestFallback::FetchByRound { round });
512        let marshal = self.marshal.clone();
513        let (mut tx, rx) = oneshot::channel();
514        let context = self
515            .context
516            .lock()
517            .await
518            .child("inline_certify")
519            .with_attribute("round", round);
520        context.spawn(move |_| async move {
521            let Some(block) =
522                await_block_subscription(&mut tx, block_rx, &digest, "certification").await
523            else {
524                return;
525            };
526
527            // `certify` resolving true drives the finalize vote, so mere
528            // buffered availability is not sufficient here. Persist the
529            // block through marshal before signaling success. The caller
530            // holds a notarization for this block, so route it into the
531            // notarized cache directly rather than the verified cache.
532            if marshal.certified(round, block).await {
533                tx.send_lossy(true);
534            }
535        });
536
537        // We don't need to verify the block here because we could not have
538        // reached certification without a notarization (implying at least f+1
539        // honest validators have verified the block).
540        rx
541    }
542}
543
544impl<E, S, A, B, ES> Relay for Inline<E, S, A, B, ES>
545where
546    E: Rng + Spawner + Metrics + Clock,
547    S: Scheme,
548    A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>,
549    B: Block + Clone,
550    ES: Epocher,
551{
552    type Digest = B::Digest;
553    type PublicKey = S::PublicKey;
554    type Plan = Plan<S::PublicKey>;
555
556    fn broadcast(&mut self, commitment: Self::Digest, plan: Plan<S::PublicKey>) -> Feedback {
557        let (round, recipients) = match plan {
558            Plan::Propose { round } => (round, Recipients::All),
559            Plan::Forward { round, recipients } => (round, recipients),
560        };
561        self.marshal.forward(round, commitment, recipients)
562    }
563}
564
565impl<E, S, A, B, ES> Reporter for Inline<E, S, A, B, ES>
566where
567    E: Rng + Spawner + Metrics + Clock,
568    S: Scheme,
569    A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>
570        + Reporter<Activity = Update<B>>,
571    B: Block + Clone,
572    ES: Epocher,
573{
574    type Activity = A::Activity;
575
576    /// Forwards consensus activity to the wrapped application reporter.
577    fn report(&mut self, update: Self::Activity) -> Feedback {
578        if let Update::Tip(tip_round, _, _) = &update {
579            self.available_blocks
580                .lock()
581                .retain(|(round, _)| round > tip_round);
582        }
583        self.application.report(update)
584    }
585}
586
587#[cfg(test)]
588mod tests {
589    use super::Inline;
590    use crate::{
591        marshal::mocks::{
592            harness::{
593                default_leader, make_raw_block, setup_network_with_participants, Ctx,
594                StandardHarness, TestHarness, B, BLOCKS_PER_EPOCH, NAMESPACE, NUM_VALIDATORS, S, V,
595            },
596            verifying::{GatedVerifyingApp, MockVerifyingApp},
597        },
598        simplex::{scheme::bls12381_threshold::vrf as bls12381_threshold_vrf, types::Context},
599        types::{Epoch, FixedEpocher, Height, Round, View},
600        Application, Automaton, Block, CertifiableAutomaton, Relay,
601    };
602    use commonware_broadcast::Broadcaster;
603    use commonware_cryptography::{
604        certificate::{mocks::Fixture, ConstantProvider, Scheme},
605        sha256::Sha256,
606        Digestible, Hasher as _,
607    };
608    use commonware_macros::{select, test_traced};
609    use commonware_runtime::{deterministic, Clock, Metrics, Runner, Spawner, Supervisor as _};
610    use commonware_utils::{channel::fallible::OneshotExt, NZUsize};
611    use rand::Rng;
612    use std::time::Duration;
613
614    // Compile-time assertion only: inline standard wrapper must not require `CertifiableBlock`.
615    #[allow(dead_code)]
616    fn assert_non_certifiable_block_supported<E, S, A, B, ES>()
617    where
618        E: Rng + Spawner + Metrics + Clock,
619        S: Scheme,
620        A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
621        B: Block + Clone,
622        ES: crate::types::Epocher,
623    {
624        fn assert_automaton<T: Automaton>() {}
625        fn assert_certifiable<T: CertifiableAutomaton>() {}
626        fn assert_relay<T: Relay>() {}
627
628        assert_automaton::<Inline<E, S, A, B, ES>>();
629        assert_certifiable::<Inline<E, S, A, B, ES>>();
630        assert_relay::<Inline<E, S, A, B, ES>>();
631    }
632
633    #[test_traced("INFO")]
634    fn test_certify_returns_immediately_after_verify_fetches_block() {
635        let runner = deterministic::Runner::timed(Duration::from_secs(30));
636        runner.start(|mut context| async move {
637            let Fixture {
638                participants,
639                schemes,
640                ..
641            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
642            let mut oracle = setup_network_with_participants(
643                context.child("network"),
644                NZUsize!(1),
645                participants.clone(),
646            )
647            .await;
648
649            let me = participants[0].clone();
650            let setup = StandardHarness::setup_validator(
651                context.child("validator").with_attribute("index", 0),
652                &mut oracle,
653                me.clone(),
654                ConstantProvider::new(schemes[0].clone()),
655            )
656            .await;
657            let marshal = setup.mailbox;
658
659            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
660            let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
661            let mut inline = Inline::new(
662                context.child("inline"),
663                mock_app,
664                marshal.clone(),
665                FixedEpocher::new(BLOCKS_PER_EPOCH),
666            );
667
668            // Seed the parent and child blocks in marshal so verify can fetch locally.
669            let parent_round = Round::new(Epoch::zero(), View::new(1));
670            let parent_ctx = Ctx {
671                round: parent_round,
672                leader: default_leader(),
673                parent: (View::zero(), genesis.digest()),
674            };
675            let parent = B::new::<Sha256>(parent_ctx, genesis.digest(), Height::new(1), 100);
676            let parent_digest = parent.digest();
677            assert!(marshal.verified(parent_round, parent).await);
678
679            let round = Round::new(Epoch::zero(), View::new(2));
680            let verify_context = Ctx {
681                round,
682                leader: me,
683                parent: (View::new(1), parent_digest),
684            };
685            let block =
686                B::new::<Sha256>(verify_context.clone(), parent_digest, Height::new(2), 200);
687            let digest = block.digest();
688            assert!(marshal.verified(round, block).await);
689
690            // Complete verify first so the block is already available locally.
691            let verify_rx = inline.verify(verify_context, digest).await;
692            assert!(
693                verify_rx.await.unwrap(),
694                "verify should complete successfully before certify"
695            );
696
697            // Certify should return immediately instead of waiting on marshal.
698            let certify_rx = inline.certify(round, digest).await;
699
700            select! {
701                result = certify_rx => {
702                    assert!(
703                        result.unwrap(),
704                        "certify should return immediately once verify has fetched the block"
705                    );
706                },
707                _ = context.sleep(Duration::from_secs(5)) => {
708                    panic!("certify should not hang after local verify completed");
709                },
710            }
711        });
712    }
713
714    #[test_traced("INFO")]
715    fn test_certify_succeeds_without_verify_task() {
716        let runner = deterministic::Runner::timed(Duration::from_secs(30));
717        runner.start(|mut context| async move {
718            let Fixture {
719                participants,
720                schemes,
721                ..
722            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
723            let mut oracle = setup_network_with_participants(
724                context.child("network"),
725                NZUsize!(1),
726                participants.clone(),
727            )
728            .await;
729
730            let me = participants[0].clone();
731            let setup = StandardHarness::setup_validator(
732                context.child("validator").with_attribute("index", 0),
733                &mut oracle,
734                me.clone(),
735                ConstantProvider::new(schemes[0].clone()),
736            )
737            .await;
738            let marshal = setup.mailbox;
739
740            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
741            let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
742            let mut inline = Inline::new(
743                context.child("inline"),
744                mock_app,
745                marshal.clone(),
746                FixedEpocher::new(BLOCKS_PER_EPOCH),
747            );
748
749            // Seed the parent and child blocks in marshal without starting a verify task.
750            let parent_round = Round::new(Epoch::zero(), View::new(1));
751            let parent_ctx = Ctx {
752                round: parent_round,
753                leader: default_leader(),
754                parent: (View::zero(), genesis.digest()),
755            };
756            let parent = B::new::<Sha256>(parent_ctx, genesis.digest(), Height::new(1), 100);
757            let parent_digest = parent.digest();
758            assert!(marshal.verified(parent_round, parent).await);
759
760            let round = Round::new(Epoch::zero(), View::new(2));
761            let verify_context = Ctx {
762                round,
763                leader: me,
764                parent: (View::new(1), parent_digest),
765            };
766            let block =
767                B::new::<Sha256>(verify_context.clone(), parent_digest, Height::new(2), 200);
768            let digest = block.digest();
769            assert!(marshal.verified(round, block).await);
770
771            // Certify should still resolve by waiting on marshal block availability directly.
772            let certify_rx = inline.certify(round, digest).await;
773
774            select! {
775                result = certify_rx => {
776                    assert!(
777                        result.unwrap(),
778                        "certify should resolve once block availability is known"
779                    );
780                },
781                _ = context.sleep(Duration::from_secs(5)) => {
782                    panic!("certify should not hang when block is already available in marshal");
783                },
784            }
785        });
786    }
787
788    #[test_traced("INFO")]
789    fn test_certify_reproposal_uses_available_blocks_after_verify() {
790        let runner = deterministic::Runner::timed(Duration::from_secs(30));
791        runner.start(|mut context| async move {
792            let Fixture {
793                participants,
794                schemes,
795                ..
796            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
797            let mut oracle =
798                setup_network_with_participants(context.child("network"), NZUsize!(1), participants.clone())
799                    .await;
800
801            let me = participants[0].clone();
802            let setup = StandardHarness::setup_validator(
803                context.child("validator").with_attribute("index", 0),
804                &mut oracle,
805                me.clone(),
806                ConstantProvider::new(schemes[0].clone()),
807            )
808            .await;
809            let marshal = setup.mailbox;
810            let marshal_actor_handle = setup.actor_handle;
811
812            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
813            let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
814            let mut inline = Inline::new(context.child("inline"),
815                mock_app,
816                marshal.clone(),
817                FixedEpocher::new(BLOCKS_PER_EPOCH),
818            );
819
820            let boundary_height = Height::new(BLOCKS_PER_EPOCH.get() - 1);
821            let boundary_round = Round::new(Epoch::zero(), View::new(boundary_height.get()));
822            let boundary_block = B::new::<Sha256>(
823                Ctx {
824                    round: boundary_round,
825                    leader: default_leader(),
826                    parent: (View::zero(), genesis.digest()),
827                },
828                genesis.digest(),
829                boundary_height,
830                1900,
831            );
832            let boundary_digest = boundary_block.digest();
833            assert!(marshal.verified(boundary_round, boundary_block).await);
834
835            let reproposal_round = Round::new(Epoch::zero(), View::new(boundary_height.get() + 1));
836            let reproposal_context = Ctx {
837                round: reproposal_round,
838                leader: me,
839                parent: (View::new(boundary_height.get()), boundary_digest),
840            };
841
842            let verify_rx = inline.verify(reproposal_context, boundary_digest).await;
843            assert!(
844                verify_rx.await.unwrap(),
845                "verify should accept a valid boundary re-proposal"
846            );
847
848            marshal_actor_handle.abort();
849            drop(marshal);
850            context.sleep(Duration::from_millis(1)).await;
851
852            let certify_rx = inline.certify(reproposal_round, boundary_digest).await;
853            select! {
854                result = certify_rx => {
855                    assert!(
856                        result.unwrap(),
857                        "certify should use the available_blocks fast path for verified re-proposals"
858                    );
859                },
860                _ = context.sleep(Duration::from_secs(5)) => {
861                    panic!("certify should not depend on marshal after verify cached a re-proposal");
862                },
863            }
864        });
865    }
866
867    /// Regression: in inline mode, `verify` itself returns true after running
868    /// app verification. That return value drives the notarize vote, so it
869    /// must imply "block is durably persisted" -- otherwise a crash between
870    /// vote and persistence leaves the validator having voted for a block it
871    /// cannot serve.
872    ///
873    /// As with the deferred-mode test, the parent and child are seeded via
874    /// the buffered broadcast layer (in-memory only), bypassing
875    /// `marshal.proposed` which would already persist them.
876    #[test_traced("WARN")]
877    fn test_inline_verify_persists_block_before_resolving() {
878        for seed in 0u64..16 {
879            inline_verify_persists_block_before_resolving_at(seed);
880        }
881    }
882
883    fn inline_verify_persists_block_before_resolving_at(seed: u64) {
884        let runner = deterministic::Runner::new(
885            deterministic::Config::new()
886                .with_seed(seed)
887                .with_timeout(Some(Duration::from_secs(60))),
888        );
889        runner.start(|mut context| async move {
890            let Fixture {
891                participants,
892                schemes,
893                ..
894            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
895            let mut oracle = setup_network_with_participants(
896                context.child("network"),
897                NZUsize!(1),
898                participants.clone(),
899            )
900            .await;
901
902            let me = participants[0].clone();
903
904            let setup = StandardHarness::setup_validator(
905                context.child("validator").with_attribute("index", 0),
906                &mut oracle,
907                me.clone(),
908                ConstantProvider::new(schemes[0].clone()),
909            )
910            .await;
911            let marshal = setup.mailbox;
912            let buffer = setup.extra;
913            let actor_handle = setup.actor_handle;
914
915            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
916            let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
917
918            let mut inline = Inline::new(
919                context.child("inline"),
920                mock_app,
921                marshal.clone(),
922                FixedEpocher::new(BLOCKS_PER_EPOCH),
923            );
924
925            // Build parent (height 1) and child (height 2). Seed both into
926            // the buffered broadcast cache (in-memory only).
927            let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
928            let parent_digest = parent.digest();
929
930            let child_round = Round::new(Epoch::zero(), View::new(2));
931            let child_ctx = Ctx {
932                round: child_round,
933                leader: me.clone(),
934                parent: (View::new(1), parent_digest),
935            };
936            let child = B::new::<Sha256>(child_ctx.clone(), parent_digest, Height::new(2), 200);
937            let child_digest = child.digest();
938
939            assert!(
940                buffer
941                    .broadcast(commonware_p2p::Recipients::Some(vec![]), parent.clone())
942                    .accepted(),
943                "buffer broadcast for parent should be accepted"
944            );
945            assert!(
946                buffer
947                    .broadcast(commonware_p2p::Recipients::Some(vec![]), child.clone())
948                    .accepted(),
949                "buffer broadcast for child should be accepted"
950            );
951
952            // Inline verify runs full validation inline and returns true only
953            // after `marshal.verified` is enqueued. With the persistence-ack
954            // fix, that enqueue blocks until put_sync completes.
955            let verify_result = inline
956                .verify(child_ctx, child_digest)
957                .await
958                .await
959                .expect("verify result missing");
960            assert!(verify_result, "inline verify should pass");
961
962            // Abort the marshal actor synchronously, with no
963            // intervening await. If verify returned true but the actor had
964            // only enqueued (not processed) the `Verified` message, this
965            // abort kills the actor before persistence completes.
966            actor_handle.abort();
967            drop(inline);
968            drop(marshal);
969            drop(buffer);
970
971            // Restart from the same partition. The block must be durably
972            // persisted - otherwise the validator would have voted notarize
973            // for a block it cannot serve from local storage.
974            let setup2 = StandardHarness::setup_validator(
975                context
976                    .child("validator_restart")
977                    .with_attribute("index", 0),
978                &mut oracle,
979                me.clone(),
980                ConstantProvider::new(schemes[0].clone()),
981            )
982            .await;
983            let marshal2 = setup2.mailbox;
984
985            let post_restart = marshal2.get_block(&child_digest).await;
986            assert!(
987                post_restart.is_some(),
988                "verify resolved true so block must be durably persisted (seed={seed})"
989            );
990        });
991    }
992
993    /// Regression: `certify` resolving true drives the finalize vote in inline
994    /// mode, so it must imply the block is durably persisted even when the
995    /// certify path subscribed before `verify()` finished.
996    #[test_traced("WARN")]
997    fn test_inline_certify_persists_block_before_resolving() {
998        for seed in 0u64..16 {
999            inline_certify_persists_block_before_resolving_at(seed);
1000        }
1001    }
1002
1003    fn inline_certify_persists_block_before_resolving_at(seed: u64) {
1004        let runner = deterministic::Runner::new(
1005            deterministic::Config::new()
1006                .with_seed(seed)
1007                .with_timeout(Some(Duration::from_secs(60))),
1008        );
1009        runner.start(|mut context| async move {
1010            let Fixture {
1011                participants,
1012                schemes,
1013                ..
1014            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1015            let mut oracle = setup_network_with_participants(
1016                context.child("network"),
1017                NZUsize!(1),
1018                participants.clone(),
1019            )
1020            .await;
1021
1022            let me = participants[0].clone();
1023
1024            let setup = StandardHarness::setup_validator(
1025                context.child("validator").with_attribute("index", 0),
1026                &mut oracle,
1027                me.clone(),
1028                ConstantProvider::new(schemes[0].clone()),
1029            )
1030            .await;
1031            let marshal = setup.mailbox;
1032            let buffer = setup.extra;
1033            let actor_handle = setup.actor_handle;
1034
1035            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
1036            let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
1037            let mut inline = Inline::new(
1038                context.child("inline"),
1039                mock_app,
1040                marshal.clone(),
1041                FixedEpocher::new(BLOCKS_PER_EPOCH),
1042            );
1043
1044            let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
1045            let parent_digest = parent.digest();
1046
1047            let child_round = Round::new(Epoch::zero(), View::new(2));
1048            let child_ctx = Ctx {
1049                round: child_round,
1050                leader: me.clone(),
1051                parent: (View::new(1), parent_digest),
1052            };
1053            let child = B::new::<Sha256>(child_ctx.clone(), parent_digest, Height::new(2), 200);
1054            let child_digest = child.digest();
1055
1056            assert!(
1057                buffer
1058                    .broadcast(commonware_p2p::Recipients::Some(vec![]), parent.clone())
1059                    .accepted(),
1060                "buffer broadcast for parent should be accepted"
1061            );
1062            assert!(
1063                buffer
1064                    .broadcast(commonware_p2p::Recipients::Some(vec![]), child.clone())
1065                    .accepted(),
1066                "buffer broadcast for child should be accepted"
1067            );
1068
1069            let verify_rx = inline.verify(child_ctx, child_digest).await;
1070            let certify_result = inline
1071                .certify(child_round, child_digest)
1072                .await
1073                .await
1074                .expect("certify result missing");
1075            assert!(certify_result, "certify should succeed");
1076
1077            actor_handle.abort();
1078            drop(verify_rx);
1079            drop(inline);
1080            drop(marshal);
1081            drop(buffer);
1082
1083            let setup2 = StandardHarness::setup_validator(
1084                context
1085                    .child("validator_restart")
1086                    .with_attribute("index", 0),
1087                &mut oracle,
1088                me.clone(),
1089                ConstantProvider::new(schemes[0].clone()),
1090            )
1091            .await;
1092            let marshal2 = setup2.mailbox;
1093
1094            let post_restart = marshal2.get_block(&child_digest).await;
1095            assert!(
1096                post_restart.is_some(),
1097                "certify resolved true so block must be durably persisted (seed={seed})"
1098            );
1099        });
1100    }
1101
1102    #[test_traced("WARN")]
1103    fn test_inline_certify_does_not_bypass_failed_verify_persistence() {
1104        let runner = deterministic::Runner::timed(Duration::from_secs(30));
1105        runner.start(|mut context| async move {
1106            let Fixture {
1107                participants,
1108                schemes,
1109                ..
1110            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1111            let mut oracle = setup_network_with_participants(
1112                context.child("network"),
1113                NZUsize!(1),
1114                participants.clone(),
1115            )
1116            .await;
1117
1118            let me = participants[0].clone();
1119
1120            let setup = StandardHarness::setup_validator(
1121                context.child("validator").with_attribute("index", 0),
1122                &mut oracle,
1123                me.clone(),
1124                ConstantProvider::new(schemes[0].clone()),
1125            )
1126            .await;
1127            let marshal = setup.mailbox;
1128            let buffer = setup.extra;
1129            let marshal_actor_handle = setup.actor_handle;
1130
1131            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
1132            let (mock_app, verify_started, release_verify): (GatedVerifyingApp<B, S>, _, _) =
1133                GatedVerifyingApp::new();
1134            let mut inline = Inline::new(
1135                context.child("inline"),
1136                mock_app,
1137                marshal.clone(),
1138                FixedEpocher::new(BLOCKS_PER_EPOCH),
1139            );
1140
1141            let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
1142            let parent_digest = parent.digest();
1143
1144            let child_round = Round::new(Epoch::zero(), View::new(2));
1145            let child_ctx = Ctx {
1146                round: child_round,
1147                leader: me.clone(),
1148                parent: (View::new(1), parent_digest),
1149            };
1150            let child = B::new::<Sha256>(child_ctx.clone(), parent_digest, Height::new(2), 200);
1151            let child_digest = child.digest();
1152
1153            assert!(
1154                buffer
1155                    .broadcast(commonware_p2p::Recipients::Some(vec![]), parent)
1156                    .accepted(),
1157                "buffer broadcast for parent should be accepted"
1158            );
1159            assert!(
1160                buffer
1161                    .broadcast(commonware_p2p::Recipients::Some(vec![]), child)
1162                    .accepted(),
1163                "buffer broadcast for child should be accepted"
1164            );
1165
1166            let verify_rx = inline.verify(child_ctx, child_digest).await;
1167            verify_started
1168                .await
1169                .expect("verify should reach application before marshal abort");
1170
1171            // Wait for marshal shutdown to complete before releasing `app.verify`.
1172            // This makes the later persistence ack fail deterministically.
1173            marshal_actor_handle.abort();
1174            let _ = marshal_actor_handle.await;
1175            release_verify.send_lossy(());
1176
1177            select! {
1178                result = verify_rx => {
1179                    assert!(
1180                        result.is_err(),
1181                        "verify must not resolve after marshal.verified loses its persistence ack"
1182                    );
1183                },
1184                _ = context.sleep(Duration::from_secs(5)) => {
1185                    panic!("verify should terminate after marshal abort");
1186                },
1187            }
1188
1189            let certify_rx = inline.certify(child_round, child_digest).await;
1190            select! {
1191                result = certify_rx => {
1192                    assert!(
1193                        result.is_err(),
1194                        "certify must not bypass failed verify persistence via stale availability"
1195                    );
1196                },
1197                _ = context.sleep(Duration::from_secs(5)) => {
1198                    panic!("certify should terminate after marshal abort");
1199                },
1200            }
1201
1202            drop(inline);
1203            drop(marshal);
1204            drop(buffer);
1205
1206            let setup2 = StandardHarness::setup_validator(
1207                context
1208                    .child("validator_restart")
1209                    .with_attribute("index", 0),
1210                &mut oracle,
1211                me,
1212                ConstantProvider::new(schemes[0].clone()),
1213            )
1214            .await;
1215            let marshal2 = setup2.mailbox;
1216
1217            let post_restart = marshal2.get_block(&child_digest).await;
1218            assert!(
1219                post_restart.is_none(),
1220                "failed marshal.verified ack must not leave a durably recoverable block"
1221            );
1222        });
1223    }
1224
1225    /// Regression: if marshal persisted a verified block for a round before
1226    /// a crash (via a prior `propose` call) but the simplex notarize artifact
1227    /// never reached the journal, the restarted leader must skip proposing
1228    /// for that round. The cached block was built against a parent context
1229    /// that replay may have changed, so reusing it can broadcast a proposal
1230    /// whose payload no longer matches the recovered header. Building a
1231    /// fresh block would also be unsafe because the prunable archive silently
1232    /// drops the second write at the same view index. Dropping the receiver
1233    /// lets the voter nullify the view via `MissingProposal`.
1234    #[test_traced("WARN")]
1235    fn test_propose_skips_when_verified_block_exists_on_restart() {
1236        let runner = deterministic::Runner::timed(Duration::from_secs(30));
1237        runner.start(|mut context| async move {
1238            let Fixture {
1239                participants,
1240                schemes,
1241                ..
1242            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1243            let mut oracle = setup_network_with_participants(
1244                context.child("network"),
1245                NZUsize!(1),
1246                participants.clone(),
1247            )
1248            .await;
1249
1250            let me = participants[0].clone();
1251            let round = Round::new(Epoch::zero(), View::new(1));
1252            let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
1253            let ctx = Ctx {
1254                round,
1255                leader: me.clone(),
1256                parent: (View::zero(), genesis.digest()),
1257            };
1258
1259            // Pre-crash: seed `verified_blocks[V=1]` through the live mailbox,
1260            // mirroring an aborted pre-crash `Inline::propose` that persisted
1261            // its verified block before the voter could journal a notarize.
1262            let pre_setup = StandardHarness::setup_validator(
1263                context.child("validator").with_attribute("index", 0),
1264                &mut oracle,
1265                me.clone(),
1266                ConstantProvider::new(schemes[0].clone()),
1267            )
1268            .await;
1269            let pre_marshal = pre_setup.mailbox;
1270            let pre_actor = pre_setup.actor_handle;
1271            let pre_extra = pre_setup.extra;
1272            let pre_application = pre_setup.application;
1273
1274            let stale_block = B::new::<Sha256>(ctx.clone(), genesis.digest(), Height::new(1), 100);
1275            assert!(pre_marshal.verified(round, stale_block).await);
1276
1277            // Simulate a crash: abort the actor and drop every handle so the
1278            // storage partition is fully released before reopening.
1279            pre_actor.abort();
1280            drop(pre_marshal);
1281            drop(pre_extra);
1282            drop(pre_application);
1283
1284            // Post-crash: reopen the same partition. The verified block must
1285            // be recovered from storage during archive restore so that
1286            // `Message::GetVerified` on the new mailbox observes it.
1287            let post_setup = StandardHarness::setup_validator(
1288                context
1289                    .child("validator_restart")
1290                    .with_attribute("index", 0),
1291                &mut oracle,
1292                me.clone(),
1293                ConstantProvider::new(schemes[0].clone()),
1294            )
1295            .await;
1296            let post_marshal = post_setup.mailbox;
1297
1298            let fresh_block = B::new::<Sha256>(ctx.clone(), genesis.digest(), Height::new(1), 200);
1299            let mock_app: MockVerifyingApp<B, S> =
1300                MockVerifyingApp::new().with_propose_result(fresh_block);
1301            let mut inline = Inline::new(
1302                context.child("inline"),
1303                mock_app,
1304                post_marshal.clone(),
1305                FixedEpocher::new(BLOCKS_PER_EPOCH),
1306            );
1307
1308            let digest_rx = inline.propose(ctx).await;
1309            assert!(
1310                digest_rx.await.is_err(),
1311                "propose must drop the receiver so the voter nullifies the round via timeout"
1312            );
1313        });
1314    }
1315}