1use crate::{
74 marshal::{
75 application::{
76 validation::{is_inferred_reproposal_at_certify, Stage},
77 verification_tasks::VerificationTasks,
78 },
79 core::{CommitmentFallback, DigestFallback, Mailbox},
80 standard::{
81 validation::{precheck_epoch_and_reproposal, verify_with_parent, Decision},
82 Standard,
83 },
84 Update,
85 },
86 simplex::{types::Context, Plan},
87 types::{Epocher, Round},
88 Application, Automaton, CertifiableAutomaton, CertifiableBlock, Epochable, Relay, Reporter,
89};
90use commonware_actor::Feedback;
91use commonware_cryptography::{certificate::Scheme, Digestible};
92use commonware_macros::select;
93use commonware_p2p::Recipients;
94use commonware_runtime::{
95 telemetry::metrics::{
96 histogram::{Buckets, Timed},
97 MetricsExt as _,
98 },
99 Clock, Metrics, Spawner,
100};
101use commonware_utils::{
102 channel::{fallible::OneshotExt, oneshot},
103 sync::AsyncMutex,
104};
105use rand::Rng;
106use std::sync::Arc;
107use tracing::debug;
108
109pub struct Deferred<E, S, A, B, ES>
138where
139 E: Rng + Spawner + Metrics + Clock,
140 S: Scheme,
141 A: Application<E>,
142 B: CertifiableBlock,
143 ES: Epocher,
144{
145 context: Arc<AsyncMutex<E>>,
146 application: A,
147 marshal: Mailbox<S, Standard<B>>,
148 epocher: ES,
149 verification_tasks: VerificationTasks<<B as Digestible>::Digest>,
150
151 build_duration: Timed,
152 proposal_parent_fetch_duration: Timed,
153 ancestor_fetch_duration: Timed,
154}
155
156impl<E, S, A, B, ES> Clone for Deferred<E, S, A, B, ES>
157where
158 E: Rng + Spawner + Metrics + Clock,
159 S: Scheme,
160 A: Application<E>,
161 B: CertifiableBlock,
162 ES: Epocher,
163{
164 fn clone(&self) -> Self {
165 Self {
166 context: self.context.clone(),
167 application: self.application.clone(),
168 marshal: self.marshal.clone(),
169 epocher: self.epocher.clone(),
170 verification_tasks: self.verification_tasks.clone(),
171 build_duration: self.build_duration.clone(),
172 proposal_parent_fetch_duration: self.proposal_parent_fetch_duration.clone(),
173 ancestor_fetch_duration: self.ancestor_fetch_duration.clone(),
174 }
175 }
176}
177
178impl<E, S, A, B, ES> Deferred<E, S, A, B, ES>
179where
180 E: Rng + Spawner + Metrics + Clock,
181 S: Scheme,
182 A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
183 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
184 ES: Epocher,
185{
186 pub fn new(context: E, application: A, marshal: Mailbox<S, Standard<B>>, epocher: ES) -> Self {
188 let build_histogram = context.histogram(
189 "build_duration",
190 "Histogram of time taken for the application to build a new block, in seconds",
191 Buckets::LOCAL,
192 );
193 let build_duration = Timed::new(build_histogram);
194 let parent_fetch_histogram = context.histogram(
195 "parent_fetch_duration",
196 "Histogram of time taken to fetch a parent block in propose, in seconds",
197 Buckets::LOCAL,
198 );
199 let proposal_parent_fetch_duration = Timed::new(parent_fetch_histogram);
200 let ancestor_fetch_histogram = context.histogram(
201 "ancestor_fetch_duration",
202 "Histogram of time taken to fetch a block via the ancestry stream, in seconds",
203 Buckets::LOCAL,
204 );
205 let ancestor_fetch_duration = Timed::new(ancestor_fetch_histogram);
206
207 Self {
208 context: Arc::new(AsyncMutex::new(context)),
209 application,
210 marshal,
211 epocher,
212 verification_tasks: VerificationTasks::new(),
213
214 build_duration,
215 proposal_parent_fetch_duration,
216 ancestor_fetch_duration,
217 }
218 }
219
220 #[inline]
230 async fn deferred_verify(
231 &mut self,
232 context: <Self as Automaton>::Context,
233 block: B,
234 stage: Stage,
235 ) -> oneshot::Receiver<bool> {
236 let mut marshal = self.marshal.clone();
237 let mut application = self.application.clone();
238 let (mut tx, rx) = oneshot::channel();
239 let ancestor_fetch_duration = self.ancestor_fetch_duration.clone();
240 let runtime_context = self
241 .context
242 .lock()
243 .await
244 .child("deferred_verify")
245 .with_attribute("round", context.round);
246 runtime_context.spawn(move |runtime_context| async move {
247 let application_valid = match verify_with_parent(
256 runtime_context,
257 context,
258 block,
259 &mut application,
260 &mut marshal,
261 &mut tx,
262 stage,
263 ancestor_fetch_duration,
264 )
265 .await
266 {
267 Some(valid) => valid,
268 None => return,
269 };
270 tx.send_lossy(application_valid);
271 });
272
273 rx
274 }
275
276 async fn certify_from_embedded_context(
277 &mut self,
278 round: Round,
279 digest: B::Digest,
280 ) -> oneshot::Receiver<bool> {
281 debug!(
296 ?round,
297 ?digest,
298 "subscribing to block for certification using embedded context"
299 );
300 let block_rx = self
301 .marshal
302 .subscribe_by_digest(digest, DigestFallback::FetchByRound { round });
303 let mut marshaled = self.clone();
304 let epocher = self.epocher.clone();
305 let (mut tx, rx) = oneshot::channel();
306 let context = self
307 .context
308 .lock()
309 .await
310 .child("certify")
311 .with_attribute("round", round);
312 context.spawn(move |_| async move {
313 let block = select! {
314 _ = tx.closed() => {
315 debug!(
316 reason = "consensus dropped receiver",
317 "skipping certification"
318 );
319 return;
320 },
321 result = block_rx => match result {
322 Ok(block) => block,
323 Err(_) => {
324 debug!(
325 ?digest,
326 reason = "failed to fetch block for certification",
327 "skipping certification"
328 );
329 return;
330 }
331 },
332 };
333
334 let embedded_context = block.context();
341 let is_reproposal = is_inferred_reproposal_at_certify(
342 &epocher,
343 block.height(),
344 embedded_context.round,
345 round,
346 );
347 if is_reproposal {
348 if !marshaled.marshal.certified(round, block).await {
352 debug!(?round, "marshal unable to accept block");
353 return;
354 }
355 tx.send_lossy(true);
356 return;
357 }
358
359 let verify_rx = marshaled
360 .deferred_verify(embedded_context, block, Stage::Certified)
361 .await;
362 if let Ok(result) = verify_rx.await {
363 tx.send_lossy(result);
364 }
365 });
366 rx
367 }
368
369 async fn certify_from_existing_task(
370 &mut self,
371 round: Round,
372 digest: B::Digest,
373 task: oneshot::Receiver<bool>,
374 ) -> oneshot::Receiver<bool> {
375 self.marshal.hint_notarized(round, digest);
380
381 let mut marshaled = self.clone();
382 let (mut tx, rx) = oneshot::channel();
383 let context = self
384 .context
385 .lock()
386 .await
387 .child("certify_existing")
388 .with_attribute("round", round);
389 context.spawn(move |_| async move {
390 let result = select! {
391 _ = tx.closed() => {
392 debug!(
393 reason = "consensus dropped receiver",
394 "skipping certification"
395 );
396 return;
397 },
398 result = task => result,
399 };
400 match result {
401 Ok(result) => {
402 tx.send_lossy(result);
403 }
404 Err(_) => {
405 debug!(
406 ?round,
407 ?digest,
408 "verification task closed before certification, falling back to embedded context"
409 );
410 let fallback = marshaled.certify_from_embedded_context(round, digest).await;
411 let result = select! {
412 _ = tx.closed() => {
413 debug!(
414 reason = "consensus dropped receiver",
415 "skipping certification"
416 );
417 return;
418 },
419 result = fallback => result,
420 };
421 if let Ok(result) = result {
422 tx.send_lossy(result);
423 }
424 }
425 }
426 });
427 rx
428 }
429}
430
431impl<E, S, A, B, ES> Automaton for Deferred<E, S, A, B, ES>
432where
433 E: Rng + Spawner + Metrics + Clock,
434 S: Scheme,
435 A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
436 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
437 ES: Epocher,
438{
439 type Digest = B::Digest;
440 type Context = Context<Self::Digest, S::PublicKey>;
441
442 async fn propose(
453 &mut self,
454 consensus_context: Context<Self::Digest, S::PublicKey>,
455 ) -> oneshot::Receiver<Self::Digest> {
456 let marshal = self.marshal.clone();
457 let mut application = self.application.clone();
458 let epocher = self.epocher.clone();
459
460 let build_duration = self.build_duration.clone();
462 let proposal_parent_fetch_duration = self.proposal_parent_fetch_duration.clone();
463 let ancestor_fetch_duration = self.ancestor_fetch_duration.clone();
464
465 let (mut tx, rx) = oneshot::channel();
466 let context = self
467 .context
468 .lock()
469 .await
470 .child("propose")
471 .with_attribute("round", consensus_context.round);
472 context.spawn(move |runtime_context| async move {
473 if let Some(block) = marshal.get_verified(consensus_context.round).await {
487 let block_context = block.context();
488 if block_context != consensus_context {
489 debug!(
490 round = ?consensus_context.round,
491 ?consensus_context,
492 ?block_context,
493 "skipping proposal: cached verified block context no longer matches"
494 );
495 return;
496 }
497 let digest = block.digest();
498 let success = tx.send_lossy(digest);
499 debug!(
500 round = ?consensus_context.round,
501 ?digest,
502 success,
503 "reused verified block from marshal on leader recovery"
504 );
505 return;
506 }
507
508 let (parent_view, parent_commitment) = consensus_context.parent;
517 let parent_request = marshal.subscribe_by_commitment(
518 parent_commitment,
519 CommitmentFallback::FetchByRound {
520 round: Round::new(consensus_context.epoch(), parent_view),
521 },
522 );
523
524 let parent_timer = proposal_parent_fetch_duration.timer(&runtime_context);
525 let parent = select! {
526 _ = tx.closed() => {
527 debug!(reason = "consensus dropped receiver", "skipping proposal");
528 return;
529 },
530 result = parent_request => match result {
531 Ok(parent) => parent,
532 Err(_) => {
533 debug!(
534 ?parent_commitment,
535 reason = "failed to fetch parent block",
536 "skipping proposal"
537 );
538 return;
539 }
540 },
541 };
542 parent_timer.observe(&runtime_context);
543
544 let last_in_epoch = epocher
548 .last(consensus_context.epoch())
549 .expect("current epoch should exist");
550 if parent.height() == last_in_epoch {
551 let digest = parent.digest();
552 if !marshal.verified(consensus_context.round, parent).await {
553 debug!(
554 round = ?consensus_context.round,
555 ?digest,
556 "marshal rejected re-proposed boundary block"
557 );
558 return;
559 }
560 let success = tx.send_lossy(digest);
561 debug!(
562 round = ?consensus_context.round,
563 ?digest,
564 success,
565 "re-proposed parent block at epoch boundary"
566 );
567 return;
568 }
569
570 let ancestor_stream = marshal.ancestor_stream(
571 Arc::new(runtime_context.child("ancestor_stream")),
572 [parent],
573 ancestor_fetch_duration,
574 );
575 let build_request = application.propose(
576 (
577 runtime_context.child("app_propose"),
578 consensus_context.clone(),
579 ),
580 ancestor_stream,
581 );
582
583 let build_timer = build_duration.timer(&runtime_context);
584 let built_block = select! {
585 _ = tx.closed() => {
586 debug!(reason = "consensus dropped receiver", "skipping proposal");
587 return;
588 },
589 result = build_request => match result {
590 Some(block) => block,
591 None => {
592 debug!(
593 ?parent_commitment,
594 reason = "block building failed",
595 "skipping proposal"
596 );
597 return;
598 }
599 },
600 };
601 build_timer.observe(&runtime_context);
602
603 let digest = built_block.digest();
604 if !marshal.proposed(consensus_context.round, built_block).await {
605 debug!(
606 round = ?consensus_context.round,
607 ?digest,
608 "marshal rejected proposed block"
609 );
610 return;
611 }
612 let success = tx.send_lossy(digest);
613 debug!(
614 round = ?consensus_context.round,
615 ?digest,
616 success,
617 "proposed new block"
618 );
619 });
620 rx
621 }
622
623 async fn verify(
624 &mut self,
625 context: Context<Self::Digest, S::PublicKey>,
626 digest: Self::Digest,
627 ) -> oneshot::Receiver<bool> {
628 let mut marshal = self.marshal.clone();
629 let mut marshaled = self.clone();
630 let round = context.round;
631
632 let (task_tx, task_rx) = oneshot::channel();
637 self.verification_tasks.insert(round, digest, task_rx);
638
639 let (mut tx, rx) = oneshot::channel();
640 let runtime_context = self
641 .context
642 .lock()
643 .await
644 .child("optimistic_verify")
645 .with_attribute("round", round);
646 runtime_context.spawn(move |_| async move {
647 let block_request = marshal.subscribe_by_digest(digest, DigestFallback::Wait);
648 let block = select! {
649 _ = tx.closed() => {
650 debug!(
651 reason = "consensus dropped receiver",
652 "skipping optimistic verification"
653 );
654 return;
655 },
656 result = block_request => match result {
657 Ok(block) => block,
658 Err(_) => {
659 debug!(
660 ?digest,
661 reason = "failed to fetch block for optimistic verification",
662 "skipping optimistic verification"
663 );
664 return;
665 }
666 },
667 };
668
669 let Some(decision) = precheck_epoch_and_reproposal(
677 &marshaled.epocher,
678 &mut marshal,
679 &context,
680 digest,
681 block,
682 )
683 .await
684 else {
685 return;
686 };
687 let block = match decision {
688 Decision::Complete(valid) => {
689 task_tx.send_lossy(valid);
692 tx.send_lossy(valid);
693 return;
694 }
695 Decision::Continue(block) => block,
696 };
697
698 if block.context() != context {
707 debug!(
708 ?context,
709 block_context = ?block.context(),
710 "block-embedded context does not match consensus context during optimistic verification"
711 );
712 task_tx.send_lossy(false);
713 tx.send_lossy(false);
714 return;
715 }
716
717 let deferred_rx = marshaled
722 .deferred_verify(context, block, Stage::Verified)
723 .await;
724 tx.send_lossy(true);
725 if let Ok(result) = deferred_rx.await {
726 task_tx.send_lossy(result);
727 }
728 });
729 rx
730 }
731}
732
733impl<E, S, A, B, ES> CertifiableAutomaton for Deferred<E, S, A, B, ES>
734where
735 E: Rng + Spawner + Metrics + Clock,
736 S: Scheme,
737 A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
738 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
739 ES: Epocher,
740{
741 async fn certify(&mut self, round: Round, digest: Self::Digest) -> oneshot::Receiver<bool> {
742 let task = self.verification_tasks.take(round, digest);
744 if let Some(task) = task {
745 return self.certify_from_existing_task(round, digest, task).await;
746 }
747
748 self.certify_from_embedded_context(round, digest).await
749 }
750}
751
752impl<E, S, A, B, ES> Relay for Deferred<E, S, A, B, ES>
753where
754 E: Rng + Spawner + Metrics + Clock,
755 S: Scheme,
756 A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>,
757 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
758 ES: Epocher,
759{
760 type Digest = B::Digest;
761 type PublicKey = S::PublicKey;
762 type Plan = Plan<S::PublicKey>;
763
764 fn broadcast(&mut self, commitment: Self::Digest, plan: Plan<S::PublicKey>) -> Feedback {
765 let (round, recipients) = match plan {
766 Plan::Propose { round } => (round, Recipients::All),
767 Plan::Forward { round, recipients } => (round, recipients),
768 };
769 self.marshal.forward(round, commitment, recipients)
770 }
771}
772
773impl<E, S, A, B, ES> Reporter for Deferred<E, S, A, B, ES>
774where
775 E: Rng + Spawner + Metrics + Clock,
776 S: Scheme,
777 A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>
778 + Reporter<Activity = Update<B>>,
779 B: CertifiableBlock<Context = <A as Application<E>>::Context>,
780 ES: Epocher,
781{
782 type Activity = A::Activity;
783
784 fn report(&mut self, update: Self::Activity) -> Feedback {
786 if let Update::Tip(round, _, _) = &update {
788 self.verification_tasks.retain_after(round);
789 }
790 self.application.report(update)
791 }
792}
793
794#[cfg(test)]
795mod tests {
796 use super::Deferred;
797 use crate::{
798 marshal::mocks::{
799 harness::{
800 default_leader, make_raw_block, setup_network_with_participants, Ctx,
801 StandardHarness, TestHarness, B, BLOCKS_PER_EPOCH, NAMESPACE, NUM_VALIDATORS, S, V,
802 },
803 verifying::{GatedVerifyingApp, MockVerifyingApp},
804 },
805 simplex::scheme::bls12381_threshold::vrf as bls12381_threshold_vrf,
806 types::{Epoch, Epocher, FixedEpocher, Height, Round, View},
807 Automaton, CertifiableAutomaton,
808 };
809 use commonware_broadcast::Broadcaster;
810 use commonware_cryptography::{
811 certificate::{mocks::Fixture, ConstantProvider},
812 sha256::Sha256,
813 Digestible, Hasher as _,
814 };
815 use commonware_macros::{select, test_traced};
816 use commonware_runtime::{deterministic, Clock, Runner, Supervisor as _};
817 use commonware_utils::{channel::fallible::OneshotExt, NZUsize};
818 use std::time::Duration;
819
820 #[test_traced("INFO")]
821 fn test_certify_lower_view_after_higher_view() {
822 let runner = deterministic::Runner::timed(Duration::from_secs(60));
823 runner.start(|mut context| async move {
824 let Fixture {
825 participants,
826 schemes,
827 ..
828 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
829 let mut oracle = setup_network_with_participants(
830 context.child("network"),
831 NZUsize!(1),
832 participants.clone(),
833 )
834 .await;
835
836 let me = participants[0].clone();
837
838 let setup = StandardHarness::setup_validator(
839 context.child("validator").with_attribute("index", 0),
840 &mut oracle,
841 me.clone(),
842 ConstantProvider::new(schemes[0].clone()),
843 )
844 .await;
845 let marshal = setup.mailbox;
846
847 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
848 let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
849
850 let mut marshaled = Deferred::new(
851 context.child("deferred"),
852 mock_app,
853 marshal.clone(),
854 FixedEpocher::new(BLOCKS_PER_EPOCH),
855 );
856
857 let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
859 let parent_digest = parent.digest();
860 assert!(
861 marshal
862 .verified(Round::new(Epoch::new(0), View::new(1)), parent.clone())
863 .await
864 );
865
866 let round_a = Round::new(Epoch::new(0), View::new(5));
868 let context_a = Ctx {
869 round: round_a,
870 leader: me.clone(),
871 parent: (View::new(1), parent_digest),
872 };
873 let block_a = B::new::<Sha256>(context_a.clone(), parent_digest, Height::new(2), 200);
874 let commitment_a = StandardHarness::commitment(&block_a);
875 assert!(marshal.verified(round_a, block_a.clone()).await);
876
877 let round_b = Round::new(Epoch::new(0), View::new(10));
879 let context_b = Ctx {
880 round: round_b,
881 leader: me.clone(),
882 parent: (View::new(1), parent_digest),
883 };
884 let block_b = B::new::<Sha256>(context_b.clone(), parent_digest, Height::new(2), 300);
885 let commitment_b = StandardHarness::commitment(&block_b);
886 assert!(marshal.verified(round_b, block_b.clone()).await);
887
888 context.sleep(Duration::from_millis(10)).await;
889
890 let _ = marshaled.verify(context_a, commitment_a).await.await;
892
893 let _ = marshaled.verify(context_b, commitment_b).await.await;
895
896 let certify_b = marshaled.certify(round_b, commitment_b).await;
898 assert!(
899 certify_b.await.unwrap(),
900 "Block B certification should succeed"
901 );
902
903 let certify_a = marshaled.certify(round_a, commitment_a).await;
905
906 select! {
907 result = certify_a => {
908 assert!(result.unwrap(), "Block A certification should succeed");
909 },
910 _ = context.sleep(Duration::from_secs(5)) => {
911 panic!("Block A certification timed out");
912 },
913 }
914 })
915 }
916
917 #[test_traced("WARN")]
918 fn test_marshaled_rejects_unsupported_epoch() {
919 #[derive(Clone)]
920 struct LimitedEpocher {
921 inner: FixedEpocher,
922 max_epoch: u64,
923 }
924
925 impl Epocher for LimitedEpocher {
926 fn containing(&self, height: Height) -> Option<crate::types::EpochInfo> {
927 let bounds = self.inner.containing(height)?;
928 if bounds.epoch().get() > self.max_epoch {
929 None
930 } else {
931 Some(bounds)
932 }
933 }
934
935 fn first(&self, epoch: Epoch) -> Option<Height> {
936 if epoch.get() > self.max_epoch {
937 None
938 } else {
939 self.inner.first(epoch)
940 }
941 }
942
943 fn last(&self, epoch: Epoch) -> Option<Height> {
944 if epoch.get() > self.max_epoch {
945 None
946 } else {
947 self.inner.last(epoch)
948 }
949 }
950 }
951
952 let runner = deterministic::Runner::timed(Duration::from_secs(60));
953 runner.start(|mut context| async move {
954 let Fixture {
955 participants,
956 schemes,
957 ..
958 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
959 let mut oracle = setup_network_with_participants(
960 context.child("network"),
961 NZUsize!(1),
962 participants.clone(),
963 )
964 .await;
965
966 let me = participants[0].clone();
967
968 let setup = StandardHarness::setup_validator(
969 context.child("validator").with_attribute("index", 0),
970 &mut oracle,
971 me.clone(),
972 ConstantProvider::new(schemes[0].clone()),
973 )
974 .await;
975 let marshal = setup.mailbox;
976
977 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
978 let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
979 let limited_epocher = LimitedEpocher {
980 inner: FixedEpocher::new(BLOCKS_PER_EPOCH),
981 max_epoch: 0,
982 };
983
984 let mut marshaled = Deferred::new(
985 context.child("deferred"),
986 mock_app,
987 marshal.clone(),
988 limited_epocher,
989 );
990
991 let parent_ctx = Ctx {
993 round: Round::new(Epoch::zero(), View::new(19)),
994 leader: default_leader(),
995 parent: (View::zero(), genesis.digest()),
996 };
997 let parent =
998 B::new::<Sha256>(parent_ctx.clone(), genesis.digest(), Height::new(19), 1000);
999 let parent_digest = parent.digest();
1000 assert!(
1001 marshal
1002 .clone()
1003 .verified(Round::new(Epoch::zero(), View::new(19)), parent.clone())
1004 .await
1005 );
1006
1007 let unsupported_round = Round::new(Epoch::new(1), View::new(20));
1009 let unsupported_context = Ctx {
1010 round: unsupported_round,
1011 leader: me.clone(),
1012 parent: (View::new(19), parent_digest),
1013 };
1014 let block = B::new::<Sha256>(
1015 unsupported_context.clone(),
1016 parent_digest,
1017 Height::new(20),
1018 2000,
1019 );
1020 let block_commitment = StandardHarness::commitment(&block);
1021 assert!(
1022 marshal
1023 .clone()
1024 .verified(unsupported_round, block.clone())
1025 .await
1026 );
1027
1028 context.sleep(Duration::from_millis(10)).await;
1029
1030 let verify_result = marshaled
1033 .verify(unsupported_context, block_commitment)
1034 .await;
1035 let optimistic_result = verify_result.await;
1037
1038 assert!(
1040 !optimistic_result.unwrap(),
1041 "Optimistic verify should reject block in unsupported epoch"
1042 );
1043 })
1044 }
1045
1046 #[test_traced("WARN")]
1052 fn test_marshaled_rejects_mismatched_context() {
1053 let runner = deterministic::Runner::timed(Duration::from_secs(30));
1054 runner.start(|mut context| async move {
1055 let Fixture {
1056 participants,
1057 schemes,
1058 ..
1059 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1060 let mut oracle = setup_network_with_participants(
1061 context.child("network"),
1062 NZUsize!(1),
1063 participants.clone(),
1064 )
1065 .await;
1066
1067 let me = participants[0].clone();
1068
1069 let setup = StandardHarness::setup_validator(
1070 context.child("validator").with_attribute("index", 0),
1071 &mut oracle,
1072 me.clone(),
1073 ConstantProvider::new(schemes[0].clone()),
1074 )
1075 .await;
1076 let marshal = setup.mailbox;
1077
1078 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
1079 let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
1080
1081 let mut marshaled = Deferred::new(
1082 context.child("deferred"),
1083 mock_app,
1084 marshal.clone(),
1085 FixedEpocher::new(BLOCKS_PER_EPOCH),
1086 );
1087
1088 let parent_ctx = Ctx {
1090 round: Round::new(Epoch::zero(), View::new(1)),
1091 leader: default_leader(),
1092 parent: (View::zero(), genesis.digest()),
1093 };
1094 let parent = B::new::<Sha256>(parent_ctx, genesis.digest(), Height::new(1), 100);
1095 let parent_commitment = StandardHarness::commitment(&parent);
1096 assert!(
1097 marshal
1098 .clone()
1099 .verified(Round::new(Epoch::zero(), View::new(1)), parent.clone())
1100 .await
1101 );
1102
1103 let round_a = Round::new(Epoch::zero(), View::new(2));
1105 let context_a = Ctx {
1106 round: round_a,
1107 leader: me.clone(),
1108 parent: (View::new(1), parent_commitment),
1109 };
1110 let block_a = B::new::<Sha256>(context_a, parent.digest(), Height::new(2), 200);
1111 let commitment_a = StandardHarness::commitment(&block_a);
1112 assert!(marshal.verified(round_a, block_a).await);
1113
1114 context.sleep(Duration::from_millis(10)).await;
1115
1116 let round_b = Round::new(Epoch::zero(), View::new(3));
1118 let context_b = Ctx {
1119 round: round_b,
1120 leader: participants[1].clone(),
1121 parent: (View::new(1), parent_commitment),
1122 };
1123
1124 let verify_rx = marshaled.verify(context_b, commitment_a).await;
1125 select! {
1126 result = verify_rx => {
1127 assert!(
1128 !result.unwrap(),
1129 "mismatched context hash should be rejected"
1130 );
1131 },
1132 _ = context.sleep(Duration::from_secs(5)) => {
1133 panic!("verify should reject mismatched context hash promptly");
1134 },
1135 }
1136 })
1137 }
1138
1139 #[test_traced("WARN")]
1143 fn test_deferred_certify_recovers_after_verify_receiver_drop() {
1144 let runner = deterministic::Runner::timed(Duration::from_secs(30));
1145 runner.start(|mut context| async move {
1146 let Fixture {
1147 participants,
1148 schemes,
1149 ..
1150 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1151 let mut oracle = setup_network_with_participants(
1152 context.child("network"),
1153 NZUsize!(1),
1154 participants.clone(),
1155 )
1156 .await;
1157
1158 let me = participants[0].clone();
1159 let setup = StandardHarness::setup_validator(
1160 context.child("validator").with_attribute("index", 0),
1161 &mut oracle,
1162 me.clone(),
1163 ConstantProvider::new(schemes[0].clone()),
1164 )
1165 .await;
1166 let marshal = setup.mailbox;
1167
1168 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
1169 let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
1170 let mut marshaled = Deferred::new(
1171 context.child("deferred"),
1172 mock_app,
1173 marshal.clone(),
1174 FixedEpocher::new(BLOCKS_PER_EPOCH),
1175 );
1176
1177 let round = Round::new(Epoch::zero(), View::new(1));
1178 let block_context = Ctx {
1179 round,
1180 leader: me,
1181 parent: (View::zero(), genesis.digest()),
1182 };
1183 let block =
1184 B::new::<Sha256>(block_context.clone(), genesis.digest(), Height::new(1), 100);
1185 let digest = block.digest();
1186
1187 let verify_rx = marshaled.verify(block_context, digest).await;
1188 drop(verify_rx);
1189
1190 context.sleep(Duration::from_millis(10)).await;
1193
1194 assert!(marshal.proposed(round, block).await);
1195 let certify_rx = marshaled.certify(round, digest).await;
1196 select! {
1197 result = certify_rx => {
1198 assert!(
1199 result.expect("certify result missing"),
1200 "certify should recover after verify receiver drop"
1201 );
1202 },
1203 _ = context.sleep(Duration::from_secs(5)) => {
1204 panic!("certify should recover promptly after verify drop");
1205 },
1206 }
1207 });
1208 }
1209
1210 #[test_traced("WARN")]
1221 fn test_deferred_certify_does_not_bypass_failed_verify_persistence() {
1222 let runner = deterministic::Runner::timed(Duration::from_secs(30));
1223 runner.start(|mut context| async move {
1224 let Fixture {
1225 participants,
1226 schemes,
1227 ..
1228 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1229 let mut oracle = setup_network_with_participants(
1230 context.child("network"),
1231 NZUsize!(1),
1232 participants.clone(),
1233 )
1234 .await;
1235
1236 let me = participants[0].clone();
1237
1238 let setup = StandardHarness::setup_validator(
1239 context.child("validator").with_attribute("index", 0),
1240 &mut oracle,
1241 me.clone(),
1242 ConstantProvider::new(schemes[0].clone()),
1243 )
1244 .await;
1245 let marshal = setup.mailbox;
1246 let buffer = setup.extra;
1247 let marshal_actor_handle = setup.actor_handle;
1248
1249 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
1250 let (mock_app, verify_started, release_verify): (GatedVerifyingApp<B, S>, _, _) =
1251 GatedVerifyingApp::new();
1252 let mut marshaled = Deferred::new(
1253 context.child("deferred"),
1254 mock_app,
1255 marshal.clone(),
1256 FixedEpocher::new(BLOCKS_PER_EPOCH),
1257 );
1258
1259 let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
1263 let parent_digest = parent.digest();
1264
1265 let child_round = Round::new(Epoch::zero(), View::new(2));
1266 let child_ctx = Ctx {
1267 round: child_round,
1268 leader: me,
1269 parent: (View::new(1), parent_digest),
1270 };
1271 let child = B::new::<Sha256>(child_ctx.clone(), parent_digest, Height::new(2), 200);
1272 let child_digest = child.digest();
1273
1274 assert!(
1275 buffer
1276 .broadcast(commonware_p2p::Recipients::Some(vec![]), parent)
1277 .accepted(),
1278 "buffer broadcast for parent should be accepted"
1279 );
1280 assert!(
1281 buffer
1282 .broadcast(commonware_p2p::Recipients::Some(vec![]), child)
1283 .accepted(),
1284 "buffer broadcast for child should be accepted"
1285 );
1286
1287 let optimistic_rx = marshaled.verify(child_ctx, child_digest).await;
1291 let result = optimistic_rx
1292 .await
1293 .expect("optimistic verify should resolve");
1294 assert!(
1295 result,
1296 "optimistic verify should accept the available block"
1297 );
1298
1299 let certify_rx = marshaled.certify(child_round, child_digest).await;
1300 verify_started
1301 .await
1302 .expect("verify should reach application before marshal abort");
1303
1304 marshal_actor_handle.abort();
1307 let _ = marshal_actor_handle.await;
1308 release_verify.send_lossy(());
1309
1310 select! {
1311 result = certify_rx => {
1312 assert!(
1313 result.is_err(),
1314 "certify must not resolve after marshal.verified loses its persistence ack"
1315 );
1316 },
1317 _ = context.sleep(Duration::from_secs(5)) => {
1318 panic!("certify should terminate after marshal abort");
1319 },
1320 }
1321 });
1322 }
1323
1324 #[test_traced("WARN")]
1329 fn test_propose_reuses_verified_block_on_restart() {
1330 let runner = deterministic::Runner::timed(Duration::from_secs(30));
1331 runner.start(|mut context| async move {
1332 let Fixture {
1333 participants,
1334 schemes,
1335 ..
1336 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1337 let mut oracle = setup_network_with_participants(
1338 context.child("network"),
1339 NZUsize!(1),
1340 participants.clone(),
1341 )
1342 .await;
1343
1344 let me = participants[0].clone();
1345 let setup = StandardHarness::setup_validator(
1346 context.child("validator").with_attribute("index", 0),
1347 &mut oracle,
1348 me.clone(),
1349 ConstantProvider::new(schemes[0].clone()),
1350 )
1351 .await;
1352 let marshal = setup.mailbox;
1353
1354 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
1355 let round = Round::new(Epoch::zero(), View::new(1));
1356 let ctx = Ctx {
1357 round,
1358 leader: me.clone(),
1359 parent: (View::zero(), genesis.digest()),
1360 };
1361 let block_a = B::new::<Sha256>(ctx.clone(), genesis.digest(), Height::new(1), 100);
1362 let digest_a = block_a.digest();
1363 assert!(marshal.verified(round, block_a.clone()).await);
1364
1365 let block_b = B::new::<Sha256>(ctx.clone(), genesis.digest(), Height::new(1), 200);
1366 let digest_b = block_b.digest();
1367 assert_ne!(digest_a, digest_b, "test requires distinct digests");
1368
1369 let mock_app: MockVerifyingApp<B, S> =
1370 MockVerifyingApp::new().with_propose_result(block_b);
1371 let mut marshaled = Deferred::new(
1372 context.child("deferred"),
1373 mock_app,
1374 marshal.clone(),
1375 FixedEpocher::new(BLOCKS_PER_EPOCH),
1376 );
1377
1378 let digest_rx = marshaled.propose(ctx).await;
1379 let digest = digest_rx.await.expect("propose must return a digest");
1380 assert_eq!(
1381 digest, digest_a,
1382 "propose must reuse the block marshal already persisted for this round"
1383 );
1384 });
1385 }
1386
1387 #[test_traced("WARN")]
1396 fn test_propose_skips_when_verified_block_context_changed() {
1397 let runner = deterministic::Runner::timed(Duration::from_secs(30));
1398 runner.start(|mut context| async move {
1399 let Fixture {
1400 participants,
1401 schemes,
1402 ..
1403 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1404 let mut oracle = setup_network_with_participants(
1405 context.child("network"),
1406 NZUsize!(1),
1407 participants.clone(),
1408 )
1409 .await;
1410
1411 let me = participants[0].clone();
1412 let setup = StandardHarness::setup_validator(
1413 context.child("validator").with_attribute("index", 0),
1414 &mut oracle,
1415 me.clone(),
1416 ConstantProvider::new(schemes[0].clone()),
1417 )
1418 .await;
1419 let marshal = setup.mailbox;
1420
1421 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
1422
1423 let round = Round::new(Epoch::zero(), View::new(2));
1425 let stale_ctx = Ctx {
1426 round,
1427 leader: me.clone(),
1428 parent: (View::zero(), genesis.digest()),
1429 };
1430 let stale_block = B::new::<Sha256>(stale_ctx, genesis.digest(), Height::new(1), 100);
1431 assert!(marshal.verified(round, stale_block).await);
1432
1433 let new_parent_digest = Sha256::hash(b"late-certified-parent");
1436 let new_ctx = Ctx {
1437 round,
1438 leader: me.clone(),
1439 parent: (View::new(1), new_parent_digest),
1440 };
1441
1442 let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
1443 let mut marshaled = Deferred::new(
1444 context.child("deferred"),
1445 mock_app,
1446 marshal.clone(),
1447 FixedEpocher::new(BLOCKS_PER_EPOCH),
1448 );
1449
1450 let digest_rx = marshaled.propose(new_ctx).await;
1451 assert!(
1452 digest_rx.await.is_err(),
1453 "propose must drop the receiver when the cached block's context no longer matches"
1454 );
1455 });
1456 }
1457}