1use crate::{
46 marshal::{
47 application::validation::Stage,
48 core::{CommitmentFallback, DigestFallback, Mailbox},
49 standard::{
50 validation::{precheck_epoch_and_reproposal, verify_with_parent, Decision},
51 Standard,
52 },
53 Update,
54 },
55 simplex::{types::Context, Plan},
56 types::{Epocher, Round},
57 Application, Automaton, Block, CertifiableAutomaton, Epochable, Relay, Reporter,
58};
59use commonware_actor::Feedback;
60use commonware_cryptography::certificate::Scheme;
61use commonware_macros::select;
62use commonware_p2p::Recipients;
63use commonware_runtime::{
64 telemetry::metrics::{
65 histogram::{Buckets, Timed},
66 MetricsExt as _,
67 },
68 Clock, Metrics, Spawner,
69};
70use commonware_utils::{
71 channel::{fallible::OneshotExt, oneshot},
72 sync::{AsyncMutex, Mutex},
73};
74use rand::Rng;
75use std::{collections::BTreeSet, sync::Arc};
76use tracing::debug;
77
78type AvailableBlocks<D> = Arc<Mutex<BTreeSet<(Round, D)>>>;
81
82async fn await_block_subscription<T, D>(
84 tx: &mut oneshot::Sender<bool>,
85 block_rx: oneshot::Receiver<T>,
86 digest: &D,
87 stage: &'static str,
88) -> Option<T>
89where
90 D: std::fmt::Debug + ?Sized,
91{
92 select! {
93 _ = tx.closed() => {
94 debug!(
95 stage,
96 reason = "consensus dropped receiver",
97 "skipping block wait"
98 );
99 None
100 },
101 result = block_rx => {
102 if result.is_err() {
103 debug!(
104 stage,
105 ?digest,
106 reason = "failed to fetch block",
107 "skipping block wait"
108 );
109 }
110 result.ok()
111 },
112 }
113}
114
115pub struct Inline<E, S, A, B, ES>
132where
133 E: Rng + Spawner + Metrics + Clock,
134 S: Scheme,
135 A: Application<E>,
136 B: Block + Clone,
137 ES: Epocher,
138{
139 context: Arc<AsyncMutex<E>>,
140 application: A,
141 marshal: Mailbox<S, Standard<B>>,
142 epocher: ES,
143 available_blocks: AvailableBlocks<B::Digest>,
144
145 build_duration: Timed,
146 proposal_parent_fetch_duration: Timed,
147 ancestor_fetch_duration: Timed,
148}
149
150impl<E, S, A, B, ES> Clone for Inline<E, S, A, B, ES>
151where
152 E: Rng + Spawner + Metrics + Clock,
153 S: Scheme,
154 A: Application<E>,
155 B: Block + Clone,
156 ES: Epocher,
157{
158 fn clone(&self) -> Self {
159 Self {
160 context: self.context.clone(),
161 application: self.application.clone(),
162 marshal: self.marshal.clone(),
163 epocher: self.epocher.clone(),
164 available_blocks: self.available_blocks.clone(),
165 build_duration: self.build_duration.clone(),
166 proposal_parent_fetch_duration: self.proposal_parent_fetch_duration.clone(),
167 ancestor_fetch_duration: self.ancestor_fetch_duration.clone(),
168 }
169 }
170}
171
172impl<E, S, A, B, ES> Inline<E, S, A, B, ES>
173where
174 E: Rng + Spawner + Metrics + Clock,
175 S: Scheme,
176 A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
177 B: Block + Clone,
178 ES: Epocher,
179{
180 pub fn new(context: E, application: A, marshal: Mailbox<S, Standard<B>>, epocher: ES) -> Self {
184 let build_histogram = context.histogram(
185 "build_duration",
186 "Histogram of time taken for the application to build a new block, in seconds",
187 Buckets::LOCAL,
188 );
189 let build_duration = Timed::new(build_histogram);
190 let parent_fetch_histogram = context.histogram(
191 "parent_fetch_duration",
192 "Histogram of time taken to fetch a parent block in propose, in seconds",
193 Buckets::LOCAL,
194 );
195 let proposal_parent_fetch_duration = Timed::new(parent_fetch_histogram);
196 let ancestor_fetch_histogram = context.histogram(
197 "ancestor_fetch_duration",
198 "Histogram of time taken to fetch a block via the ancestry stream, in seconds",
199 Buckets::LOCAL,
200 );
201 let ancestor_fetch_duration = Timed::new(ancestor_fetch_histogram);
202
203 Self {
204 context: Arc::new(AsyncMutex::new(context)),
205 application,
206 marshal,
207 epocher,
208 available_blocks: Arc::new(Mutex::new(BTreeSet::new())),
209 build_duration,
210 proposal_parent_fetch_duration,
211 ancestor_fetch_duration,
212 }
213 }
214}
215
216impl<E, S, A, B, ES> Automaton for Inline<E, S, A, B, ES>
217where
218 E: Rng + Spawner + Metrics + Clock,
219 S: Scheme,
220 A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
221 B: Block + Clone,
222 ES: Epocher,
223{
224 type Digest = B::Digest;
225 type Context = Context<Self::Digest, S::PublicKey>;
226
227 async fn propose(
234 &mut self,
235 consensus_context: Context<Self::Digest, S::PublicKey>,
236 ) -> oneshot::Receiver<Self::Digest> {
237 let marshal = self.marshal.clone();
238 let mut application = self.application.clone();
239 let epocher = self.epocher.clone();
240 let build_duration = self.build_duration.clone();
241 let proposal_parent_fetch_duration = self.proposal_parent_fetch_duration.clone();
242 let ancestor_fetch_duration = self.ancestor_fetch_duration.clone();
243
244 let (mut tx, rx) = oneshot::channel();
245 let context = self
246 .context
247 .lock()
248 .await
249 .child("propose")
250 .with_attribute("round", consensus_context.round);
251 context.spawn(move |runtime_context| async move {
252 if marshal
263 .get_verified(consensus_context.round)
264 .await
265 .is_some()
266 {
267 debug!(
268 round = ?consensus_context.round,
269 "skipping proposal: verified block already exists for round on restart"
270 );
271 return;
272 }
273
274 let (parent_view, parent_commitment) = consensus_context.parent;
283 let parent_request = marshal.subscribe_by_commitment(
284 parent_commitment,
285 CommitmentFallback::FetchByRound {
286 round: Round::new(consensus_context.epoch(), parent_view),
287 },
288 );
289
290 let parent_timer = proposal_parent_fetch_duration.timer(&runtime_context);
291 let parent = select! {
292 _ = tx.closed() => {
293 debug!(reason = "consensus dropped receiver", "skipping proposal");
294 return;
295 },
296 result = parent_request => match result {
297 Ok(parent) => parent,
298 Err(_) => {
299 debug!(
300 ?parent_commitment,
301 reason = "failed to fetch parent block",
302 "skipping proposal"
303 );
304 return;
305 }
306 },
307 };
308 parent_timer.observe(&runtime_context);
309
310 let last_in_epoch = epocher
312 .last(consensus_context.epoch())
313 .expect("current epoch should exist");
314 if parent.height() == last_in_epoch {
315 let digest = parent.digest();
316 if !marshal.verified(consensus_context.round, parent).await {
317 debug!(
318 round = ?consensus_context.round,
319 ?digest,
320 "marshal rejected re-proposed boundary block"
321 );
322 return;
323 }
324 let success = tx.send_lossy(digest);
325 debug!(
326 round = ?consensus_context.round,
327 ?digest,
328 success,
329 "re-proposed parent block at epoch boundary"
330 );
331 return;
332 }
333
334 let ancestor_stream = marshal.ancestor_stream(
335 Arc::new(runtime_context.child("ancestor_stream")),
336 [parent],
337 ancestor_fetch_duration,
338 );
339 let build_request = application.propose(
340 (
341 runtime_context.child("app_propose"),
342 consensus_context.clone(),
343 ),
344 ancestor_stream,
345 );
346
347 let build_timer = build_duration.timer(&runtime_context);
348 let built_block = select! {
349 _ = tx.closed() => {
350 debug!(reason = "consensus dropped receiver", "skipping proposal");
351 return;
352 },
353 result = build_request => match result {
354 Some(block) => block,
355 None => {
356 debug!(
357 ?parent_commitment,
358 reason = "block building failed",
359 "skipping proposal"
360 );
361 return;
362 }
363 },
364 };
365 build_timer.observe(&runtime_context);
366
367 let digest = built_block.digest();
368 if !marshal.proposed(consensus_context.round, built_block).await {
369 debug!(
370 round = ?consensus_context.round,
371 ?digest,
372 "marshal rejected proposed block"
373 );
374 return;
375 }
376 let success = tx.send_lossy(digest);
377 debug!(
378 round = ?consensus_context.round,
379 ?digest,
380 success,
381 "proposed new block"
382 );
383 });
384 rx
385 }
386
387 async fn verify(
398 &mut self,
399 context: Context<Self::Digest, S::PublicKey>,
400 digest: Self::Digest,
401 ) -> oneshot::Receiver<bool> {
402 let mut marshal = self.marshal.clone();
403 let mut application = self.application.clone();
404 let epocher = self.epocher.clone();
405 let available_blocks = self.available_blocks.clone();
406 let ancestor_fetch_duration = self.ancestor_fetch_duration.clone();
407
408 let (mut tx, rx) = oneshot::channel();
409 let runtime_context = self
410 .context
411 .lock()
412 .await
413 .child("inline_verify")
414 .with_attribute("round", context.round);
415 runtime_context.spawn(move |runtime_context| async move {
416 let block_request = marshal.subscribe_by_digest(digest, DigestFallback::Wait);
417 let Some(block) =
418 await_block_subscription(&mut tx, block_request, &digest, "verification").await
419 else {
420 return;
421 };
422
423 let Some(decision) =
431 precheck_epoch_and_reproposal(&epocher, &mut marshal, &context, digest, block)
432 .await
433 else {
434 return;
435 };
436 let block = match decision {
437 Decision::Complete(valid) => {
438 if valid {
439 available_blocks.lock().insert((context.round, digest));
440 }
441 tx.send_lossy(valid);
442 return;
443 }
444 Decision::Continue(block) => block,
445 };
446
447 let round = context.round;
453 let application_valid = match verify_with_parent(
454 runtime_context,
455 context,
456 block,
457 &mut application,
458 &mut marshal,
459 &mut tx,
460 Stage::Verified,
461 ancestor_fetch_duration,
462 )
463 .await
464 {
465 Some(valid) => valid,
466 None => return,
467 };
468 if application_valid {
469 available_blocks.lock().insert((round, digest));
470 }
471 tx.send_lossy(application_valid);
472 });
473 rx
474 }
475}
476
477impl<E, S, A, B, ES> CertifiableAutomaton for Inline<E, S, A, B, ES>
479where
480 E: Rng + Spawner + Metrics + Clock,
481 S: Scheme,
482 A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
483 B: Block + Clone,
484 ES: Epocher,
485{
486 async fn certify(&mut self, round: Round, digest: Self::Digest) -> oneshot::Receiver<bool> {
487 if self.available_blocks.lock().contains(&(round, digest)) {
499 let (tx, rx) = oneshot::channel();
500 tx.send_lossy(true);
501 return rx;
502 }
503
504 let block_rx = self
510 .marshal
511 .subscribe_by_digest(digest, DigestFallback::FetchByRound { round });
512 let marshal = self.marshal.clone();
513 let (mut tx, rx) = oneshot::channel();
514 let context = self
515 .context
516 .lock()
517 .await
518 .child("inline_certify")
519 .with_attribute("round", round);
520 context.spawn(move |_| async move {
521 let Some(block) =
522 await_block_subscription(&mut tx, block_rx, &digest, "certification").await
523 else {
524 return;
525 };
526
527 if marshal.certified(round, block).await {
533 tx.send_lossy(true);
534 }
535 });
536
537 rx
541 }
542}
543
544impl<E, S, A, B, ES> Relay for Inline<E, S, A, B, ES>
545where
546 E: Rng + Spawner + Metrics + Clock,
547 S: Scheme,
548 A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>,
549 B: Block + Clone,
550 ES: Epocher,
551{
552 type Digest = B::Digest;
553 type PublicKey = S::PublicKey;
554 type Plan = Plan<S::PublicKey>;
555
556 fn broadcast(&mut self, commitment: Self::Digest, plan: Plan<S::PublicKey>) -> Feedback {
557 let (round, recipients) = match plan {
558 Plan::Propose { round } => (round, Recipients::All),
559 Plan::Forward { round, recipients } => (round, recipients),
560 };
561 self.marshal.forward(round, commitment, recipients)
562 }
563}
564
565impl<E, S, A, B, ES> Reporter for Inline<E, S, A, B, ES>
566where
567 E: Rng + Spawner + Metrics + Clock,
568 S: Scheme,
569 A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>
570 + Reporter<Activity = Update<B>>,
571 B: Block + Clone,
572 ES: Epocher,
573{
574 type Activity = A::Activity;
575
576 fn report(&mut self, update: Self::Activity) -> Feedback {
578 if let Update::Tip(tip_round, _, _) = &update {
579 self.available_blocks
580 .lock()
581 .retain(|(round, _)| round > tip_round);
582 }
583 self.application.report(update)
584 }
585}
586
587#[cfg(test)]
588mod tests {
589 use super::Inline;
590 use crate::{
591 marshal::mocks::{
592 harness::{
593 default_leader, make_raw_block, setup_network_with_participants, Ctx,
594 StandardHarness, TestHarness, B, BLOCKS_PER_EPOCH, NAMESPACE, NUM_VALIDATORS, S, V,
595 },
596 verifying::{GatedVerifyingApp, MockVerifyingApp},
597 },
598 simplex::{scheme::bls12381_threshold::vrf as bls12381_threshold_vrf, types::Context},
599 types::{Epoch, FixedEpocher, Height, Round, View},
600 Application, Automaton, Block, CertifiableAutomaton, Relay,
601 };
602 use commonware_broadcast::Broadcaster;
603 use commonware_cryptography::{
604 certificate::{mocks::Fixture, ConstantProvider, Scheme},
605 sha256::Sha256,
606 Digestible, Hasher as _,
607 };
608 use commonware_macros::{select, test_traced};
609 use commonware_runtime::{deterministic, Clock, Metrics, Runner, Spawner, Supervisor as _};
610 use commonware_utils::{channel::fallible::OneshotExt, NZUsize};
611 use rand::Rng;
612 use std::time::Duration;
613
614 #[allow(dead_code)]
616 fn assert_non_certifiable_block_supported<E, S, A, B, ES>()
617 where
618 E: Rng + Spawner + Metrics + Clock,
619 S: Scheme,
620 A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
621 B: Block + Clone,
622 ES: crate::types::Epocher,
623 {
624 fn assert_automaton<T: Automaton>() {}
625 fn assert_certifiable<T: CertifiableAutomaton>() {}
626 fn assert_relay<T: Relay>() {}
627
628 assert_automaton::<Inline<E, S, A, B, ES>>();
629 assert_certifiable::<Inline<E, S, A, B, ES>>();
630 assert_relay::<Inline<E, S, A, B, ES>>();
631 }
632
633 #[test_traced("INFO")]
634 fn test_certify_returns_immediately_after_verify_fetches_block() {
635 let runner = deterministic::Runner::timed(Duration::from_secs(30));
636 runner.start(|mut context| async move {
637 let Fixture {
638 participants,
639 schemes,
640 ..
641 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
642 let mut oracle = setup_network_with_participants(
643 context.child("network"),
644 NZUsize!(1),
645 participants.clone(),
646 )
647 .await;
648
649 let me = participants[0].clone();
650 let setup = StandardHarness::setup_validator(
651 context.child("validator").with_attribute("index", 0),
652 &mut oracle,
653 me.clone(),
654 ConstantProvider::new(schemes[0].clone()),
655 )
656 .await;
657 let marshal = setup.mailbox;
658
659 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
660 let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
661 let mut inline = Inline::new(
662 context.child("inline"),
663 mock_app,
664 marshal.clone(),
665 FixedEpocher::new(BLOCKS_PER_EPOCH),
666 );
667
668 let parent_round = Round::new(Epoch::zero(), View::new(1));
670 let parent_ctx = Ctx {
671 round: parent_round,
672 leader: default_leader(),
673 parent: (View::zero(), genesis.digest()),
674 };
675 let parent = B::new::<Sha256>(parent_ctx, genesis.digest(), Height::new(1), 100);
676 let parent_digest = parent.digest();
677 assert!(marshal.verified(parent_round, parent).await);
678
679 let round = Round::new(Epoch::zero(), View::new(2));
680 let verify_context = Ctx {
681 round,
682 leader: me,
683 parent: (View::new(1), parent_digest),
684 };
685 let block =
686 B::new::<Sha256>(verify_context.clone(), parent_digest, Height::new(2), 200);
687 let digest = block.digest();
688 assert!(marshal.verified(round, block).await);
689
690 let verify_rx = inline.verify(verify_context, digest).await;
692 assert!(
693 verify_rx.await.unwrap(),
694 "verify should complete successfully before certify"
695 );
696
697 let certify_rx = inline.certify(round, digest).await;
699
700 select! {
701 result = certify_rx => {
702 assert!(
703 result.unwrap(),
704 "certify should return immediately once verify has fetched the block"
705 );
706 },
707 _ = context.sleep(Duration::from_secs(5)) => {
708 panic!("certify should not hang after local verify completed");
709 },
710 }
711 });
712 }
713
714 #[test_traced("INFO")]
715 fn test_certify_succeeds_without_verify_task() {
716 let runner = deterministic::Runner::timed(Duration::from_secs(30));
717 runner.start(|mut context| async move {
718 let Fixture {
719 participants,
720 schemes,
721 ..
722 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
723 let mut oracle = setup_network_with_participants(
724 context.child("network"),
725 NZUsize!(1),
726 participants.clone(),
727 )
728 .await;
729
730 let me = participants[0].clone();
731 let setup = StandardHarness::setup_validator(
732 context.child("validator").with_attribute("index", 0),
733 &mut oracle,
734 me.clone(),
735 ConstantProvider::new(schemes[0].clone()),
736 )
737 .await;
738 let marshal = setup.mailbox;
739
740 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
741 let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
742 let mut inline = Inline::new(
743 context.child("inline"),
744 mock_app,
745 marshal.clone(),
746 FixedEpocher::new(BLOCKS_PER_EPOCH),
747 );
748
749 let parent_round = Round::new(Epoch::zero(), View::new(1));
751 let parent_ctx = Ctx {
752 round: parent_round,
753 leader: default_leader(),
754 parent: (View::zero(), genesis.digest()),
755 };
756 let parent = B::new::<Sha256>(parent_ctx, genesis.digest(), Height::new(1), 100);
757 let parent_digest = parent.digest();
758 assert!(marshal.verified(parent_round, parent).await);
759
760 let round = Round::new(Epoch::zero(), View::new(2));
761 let verify_context = Ctx {
762 round,
763 leader: me,
764 parent: (View::new(1), parent_digest),
765 };
766 let block =
767 B::new::<Sha256>(verify_context.clone(), parent_digest, Height::new(2), 200);
768 let digest = block.digest();
769 assert!(marshal.verified(round, block).await);
770
771 let certify_rx = inline.certify(round, digest).await;
773
774 select! {
775 result = certify_rx => {
776 assert!(
777 result.unwrap(),
778 "certify should resolve once block availability is known"
779 );
780 },
781 _ = context.sleep(Duration::from_secs(5)) => {
782 panic!("certify should not hang when block is already available in marshal");
783 },
784 }
785 });
786 }
787
788 #[test_traced("INFO")]
789 fn test_certify_reproposal_uses_available_blocks_after_verify() {
790 let runner = deterministic::Runner::timed(Duration::from_secs(30));
791 runner.start(|mut context| async move {
792 let Fixture {
793 participants,
794 schemes,
795 ..
796 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
797 let mut oracle =
798 setup_network_with_participants(context.child("network"), NZUsize!(1), participants.clone())
799 .await;
800
801 let me = participants[0].clone();
802 let setup = StandardHarness::setup_validator(
803 context.child("validator").with_attribute("index", 0),
804 &mut oracle,
805 me.clone(),
806 ConstantProvider::new(schemes[0].clone()),
807 )
808 .await;
809 let marshal = setup.mailbox;
810 let marshal_actor_handle = setup.actor_handle;
811
812 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
813 let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
814 let mut inline = Inline::new(context.child("inline"),
815 mock_app,
816 marshal.clone(),
817 FixedEpocher::new(BLOCKS_PER_EPOCH),
818 );
819
820 let boundary_height = Height::new(BLOCKS_PER_EPOCH.get() - 1);
821 let boundary_round = Round::new(Epoch::zero(), View::new(boundary_height.get()));
822 let boundary_block = B::new::<Sha256>(
823 Ctx {
824 round: boundary_round,
825 leader: default_leader(),
826 parent: (View::zero(), genesis.digest()),
827 },
828 genesis.digest(),
829 boundary_height,
830 1900,
831 );
832 let boundary_digest = boundary_block.digest();
833 assert!(marshal.verified(boundary_round, boundary_block).await);
834
835 let reproposal_round = Round::new(Epoch::zero(), View::new(boundary_height.get() + 1));
836 let reproposal_context = Ctx {
837 round: reproposal_round,
838 leader: me,
839 parent: (View::new(boundary_height.get()), boundary_digest),
840 };
841
842 let verify_rx = inline.verify(reproposal_context, boundary_digest).await;
843 assert!(
844 verify_rx.await.unwrap(),
845 "verify should accept a valid boundary re-proposal"
846 );
847
848 marshal_actor_handle.abort();
849 drop(marshal);
850 context.sleep(Duration::from_millis(1)).await;
851
852 let certify_rx = inline.certify(reproposal_round, boundary_digest).await;
853 select! {
854 result = certify_rx => {
855 assert!(
856 result.unwrap(),
857 "certify should use the available_blocks fast path for verified re-proposals"
858 );
859 },
860 _ = context.sleep(Duration::from_secs(5)) => {
861 panic!("certify should not depend on marshal after verify cached a re-proposal");
862 },
863 }
864 });
865 }
866
867 #[test_traced("WARN")]
877 fn test_inline_verify_persists_block_before_resolving() {
878 for seed in 0u64..16 {
879 inline_verify_persists_block_before_resolving_at(seed);
880 }
881 }
882
883 fn inline_verify_persists_block_before_resolving_at(seed: u64) {
884 let runner = deterministic::Runner::new(
885 deterministic::Config::new()
886 .with_seed(seed)
887 .with_timeout(Some(Duration::from_secs(60))),
888 );
889 runner.start(|mut context| async move {
890 let Fixture {
891 participants,
892 schemes,
893 ..
894 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
895 let mut oracle = setup_network_with_participants(
896 context.child("network"),
897 NZUsize!(1),
898 participants.clone(),
899 )
900 .await;
901
902 let me = participants[0].clone();
903
904 let setup = StandardHarness::setup_validator(
905 context.child("validator").with_attribute("index", 0),
906 &mut oracle,
907 me.clone(),
908 ConstantProvider::new(schemes[0].clone()),
909 )
910 .await;
911 let marshal = setup.mailbox;
912 let buffer = setup.extra;
913 let actor_handle = setup.actor_handle;
914
915 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
916 let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
917
918 let mut inline = Inline::new(
919 context.child("inline"),
920 mock_app,
921 marshal.clone(),
922 FixedEpocher::new(BLOCKS_PER_EPOCH),
923 );
924
925 let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
928 let parent_digest = parent.digest();
929
930 let child_round = Round::new(Epoch::zero(), View::new(2));
931 let child_ctx = Ctx {
932 round: child_round,
933 leader: me.clone(),
934 parent: (View::new(1), parent_digest),
935 };
936 let child = B::new::<Sha256>(child_ctx.clone(), parent_digest, Height::new(2), 200);
937 let child_digest = child.digest();
938
939 assert!(
940 buffer
941 .broadcast(commonware_p2p::Recipients::Some(vec![]), parent.clone())
942 .accepted(),
943 "buffer broadcast for parent should be accepted"
944 );
945 assert!(
946 buffer
947 .broadcast(commonware_p2p::Recipients::Some(vec![]), child.clone())
948 .accepted(),
949 "buffer broadcast for child should be accepted"
950 );
951
952 let verify_result = inline
956 .verify(child_ctx, child_digest)
957 .await
958 .await
959 .expect("verify result missing");
960 assert!(verify_result, "inline verify should pass");
961
962 actor_handle.abort();
967 drop(inline);
968 drop(marshal);
969 drop(buffer);
970
971 let setup2 = StandardHarness::setup_validator(
975 context
976 .child("validator_restart")
977 .with_attribute("index", 0),
978 &mut oracle,
979 me.clone(),
980 ConstantProvider::new(schemes[0].clone()),
981 )
982 .await;
983 let marshal2 = setup2.mailbox;
984
985 let post_restart = marshal2.get_block(&child_digest).await;
986 assert!(
987 post_restart.is_some(),
988 "verify resolved true so block must be durably persisted (seed={seed})"
989 );
990 });
991 }
992
993 #[test_traced("WARN")]
997 fn test_inline_certify_persists_block_before_resolving() {
998 for seed in 0u64..16 {
999 inline_certify_persists_block_before_resolving_at(seed);
1000 }
1001 }
1002
1003 fn inline_certify_persists_block_before_resolving_at(seed: u64) {
1004 let runner = deterministic::Runner::new(
1005 deterministic::Config::new()
1006 .with_seed(seed)
1007 .with_timeout(Some(Duration::from_secs(60))),
1008 );
1009 runner.start(|mut context| async move {
1010 let Fixture {
1011 participants,
1012 schemes,
1013 ..
1014 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1015 let mut oracle = setup_network_with_participants(
1016 context.child("network"),
1017 NZUsize!(1),
1018 participants.clone(),
1019 )
1020 .await;
1021
1022 let me = participants[0].clone();
1023
1024 let setup = StandardHarness::setup_validator(
1025 context.child("validator").with_attribute("index", 0),
1026 &mut oracle,
1027 me.clone(),
1028 ConstantProvider::new(schemes[0].clone()),
1029 )
1030 .await;
1031 let marshal = setup.mailbox;
1032 let buffer = setup.extra;
1033 let actor_handle = setup.actor_handle;
1034
1035 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
1036 let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
1037 let mut inline = Inline::new(
1038 context.child("inline"),
1039 mock_app,
1040 marshal.clone(),
1041 FixedEpocher::new(BLOCKS_PER_EPOCH),
1042 );
1043
1044 let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
1045 let parent_digest = parent.digest();
1046
1047 let child_round = Round::new(Epoch::zero(), View::new(2));
1048 let child_ctx = Ctx {
1049 round: child_round,
1050 leader: me.clone(),
1051 parent: (View::new(1), parent_digest),
1052 };
1053 let child = B::new::<Sha256>(child_ctx.clone(), parent_digest, Height::new(2), 200);
1054 let child_digest = child.digest();
1055
1056 assert!(
1057 buffer
1058 .broadcast(commonware_p2p::Recipients::Some(vec![]), parent.clone())
1059 .accepted(),
1060 "buffer broadcast for parent should be accepted"
1061 );
1062 assert!(
1063 buffer
1064 .broadcast(commonware_p2p::Recipients::Some(vec![]), child.clone())
1065 .accepted(),
1066 "buffer broadcast for child should be accepted"
1067 );
1068
1069 let verify_rx = inline.verify(child_ctx, child_digest).await;
1070 let certify_result = inline
1071 .certify(child_round, child_digest)
1072 .await
1073 .await
1074 .expect("certify result missing");
1075 assert!(certify_result, "certify should succeed");
1076
1077 actor_handle.abort();
1078 drop(verify_rx);
1079 drop(inline);
1080 drop(marshal);
1081 drop(buffer);
1082
1083 let setup2 = StandardHarness::setup_validator(
1084 context
1085 .child("validator_restart")
1086 .with_attribute("index", 0),
1087 &mut oracle,
1088 me.clone(),
1089 ConstantProvider::new(schemes[0].clone()),
1090 )
1091 .await;
1092 let marshal2 = setup2.mailbox;
1093
1094 let post_restart = marshal2.get_block(&child_digest).await;
1095 assert!(
1096 post_restart.is_some(),
1097 "certify resolved true so block must be durably persisted (seed={seed})"
1098 );
1099 });
1100 }
1101
1102 #[test_traced("WARN")]
1103 fn test_inline_certify_does_not_bypass_failed_verify_persistence() {
1104 let runner = deterministic::Runner::timed(Duration::from_secs(30));
1105 runner.start(|mut context| async move {
1106 let Fixture {
1107 participants,
1108 schemes,
1109 ..
1110 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1111 let mut oracle = setup_network_with_participants(
1112 context.child("network"),
1113 NZUsize!(1),
1114 participants.clone(),
1115 )
1116 .await;
1117
1118 let me = participants[0].clone();
1119
1120 let setup = StandardHarness::setup_validator(
1121 context.child("validator").with_attribute("index", 0),
1122 &mut oracle,
1123 me.clone(),
1124 ConstantProvider::new(schemes[0].clone()),
1125 )
1126 .await;
1127 let marshal = setup.mailbox;
1128 let buffer = setup.extra;
1129 let marshal_actor_handle = setup.actor_handle;
1130
1131 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
1132 let (mock_app, verify_started, release_verify): (GatedVerifyingApp<B, S>, _, _) =
1133 GatedVerifyingApp::new();
1134 let mut inline = Inline::new(
1135 context.child("inline"),
1136 mock_app,
1137 marshal.clone(),
1138 FixedEpocher::new(BLOCKS_PER_EPOCH),
1139 );
1140
1141 let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
1142 let parent_digest = parent.digest();
1143
1144 let child_round = Round::new(Epoch::zero(), View::new(2));
1145 let child_ctx = Ctx {
1146 round: child_round,
1147 leader: me.clone(),
1148 parent: (View::new(1), parent_digest),
1149 };
1150 let child = B::new::<Sha256>(child_ctx.clone(), parent_digest, Height::new(2), 200);
1151 let child_digest = child.digest();
1152
1153 assert!(
1154 buffer
1155 .broadcast(commonware_p2p::Recipients::Some(vec![]), parent)
1156 .accepted(),
1157 "buffer broadcast for parent should be accepted"
1158 );
1159 assert!(
1160 buffer
1161 .broadcast(commonware_p2p::Recipients::Some(vec![]), child)
1162 .accepted(),
1163 "buffer broadcast for child should be accepted"
1164 );
1165
1166 let verify_rx = inline.verify(child_ctx, child_digest).await;
1167 verify_started
1168 .await
1169 .expect("verify should reach application before marshal abort");
1170
1171 marshal_actor_handle.abort();
1174 let _ = marshal_actor_handle.await;
1175 release_verify.send_lossy(());
1176
1177 select! {
1178 result = verify_rx => {
1179 assert!(
1180 result.is_err(),
1181 "verify must not resolve after marshal.verified loses its persistence ack"
1182 );
1183 },
1184 _ = context.sleep(Duration::from_secs(5)) => {
1185 panic!("verify should terminate after marshal abort");
1186 },
1187 }
1188
1189 let certify_rx = inline.certify(child_round, child_digest).await;
1190 select! {
1191 result = certify_rx => {
1192 assert!(
1193 result.is_err(),
1194 "certify must not bypass failed verify persistence via stale availability"
1195 );
1196 },
1197 _ = context.sleep(Duration::from_secs(5)) => {
1198 panic!("certify should terminate after marshal abort");
1199 },
1200 }
1201
1202 drop(inline);
1203 drop(marshal);
1204 drop(buffer);
1205
1206 let setup2 = StandardHarness::setup_validator(
1207 context
1208 .child("validator_restart")
1209 .with_attribute("index", 0),
1210 &mut oracle,
1211 me,
1212 ConstantProvider::new(schemes[0].clone()),
1213 )
1214 .await;
1215 let marshal2 = setup2.mailbox;
1216
1217 let post_restart = marshal2.get_block(&child_digest).await;
1218 assert!(
1219 post_restart.is_none(),
1220 "failed marshal.verified ack must not leave a durably recoverable block"
1221 );
1222 });
1223 }
1224
1225 #[test_traced("WARN")]
1235 fn test_propose_skips_when_verified_block_exists_on_restart() {
1236 let runner = deterministic::Runner::timed(Duration::from_secs(30));
1237 runner.start(|mut context| async move {
1238 let Fixture {
1239 participants,
1240 schemes,
1241 ..
1242 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1243 let mut oracle = setup_network_with_participants(
1244 context.child("network"),
1245 NZUsize!(1),
1246 participants.clone(),
1247 )
1248 .await;
1249
1250 let me = participants[0].clone();
1251 let round = Round::new(Epoch::zero(), View::new(1));
1252 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
1253 let ctx = Ctx {
1254 round,
1255 leader: me.clone(),
1256 parent: (View::zero(), genesis.digest()),
1257 };
1258
1259 let pre_setup = StandardHarness::setup_validator(
1263 context.child("validator").with_attribute("index", 0),
1264 &mut oracle,
1265 me.clone(),
1266 ConstantProvider::new(schemes[0].clone()),
1267 )
1268 .await;
1269 let pre_marshal = pre_setup.mailbox;
1270 let pre_actor = pre_setup.actor_handle;
1271 let pre_extra = pre_setup.extra;
1272 let pre_application = pre_setup.application;
1273
1274 let stale_block = B::new::<Sha256>(ctx.clone(), genesis.digest(), Height::new(1), 100);
1275 assert!(pre_marshal.verified(round, stale_block).await);
1276
1277 pre_actor.abort();
1280 drop(pre_marshal);
1281 drop(pre_extra);
1282 drop(pre_application);
1283
1284 let post_setup = StandardHarness::setup_validator(
1288 context
1289 .child("validator_restart")
1290 .with_attribute("index", 0),
1291 &mut oracle,
1292 me.clone(),
1293 ConstantProvider::new(schemes[0].clone()),
1294 )
1295 .await;
1296 let post_marshal = post_setup.mailbox;
1297
1298 let fresh_block = B::new::<Sha256>(ctx.clone(), genesis.digest(), Height::new(1), 200);
1299 let mock_app: MockVerifyingApp<B, S> =
1300 MockVerifyingApp::new().with_propose_result(fresh_block);
1301 let mut inline = Inline::new(
1302 context.child("inline"),
1303 mock_app,
1304 post_marshal.clone(),
1305 FixedEpocher::new(BLOCKS_PER_EPOCH),
1306 );
1307
1308 let digest_rx = inline.propose(ctx).await;
1309 assert!(
1310 digest_rx.await.is_err(),
1311 "propose must drop the receiver so the voter nullifies the round via timeout"
1312 );
1313 });
1314 }
1315}