1pub mod elector;
339pub mod scheme;
340pub mod types;
341
342cfg_if::cfg_if! {
343 if #[cfg(not(target_arch = "wasm32"))] {
344 use crate::types::{Round, View, ViewDelta};
345 use commonware_cryptography::PublicKey;
346 use commonware_p2p::Recipients;
347
348 mod actors;
349 pub mod config;
350 pub use config::{Config, Floor, ForwardingPolicy};
351 mod engine;
352 pub use engine::Engine;
353 mod metrics;
354
355 pub(crate) const fn min_active(activity_timeout: ViewDelta, last_finalized: View) -> View {
357 last_finalized.saturating_sub(activity_timeout)
358 }
359
360 pub(crate) fn interesting(
364 activity_timeout: ViewDelta,
365 last_finalized: View,
366 current: View,
367 pending: View,
368 allow_future: bool,
369 ) -> bool {
370 if pending.is_zero() {
372 return false;
373 }
374 if pending < min_active(activity_timeout, last_finalized) {
375 return false;
376 }
377 if !allow_future && pending > current.next() {
378 return false;
379 }
380 true
381 }
382
383 pub enum Plan<P: PublicKey> {
385 Propose {
387 round: Round,
389 },
390 Forward {
392 round: Round,
394 recipients: Recipients<P>,
396 },
397 }
398 }
399}
400
401#[cfg(any(test, feature = "mocks"))]
402pub mod mocks;
403
404#[cfg(test)]
406pub(crate) fn quorum(n: u32) -> u32 {
407 use commonware_utils::{Faults, N3f1};
408
409 N3f1::quorum(n)
410}
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415 use crate::{
416 simplex::{
417 elector::{Config as Elector, Elector as ElectorTrait, Random, RoundRobin},
418 mocks::{
419 scheme as scheme_mocks,
420 twins::{self, Elector as TwinsElector},
421 wrapped,
422 },
423 scheme::{
424 bls12381_multisig,
425 bls12381_threshold::{
426 standard as bls12381_threshold_std,
427 vrf::{self as bls12381_threshold_vrf, Seedable},
428 },
429 ed25519, secp256r1, Scheme,
430 },
431 types::{
432 Certificate, Finalization as TFinalization, Finalize as TFinalize,
433 Notarization as TNotarization, Notarize as TNotarize,
434 Nullification as TNullification, Nullify as TNullify, Proposal, Vote,
435 },
436 },
437 types::{Epoch, Participant, Round},
438 Monitor, Viewable,
439 };
440 use commonware_codec::{Decode, DecodeExt, Encode};
441 use commonware_cryptography::{
442 bls12381::primitives::variant::{MinPk, MinSig, Variant},
443 certificate::mocks::Fixture,
444 ed25519::{PrivateKey, PublicKey},
445 sha256::{Digest as Sha256Digest, Digest as D},
446 Hasher as _, Sha256, Signer as _,
447 };
448 use commonware_macros::{select, test_group, test_traced};
449 use commonware_p2p::{
450 simulated::{Config, Link, Network, Oracle, Receiver, Sender, SplitOrigin},
451 utils::mocks::inert_channel,
452 Manager as _, Recipients, Sender as _, TrackedPeers,
453 };
454 use commonware_parallel::Sequential;
455 use commonware_runtime::{
456 buffer::paged::CacheRef, deterministic, telemetry::metrics::count_running_tasks, Clock,
457 IoBuf, Metrics as _, Quota, Runner, Spawner, Supervisor as _,
458 };
459 use commonware_utils::{ordered::Set, sync::Mutex, test_rng, Faults, N3f1, NZUsize, NZU16};
460 use engine::Engine;
461 use futures::future::join_all;
462 use rand::{rngs::StdRng, Rng as _, SeedableRng};
463 use std::{
464 collections::{BTreeMap, HashMap, HashSet},
465 num::{NonZeroU16, NonZeroU32, NonZeroUsize},
466 sync::Arc,
467 time::Duration,
468 };
469 use tracing::{debug, info, warn};
470 use types::Activity;
471
472 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
473 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
474 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
475
476 #[test]
477 fn test_interesting() {
478 let activity_timeout = ViewDelta::new(10);
479
480 assert!(!interesting(
482 activity_timeout,
483 View::zero(),
484 View::zero(),
485 View::zero(),
486 false
487 ));
488 assert!(!interesting(
489 activity_timeout,
490 View::zero(),
491 View::new(1),
492 View::zero(),
493 true
494 ));
495
496 assert!(!interesting(
498 activity_timeout,
499 View::new(20),
500 View::new(25),
501 View::new(5), false
503 ));
504
505 assert!(interesting(
507 activity_timeout,
508 View::new(20),
509 View::new(25),
510 View::new(10), false
512 ));
513
514 assert!(!interesting(
516 activity_timeout,
517 View::new(20),
518 View::new(25),
519 View::new(27),
520 false
521 ));
522
523 assert!(interesting(
525 activity_timeout,
526 View::new(20),
527 View::new(25),
528 View::new(27),
529 true
530 ));
531
532 assert!(interesting(
534 activity_timeout,
535 View::new(20),
536 View::new(25),
537 View::new(26),
538 false
539 ));
540
541 assert!(interesting(
543 activity_timeout,
544 View::new(20),
545 View::new(25),
546 View::new(22),
547 false
548 ));
549
550 assert!(interesting(
553 activity_timeout,
554 View::zero(),
555 View::new(5),
556 View::new(1),
557 false
558 ));
559 }
560
561 async fn register_validator(
563 oracle: &mut Oracle<PublicKey, deterministic::Context>,
564 validator: PublicKey,
565 ) -> (
566 (
567 Sender<PublicKey, deterministic::Context>,
568 Receiver<PublicKey>,
569 ),
570 (
571 Sender<PublicKey, deterministic::Context>,
572 Receiver<PublicKey>,
573 ),
574 (
575 Sender<PublicKey, deterministic::Context>,
576 Receiver<PublicKey>,
577 ),
578 ) {
579 let control = oracle.control(validator.clone());
580 let (vote_sender, vote_receiver) = control.register(0, TEST_QUOTA).await.unwrap();
581 let (certificate_sender, certificate_receiver) =
582 control.register(1, TEST_QUOTA).await.unwrap();
583 let (resolver_sender, resolver_receiver) = control.register(2, TEST_QUOTA).await.unwrap();
584 (
585 (vote_sender, vote_receiver),
586 (certificate_sender, certificate_receiver),
587 (resolver_sender, resolver_receiver),
588 )
589 }
590
591 async fn register_validators(
593 oracle: &mut Oracle<PublicKey, deterministic::Context>,
594 validators: &[PublicKey],
595 ) -> HashMap<
596 PublicKey,
597 (
598 (
599 Sender<PublicKey, deterministic::Context>,
600 Receiver<PublicKey>,
601 ),
602 (
603 Sender<PublicKey, deterministic::Context>,
604 Receiver<PublicKey>,
605 ),
606 (
607 Sender<PublicKey, deterministic::Context>,
608 Receiver<PublicKey>,
609 ),
610 ),
611 > {
612 let mut registrations = HashMap::new();
613 for validator in validators.iter() {
614 let registration = register_validator(oracle, validator.clone()).await;
615 registrations.insert(validator.clone(), registration);
616 }
617 registrations
618 }
619
620 async fn start_test_network_with_peers<I>(
621 context: deterministic::Context,
622 peers: I,
623 disconnect_on_block: bool,
624 ) -> Oracle<PublicKey, deterministic::Context>
625 where
626 I: IntoIterator<Item = PublicKey>,
627 {
628 let (network, oracle) = Network::new_with_peers(
629 context.child("network"),
630 Config {
631 max_size: 1024 * 1024,
632 disconnect_on_block,
633 tracked_peer_sets: NZUsize!(1),
634 },
635 peers,
636 )
637 .await;
638 network.start();
639 oracle
640 }
641
642 async fn start_test_network_with_split_peers<I, J>(
643 context: deterministic::Context,
644 primary: I,
645 secondary: J,
646 disconnect_on_block: bool,
647 ) -> Oracle<PublicKey, deterministic::Context>
648 where
649 I: IntoIterator<Item = PublicKey>,
650 J: IntoIterator<Item = PublicKey>,
651 {
652 let (network, oracle) = Network::new_with_split_peers(
653 context.child("network"),
654 Config {
655 max_size: 1024 * 1024,
656 disconnect_on_block,
657 tracked_peer_sets: NZUsize!(1),
658 },
659 primary,
660 secondary,
661 )
662 .await;
663 network.start();
664 oracle
665 }
666
667 enum Action {
669 Link(Link),
670 Update(Link), Unlink,
672 }
673
674 async fn link_validators(
680 oracle: &mut Oracle<PublicKey, deterministic::Context>,
681 validators: &[PublicKey],
682 action: Action,
683 restrict_to: Option<fn(usize, usize, usize) -> bool>,
684 ) {
685 for (i1, v1) in validators.iter().enumerate() {
686 for (i2, v2) in validators.iter().enumerate() {
687 if v2 == v1 {
689 continue;
690 }
691
692 if let Some(f) = restrict_to {
694 if !f(validators.len(), i1, i2) {
695 continue;
696 }
697 }
698
699 match action {
701 Action::Update(_) | Action::Unlink => {
702 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
703 }
704 _ => {}
705 }
706
707 match action {
709 Action::Link(ref link) | Action::Update(ref link) => {
710 oracle
711 .add_link(v1.clone(), v2.clone(), link.clone())
712 .await
713 .unwrap();
714 }
715 _ => {}
716 }
717 }
718 }
719 }
720
721 fn count_nonzero_metric_lines(encoded: &str, patterns: &[&str]) -> u32 {
723 encoded
724 .lines()
725 .filter(|line| patterns.iter().all(|p| line.contains(p)))
726 .filter(|line| {
727 line.split_whitespace()
728 .last()
729 .and_then(|s| s.parse::<u64>().ok())
730 .is_some_and(|n| n > 0)
731 })
732 .count() as u32
733 }
734
735 fn all_online<S, F, L>(mut fixture: F)
736 where
737 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
738 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
739 L: Elector<S>,
740 {
741 let n = 5;
743 let quorum = quorum(n) as usize;
744 let required_containers = View::new(100);
745 let activity_timeout = ViewDelta::new(10);
746 let skip_timeout = ViewDelta::new(5);
747 let namespace = b"consensus".to_vec();
748 let executor = deterministic::Runner::timed(Duration::from_secs(300));
749 executor.start(|mut context| async move {
750 let Fixture {
752 participants,
753 schemes,
754 ..
755 } = fixture(&mut context, &namespace, n);
756 let mut oracle =
757 start_test_network_with_peers(context.child("network"), participants.clone(), true)
758 .await;
759 let mut registrations = register_validators(&mut oracle, &participants).await;
760
761 let link = Link {
763 latency: Duration::from_millis(10),
764 jitter: Duration::from_millis(1),
765 success_rate: 1.0,
766 };
767 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
768
769 let elector = L::default();
771 let relay = Arc::new(mocks::relay::Relay::new());
772 let mut reporters = Vec::new();
773 let mut engine_handlers = Vec::new();
774 for (idx, validator) in participants.iter().enumerate() {
775 let context = context
777 .child("validator")
778 .with_attribute("public_key", validator);
779
780 let reporter_config = mocks::reporter::Config {
782 participants: participants.clone().try_into().unwrap(),
783 scheme: schemes[idx].clone(),
784 elector: elector.clone(),
785 };
786 let reporter =
787 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
788 reporters.push(reporter.clone());
789 let application_cfg = mocks::application::Config {
790 hasher: Sha256::default(),
791 relay: relay.clone(),
792 me: validator.clone(),
793 propose_latency: (10.0, 5.0),
794 verify_latency: (10.0, 5.0),
795 certify_latency: (10.0, 5.0),
796 should_certify: mocks::application::Certifier::Always,
797 };
798 let (actor, application) = mocks::application::Application::new(
799 context.child("application"),
800 application_cfg,
801 );
802 actor.start();
803 let blocker = oracle.control(validator.clone());
804 let cfg = config::Config {
805 scheme: schemes[idx].clone(),
806 elector: elector.clone(),
807 blocker,
808 automaton: application.clone(),
809 relay: application.clone(),
810 reporter: reporter.clone(),
811 strategy: Sequential,
812 partition: validator.to_string(),
813 mailbox_size: NZUsize!(1024),
814 epoch: Epoch::new(333),
815 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
816 Epoch::new(333),
817 )),
818 leader_timeout: Duration::from_secs(1),
819 certification_timeout: Duration::from_secs(2),
820 timeout_retry: Duration::from_secs(10),
821 fetch_timeout: Duration::from_secs(1),
822 activity_timeout,
823 skip_timeout,
824 fetch_concurrent: NZUsize!(4),
825 replay_buffer: NZUsize!(1024 * 1024),
826 write_buffer: NZUsize!(1024 * 1024),
827 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
828 forwarding: ForwardingPolicy::Disabled,
829 };
830 let engine = Engine::new(context.child("engine"), cfg);
831
832 let (pending, recovered, resolver) = registrations
834 .remove(validator)
835 .expect("validator should be registered");
836 engine_handlers.push(engine.start(pending, recovered, resolver));
837 }
838
839 let mut finalizers = Vec::new();
841 for reporter in reporters.iter_mut() {
842 let (mut latest, mut monitor) = reporter.subscribe().await;
843 finalizers.push(context.child("finalizer").spawn(move |_| async move {
844 while latest < required_containers {
845 latest = monitor.recv().await.expect("event missing");
846 }
847 }));
848 }
849 join_all(finalizers).await;
850
851 let latest_complete = required_containers.saturating_sub(activity_timeout);
853 for reporter in reporters.iter() {
854 reporter.assert_no_faults();
856
857 reporter.assert_no_invalid();
859
860 {
862 let certified = reporter.certified.lock();
863 for view in View::range(View::new(1), latest_complete) {
864 if !certified.contains(&view) {
866 panic!("view: {view}");
867 }
868 }
869 }
870
871 let mut notarized = HashMap::new();
873 let mut finalized = HashMap::new();
874 {
875 let notarizes = reporter.notarizes.lock();
876 for view in View::range(View::new(1), latest_complete) {
877 let Some(payloads) = notarizes.get(&view) else {
879 continue;
880 };
881 if payloads.len() > 1 {
882 panic!("view: {view}");
883 }
884 let (digest, notarizers) = payloads.iter().next().unwrap();
885 notarized.insert(view, *digest);
886
887 if notarizers.len() < quorum {
888 panic!("view: {view}");
891 }
892 }
893 }
894 {
895 let notarizations = reporter.notarizations.lock();
896 for view in View::range(View::new(1), latest_complete) {
897 let Some(notarization) = notarizations.get(&view) else {
899 continue;
900 };
901 let Some(digest) = notarized.get(&view) else {
902 continue;
903 };
904 assert_eq!(¬arization.proposal.payload, digest);
905 }
906 }
907 {
908 let finalizes = reporter.finalizes.lock();
909 for view in View::range(View::new(1), latest_complete) {
910 let Some(payloads) = finalizes.get(&view) else {
912 continue;
913 };
914 if payloads.len() > 1 {
915 panic!("view: {view}");
916 }
917 let (digest, finalizers) = payloads.iter().next().unwrap();
918 finalized.insert(view, *digest);
919
920 if view > latest_complete {
922 continue;
923 }
924
925 if finalizers.len() < quorum {
927 panic!("view: {view}");
930 }
931
932 let nullifies = reporter.nullifies.lock();
934 let Some(nullifies) = nullifies.get(&view) else {
935 continue;
936 };
937 for (_, finalizers) in payloads.iter() {
938 for finalizer in finalizers.iter() {
939 if nullifies.contains(finalizer) {
940 panic!("should not nullify and finalize at same view");
941 }
942 }
943 }
944 }
945 }
946 {
947 let finalizations = reporter.finalizations.lock();
948 for view in View::range(View::new(1), latest_complete) {
949 let Some(finalization) = finalizations.get(&view) else {
951 continue;
952 };
953 let Some(digest) = finalized.get(&view) else {
954 continue;
955 };
956 assert_eq!(&finalization.proposal.payload, digest);
957 }
958 }
959 }
960
961 let blocked = oracle.blocked().await.unwrap();
963 assert!(blocked.is_empty());
964 });
965 }
966
967 #[test_group("slow")]
968 #[test_traced]
969 fn test_all_online() {
970 all_online::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
971 all_online::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
972 all_online::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
973 all_online::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
974 all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
975 all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
976 all_online::<_, _, RoundRobin>(ed25519::fixture);
977 all_online::<_, _, RoundRobin>(secp256r1::fixture);
978 }
979
980 #[test_group("slow")]
981 #[test_traced]
982 fn test_non_genesis_floor_joiner_catches_tip() {
983 let n = 5;
986 let active_count = quorum(n) as usize;
987 let initial_tip_target = View::new(15);
988 let activity_timeout = ViewDelta::new(10);
989 let skip_timeout = ViewDelta::new(5);
990 let namespace = b"consensus".to_vec();
991 let executor = deterministic::Runner::timed(Duration::from_secs(300));
992 executor.start(|mut context| async move {
993 let Fixture {
994 participants,
995 schemes,
996 ..
997 } = ed25519::fixture(&mut context, &namespace, n);
998 let mut oracle =
999 start_test_network_with_peers(context.child("network"), participants.clone(), true)
1000 .await;
1001
1002 let active = &participants[..active_count];
1003 let joiner_idx = active_count;
1004 let joiner = participants[joiner_idx].clone();
1005
1006 let link = Link {
1007 latency: Duration::from_millis(10),
1008 jitter: Duration::from_millis(1),
1009 success_rate: 1.0,
1010 };
1011 link_validators(&mut oracle, active, Action::Link(link.clone()), None).await;
1012
1013 let elector = RoundRobin::<Sha256>::default();
1014 let relay = Arc::new(mocks::relay::Relay::new());
1015 let mut reporters = Vec::new();
1016 let mut engine_handlers = Vec::new();
1017
1018 for (idx, validator) in active.iter().enumerate() {
1019 let validator_context = context
1020 .child("validator")
1021 .with_attribute("public_key", validator);
1022
1023 let reporter_config = mocks::reporter::Config {
1024 participants: participants.clone().try_into().unwrap(),
1025 scheme: schemes[idx].clone(),
1026 elector: elector.clone(),
1027 };
1028 let reporter = mocks::reporter::Reporter::new(
1029 validator_context.child("reporter"),
1030 reporter_config,
1031 );
1032 reporters.push(reporter.clone());
1033
1034 let application_cfg = mocks::application::Config {
1035 hasher: Sha256::default(),
1036 relay: relay.clone(),
1037 me: validator.clone(),
1038 propose_latency: (10.0, 5.0),
1039 verify_latency: (10.0, 5.0),
1040 certify_latency: (10.0, 5.0),
1041 should_certify: mocks::application::Certifier::Always,
1042 };
1043 let (actor, application) = mocks::application::Application::new(
1044 validator_context.child("application"),
1045 application_cfg,
1046 );
1047 actor.start();
1048
1049 let cfg = config::Config {
1050 scheme: schemes[idx].clone(),
1051 elector: elector.clone(),
1052 blocker: oracle.control(validator.clone()),
1053 automaton: application.clone(),
1054 relay: application.clone(),
1055 reporter: reporter.clone(),
1056 strategy: Sequential,
1057 partition: format!("joiner_catches_tip_{validator}"),
1058 mailbox_size: NZUsize!(1024),
1059 epoch: Epoch::new(333),
1060 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
1061 Epoch::new(333),
1062 )),
1063 leader_timeout: Duration::from_secs(1),
1064 certification_timeout: Duration::from_secs(2),
1065 timeout_retry: Duration::from_secs(10),
1066 fetch_timeout: Duration::from_secs(1),
1067 activity_timeout,
1068 skip_timeout,
1069 fetch_concurrent: NZUsize!(4),
1070 replay_buffer: NZUsize!(1024 * 1024),
1071 write_buffer: NZUsize!(1024 * 1024),
1072 page_cache: CacheRef::from_pooler(
1073 &validator_context,
1074 PAGE_SIZE,
1075 PAGE_CACHE_SIZE,
1076 ),
1077 forwarding: ForwardingPolicy::Disabled,
1078 };
1079 let engine = Engine::new(validator_context.child("engine"), cfg);
1080 let (pending, recovered, resolver) =
1081 register_validator(&mut oracle, validator.clone()).await;
1082 engine_handlers.push(engine.start(pending, recovered, resolver));
1083 }
1084
1085 let mut finalizers = Vec::new();
1086 for reporter in reporters.iter_mut() {
1087 let (mut latest, mut monitor) = reporter.subscribe().await;
1088 finalizers.push(
1089 context
1090 .child("initial_finalizer")
1091 .spawn(move |_| async move {
1092 while latest < initial_tip_target {
1093 latest = monitor.recv().await.expect("event missing");
1094 }
1095 latest
1096 }),
1097 );
1098 }
1099 let tip_at_join = join_all(finalizers)
1100 .await
1101 .into_iter()
1102 .map(|result| result.expect("initial finalizer failed"))
1103 .min()
1104 .expect("initial validators missing");
1105
1106 let (floor_view, floor_finalization) = {
1107 let finalizations = reporters[0].finalizations.lock();
1108 finalizations
1109 .iter()
1110 .filter(|(view, _)| **view > View::zero() && **view < tip_at_join)
1111 .min_by_key(|(view, _)| view.get())
1112 .map(|(view, finalization)| (*view, finalization.clone()))
1113 .expect("non-genesis floor finalization missing")
1114 };
1115 assert!(floor_view > View::zero());
1116 assert!(floor_view < tip_at_join);
1117
1118 for validator in active.iter() {
1121 oracle
1122 .add_link(joiner.clone(), validator.clone(), link.clone())
1123 .await
1124 .unwrap();
1125 oracle
1126 .add_link(validator.clone(), joiner.clone(), link.clone())
1127 .await
1128 .unwrap();
1129 }
1130
1131 let joiner_context = context
1132 .child("validator")
1133 .with_attribute("public_key", &joiner);
1134 let reporter_config = mocks::reporter::Config {
1135 participants: participants.clone().try_into().unwrap(),
1136 scheme: schemes[joiner_idx].clone(),
1137 elector: elector.clone(),
1138 };
1139 let mut joiner_reporter =
1140 mocks::reporter::Reporter::new(joiner_context.child("reporter"), reporter_config);
1141 reporters.push(joiner_reporter.clone());
1142
1143 let application_cfg = mocks::application::Config {
1144 hasher: Sha256::default(),
1145 relay: relay.clone(),
1146 me: joiner.clone(),
1147 propose_latency: (10.0, 5.0),
1148 verify_latency: (10.0, 5.0),
1149 certify_latency: (10.0, 5.0),
1150 should_certify: mocks::application::Certifier::Always,
1151 };
1152 let (actor, application) = mocks::application::Application::new(
1153 joiner_context.child("application"),
1154 application_cfg,
1155 );
1156 actor.start();
1157
1158 let cfg = config::Config {
1159 scheme: schemes[joiner_idx].clone(),
1160 elector,
1161 blocker: oracle.control(joiner.clone()),
1162 automaton: application.clone(),
1163 relay: application.clone(),
1164 reporter: joiner_reporter.clone(),
1165 strategy: Sequential,
1166 partition: format!("joiner_catches_tip_{joiner}"),
1167 mailbox_size: NZUsize!(1024),
1168 epoch: Epoch::new(333),
1169 floor: config::Floor::Finalized(floor_finalization),
1170 leader_timeout: Duration::from_secs(1),
1171 certification_timeout: Duration::from_secs(2),
1172 timeout_retry: Duration::from_secs(10),
1173 fetch_timeout: Duration::from_secs(1),
1174 activity_timeout,
1175 skip_timeout,
1176 fetch_concurrent: NZUsize!(4),
1177 replay_buffer: NZUsize!(1024 * 1024),
1178 write_buffer: NZUsize!(1024 * 1024),
1179 page_cache: CacheRef::from_pooler(&joiner_context, PAGE_SIZE, PAGE_CACHE_SIZE),
1180 forwarding: ForwardingPolicy::Disabled,
1181 };
1182 let engine = Engine::new(joiner_context.child("engine"), cfg);
1183 let (pending, recovered, resolver) = register_validator(&mut oracle, joiner).await;
1184 engine_handlers.push(engine.start(pending, recovered, resolver));
1185
1186 let (mut joiner_latest, mut joiner_monitor) = joiner_reporter.subscribe().await;
1187 while joiner_latest < tip_at_join {
1188 joiner_latest = joiner_monitor.recv().await.expect("event missing");
1189 }
1190
1191 let post_join_target = tip_at_join.saturating_add(ViewDelta::new(5));
1192 while joiner_latest < post_join_target {
1193 joiner_latest = joiner_monitor.recv().await.expect("event missing");
1194 }
1195
1196 for reporter in reporters.iter() {
1197 reporter.assert_no_faults();
1198 reporter.assert_no_invalid();
1199 }
1200
1201 let blocked = oracle.blocked().await.unwrap();
1202 assert!(blocked.is_empty());
1203 });
1204 }
1205
1206 fn dishonest_leader_certification_rejected<S, F>(mut fixture: F)
1221 where
1222 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1223 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1224 RoundRobin: Elector<S>,
1225 {
1226 let n = 5;
1227 let required_containers = View::new(50);
1228 let activity_timeout = ViewDelta::new(10);
1229 let skip_timeout = ViewDelta::new(5);
1230 let namespace = b"consensus".to_vec();
1231 let executor = deterministic::Runner::timed(Duration::from_secs(300));
1232 executor.start(|mut context| async move {
1233 let Fixture {
1234 participants,
1235 schemes,
1236 ..
1237 } = fixture(&mut context, &namespace, n);
1238 let mut oracle =
1239 start_test_network_with_peers(context.child("network"), participants.clone(), true)
1240 .await;
1241 let mut registrations = register_validators(&mut oracle, &participants).await;
1242
1243 let link = Link {
1244 latency: Duration::from_millis(10),
1245 jitter: Duration::from_millis(1),
1246 success_rate: 1.0,
1247 };
1248 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1249
1250 let elector = RoundRobin::default();
1251 let participants_set: Set<S::PublicKey> = participants.clone().try_into().unwrap();
1252 let built_elector = elector.clone().build(&participants_set);
1253 let relay = Arc::new(mocks::relay::Relay::new());
1254 let mut reporters = Vec::new();
1255 let mut engine_handlers = Vec::new();
1256 let dishonest = Participant::new(0);
1257 for (idx, validator) in participants.iter().enumerate() {
1258 let context = context
1259 .child("validator")
1260 .with_attribute("public_key", validator);
1261 let reporter_config = mocks::reporter::Config {
1262 participants: participants.clone().try_into().unwrap(),
1263 scheme: schemes[idx].clone(),
1264 elector: elector.clone(),
1265 };
1266 let reporter =
1267 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
1268 reporters.push(reporter.clone());
1269
1270 let application_cfg = mocks::application::Config {
1271 hasher: Sha256::default(),
1272 relay: relay.clone(),
1273 me: validator.clone(),
1274 propose_latency: (10.0, 5.0),
1275 verify_latency: (10.0, 5.0),
1276 certify_latency: (10.0, 5.0),
1277 should_certify: mocks::application::Certifier::Custom(Box::new({
1278 let built_elector_clone = built_elector.clone();
1279 move |round, _| built_elector_clone.elect(round, None) != dishonest
1280 })),
1281 };
1282 let (actor, application) = mocks::application::Application::new(
1283 context.child("application"),
1284 application_cfg,
1285 );
1286 actor.start();
1287
1288 let blocker = oracle.control(validator.clone());
1289 let cfg = config::Config {
1290 scheme: schemes[idx].clone(),
1291 elector: elector.clone(),
1292 blocker,
1293 automaton: application.clone(),
1294 relay: application.clone(),
1295 reporter: reporter.clone(),
1296 strategy: Sequential,
1297 partition: validator.to_string(),
1298 mailbox_size: NZUsize!(1024),
1299 epoch: Epoch::new(333),
1300 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
1301 Epoch::new(333),
1302 )),
1303 leader_timeout: Duration::from_secs(1),
1304 certification_timeout: Duration::from_secs(2),
1305 timeout_retry: Duration::from_secs(10),
1306 fetch_timeout: Duration::from_secs(1),
1307 activity_timeout,
1308 skip_timeout,
1309 fetch_concurrent: NZUsize!(4),
1310 replay_buffer: NZUsize!(1024 * 1024),
1311 write_buffer: NZUsize!(1024 * 1024),
1312 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1313 forwarding: ForwardingPolicy::Disabled,
1314 };
1315 let engine = Engine::new(context.child("engine"), cfg);
1316 let (pending, recovered, resolver) = registrations
1317 .remove(validator)
1318 .expect("validator should be registered");
1319 engine_handlers.push(engine.start(pending, recovered, resolver));
1320 }
1321
1322 let mut finalizers = Vec::new();
1323 for reporter in reporters.iter_mut() {
1324 let (mut latest, mut monitor) = reporter.subscribe().await;
1325 finalizers.push(context.child("finalizer").spawn(move |_| async move {
1326 while latest < required_containers {
1327 latest = monitor.recv().await.expect("event missing");
1328 }
1329 }));
1330 }
1331 join_all(finalizers).await;
1332
1333 for reporter in reporters.iter() {
1334 reporter.assert_no_faults();
1335 reporter.assert_no_invalid();
1336 }
1337 });
1338 }
1339
1340 #[test_group("slow")]
1341 #[test_traced]
1342 fn test_dishonest_leader_certification_rejected() {
1343 dishonest_leader_certification_rejected::<_, _>(
1344 bls12381_threshold_vrf::fixture::<MinPk, _>,
1345 );
1346 dishonest_leader_certification_rejected::<_, _>(
1347 bls12381_threshold_vrf::fixture::<MinSig, _>,
1348 );
1349 dishonest_leader_certification_rejected::<_, _>(
1350 bls12381_threshold_std::fixture::<MinPk, _>,
1351 );
1352 dishonest_leader_certification_rejected::<_, _>(
1353 bls12381_threshold_std::fixture::<MinSig, _>,
1354 );
1355 dishonest_leader_certification_rejected::<_, _>(bls12381_multisig::fixture::<MinPk, _>);
1356 dishonest_leader_certification_rejected::<_, _>(bls12381_multisig::fixture::<MinSig, _>);
1357 dishonest_leader_certification_rejected::<_, _>(ed25519::fixture);
1358 dishonest_leader_certification_rejected::<_, _>(secp256r1::fixture);
1359 }
1360
1361 fn observer<S, F, L>(mut fixture: F)
1362 where
1363 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1364 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1365 L: Elector<S>,
1366 {
1367 let n_active = 5;
1369 let required_containers = View::new(100);
1370 let activity_timeout = ViewDelta::new(10);
1371 let skip_timeout = ViewDelta::new(5);
1372 let namespace = b"consensus".to_vec();
1373 let executor = deterministic::Runner::timed(Duration::from_secs(300));
1374 executor.start(|mut context| async move {
1375 let Fixture {
1377 participants,
1378 schemes,
1379 verifier,
1380 ..
1381 } = fixture(&mut context, &namespace, n_active);
1382
1383 let private_key_observer = PrivateKey::from_seed(n_active as u64);
1385 let public_key_observer = private_key_observer.public_key();
1386
1387 let mut oracle = start_test_network_with_split_peers(
1388 context.child("network"),
1389 participants.clone(),
1390 [public_key_observer.clone()],
1391 true,
1392 )
1393 .await;
1394
1395 let mut all_validators = participants.clone();
1397 all_validators.push(public_key_observer.clone());
1398 all_validators.sort();
1399 let mut registrations = register_validators(&mut oracle, &all_validators).await;
1400
1401 let link = Link {
1403 latency: Duration::from_millis(10),
1404 jitter: Duration::from_millis(1),
1405 success_rate: 1.0,
1406 };
1407 link_validators(&mut oracle, &all_validators, Action::Link(link), None).await;
1408
1409 let elector = L::default();
1411 let relay = Arc::new(mocks::relay::Relay::new());
1412 let mut reporters = Vec::new();
1413
1414 for (idx, validator) in participants.iter().enumerate() {
1415 let is_observer = *validator == public_key_observer;
1416
1417 let context = context
1419 .child("validator")
1420 .with_attribute("public_key", validator);
1421
1422 let signing = if is_observer {
1424 verifier.clone()
1425 } else {
1426 schemes[idx].clone()
1427 };
1428 let reporter_config = mocks::reporter::Config {
1429 participants: participants.clone().try_into().unwrap(),
1430 scheme: signing.clone(),
1431 elector: elector.clone(),
1432 };
1433 let reporter =
1434 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
1435 reporters.push(reporter.clone());
1436 let application_cfg = mocks::application::Config {
1437 hasher: Sha256::default(),
1438 relay: relay.clone(),
1439 me: validator.clone(),
1440 propose_latency: (10.0, 5.0),
1441 verify_latency: (10.0, 5.0),
1442 certify_latency: (10.0, 5.0),
1443 should_certify: mocks::application::Certifier::Always,
1444 };
1445 let (actor, application) = mocks::application::Application::new(
1446 context.child("application"),
1447 application_cfg,
1448 );
1449 actor.start();
1450 let blocker = oracle.control(validator.clone());
1451 let cfg = config::Config {
1452 scheme: signing.clone(),
1453 elector: elector.clone(),
1454 blocker,
1455 automaton: application.clone(),
1456 relay: application.clone(),
1457 reporter: reporter.clone(),
1458 strategy: Sequential,
1459 partition: validator.to_string(),
1460 mailbox_size: NZUsize!(1024),
1461 epoch: Epoch::new(333),
1462 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
1463 Epoch::new(333),
1464 )),
1465 leader_timeout: Duration::from_secs(1),
1466 certification_timeout: Duration::from_secs(2),
1467 timeout_retry: Duration::from_secs(10),
1468 fetch_timeout: Duration::from_secs(1),
1469 activity_timeout,
1470 skip_timeout,
1471 fetch_concurrent: NZUsize!(4),
1472 replay_buffer: NZUsize!(1024 * 1024),
1473 write_buffer: NZUsize!(1024 * 1024),
1474 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1475 forwarding: ForwardingPolicy::Disabled,
1476 };
1477 let engine = Engine::new(context.child("engine"), cfg);
1478
1479 let (pending, recovered, resolver) = registrations
1481 .remove(validator)
1482 .expect("validator should be registered");
1483 engine.start(pending, recovered, resolver);
1484 }
1485
1486 let mut finalizers = Vec::new();
1488 for reporter in reporters.iter_mut() {
1489 let (mut latest, mut monitor) = reporter.subscribe().await;
1490 finalizers.push(context.child("finalizer").spawn(move |_| async move {
1491 while latest < required_containers {
1492 latest = monitor.recv().await.expect("event missing");
1493 }
1494 }));
1495 }
1496 join_all(finalizers).await;
1497
1498 for reporter in reporters.iter() {
1501 reporter.assert_no_faults();
1503 reporter.assert_no_invalid();
1504
1505 let blocked = oracle.blocked().await.unwrap();
1507 assert!(blocked.is_empty());
1508 }
1509 });
1510 }
1511
1512 #[test_group("slow")]
1513 #[test_traced]
1514 fn test_observer() {
1515 observer::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1516 observer::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1517 observer::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1518 observer::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1519 observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1520 observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1521 observer::<_, _, RoundRobin>(ed25519::fixture);
1522 observer::<_, _, RoundRobin>(secp256r1::fixture);
1523 }
1524
1525 fn unclean_shutdown<S, F, L>(mut fixture: F)
1526 where
1527 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1528 F: FnMut(&mut StdRng, &[u8], u32) -> Fixture<S>,
1529 L: Elector<S>,
1530 {
1531 let n = 5;
1533 let required_containers = View::new(100);
1534 let activity_timeout = ViewDelta::new(10);
1535 let skip_timeout = ViewDelta::new(5);
1536 let namespace = b"consensus".to_vec();
1537
1538 let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
1540 let supervised = Arc::new(Mutex::new(Vec::new()));
1541 let mut prev_checkpoint = None;
1542
1543 let mut rng = test_rng();
1545 let Fixture {
1546 participants,
1547 schemes,
1548 ..
1549 } = fixture(&mut rng, &namespace, n);
1550
1551 let relay = Arc::new(mocks::relay::Relay::<Sha256Digest, S::PublicKey>::new());
1553
1554 loop {
1555 let rng = rng.clone();
1556 let participants = participants.clone();
1557 let schemes = schemes.clone();
1558 let shutdowns = shutdowns.clone();
1559 let supervised = supervised.clone();
1560 let relay = relay.clone();
1561 relay.deregister_all(); let f = |mut context: deterministic::Context| async move {
1564 let mut oracle = start_test_network_with_peers(
1566 context.child("network"),
1567 participants.clone(),
1568 true,
1569 )
1570 .await;
1571 let mut registrations = register_validators(&mut oracle, &participants).await;
1572
1573 let link = Link {
1575 latency: Duration::from_millis(50),
1576 jitter: Duration::from_millis(50),
1577 success_rate: 1.0,
1578 };
1579 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1580
1581 let elector = L::default();
1583 let relay = Arc::new(mocks::relay::Relay::new());
1584 let mut reporters = HashMap::new();
1585 let mut engine_handlers = Vec::new();
1586 for (idx, validator) in participants.iter().enumerate() {
1587 let context = context
1589 .child("validator")
1590 .with_attribute("public_key", validator);
1591
1592 let reporter_config = mocks::reporter::Config {
1594 participants: participants.clone().try_into().unwrap(),
1595 scheme: schemes[idx].clone(),
1596 elector: elector.clone(),
1597 };
1598 let reporter = mocks::reporter::Reporter::new(rng.clone(), reporter_config);
1599 reporters.insert(validator.clone(), reporter.clone());
1600 let application_cfg = mocks::application::Config {
1601 hasher: Sha256::default(),
1602 relay: relay.clone(),
1603 me: validator.clone(),
1604 propose_latency: (10.0, 5.0),
1605 verify_latency: (10.0, 5.0),
1606 certify_latency: (10.0, 5.0),
1607 should_certify: mocks::application::Certifier::Always,
1608 };
1609 let (actor, application) = mocks::application::Application::new(
1610 context.child("application"),
1611 application_cfg,
1612 );
1613 actor.start();
1614 let blocker = oracle.control(validator.clone());
1615 let cfg = config::Config {
1616 scheme: schemes[idx].clone(),
1617 elector: elector.clone(),
1618 blocker,
1619 automaton: application.clone(),
1620 relay: application.clone(),
1621 reporter: reporter.clone(),
1622 strategy: Sequential,
1623 partition: validator.to_string(),
1624 mailbox_size: NZUsize!(1024),
1625 epoch: Epoch::new(333),
1626 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
1627 Epoch::new(333),
1628 )),
1629 leader_timeout: Duration::from_secs(1),
1630 certification_timeout: Duration::from_secs(2),
1631 timeout_retry: Duration::from_secs(10),
1632 fetch_timeout: Duration::from_secs(1),
1633 activity_timeout,
1634 skip_timeout,
1635 fetch_concurrent: NZUsize!(4),
1636 replay_buffer: NZUsize!(1024 * 1024),
1637 write_buffer: NZUsize!(1024 * 1024),
1638 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1639 forwarding: ForwardingPolicy::Disabled,
1640 };
1641 let engine = Engine::new(context.child("engine"), cfg);
1642
1643 let (pending, recovered, resolver) = registrations
1645 .remove(validator)
1646 .expect("validator should be registered");
1647 engine_handlers.push(engine.start(pending, recovered, resolver));
1648 }
1649
1650 let mut finalizers = Vec::new();
1652 for (_, reporter) in reporters.iter_mut() {
1653 let (mut latest, mut monitor) = reporter.subscribe().await;
1654 finalizers.push(context.child("finalizer").spawn(move |_| async move {
1655 while latest < required_containers {
1656 latest = monitor.recv().await.expect("event missing");
1657 }
1658 }));
1659 }
1660
1661 let wait =
1663 context.gen_range(Duration::from_millis(100)..Duration::from_millis(2_000));
1664 let result = select! {
1665 _ = context.sleep(wait) => {
1666 {
1668 let mut shutdowns = shutdowns.lock();
1669 debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
1670 *shutdowns += 1;
1671 }
1672 supervised.lock().push(reporters);
1673 false
1674 },
1675 _ = join_all(finalizers) => {
1676 let supervised = supervised.lock();
1678 for reporters in supervised.iter() {
1679 for (_, reporter) in reporters.iter() {
1680 reporter.assert_no_faults();
1681 }
1682 }
1683 true
1684 },
1685 };
1686
1687 let blocked = oracle.blocked().await.unwrap();
1689 assert!(blocked.is_empty());
1690
1691 result
1692 };
1693
1694 let (complete, checkpoint) = prev_checkpoint
1695 .map_or_else(
1696 || deterministic::Runner::timed(Duration::from_secs(180)),
1697 deterministic::Runner::from,
1698 )
1699 .start_and_recover(f);
1700
1701 if complete {
1703 break;
1704 }
1705
1706 prev_checkpoint = Some(checkpoint);
1707 }
1708 }
1709
1710 #[test_group("slow")]
1711 #[test_traced]
1712 fn test_unclean_shutdown() {
1713 unclean_shutdown::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1714 unclean_shutdown::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1715 unclean_shutdown::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1716 unclean_shutdown::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1717 unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1718 unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1719 unclean_shutdown::<_, _, RoundRobin>(ed25519::fixture);
1720 unclean_shutdown::<_, _, RoundRobin>(secp256r1::fixture);
1721 }
1722
1723 fn backfill<S, F, L>(mut fixture: F)
1724 where
1725 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1726 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1727 L: Elector<S>,
1728 {
1729 let n = 4;
1731 let required_containers = View::new(100);
1732 let activity_timeout = ViewDelta::new(10);
1733 let skip_timeout = ViewDelta::new(5);
1734 let namespace = b"consensus".to_vec();
1735 let executor = deterministic::Runner::timed(Duration::from_secs(240));
1736 executor.start(|mut context| async move {
1737 let Fixture {
1739 participants,
1740 schemes,
1741 ..
1742 } = fixture(&mut context, &namespace, n);
1743 let mut oracle =
1744 start_test_network_with_peers(context.child("network"), participants.clone(), true)
1745 .await;
1746 let mut registrations = register_validators(&mut oracle, &participants).await;
1747
1748 let link = Link {
1750 latency: Duration::from_millis(10),
1751 jitter: Duration::from_millis(1),
1752 success_rate: 1.0,
1753 };
1754 link_validators(
1755 &mut oracle,
1756 &participants,
1757 Action::Link(link),
1758 Some(|_, i, j| ![i, j].contains(&0usize)),
1759 )
1760 .await;
1761
1762 let elector = L::default();
1764 let relay = Arc::new(mocks::relay::Relay::new());
1765 let mut reporters = Vec::new();
1766 let mut engine_handlers = Vec::new();
1767 for (idx_scheme, validator) in participants.iter().enumerate() {
1768 if idx_scheme == 0 {
1770 continue;
1771 }
1772
1773 let context = context
1775 .child("validator")
1776 .with_attribute("public_key", validator);
1777
1778 let reporter_config = mocks::reporter::Config {
1780 participants: participants.clone().try_into().unwrap(),
1781 scheme: schemes[idx_scheme].clone(),
1782 elector: elector.clone(),
1783 };
1784 let reporter =
1785 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
1786 reporters.push(reporter.clone());
1787 let application_cfg = mocks::application::Config {
1788 hasher: Sha256::default(),
1789 relay: relay.clone(),
1790 me: validator.clone(),
1791 propose_latency: (10.0, 5.0),
1792 verify_latency: (10.0, 5.0),
1793 certify_latency: (10.0, 5.0),
1794 should_certify: mocks::application::Certifier::Always,
1795 };
1796 let (actor, application) = mocks::application::Application::new(
1797 context.child("application"),
1798 application_cfg,
1799 );
1800 actor.start();
1801 let blocker = oracle.control(validator.clone());
1802 let cfg = config::Config {
1803 scheme: schemes[idx_scheme].clone(),
1804 elector: elector.clone(),
1805 blocker,
1806 automaton: application.clone(),
1807 relay: application.clone(),
1808 reporter: reporter.clone(),
1809 strategy: Sequential,
1810 partition: validator.to_string(),
1811 mailbox_size: NZUsize!(1024),
1812 epoch: Epoch::new(333),
1813 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
1814 Epoch::new(333),
1815 )),
1816 leader_timeout: Duration::from_secs(1),
1817 certification_timeout: Duration::from_secs(2),
1818 timeout_retry: Duration::from_secs(10),
1819 fetch_timeout: Duration::from_secs(1),
1820 activity_timeout,
1821 skip_timeout,
1822 fetch_concurrent: NZUsize!(4),
1823 replay_buffer: NZUsize!(1024 * 1024),
1824 write_buffer: NZUsize!(1024 * 1024),
1825 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1826 forwarding: ForwardingPolicy::Disabled,
1827 };
1828 let engine = Engine::new(context.child("engine"), cfg);
1829
1830 let (pending, recovered, resolver) = registrations
1832 .remove(validator)
1833 .expect("validator should be registered");
1834 engine_handlers.push(engine.start(pending, recovered, resolver));
1835 }
1836
1837 let mut finalizers = Vec::new();
1839 for reporter in reporters.iter_mut() {
1840 let (mut latest, mut monitor) = reporter.subscribe().await;
1841 finalizers.push(context.child("finalizer").spawn(move |_| async move {
1842 while latest < required_containers {
1843 latest = monitor.recv().await.expect("event missing");
1844 }
1845 }));
1846 }
1847 join_all(finalizers).await;
1848
1849 let link = Link {
1851 latency: Duration::from_secs(3),
1852 jitter: Duration::from_millis(0),
1853 success_rate: 1.0,
1854 };
1855 link_validators(
1856 &mut oracle,
1857 &participants,
1858 Action::Update(link.clone()),
1859 Some(|_, i, j| ![i, j].contains(&0usize)),
1860 )
1861 .await;
1862
1863 context.sleep(Duration::from_secs(60)).await;
1865
1866 link_validators(
1868 &mut oracle,
1869 &participants,
1870 Action::Unlink,
1871 Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
1872 )
1873 .await;
1874
1875 let me = participants[0].clone();
1877 let context = context.child("validator").with_attribute("public_key", &me);
1878
1879 link_validators(
1881 &mut oracle,
1882 &participants,
1883 Action::Link(link),
1884 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
1885 )
1886 .await;
1887
1888 let link = Link {
1890 latency: Duration::from_millis(10),
1891 jitter: Duration::from_millis(3),
1892 success_rate: 1.0,
1893 };
1894 link_validators(
1895 &mut oracle,
1896 &participants,
1897 Action::Update(link),
1898 Some(|_, i, j| ![i, j].contains(&1usize)),
1899 )
1900 .await;
1901
1902 let reporter_config = mocks::reporter::Config {
1904 participants: participants.clone().try_into().unwrap(),
1905 scheme: schemes[0].clone(),
1906 elector: elector.clone(),
1907 };
1908 let mut reporter =
1909 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
1910 reporters.push(reporter.clone());
1911 let application_cfg = mocks::application::Config {
1912 hasher: Sha256::default(),
1913 relay: relay.clone(),
1914 me: me.clone(),
1915 propose_latency: (10.0, 5.0),
1916 verify_latency: (10.0, 5.0),
1917 certify_latency: (10.0, 5.0),
1918 should_certify: mocks::application::Certifier::Always,
1919 };
1920 let (actor, application) =
1921 mocks::application::Application::new(context.child("application"), application_cfg);
1922 actor.start();
1923 let blocker = oracle.control(me.clone());
1924 let cfg = config::Config {
1925 scheme: schemes[0].clone(),
1926 elector: elector.clone(),
1927 blocker,
1928 automaton: application.clone(),
1929 relay: application.clone(),
1930 reporter: reporter.clone(),
1931 strategy: Sequential,
1932 partition: me.to_string(),
1933 mailbox_size: NZUsize!(1024),
1934 epoch: Epoch::new(333),
1935 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(Epoch::new(
1936 333,
1937 ))),
1938 leader_timeout: Duration::from_secs(1),
1939 certification_timeout: Duration::from_secs(2),
1940 timeout_retry: Duration::from_secs(10),
1941 fetch_timeout: Duration::from_secs(1),
1942 activity_timeout,
1943 skip_timeout,
1944 fetch_concurrent: NZUsize!(4),
1945 replay_buffer: NZUsize!(1024 * 1024),
1946 write_buffer: NZUsize!(1024 * 1024),
1947 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1948 forwarding: ForwardingPolicy::Disabled,
1949 };
1950 let engine = Engine::new(context.child("engine"), cfg);
1951
1952 let (pending, recovered, resolver) = registrations
1954 .remove(&me)
1955 .expect("validator should be registered");
1956 engine_handlers.push(engine.start(pending, recovered, resolver));
1957
1958 let (mut latest, mut monitor) = reporter.subscribe().await;
1960 while latest < required_containers {
1961 latest = monitor.recv().await.expect("event missing");
1962 }
1963
1964 let blocked = oracle.blocked().await.unwrap();
1966 assert!(blocked.is_empty());
1967 });
1968 }
1969
1970 #[test_group("slow")]
1971 #[test_traced]
1972 fn test_backfill() {
1973 backfill::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1974 backfill::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1975 backfill::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1976 backfill::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1977 backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1978 backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1979 backfill::<_, _, RoundRobin>(ed25519::fixture);
1980 backfill::<_, _, RoundRobin>(secp256r1::fixture);
1981 }
1982
1983 fn one_offline<S, F, L>(mut fixture: F)
1984 where
1985 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1986 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1987 L: Elector<S>,
1988 {
1989 let n = 5;
1991 let quorum = quorum(n) as usize;
1992 let required_containers = View::new(100);
1993 let activity_timeout = ViewDelta::new(10);
1994 let skip_timeout = ViewDelta::new(5);
1995 let max_exceptions = 10;
1996 let namespace = b"consensus".to_vec();
1997 let executor = deterministic::Runner::timed(Duration::from_secs(300));
1998 executor.start(|mut context| async move {
1999 let Fixture {
2001 participants,
2002 schemes,
2003 ..
2004 } = fixture(&mut context, &namespace, n);
2005 let mut oracle =
2006 start_test_network_with_peers(context.child("network"), participants.clone(), true)
2007 .await;
2008 let mut registrations = register_validators(&mut oracle, &participants).await;
2009
2010 let link = Link {
2012 latency: Duration::from_millis(10),
2013 jitter: Duration::from_millis(1),
2014 success_rate: 1.0,
2015 };
2016 link_validators(
2017 &mut oracle,
2018 &participants,
2019 Action::Link(link),
2020 Some(|_, i, j| ![i, j].contains(&0usize)),
2021 )
2022 .await;
2023
2024 let elector = L::default();
2026 let relay = Arc::new(mocks::relay::Relay::new());
2027 let mut reporters = Vec::new();
2028 let mut engine_handlers = Vec::new();
2029 for (idx_scheme, validator) in participants.iter().enumerate() {
2030 if idx_scheme == 0 {
2032 continue;
2033 }
2034
2035 let context = context
2037 .child("validator")
2038 .with_attribute("public_key", validator);
2039
2040 let reporter_config = mocks::reporter::Config {
2042 participants: participants.clone().try_into().unwrap(),
2043 scheme: schemes[idx_scheme].clone(),
2044 elector: elector.clone(),
2045 };
2046 let reporter =
2047 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
2048 reporters.push(reporter.clone());
2049 let application_cfg = mocks::application::Config {
2050 hasher: Sha256::default(),
2051 relay: relay.clone(),
2052 me: validator.clone(),
2053 propose_latency: (10.0, 5.0),
2054 verify_latency: (10.0, 5.0),
2055 certify_latency: (10.0, 5.0),
2056 should_certify: mocks::application::Certifier::Always,
2057 };
2058 let (actor, application) = mocks::application::Application::new(
2059 context.child("application"),
2060 application_cfg,
2061 );
2062 actor.start();
2063 let blocker = oracle.control(validator.clone());
2064 let cfg = config::Config {
2065 scheme: schemes[idx_scheme].clone(),
2066 elector: elector.clone(),
2067 blocker,
2068 automaton: application.clone(),
2069 relay: application.clone(),
2070 reporter: reporter.clone(),
2071 strategy: Sequential,
2072 partition: validator.to_string(),
2073 mailbox_size: NZUsize!(1024),
2074 epoch: Epoch::new(333),
2075 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
2076 Epoch::new(333),
2077 )),
2078 leader_timeout: Duration::from_secs(1),
2079 certification_timeout: Duration::from_secs(2),
2080 timeout_retry: Duration::from_secs(10),
2081 fetch_timeout: Duration::from_secs(1),
2082 activity_timeout,
2083 skip_timeout,
2084 fetch_concurrent: NZUsize!(4),
2085 replay_buffer: NZUsize!(1024 * 1024),
2086 write_buffer: NZUsize!(1024 * 1024),
2087 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2088 forwarding: ForwardingPolicy::Disabled,
2089 };
2090 let engine = Engine::new(context.child("engine"), cfg);
2091
2092 let (pending, recovered, resolver) = registrations
2094 .remove(validator)
2095 .expect("validator should be registered");
2096 engine_handlers.push(engine.start(pending, recovered, resolver));
2097 }
2098
2099 let mut finalizers = Vec::new();
2101 for reporter in reporters.iter_mut() {
2102 let (mut latest, mut monitor) = reporter.subscribe().await;
2103 finalizers.push(context.child("finalizer").spawn(move |_| async move {
2104 while latest < required_containers {
2105 latest = monitor.recv().await.expect("event missing");
2106 }
2107 }));
2108 }
2109 join_all(finalizers).await;
2110
2111 let exceptions = 0;
2113 let offline = &participants[0];
2114 for reporter in reporters.iter() {
2115 reporter.assert_no_faults();
2117
2118 reporter.assert_no_invalid();
2120
2121 let mut exceptions = 0;
2123 {
2124 let notarizes = reporter.notarizes.lock();
2125 for (view, payloads) in notarizes.iter() {
2126 for (_, participants) in payloads.iter() {
2127 if participants.contains(offline) {
2128 panic!("view: {view}");
2129 }
2130 }
2131 }
2132 }
2133 {
2134 let nullifies = reporter.nullifies.lock();
2135 for (view, participants) in nullifies.iter() {
2136 if participants.contains(offline) {
2137 panic!("view: {view}");
2138 }
2139 }
2140 }
2141 {
2142 let finalizes = reporter.finalizes.lock();
2143 for (view, payloads) in finalizes.iter() {
2144 for (_, finalizers) in payloads.iter() {
2145 if finalizers.contains(offline) {
2146 panic!("view: {view}");
2147 }
2148 }
2149 }
2150 }
2151
2152 let mut offline_views = Vec::new();
2154 {
2155 let leaders = reporter.leaders.lock();
2156 for (view, leader) in leaders.iter() {
2157 if leader == offline {
2158 offline_views.push(*view);
2159 }
2160 }
2161 }
2162 assert!(!offline_views.is_empty());
2163
2164 {
2166 let nullifies = reporter.nullifies.lock();
2167 for view in offline_views.iter() {
2168 let nullifies = nullifies.get(view).map_or(0, |n| n.len());
2169 if nullifies < quorum {
2170 warn!("missing expected view nullifies: {}", view);
2171 exceptions += 1;
2172 }
2173 }
2174 }
2175 {
2176 let nullifications = reporter.nullifications.lock();
2177 for view in offline_views.iter() {
2178 if !nullifications.contains_key(view) {
2179 warn!("missing expected view nullifies: {}", view);
2180 exceptions += 1;
2181 }
2182 }
2183 }
2184
2185 assert!(exceptions <= max_exceptions);
2187 }
2188 assert!(exceptions <= max_exceptions);
2189
2190 let blocked = oracle.blocked().await.unwrap();
2192 assert!(blocked.is_empty());
2193
2194 let encoded = context.encode();
2196 let leader_label = format!("leader=\"{}\"", offline);
2197 assert!(
2198 count_nonzero_metric_lines(&encoded, &["_timeouts", &leader_label]) >= n - 1,
2199 "expected timeout metrics for offline leader"
2200 );
2201 assert_eq!(
2202 count_nonzero_metric_lines(&encoded, &["_nullifications", &leader_label]),
2203 n - 1,
2204 "expected all online nodes to record _nullifications for offline leader"
2205 );
2206 });
2207 }
2208
2209 #[test_group("slow")]
2210 #[test_traced]
2211 fn test_one_offline() {
2212 one_offline::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
2213 one_offline::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
2214 one_offline::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
2215 one_offline::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
2216 one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2217 one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2218 one_offline::<_, _, RoundRobin>(ed25519::fixture);
2219 one_offline::<_, _, RoundRobin>(secp256r1::fixture);
2220 }
2221
2222 fn slow_validator<S, F, L>(mut fixture: F)
2223 where
2224 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2225 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2226 L: Elector<S>,
2227 {
2228 let n = 5;
2230 let required_containers = View::new(50);
2231 let activity_timeout = ViewDelta::new(10);
2232 let skip_timeout = ViewDelta::new(5);
2233 let namespace = b"consensus".to_vec();
2234 let executor = deterministic::Runner::timed(Duration::from_secs(300));
2235 executor.start(|mut context| async move {
2236 let Fixture {
2238 participants,
2239 schemes,
2240 ..
2241 } = fixture(&mut context, &namespace, n);
2242 let mut oracle =
2243 start_test_network_with_peers(context.child("network"), participants.clone(), true)
2244 .await;
2245 let mut registrations = register_validators(&mut oracle, &participants).await;
2246
2247 let link = Link {
2249 latency: Duration::from_millis(10),
2250 jitter: Duration::from_millis(1),
2251 success_rate: 1.0,
2252 };
2253 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2254
2255 let elector = L::default();
2257 let relay = Arc::new(mocks::relay::Relay::new());
2258 let mut reporters = Vec::new();
2259 let mut engine_handlers = Vec::new();
2260 for (idx_scheme, validator) in participants.iter().enumerate() {
2261 let context = context
2263 .child("validator")
2264 .with_attribute("public_key", validator);
2265
2266 let reporter_config = mocks::reporter::Config {
2268 participants: participants.clone().try_into().unwrap(),
2269 scheme: schemes[idx_scheme].clone(),
2270 elector: elector.clone(),
2271 };
2272 let reporter =
2273 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
2274 reporters.push(reporter.clone());
2275 let application_cfg = if idx_scheme == 0 {
2276 mocks::application::Config {
2277 hasher: Sha256::default(),
2278 relay: relay.clone(),
2279 me: validator.clone(),
2280 propose_latency: (10_000.0, 0.0),
2281 verify_latency: (10_000.0, 5.0),
2282 certify_latency: (10_000.0, 5.0),
2283 should_certify: mocks::application::Certifier::Always,
2284 }
2285 } else {
2286 mocks::application::Config {
2287 hasher: Sha256::default(),
2288 relay: relay.clone(),
2289 me: validator.clone(),
2290 propose_latency: (10.0, 5.0),
2291 verify_latency: (10.0, 5.0),
2292 certify_latency: (10.0, 5.0),
2293 should_certify: mocks::application::Certifier::Always,
2294 }
2295 };
2296 let (actor, application) = mocks::application::Application::new(
2297 context.child("application"),
2298 application_cfg,
2299 );
2300 actor.start();
2301 let blocker = oracle.control(validator.clone());
2302 let cfg = config::Config {
2303 scheme: schemes[idx_scheme].clone(),
2304 elector: elector.clone(),
2305 blocker,
2306 automaton: application.clone(),
2307 relay: application.clone(),
2308 reporter: reporter.clone(),
2309 strategy: Sequential,
2310 partition: validator.to_string(),
2311 mailbox_size: NZUsize!(1024),
2312 epoch: Epoch::new(333),
2313 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
2314 Epoch::new(333),
2315 )),
2316 leader_timeout: Duration::from_secs(1),
2317 certification_timeout: Duration::from_secs(2),
2318 timeout_retry: Duration::from_secs(10),
2319 fetch_timeout: Duration::from_secs(1),
2320 activity_timeout,
2321 skip_timeout,
2322 fetch_concurrent: NZUsize!(4),
2323 replay_buffer: NZUsize!(1024 * 1024),
2324 write_buffer: NZUsize!(1024 * 1024),
2325 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2326 forwarding: ForwardingPolicy::Disabled,
2327 };
2328 let engine = Engine::new(context.child("engine"), cfg);
2329
2330 let (pending, recovered, resolver) = registrations
2332 .remove(validator)
2333 .expect("validator should be registered");
2334 engine_handlers.push(engine.start(pending, recovered, resolver));
2335 }
2336
2337 let mut finalizers = Vec::new();
2339 for reporter in reporters.iter_mut() {
2340 let (mut latest, mut monitor) = reporter.subscribe().await;
2341 finalizers.push(context.child("finalizer").spawn(move |_| async move {
2342 while latest < required_containers {
2343 latest = monitor.recv().await.expect("event missing");
2344 }
2345 }));
2346 }
2347 join_all(finalizers).await;
2348
2349 let slow = &participants[0];
2351 for reporter in reporters.iter() {
2352 reporter.assert_no_faults();
2354
2355 reporter.assert_no_invalid();
2357
2358 {
2361 let notarizes = reporter.notarizes.lock();
2362 assert!(notarizes.values().all(|payloads| {
2363 payloads
2364 .values()
2365 .all(|participants| !participants.contains(slow))
2366 }));
2367 }
2368 {
2369 let finalizes = reporter.finalizes.lock();
2370 assert!(finalizes.values().all(|payloads| {
2371 payloads
2372 .values()
2373 .all(|participants| !participants.contains(slow))
2374 }));
2375 }
2376
2377 {
2379 let finalizations = reporter.finalizations.lock();
2380 assert!(finalizations
2381 .keys()
2382 .any(|view| *view >= required_containers));
2383 }
2384 }
2385
2386 let blocked = oracle.blocked().await.unwrap();
2388 assert!(blocked.is_empty());
2389 });
2390 }
2391
2392 #[test_group("slow")]
2393 #[test_traced]
2394 fn test_slow_validator() {
2395 slow_validator::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
2396 slow_validator::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
2397 slow_validator::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
2398 slow_validator::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
2399 slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2400 slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2401 slow_validator::<_, _, RoundRobin>(ed25519::fixture);
2402 slow_validator::<_, _, RoundRobin>(secp256r1::fixture);
2403 }
2404
2405 fn all_recovery<S, F, L>(mut fixture: F)
2406 where
2407 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2408 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2409 L: Elector<S>,
2410 {
2411 let n = 5;
2413 let required_containers = View::new(100);
2414 let activity_timeout = ViewDelta::new(10);
2415 let skip_timeout = ViewDelta::new(2);
2416 let namespace = b"consensus".to_vec();
2417 let executor = deterministic::Runner::timed(Duration::from_secs(1800));
2418 executor.start(|mut context| async move {
2419 let Fixture {
2421 participants,
2422 schemes,
2423 ..
2424 } = fixture(&mut context, &namespace, n);
2425 let mut oracle =
2426 start_test_network_with_peers(context.child("network"), participants.clone(), true)
2427 .await;
2428 let mut registrations = register_validators(&mut oracle, &participants).await;
2429
2430 let link = Link {
2432 latency: Duration::from_secs(3),
2433 jitter: Duration::from_millis(0),
2434 success_rate: 1.0,
2435 };
2436 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2437
2438 let elector = L::default();
2440 let relay = Arc::new(mocks::relay::Relay::new());
2441 let mut reporters = Vec::new();
2442 let mut engine_handlers = Vec::new();
2443 for (idx, validator) in participants.iter().enumerate() {
2444 let context = context
2446 .child("validator")
2447 .with_attribute("public_key", validator);
2448
2449 let reporter_config = mocks::reporter::Config {
2451 participants: participants.clone().try_into().unwrap(),
2452 scheme: schemes[idx].clone(),
2453 elector: elector.clone(),
2454 };
2455 let reporter =
2456 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
2457 reporters.push(reporter.clone());
2458 let application_cfg = mocks::application::Config {
2459 hasher: Sha256::default(),
2460 relay: relay.clone(),
2461 me: validator.clone(),
2462 propose_latency: (10.0, 5.0),
2463 verify_latency: (10.0, 5.0),
2464 certify_latency: (10.0, 5.0),
2465 should_certify: mocks::application::Certifier::Always,
2466 };
2467 let (actor, application) = mocks::application::Application::new(
2468 context.child("application"),
2469 application_cfg,
2470 );
2471 actor.start();
2472 let blocker = oracle.control(validator.clone());
2473 let cfg = config::Config {
2474 scheme: schemes[idx].clone(),
2475 elector: elector.clone(),
2476 blocker,
2477 automaton: application.clone(),
2478 relay: application.clone(),
2479 reporter: reporter.clone(),
2480 strategy: Sequential,
2481 partition: validator.to_string(),
2482 mailbox_size: NZUsize!(1024),
2483 epoch: Epoch::new(333),
2484 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
2485 Epoch::new(333),
2486 )),
2487 leader_timeout: Duration::from_secs(1),
2488 certification_timeout: Duration::from_secs(2),
2489 timeout_retry: Duration::from_secs(10),
2490 fetch_timeout: Duration::from_secs(1),
2491 activity_timeout,
2492 skip_timeout,
2493 fetch_concurrent: NZUsize!(4),
2494 replay_buffer: NZUsize!(1024 * 1024),
2495 write_buffer: NZUsize!(1024 * 1024),
2496 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2497 forwarding: ForwardingPolicy::Disabled,
2498 };
2499 let engine = Engine::new(context.child("engine"), cfg);
2500
2501 let (pending, recovered, resolver) = registrations
2503 .remove(validator)
2504 .expect("validator should be registered");
2505 engine_handlers.push(engine.start(pending, recovered, resolver));
2506 }
2507
2508 let mut finalizers = Vec::new();
2510 for reporter in reporters.iter_mut() {
2511 let (_, mut monitor) = reporter.subscribe().await;
2512 finalizers.push(context.child("finalizer").spawn(move |context| async move {
2513 select! {
2514 _timeout = context.sleep(Duration::from_secs(60)) => {},
2515 _done = monitor.recv() => {
2516 panic!("engine should not notarize or finalize anything");
2517 },
2518 }
2519 }));
2520 }
2521 join_all(finalizers).await;
2522
2523 link_validators(&mut oracle, &participants, Action::Unlink, None).await;
2525
2526 context.sleep(Duration::from_secs(60)).await;
2528
2529 let mut latest = View::zero();
2531 for reporter in reporters.iter() {
2532 let nullifies = reporter.nullifies.lock();
2533 let max = nullifies.keys().max().unwrap();
2534 if *max > latest {
2535 latest = *max;
2536 }
2537 }
2538
2539 let link = Link {
2541 latency: Duration::from_millis(10),
2542 jitter: Duration::from_millis(1),
2543 success_rate: 1.0,
2544 };
2545 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2546
2547 let mut finalizers = Vec::new();
2549 for reporter in reporters.iter_mut() {
2550 let (mut latest, mut monitor) = reporter.subscribe().await;
2551 finalizers.push(context.child("finalizer").spawn(move |_| async move {
2552 while latest < required_containers {
2553 latest = monitor.recv().await.expect("event missing");
2554 }
2555 }));
2556 }
2557 join_all(finalizers).await;
2558
2559 for reporter in reporters.iter() {
2561 reporter.assert_no_faults();
2563
2564 reporter.assert_no_invalid();
2566
2567 {
2572 let mut found = 0;
2576 let notarizations = reporter.notarizations.lock();
2577 for view in View::range(latest, latest.saturating_add(activity_timeout)) {
2578 if notarizations.contains_key(&view) {
2579 found += 1;
2580 }
2581 }
2582 let tolerated_missing = skip_timeout.get().saturating_add(1);
2583 assert!(
2584 found >= activity_timeout.get().saturating_sub(tolerated_missing),
2585 "found: {found}"
2586 );
2587 }
2588 }
2589
2590 let blocked = oracle.blocked().await.unwrap();
2592 assert!(blocked.is_empty());
2593 });
2594 }
2595
2596 #[test_group("slow")]
2597 #[test_traced]
2598 fn test_all_recovery() {
2599 all_recovery::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
2600 all_recovery::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
2601 all_recovery::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
2602 all_recovery::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
2603 all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2604 all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2605 all_recovery::<_, _, RoundRobin>(ed25519::fixture);
2606 all_recovery::<_, _, RoundRobin>(secp256r1::fixture);
2607 }
2608
2609 fn partition<S, F, L>(mut fixture: F)
2610 where
2611 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2612 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2613 L: Elector<S>,
2614 {
2615 let n = 10;
2617 let required_containers = View::new(50);
2618 let activity_timeout = ViewDelta::new(10);
2619 let skip_timeout = ViewDelta::new(5);
2620 let namespace = b"consensus".to_vec();
2621 let executor = deterministic::Runner::timed(Duration::from_secs(900));
2622 executor.start(|mut context| async move {
2623 let Fixture {
2625 participants,
2626 schemes,
2627 ..
2628 } = fixture(&mut context, &namespace, n);
2629 let mut oracle =
2630 start_test_network_with_peers(context.child("network"), participants.clone(), true)
2631 .await;
2632 let mut registrations = register_validators(&mut oracle, &participants).await;
2633
2634 let link = Link {
2636 latency: Duration::from_millis(10),
2637 jitter: Duration::from_millis(1),
2638 success_rate: 1.0,
2639 };
2640 link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
2641
2642 let elector = L::default();
2644 let relay = Arc::new(mocks::relay::Relay::new());
2645 let mut reporters = Vec::new();
2646 let mut engine_handlers = Vec::new();
2647 for (idx, validator) in participants.iter().enumerate() {
2648 let context = context
2650 .child("validator")
2651 .with_attribute("public_key", validator);
2652
2653 let reporter_config = mocks::reporter::Config {
2655 participants: participants.clone().try_into().unwrap(),
2656 scheme: schemes[idx].clone(),
2657 elector: elector.clone(),
2658 };
2659 let reporter =
2660 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
2661 reporters.push(reporter.clone());
2662 let application_cfg = mocks::application::Config {
2663 hasher: Sha256::default(),
2664 relay: relay.clone(),
2665 me: validator.clone(),
2666 propose_latency: (10.0, 5.0),
2667 verify_latency: (10.0, 5.0),
2668 certify_latency: (10.0, 5.0),
2669 should_certify: mocks::application::Certifier::Always,
2670 };
2671 let (actor, application) = mocks::application::Application::new(
2672 context.child("application"),
2673 application_cfg,
2674 );
2675 actor.start();
2676 let blocker = oracle.control(validator.clone());
2677 let cfg = config::Config {
2678 scheme: schemes[idx].clone(),
2679 elector: elector.clone(),
2680 blocker,
2681 automaton: application.clone(),
2682 relay: application.clone(),
2683 reporter: reporter.clone(),
2684 strategy: Sequential,
2685 partition: validator.to_string(),
2686 mailbox_size: NZUsize!(1024),
2687 epoch: Epoch::new(333),
2688 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
2689 Epoch::new(333),
2690 )),
2691 leader_timeout: Duration::from_secs(1),
2692 certification_timeout: Duration::from_secs(2),
2693 timeout_retry: Duration::from_secs(10),
2694 fetch_timeout: Duration::from_secs(1),
2695 activity_timeout,
2696 skip_timeout,
2697 fetch_concurrent: NZUsize!(4),
2698 replay_buffer: NZUsize!(1024 * 1024),
2699 write_buffer: NZUsize!(1024 * 1024),
2700 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2701 forwarding: ForwardingPolicy::Disabled,
2702 };
2703 let engine = Engine::new(context.child("engine"), cfg);
2704
2705 let (pending, recovered, resolver) = registrations
2707 .remove(validator)
2708 .expect("validator should be registered");
2709 engine_handlers.push(engine.start(pending, recovered, resolver));
2710 }
2711
2712 let mut finalizers = Vec::new();
2714 for reporter in reporters.iter_mut() {
2715 let (mut latest, mut monitor) = reporter.subscribe().await;
2716 finalizers.push(context.child("finalizer").spawn(move |_| async move {
2717 while latest < required_containers {
2718 latest = monitor.recv().await.expect("event missing");
2719 }
2720 }));
2721 }
2722 join_all(finalizers).await;
2723
2724 fn separated(n: usize, a: usize, b: usize) -> bool {
2726 let m = n / 2;
2727 (a < m && b >= m) || (a >= m && b < m)
2728 }
2729 link_validators(&mut oracle, &participants, Action::Unlink, Some(separated)).await;
2730
2731 context.sleep(Duration::from_secs(10)).await;
2733
2734 let mut finalizers = Vec::new();
2736 for reporter in reporters.iter_mut() {
2737 let (_, mut monitor) = reporter.subscribe().await;
2738 finalizers.push(context.child("finalizer").spawn(move |context| async move {
2739 select! {
2740 _timeout = context.sleep(Duration::from_secs(60)) => {},
2741 _done = monitor.recv() => {
2742 panic!("engine should not notarize or finalize anything");
2743 },
2744 }
2745 }));
2746 }
2747 join_all(finalizers).await;
2748
2749 link_validators(
2751 &mut oracle,
2752 &participants,
2753 Action::Link(link),
2754 Some(separated),
2755 )
2756 .await;
2757
2758 let mut finalizers = Vec::new();
2760 for reporter in reporters.iter_mut() {
2761 let (mut latest, mut monitor) = reporter.subscribe().await;
2762 let required = latest.saturating_add(ViewDelta::new(required_containers.get()));
2763 finalizers.push(context.child("finalizer").spawn(move |_| async move {
2764 while latest < required {
2765 latest = monitor.recv().await.expect("event missing");
2766 }
2767 }));
2768 }
2769 join_all(finalizers).await;
2770
2771 for reporter in reporters.iter() {
2773 reporter.assert_no_faults();
2775
2776 reporter.assert_no_invalid();
2778 }
2779
2780 let blocked = oracle.blocked().await.unwrap();
2782 assert!(blocked.is_empty());
2783 });
2784 }
2785
2786 #[test_group("slow")]
2787 #[test_traced]
2788 fn test_partition() {
2789 partition::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
2790 partition::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
2791 partition::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
2792 partition::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
2793 partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2794 partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2795 partition::<_, _, RoundRobin>(ed25519::fixture);
2796 partition::<_, _, RoundRobin>(secp256r1::fixture);
2797 }
2798
2799 fn slow_and_lossy_links<S, F, L>(seed: u64, mut fixture: F) -> String
2800 where
2801 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2802 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2803 L: Elector<S>,
2804 {
2805 let n = 5;
2807 let required_containers = View::new(50);
2808 let activity_timeout = ViewDelta::new(10);
2809 let skip_timeout = ViewDelta::new(5);
2810 let namespace = b"consensus".to_vec();
2811 let cfg = deterministic::Config::new()
2812 .with_seed(seed)
2813 .with_timeout(Some(Duration::from_secs(5_000)));
2814 let executor = deterministic::Runner::new(cfg);
2815 executor.start(|mut context| async move {
2816 let Fixture {
2818 participants,
2819 schemes,
2820 ..
2821 } = fixture(&mut context, &namespace, n);
2822 let mut oracle =
2823 start_test_network_with_peers(context.child("network"), participants.clone(), true)
2824 .await;
2825 let mut registrations = register_validators(&mut oracle, &participants).await;
2826
2827 let degraded_link = Link {
2829 latency: Duration::from_millis(200),
2830 jitter: Duration::from_millis(150),
2831 success_rate: 0.5,
2832 };
2833 link_validators(
2834 &mut oracle,
2835 &participants,
2836 Action::Link(degraded_link),
2837 None,
2838 )
2839 .await;
2840
2841 let elector = L::default();
2843 let relay = Arc::new(mocks::relay::Relay::new());
2844 let mut reporters = Vec::new();
2845 let mut engine_handlers = Vec::new();
2846 for (idx, validator) in participants.iter().enumerate() {
2847 let context = context
2849 .child("validator")
2850 .with_attribute("public_key", validator);
2851
2852 let reporter_config = mocks::reporter::Config {
2854 participants: participants.clone().try_into().unwrap(),
2855 scheme: schemes[idx].clone(),
2856 elector: elector.clone(),
2857 };
2858 let reporter =
2859 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
2860 reporters.push(reporter.clone());
2861 let application_cfg = mocks::application::Config {
2862 hasher: Sha256::default(),
2863 relay: relay.clone(),
2864 me: validator.clone(),
2865 propose_latency: (10.0, 5.0),
2866 verify_latency: (10.0, 5.0),
2867 certify_latency: (10.0, 5.0),
2868 should_certify: mocks::application::Certifier::Always,
2869 };
2870 let (actor, application) = mocks::application::Application::new(
2871 context.child("application"),
2872 application_cfg,
2873 );
2874 actor.start();
2875 let blocker = oracle.control(validator.clone());
2876 let cfg = config::Config {
2877 scheme: schemes[idx].clone(),
2878 elector: elector.clone(),
2879 blocker,
2880 automaton: application.clone(),
2881 relay: application.clone(),
2882 reporter: reporter.clone(),
2883 strategy: Sequential,
2884 partition: validator.to_string(),
2885 mailbox_size: NZUsize!(1024),
2886 epoch: Epoch::new(333),
2887 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
2888 Epoch::new(333),
2889 )),
2890 leader_timeout: Duration::from_secs(1),
2891 certification_timeout: Duration::from_secs(2),
2892 timeout_retry: Duration::from_secs(10),
2893 fetch_timeout: Duration::from_secs(1),
2894 activity_timeout,
2895 skip_timeout,
2896 fetch_concurrent: NZUsize!(4),
2897 replay_buffer: NZUsize!(1024 * 1024),
2898 write_buffer: NZUsize!(1024 * 1024),
2899 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2900 forwarding: ForwardingPolicy::Disabled,
2901 };
2902 let engine = Engine::new(context.child("engine"), cfg);
2903
2904 let (pending, recovered, resolver) = registrations
2906 .remove(validator)
2907 .expect("validator should be registered");
2908 engine_handlers.push(engine.start(pending, recovered, resolver));
2909 }
2910
2911 let mut finalizers = Vec::new();
2913 for reporter in reporters.iter_mut() {
2914 let (mut latest, mut monitor) = reporter.subscribe().await;
2915 finalizers.push(context.child("finalizer").spawn(move |_| async move {
2916 while latest < required_containers {
2917 latest = monitor.recv().await.expect("event missing");
2918 }
2919 }));
2920 }
2921 join_all(finalizers).await;
2922
2923 for reporter in reporters.iter() {
2925 reporter.assert_no_faults();
2927
2928 reporter.assert_no_invalid();
2930 }
2931
2932 let blocked = oracle.blocked().await.unwrap();
2934 assert!(blocked.is_empty());
2935
2936 context.auditor().state()
2937 })
2938 }
2939
2940 #[test_group("slow")]
2941 #[test_traced]
2942 fn test_slow_and_lossy_links() {
2943 slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold_vrf::fixture::<MinPk, _>);
2944 slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold_vrf::fixture::<MinSig, _>);
2945 slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_threshold_std::fixture::<MinPk, _>);
2946 slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_threshold_std::fixture::<MinSig, _>);
2947 slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinPk, _>);
2948 slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinSig, _>);
2949 slow_and_lossy_links::<_, _, RoundRobin>(0, ed25519::fixture);
2950 slow_and_lossy_links::<_, _, RoundRobin>(0, secp256r1::fixture);
2951 }
2952
2953 #[test_group("slow")]
2954 #[test_traced]
2955 fn test_determinism() {
2956 for seed in 1..6 {
2959 let ts_vrf_pk_state_1 = slow_and_lossy_links::<_, _, Random>(
2960 seed,
2961 bls12381_threshold_vrf::fixture::<MinPk, _>,
2962 );
2963 let ts_vrf_pk_state_2 = slow_and_lossy_links::<_, _, Random>(
2964 seed,
2965 bls12381_threshold_vrf::fixture::<MinPk, _>,
2966 );
2967 assert_eq!(ts_vrf_pk_state_1, ts_vrf_pk_state_2);
2968
2969 let ts_vrf_sig_state_1 = slow_and_lossy_links::<_, _, Random>(
2970 seed,
2971 bls12381_threshold_vrf::fixture::<MinSig, _>,
2972 );
2973 let ts_vrf_sig_state_2 = slow_and_lossy_links::<_, _, Random>(
2974 seed,
2975 bls12381_threshold_vrf::fixture::<MinSig, _>,
2976 );
2977 assert_eq!(ts_vrf_sig_state_1, ts_vrf_sig_state_2);
2978
2979 let ts_std_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2980 seed,
2981 bls12381_threshold_std::fixture::<MinPk, _>,
2982 );
2983 let ts_std_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2984 seed,
2985 bls12381_threshold_std::fixture::<MinPk, _>,
2986 );
2987 assert_eq!(ts_std_pk_state_1, ts_std_pk_state_2);
2988
2989 let ts_std_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2990 seed,
2991 bls12381_threshold_std::fixture::<MinSig, _>,
2992 );
2993 let ts_std_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2994 seed,
2995 bls12381_threshold_std::fixture::<MinSig, _>,
2996 );
2997 assert_eq!(ts_std_sig_state_1, ts_std_sig_state_2);
2998
2999 let ms_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
3000 seed,
3001 bls12381_multisig::fixture::<MinPk, _>,
3002 );
3003 let ms_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
3004 seed,
3005 bls12381_multisig::fixture::<MinPk, _>,
3006 );
3007 assert_eq!(ms_pk_state_1, ms_pk_state_2);
3008
3009 let ms_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
3010 seed,
3011 bls12381_multisig::fixture::<MinSig, _>,
3012 );
3013 let ms_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
3014 seed,
3015 bls12381_multisig::fixture::<MinSig, _>,
3016 );
3017 assert_eq!(ms_sig_state_1, ms_sig_state_2);
3018
3019 let ed_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
3020 let ed_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
3021 assert_eq!(ed_state_1, ed_state_2);
3022
3023 let secp_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
3024 let secp_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
3025 assert_eq!(secp_state_1, secp_state_2);
3026
3027 let states = [
3028 ("threshold-vrf-minpk", ts_vrf_pk_state_1),
3029 ("threshold-vrf-minsig", ts_vrf_sig_state_1),
3030 ("threshold-std-minpk", ts_std_pk_state_1),
3031 ("threshold-std-minsig", ts_std_sig_state_1),
3032 ("multisig-minpk", ms_pk_state_1),
3033 ("multisig-minsig", ms_sig_state_1),
3034 ("ed25519", ed_state_1),
3035 ("secp256r1", secp_state_1),
3036 ];
3037
3038 for pair in states.windows(2) {
3040 assert_ne!(
3041 pair[0].1, pair[1].1,
3042 "state {} equals state {}",
3043 pair[0].0, pair[1].0
3044 );
3045 }
3046 }
3047 }
3048
3049 fn conflicter<S, F, L>(seed: u64, mut fixture: F)
3050 where
3051 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3052 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3053 L: Elector<S>,
3054 {
3055 let n = 4;
3057 let required_containers = View::new(50);
3058 let activity_timeout = ViewDelta::new(10);
3059 let skip_timeout = ViewDelta::new(5);
3060 let namespace = b"consensus".to_vec();
3061 let cfg = deterministic::Config::new()
3062 .with_seed(seed)
3063 .with_timeout(Some(Duration::from_secs(30)));
3064 let executor = deterministic::Runner::new(cfg);
3065 executor.start(|mut context| async move {
3066 let Fixture {
3068 participants,
3069 schemes,
3070 ..
3071 } = fixture(&mut context, &namespace, n);
3072 let mut oracle =
3073 start_test_network_with_peers(context.child("network"), participants.clone(), true)
3074 .await;
3075 let mut registrations = register_validators(&mut oracle, &participants).await;
3076
3077 let link = Link {
3079 latency: Duration::from_millis(10),
3080 jitter: Duration::from_millis(1),
3081 success_rate: 1.0,
3082 };
3083 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3084
3085 let elector = L::default();
3087 let relay = Arc::new(mocks::relay::Relay::new());
3088 let mut reporters = Vec::new();
3089 for (idx_scheme, validator) in participants.iter().enumerate() {
3090 let context = context
3092 .child("validator")
3093 .with_attribute("public_key", validator);
3094
3095 let reporter_config = mocks::reporter::Config {
3097 participants: participants.clone().try_into().unwrap(),
3098 scheme: schemes[idx_scheme].clone(),
3099 elector: elector.clone(),
3100 };
3101 let reporter =
3102 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
3103 let (pending, recovered, resolver) = registrations
3104 .remove(validator)
3105 .expect("validator should be registered");
3106 if idx_scheme == 0 {
3107 let cfg = mocks::conflicter::Config {
3108 scheme: schemes[idx_scheme].clone(),
3109 };
3110
3111 let engine: mocks::conflicter::Conflicter<_, _, Sha256> =
3112 mocks::conflicter::Conflicter::new(context.child("byzantine_engine"), cfg);
3113 engine.start(pending);
3114 } else {
3115 reporters.push(reporter.clone());
3116 let application_cfg = mocks::application::Config {
3117 hasher: Sha256::default(),
3118 relay: relay.clone(),
3119 me: validator.clone(),
3120 propose_latency: (10.0, 5.0),
3121 verify_latency: (10.0, 5.0),
3122 certify_latency: (10.0, 5.0),
3123 should_certify: mocks::application::Certifier::Always,
3124 };
3125 let (actor, application) = mocks::application::Application::new(
3126 context.child("application"),
3127 application_cfg,
3128 );
3129 actor.start();
3130 let blocker = oracle.control(validator.clone());
3131 let cfg = config::Config {
3132 scheme: schemes[idx_scheme].clone(),
3133 elector: elector.clone(),
3134 blocker,
3135 automaton: application.clone(),
3136 relay: application.clone(),
3137 reporter: reporter.clone(),
3138 strategy: Sequential,
3139 partition: validator.to_string(),
3140 mailbox_size: NZUsize!(1024),
3141 epoch: Epoch::new(333),
3142 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
3143 Epoch::new(333),
3144 )),
3145 leader_timeout: Duration::from_secs(1),
3146 certification_timeout: Duration::from_secs(2),
3147 timeout_retry: Duration::from_secs(10),
3148 fetch_timeout: Duration::from_secs(1),
3149 activity_timeout,
3150 skip_timeout,
3151 fetch_concurrent: NZUsize!(4),
3152 replay_buffer: NZUsize!(1024 * 1024),
3153 write_buffer: NZUsize!(1024 * 1024),
3154 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3155 forwarding: ForwardingPolicy::Disabled,
3156 };
3157 let engine = Engine::new(context.child("engine"), cfg);
3158 engine.start(pending, recovered, resolver);
3159 }
3160 }
3161
3162 let mut finalizers = Vec::new();
3164 for reporter in reporters.iter_mut() {
3165 let (mut latest, mut monitor) = reporter.subscribe().await;
3166 finalizers.push(context.child("finalizer").spawn(move |_| async move {
3167 while latest < required_containers {
3168 latest = monitor.recv().await.expect("event missing");
3169 }
3170 }));
3171 }
3172 join_all(finalizers).await;
3173
3174 let byz = &participants[0];
3176 let mut count_conflicting = 0;
3177 for reporter in reporters.iter() {
3178 {
3180 let faults = reporter.faults.lock();
3181 assert_eq!(faults.len(), 1);
3182 let faulter = faults.get(byz).expect("byzantine party is not faulter");
3183 for (_, faults) in faulter.iter() {
3184 for fault in faults.iter() {
3185 match fault {
3186 Activity::ConflictingNotarize(_) => {
3187 count_conflicting += 1;
3188 }
3189 Activity::ConflictingFinalize(_) => {
3190 count_conflicting += 1;
3191 }
3192 _ => panic!("unexpected fault: {fault:?}"),
3193 }
3194 }
3195 }
3196 }
3197
3198 reporter.assert_no_invalid();
3200 }
3201 assert!(count_conflicting > 0);
3202
3203 let blocked = oracle.blocked().await.unwrap();
3205 assert!(!blocked.is_empty());
3206 for (a, b) in blocked {
3207 assert_ne!(&a, byz);
3208 assert_eq!(&b, byz);
3209 }
3210 });
3211 }
3212
3213 #[test_group("slow")]
3214 #[test_traced]
3215 fn test_conflicter() {
3216 for seed in 0..5 {
3217 conflicter::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3218 conflicter::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3219 conflicter::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3220 conflicter::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3221 conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3222 conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3223 conflicter::<_, _, RoundRobin>(seed, ed25519::fixture);
3224 conflicter::<_, _, RoundRobin>(seed, secp256r1::fixture);
3225 }
3226 }
3227
3228 fn invalid<S, F, L>(seed: u64, mut fixture: F)
3229 where
3230 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3231 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3232 L: Elector<S>,
3233 {
3234 let n = 4;
3236 let required_containers = View::new(50);
3237 let activity_timeout = ViewDelta::new(10);
3238 let skip_timeout = ViewDelta::new(5);
3239 let namespace = b"consensus".to_vec();
3240 let cfg = deterministic::Config::new()
3241 .with_seed(seed)
3242 .with_timeout(Some(Duration::from_secs(30)));
3243 let executor = deterministic::Runner::new(cfg);
3244 executor.start(|mut context| async move {
3245 let Fixture {
3247 participants,
3248 schemes,
3249 ..
3250 } = fixture(&mut context, &namespace, n);
3251
3252 let schemes: Vec<_> = schemes
3253 .into_iter()
3254 .enumerate()
3255 .map(|(idx, scheme)| {
3256 let is_byzantine = idx == 0;
3257 let behavior = if is_byzantine {
3258 wrapped::Behavior::CorruptSignature
3259 } else {
3260 wrapped::Behavior::Honest
3261 };
3262 wrapped::Scheme::new(scheme, behavior)
3263 })
3264 .collect();
3265
3266 let mut oracle =
3267 start_test_network_with_peers(context.child("network"), participants.clone(), true)
3268 .await;
3269 let mut registrations = register_validators(&mut oracle, &participants).await;
3270
3271 let link = Link {
3273 latency: Duration::from_millis(10),
3274 jitter: Duration::from_millis(1),
3275 success_rate: 1.0,
3276 };
3277 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3278
3279 let elector = wrapped::Config(L::default());
3281 let relay = Arc::new(mocks::relay::Relay::new());
3282 let mut reporters = Vec::new();
3283 for (idx_scheme, validator) in participants.iter().enumerate() {
3284 let context = context
3286 .child("validator")
3287 .with_attribute("public_key", validator);
3288
3289 let reporter_config = mocks::reporter::Config {
3290 participants: participants.clone().try_into().unwrap(),
3291 scheme: schemes[idx_scheme].clone(),
3292 elector: elector.clone(),
3293 };
3294 let reporter =
3295 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
3296 reporters.push(reporter.clone());
3297
3298 let application_cfg = mocks::application::Config {
3299 hasher: Sha256::default(),
3300 relay: relay.clone(),
3301 me: validator.clone(),
3302 propose_latency: (10.0, 5.0),
3303 verify_latency: (10.0, 5.0),
3304 certify_latency: (10.0, 5.0),
3305 should_certify: mocks::application::Certifier::Always,
3306 };
3307 let (actor, application) = mocks::application::Application::new(
3308 context.child("application"),
3309 application_cfg,
3310 );
3311 actor.start();
3312 let blocker = oracle.control(validator.clone());
3313 let cfg = config::Config {
3314 scheme: schemes[idx_scheme].clone(),
3315 elector: elector.clone(),
3316 blocker,
3317 automaton: application.clone(),
3318 relay: application.clone(),
3319 reporter: reporter.clone(),
3320 strategy: Sequential,
3321 partition: validator.clone().to_string(),
3322 mailbox_size: NZUsize!(1024),
3323 epoch: Epoch::new(333),
3324 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
3325 Epoch::new(333),
3326 )),
3327 leader_timeout: Duration::from_secs(1),
3328 certification_timeout: Duration::from_secs(2),
3329 timeout_retry: Duration::from_secs(10),
3330 fetch_timeout: Duration::from_secs(1),
3331 activity_timeout,
3332 skip_timeout,
3333 fetch_concurrent: NZUsize!(4),
3334 replay_buffer: NZUsize!(1024 * 1024),
3335 write_buffer: NZUsize!(1024 * 1024),
3336 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3337 forwarding: ForwardingPolicy::Disabled,
3338 };
3339 let engine = Engine::new(context.child("engine"), cfg);
3340 let (pending, recovered, resolver) = registrations
3341 .remove(validator)
3342 .expect("validator should be registered");
3343 engine.start(pending, recovered, resolver);
3344 }
3345
3346 let mut finalizers = Vec::new();
3350 for reporter in reporters.iter_mut().skip(1) {
3351 let (mut latest, mut monitor) = reporter.subscribe().await;
3352 finalizers.push(context.child("finalizer").spawn(move |_| async move {
3353 while latest < required_containers {
3354 latest = monitor.recv().await.expect("event missing");
3355 }
3356 }));
3357 }
3358 join_all(finalizers).await;
3359
3360 for (i, reporter) in reporters.iter().enumerate() {
3362 reporter.assert_no_faults();
3364
3365 assert!(*reporter.invalid_votes.lock() > 0);
3368
3369 let is_byzantine = i == 0;
3372 if is_byzantine {
3373 assert!(*reporter.invalid_certificates.lock() > 0);
3374 } else {
3375 assert_eq!(*reporter.invalid_certificates.lock(), 0);
3376 }
3377 }
3378
3379 let blocked = oracle.blocked().await.unwrap();
3382 assert!(blocked.len() >= participants.len() - 1);
3383 let byz = &participants[0];
3384 for (_, b) in blocked {
3385 assert_eq!(&b, byz);
3387 }
3388 });
3389 }
3390
3391 #[test_group("slow")]
3392 #[test_traced]
3393 fn test_invalid() {
3394 for seed in 0..5 {
3395 invalid::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3396 invalid::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3397 invalid::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3398 invalid::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3399 invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3400 invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3401 invalid::<_, _, RoundRobin>(seed, ed25519::fixture);
3402 invalid::<_, _, RoundRobin>(seed, secp256r1::fixture);
3403 }
3404 }
3405
3406 fn received_certificates_are_reported<S, F, L>(seed: u64, mut fixture: F)
3407 where
3408 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3409 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3410 L: Elector<S>,
3411 {
3412 let n = 4;
3413 let required_containers = View::new(10);
3414 let activity_timeout = ViewDelta::new(10);
3415 let skip_timeout = ViewDelta::new(5);
3416 let namespace = b"consensus".to_vec();
3417 let cfg = deterministic::Config::new()
3418 .with_seed(seed)
3419 .with_timeout(Some(Duration::from_secs(30)));
3420 let executor = deterministic::Runner::new(cfg);
3421 executor.start(|mut context| async move {
3422 let Fixture {
3423 participants,
3424 schemes,
3425 ..
3426 } = fixture(&mut context, &namespace, n);
3427
3428 let mut oracle = start_test_network_with_peers(
3429 context.child("network"),
3430 participants.clone(),
3431 false,
3432 )
3433 .await;
3434 let mut registrations = register_validators(&mut oracle, &participants).await;
3435
3436 let link = Link {
3441 latency: Duration::from_millis(100),
3442 jitter: Duration::from_millis(1),
3443 success_rate: 1.0,
3444 };
3445 fn link_graph(_: usize, i: usize, j: usize) -> bool {
3446 if i == 0 || j == 0 {
3447 return i == 1 || j == 1;
3448 }
3449 true
3450 }
3451 link_validators(
3452 &mut oracle,
3453 &participants,
3454 Action::Link(link),
3455 Some(link_graph),
3456 )
3457 .await;
3458
3459 let elector = L::default();
3460 let relay = Arc::new(mocks::relay::Relay::new());
3461 let mut reporters = Vec::new();
3462 for (idx_scheme, validator) in participants.iter().enumerate() {
3463 let context = context
3464 .child("validator")
3465 .with_attribute("public_key", validator);
3466 let reporter_config = mocks::reporter::Config {
3467 participants: participants.clone().try_into().unwrap(),
3468 scheme: schemes[idx_scheme].clone(),
3469 elector: elector.clone(),
3470 };
3471 let reporter =
3472 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
3473 reporters.push(reporter.clone());
3474
3475 let application_cfg = mocks::application::Config {
3476 hasher: Sha256::default(),
3477 relay: relay.clone(),
3478 me: validator.clone(),
3479 propose_latency: (10.0, 5.0),
3480 verify_latency: (10.0, 5.0),
3481 certify_latency: (10.0, 5.0),
3482 should_certify: mocks::application::Certifier::Always,
3483 };
3484 let (actor, application) = mocks::application::Application::new(
3485 context.child("application"),
3486 application_cfg,
3487 );
3488 actor.start();
3489 let blocker = oracle.control(validator.clone());
3490 let cfg = config::Config {
3491 scheme: schemes[idx_scheme].clone(),
3492 elector: elector.clone(),
3493 blocker,
3494 automaton: application.clone(),
3495 relay: application.clone(),
3496 reporter: reporter.clone(),
3497 strategy: Sequential,
3498 partition: validator.clone().to_string(),
3499 mailbox_size: NZUsize!(1024),
3500 epoch: Epoch::new(333),
3501 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
3502 Epoch::new(333),
3503 )),
3504 leader_timeout: Duration::from_secs(1),
3505 certification_timeout: Duration::from_secs(2),
3506 timeout_retry: Duration::from_secs(10),
3507 fetch_timeout: Duration::from_secs(1),
3508 activity_timeout,
3509 skip_timeout,
3510 fetch_concurrent: NZUsize!(4),
3511 replay_buffer: NZUsize!(1024 * 1024),
3512 write_buffer: NZUsize!(1024 * 1024),
3513 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3514 forwarding: ForwardingPolicy::Disabled,
3515 };
3516 let engine = Engine::new(context.child("engine"), cfg);
3517 let (pending, recovered, resolver) = registrations
3518 .remove(validator)
3519 .expect("validator should be registered");
3520 engine.start(pending, recovered, resolver);
3521 }
3522 let excluded_reporter = reporters[0].clone();
3524 let mut honest_reporter = reporters[1].clone();
3525 let (mut honest_latest, mut honest_monitor) = honest_reporter.subscribe().await;
3526 while honest_latest < required_containers {
3527 honest_latest = honest_monitor.recv().await.expect("event missing");
3528 }
3529
3530 context.sleep(Duration::from_secs(1)).await;
3532
3533 let honest_notarized = {
3536 let notarizations = honest_reporter.notarizations.lock();
3537 View::range(View::new(1), required_containers.next())
3538 .filter(|view| notarizations.contains_key(view))
3539 .count()
3540 };
3541 let excluded_notarized = {
3542 let notarizations = excluded_reporter.notarizations.lock();
3543 View::range(View::new(1), required_containers.next())
3544 .filter(|view| notarizations.contains_key(view))
3545 .count()
3546 };
3547 assert!(
3548 excluded_notarized >= honest_notarized.saturating_sub(2),
3549 "honest_notarized: {honest_notarized}, excluded_notarized: {excluded_notarized}"
3550 );
3551
3552 let honest_finalized = {
3553 let finalizations = honest_reporter.finalizations.lock();
3554 View::range(View::new(1), required_containers.next())
3555 .filter(|view| finalizations.contains_key(view))
3556 .count()
3557 };
3558 let excluded_finalized = {
3559 let finalizations = excluded_reporter.finalizations.lock();
3560 View::range(View::new(1), required_containers.next())
3561 .filter(|view| finalizations.contains_key(view))
3562 .count()
3563 };
3564 assert!(
3565 excluded_finalized >= honest_finalized.saturating_sub(2),
3566 "honest_finalized: {honest_finalized}, excluded_finalized: {excluded_finalized}"
3567 );
3568 });
3569 }
3570
3571 #[test_group("slow")]
3573 #[test_traced]
3574 fn test_received_certificates_are_reported() {
3575 received_certificates_are_reported::<_, _, Random>(
3576 0,
3577 bls12381_threshold_vrf::fixture::<MinPk, _>,
3578 );
3579 received_certificates_are_reported::<_, _, Random>(
3580 0,
3581 bls12381_threshold_vrf::fixture::<MinSig, _>,
3582 );
3583 received_certificates_are_reported::<_, _, RoundRobin>(
3584 0,
3585 bls12381_threshold_std::fixture::<MinPk, _>,
3586 );
3587 received_certificates_are_reported::<_, _, RoundRobin>(
3588 0,
3589 bls12381_threshold_std::fixture::<MinSig, _>,
3590 );
3591 received_certificates_are_reported::<_, _, RoundRobin>(
3592 0,
3593 bls12381_multisig::fixture::<MinPk, _>,
3594 );
3595 received_certificates_are_reported::<_, _, RoundRobin>(
3596 0,
3597 bls12381_multisig::fixture::<MinSig, _>,
3598 );
3599 received_certificates_are_reported::<_, _, RoundRobin>(0, ed25519::fixture);
3600 received_certificates_are_reported::<_, _, RoundRobin>(0, secp256r1::fixture);
3601 }
3602
3603 #[test_traced]
3604 fn test_survives_burst() {
3605 let n = 4;
3606 let epoch = Epoch::new(333);
3607 let namespace = b"mailbox_size_one_certificate_burst".to_vec();
3608 let executor = deterministic::Runner::default();
3609 executor.start(|mut context| async move {
3610 let Fixture {
3611 participants,
3612 schemes,
3613 ..
3614 } = scheme_mocks::fixture(&mut context, &namespace, n);
3615 let me = participants[0].clone();
3616 let mut oracle =
3617 start_test_network_with_peers(context.child("network"), participants.clone(), true)
3618 .await;
3619 let (pending, recovered, resolver) = register_validator(&mut oracle, me.clone()).await;
3620
3621 let injector_pk = PrivateKey::from_seed(9_000_000).public_key();
3622 let (mut injector_sender, _injector_receiver) = oracle
3623 .control(injector_pk.clone())
3624 .register(1, TEST_QUOTA)
3625 .await
3626 .unwrap();
3627 let link = Link {
3628 latency: Duration::from_millis(0),
3629 jitter: Duration::from_millis(0),
3630 success_rate: 1.0,
3631 };
3632 oracle
3633 .add_link(injector_pk.clone(), me.clone(), link)
3634 .await
3635 .unwrap();
3636 oracle.manager().track(
3637 1,
3638 TrackedPeers::new(
3639 Set::from_iter_dedup(std::iter::once(me.clone())),
3640 Set::from_iter_dedup(std::iter::once(injector_pk.clone())),
3641 ),
3642 );
3643 context.sleep(Duration::from_millis(1)).await;
3644
3645 let quorum = quorum(n) as usize;
3646 let notarization = |view: View, parent: View, payload: &[u8]| {
3647 let proposal =
3648 Proposal::new(Round::new(epoch, view), parent, Sha256::hash(payload));
3649 let votes: Vec<_> = schemes
3650 .iter()
3651 .take(quorum)
3652 .map(|scheme| TNotarize::sign(scheme, proposal.clone()).unwrap())
3653 .collect();
3654 TNotarization::from_notarizes(&schemes[0], &votes, &Sequential)
3655 .expect("notarization requires quorum")
3656 };
3657 let finalization = |view: View, parent: View, payload: &[u8]| {
3658 let proposal =
3659 Proposal::new(Round::new(epoch, view), parent, Sha256::hash(payload));
3660 let votes: Vec<_> = schemes
3661 .iter()
3662 .take(quorum)
3663 .map(|scheme| TFinalize::sign(scheme, proposal.clone()).unwrap())
3664 .collect();
3665 TFinalization::from_finalizes(&schemes[0], &votes, &Sequential)
3666 .expect("finalization requires quorum")
3667 };
3668
3669 for certificate in [
3671 Certificate::Notarization(notarization(View::new(1), View::zero(), b"payload-1")),
3672 Certificate::Notarization(notarization(View::new(2), View::new(1), b"payload-2")),
3673 Certificate::Notarization(notarization(View::new(3), View::new(2), b"payload-3")),
3674 Certificate::Finalization(finalization(View::new(3), View::new(2), b"payload-3")),
3675 ] {
3676 injector_sender.send(Recipients::One(me.clone()), certificate.encode(), true);
3677 }
3678
3679 let elector = RoundRobin::<Sha256>::default();
3680 let reporter_config = mocks::reporter::Config {
3681 participants: participants.clone().try_into().unwrap(),
3682 scheme: schemes[0].clone(),
3683 elector: elector.clone(),
3684 };
3685 let reporter =
3686 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
3687 let mut monitor_reporter = reporter.clone();
3688 let (mut latest, mut monitor) = monitor_reporter.subscribe().await;
3689
3690 let relay = Arc::new(mocks::relay::Relay::new());
3691 let application_cfg = mocks::application::Config {
3692 hasher: Sha256::default(),
3693 relay: relay.clone(),
3694 me: me.clone(),
3695 propose_latency: (0.0, 0.0),
3696 verify_latency: (0.0, 0.0),
3697 certify_latency: (0.0, 0.0),
3698 should_certify: mocks::application::Certifier::Always,
3699 };
3700 let (mut application_actor, application) =
3701 mocks::application::Application::new(context.child("application"), application_cfg);
3702 application_actor.set_stall_proposals(true);
3703 application_actor.start();
3704
3705 let cfg = config::Config {
3706 scheme: schemes[0].clone(),
3707 elector,
3708 blocker: oracle.control(me.clone()),
3709 automaton: application.clone(),
3710 relay: application,
3711 reporter,
3712 strategy: Sequential,
3713 partition: me.to_string(),
3714 mailbox_size: NZUsize!(1),
3715 epoch,
3716 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(epoch)),
3717 leader_timeout: Duration::from_secs(1),
3718 certification_timeout: Duration::from_secs(2),
3719 timeout_retry: Duration::from_secs(10),
3720 fetch_timeout: Duration::from_secs(1),
3721 activity_timeout: ViewDelta::new(10),
3722 skip_timeout: ViewDelta::new(5),
3723 fetch_concurrent: NZUsize!(4),
3724 replay_buffer: NZUsize!(1024 * 1024),
3725 write_buffer: NZUsize!(1024 * 1024),
3726 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3727 forwarding: ForwardingPolicy::Disabled,
3728 };
3729 let engine = Engine::new(context.child("engine"), cfg);
3730 engine.start(pending, recovered, resolver);
3731
3732 while latest < View::new(3) {
3733 latest = monitor.recv().await.expect("finalization event missing");
3734 }
3735 });
3736 }
3737
3738 fn impersonator<S, F, L>(seed: u64, mut fixture: F)
3739 where
3740 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3741 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3742 L: Elector<S>,
3743 {
3744 let n = 4;
3746 let required_containers = View::new(50);
3747 let activity_timeout = ViewDelta::new(10);
3748 let skip_timeout = ViewDelta::new(5);
3749 let namespace = b"consensus".to_vec();
3750 let cfg = deterministic::Config::new()
3751 .with_seed(seed)
3752 .with_timeout(Some(Duration::from_secs(30)));
3753 let executor = deterministic::Runner::new(cfg);
3754 executor.start(|mut context| async move {
3755 let Fixture {
3757 participants,
3758 schemes,
3759 ..
3760 } = fixture(&mut context, &namespace, n);
3761 let mut oracle =
3762 start_test_network_with_peers(context.child("network"), participants.clone(), true)
3763 .await;
3764 let mut registrations = register_validators(&mut oracle, &participants).await;
3765
3766 let link = Link {
3768 latency: Duration::from_millis(10),
3769 jitter: Duration::from_millis(1),
3770 success_rate: 1.0,
3771 };
3772 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3773
3774 let elector = L::default();
3776 let relay = Arc::new(mocks::relay::Relay::new());
3777 let mut reporters = Vec::new();
3778 for (idx_scheme, validator) in participants.iter().enumerate() {
3779 let context = context
3781 .child("validator")
3782 .with_attribute("public_key", validator);
3783
3784 let reporter_config = mocks::reporter::Config {
3786 participants: participants.clone().try_into().unwrap(),
3787 scheme: schemes[idx_scheme].clone(),
3788 elector: elector.clone(),
3789 };
3790 let reporter =
3791 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
3792 let (pending, recovered, resolver) = registrations
3793 .remove(validator)
3794 .expect("validator should be registered");
3795 if idx_scheme == 0 {
3796 let cfg = mocks::impersonator::Config {
3797 scheme: schemes[idx_scheme].clone(),
3798 };
3799
3800 let engine: mocks::impersonator::Impersonator<_, _, Sha256> =
3801 mocks::impersonator::Impersonator::new(
3802 context.child("byzantine_engine"),
3803 cfg,
3804 );
3805 engine.start(pending);
3806 } else {
3807 reporters.push(reporter.clone());
3808 let application_cfg = mocks::application::Config {
3809 hasher: Sha256::default(),
3810 relay: relay.clone(),
3811 me: validator.clone(),
3812 propose_latency: (10.0, 5.0),
3813 verify_latency: (10.0, 5.0),
3814 certify_latency: (10.0, 5.0),
3815 should_certify: mocks::application::Certifier::Always,
3816 };
3817 let (actor, application) = mocks::application::Application::new(
3818 context.child("application"),
3819 application_cfg,
3820 );
3821 actor.start();
3822 let blocker = oracle.control(validator.clone());
3823 let cfg = config::Config {
3824 scheme: schemes[idx_scheme].clone(),
3825 elector: elector.clone(),
3826 blocker,
3827 automaton: application.clone(),
3828 relay: application.clone(),
3829 reporter: reporter.clone(),
3830 strategy: Sequential,
3831 partition: validator.clone().to_string(),
3832 mailbox_size: NZUsize!(1024),
3833 epoch: Epoch::new(333),
3834 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
3835 Epoch::new(333),
3836 )),
3837 leader_timeout: Duration::from_secs(1),
3838 certification_timeout: Duration::from_secs(2),
3839 timeout_retry: Duration::from_secs(10),
3840 fetch_timeout: Duration::from_secs(1),
3841 activity_timeout,
3842 skip_timeout,
3843 fetch_concurrent: NZUsize!(4),
3844 replay_buffer: NZUsize!(1024 * 1024),
3845 write_buffer: NZUsize!(1024 * 1024),
3846 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3847 forwarding: ForwardingPolicy::Disabled,
3848 };
3849 let engine = Engine::new(context.child("engine"), cfg);
3850 engine.start(pending, recovered, resolver);
3851 }
3852 }
3853
3854 let mut finalizers = Vec::new();
3856 for reporter in reporters.iter_mut() {
3857 let (mut latest, mut monitor) = reporter.subscribe().await;
3858 finalizers.push(context.child("finalizer").spawn(move |_| async move {
3859 while latest < required_containers {
3860 latest = monitor.recv().await.expect("event missing");
3861 }
3862 }));
3863 }
3864 join_all(finalizers).await;
3865
3866 let byz = &participants[0];
3868 for reporter in reporters.iter() {
3869 reporter.assert_no_faults();
3871
3872 reporter.assert_no_invalid();
3874 }
3875
3876 let blocked = oracle.blocked().await.unwrap();
3878 assert!(!blocked.is_empty());
3879 for (a, b) in blocked {
3880 assert_ne!(&a, byz);
3881 assert_eq!(&b, byz);
3882 }
3883 });
3884 }
3885
3886 #[test_group("slow")]
3887 #[test_traced]
3888 fn test_impersonator() {
3889 for seed in 0..5 {
3890 impersonator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3891 impersonator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3892 impersonator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3893 impersonator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3894 impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3895 impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3896 impersonator::<_, _, RoundRobin>(seed, ed25519::fixture);
3897 impersonator::<_, _, RoundRobin>(seed, secp256r1::fixture);
3898 }
3899 }
3900
3901 fn equivocator<S, F, L>(seed: u64, mut fixture: F) -> bool
3902 where
3903 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3904 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3905 L: Elector<S>,
3906 {
3907 let n = 7;
3909 let required_containers = View::new(50);
3910 let activity_timeout = ViewDelta::new(10);
3911 let skip_timeout = ViewDelta::new(5);
3912 let namespace = b"consensus".to_vec();
3913 let cfg = deterministic::Config::new()
3914 .with_seed(seed)
3915 .with_timeout(Some(Duration::from_secs(60)));
3916 let executor = deterministic::Runner::new(cfg);
3917 executor.start(|mut context| async move {
3918 let Fixture {
3920 participants,
3921 schemes,
3922 ..
3923 } = fixture(&mut context, &namespace, n);
3924 let mut oracle =
3925 start_test_network_with_peers(context.child("network"), participants.clone(), true)
3926 .await;
3927 let mut registrations = register_validators(&mut oracle, &participants).await;
3928
3929 let link = Link {
3931 latency: Duration::from_millis(10),
3932 jitter: Duration::from_millis(1),
3933 success_rate: 1.0,
3934 };
3935 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3936
3937 let elector = L::default();
3939 let mut engines = Vec::new();
3940 let relay = Arc::new(mocks::relay::Relay::new());
3941 let mut reporters = Vec::new();
3942 for (idx_scheme, validator) in participants.iter().enumerate() {
3943 let context = context
3945 .child("validator")
3946 .with_attribute("public_key", validator);
3947
3948 let reporter_config = mocks::reporter::Config {
3950 participants: participants.clone().try_into().unwrap(),
3951 scheme: schemes[idx_scheme].clone(),
3952 elector: elector.clone(),
3953 };
3954 let reporter =
3955 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
3956 reporters.push(reporter.clone());
3957 let (pending, recovered, resolver) = registrations
3958 .remove(validator)
3959 .expect("validator should be registered");
3960 if idx_scheme == 0 {
3961 let cfg = mocks::equivocator::Config {
3962 scheme: schemes[idx_scheme].clone(),
3963 epoch: Epoch::new(333),
3964 relay: relay.clone(),
3965 hasher: Sha256::default(),
3966 elector: elector.clone(),
3967 };
3968
3969 let engine = mocks::equivocator::Equivocator::new(
3970 context.child("byzantine_engine"),
3971 cfg,
3972 );
3973 engines.push(engine.start(pending, recovered));
3974 } else {
3975 let application_cfg = mocks::application::Config {
3976 hasher: Sha256::default(),
3977 relay: relay.clone(),
3978 me: validator.clone(),
3979 propose_latency: (10.0, 5.0),
3980 verify_latency: (10.0, 5.0),
3981 certify_latency: (10.0, 5.0),
3982 should_certify: mocks::application::Certifier::Always,
3983 };
3984 let (actor, application) = mocks::application::Application::new(
3985 context.child("application"),
3986 application_cfg,
3987 );
3988 actor.start();
3989 let blocker = oracle.control(validator.clone());
3990 let cfg = config::Config {
3991 scheme: schemes[idx_scheme].clone(),
3992 elector: elector.clone(),
3993 blocker,
3994 automaton: application.clone(),
3995 relay: application.clone(),
3996 reporter: reporter.clone(),
3997 strategy: Sequential,
3998 partition: validator.to_string(),
3999 mailbox_size: NZUsize!(1024),
4000 epoch: Epoch::new(333),
4001 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
4002 Epoch::new(333),
4003 )),
4004 leader_timeout: Duration::from_secs(1),
4005 certification_timeout: Duration::from_secs(2),
4006 timeout_retry: Duration::from_secs(10),
4007 fetch_timeout: Duration::from_secs(1),
4008 activity_timeout,
4009 skip_timeout,
4010 fetch_concurrent: NZUsize!(4),
4011 replay_buffer: NZUsize!(1024 * 1024),
4012 write_buffer: NZUsize!(1024 * 1024),
4013 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4014 forwarding: ForwardingPolicy::Disabled,
4015 };
4016 let engine = Engine::new(context.child("engine"), cfg);
4017 engines.push(engine.start(pending, recovered, resolver));
4018 }
4019 }
4020
4021 let mut finalizers = Vec::new();
4023 for reporter in reporters.iter_mut().skip(1) {
4024 let (mut latest, mut monitor) = reporter.subscribe().await;
4025 finalizers.push(context.child("finalizer").spawn(move |_| async move {
4026 while latest < required_containers {
4027 latest = monitor.recv().await.expect("event missing");
4028 }
4029 }));
4030 }
4031 join_all(finalizers).await;
4032
4033 let idx = context.gen_range(1..engines.len()); let validator = &participants[idx];
4036 let handle = engines.remove(idx);
4037 handle.abort();
4038 let _ = handle.await;
4039 reporters.remove(idx);
4040 info!(idx, ?validator, "aborted validator");
4041
4042 let mut finalizers = Vec::new();
4044 for reporter in reporters.iter_mut().skip(1) {
4045 let (mut latest, mut monitor) = reporter.subscribe().await;
4046 finalizers.push(context.child("finalizer").spawn(move |_| async move {
4047 while latest < View::new(required_containers.get() * 2) {
4048 latest = monitor.recv().await.expect("event missing");
4049 }
4050 }));
4051 }
4052 join_all(finalizers).await;
4053
4054 info!(idx, ?validator, "restarting validator");
4056 let context = context
4057 .child("validator_restarted")
4058 .with_attribute("public_key", validator);
4059
4060 let reporter_config = mocks::reporter::Config {
4062 participants: participants.clone().try_into().unwrap(),
4063 scheme: schemes[idx].clone(),
4064 elector: elector.clone(),
4065 };
4066 let reporter =
4067 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
4068 let (pending, recovered, resolver) =
4069 register_validator(&mut oracle, validator.clone()).await;
4070 reporters.push(reporter.clone());
4071 let application_cfg = mocks::application::Config {
4072 hasher: Sha256::default(),
4073 relay: relay.clone(),
4074 me: validator.clone(),
4075 propose_latency: (10.0, 5.0),
4076 verify_latency: (10.0, 5.0),
4077 certify_latency: (10.0, 5.0),
4078 should_certify: mocks::application::Certifier::Always,
4079 };
4080 let (actor, application) =
4081 mocks::application::Application::new(context.child("application"), application_cfg);
4082 actor.start();
4083 let blocker = oracle.control(validator.clone());
4084 let cfg = config::Config {
4085 scheme: schemes[idx].clone(),
4086 elector: elector.clone(),
4087 blocker,
4088 automaton: application.clone(),
4089 relay: application.clone(),
4090 reporter: reporter.clone(),
4091 strategy: Sequential,
4092 partition: validator.to_string(),
4093 mailbox_size: NZUsize!(1024),
4094 epoch: Epoch::new(333),
4095 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(Epoch::new(
4096 333,
4097 ))),
4098 leader_timeout: Duration::from_secs(1),
4099 certification_timeout: Duration::from_secs(2),
4100 timeout_retry: Duration::from_secs(10),
4101 fetch_timeout: Duration::from_secs(1),
4102 activity_timeout,
4103 skip_timeout,
4104 fetch_concurrent: NZUsize!(4),
4105 replay_buffer: NZUsize!(1024 * 1024),
4106 write_buffer: NZUsize!(1024 * 1024),
4107 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4108 forwarding: ForwardingPolicy::Disabled,
4109 };
4110 let engine = Engine::new(context.child("engine"), cfg);
4111 engine.start(pending, recovered, resolver);
4112
4113 let mut finalizers = Vec::new();
4115 for reporter in reporters.iter_mut().skip(1) {
4116 let (mut latest, mut monitor) = reporter.subscribe().await;
4117 finalizers.push(context.child("finalizer").spawn(move |_| async move {
4118 while latest < View::new(required_containers.get() * 3) {
4119 latest = monitor.recv().await.expect("event missing");
4120 }
4121 }));
4122 }
4123 join_all(finalizers).await;
4124
4125 let byz = &participants[0];
4129 let blocked = oracle.blocked().await.unwrap();
4130 for (a, b) in &blocked {
4131 assert_ne!(a, byz);
4132 assert_eq!(b, byz);
4133 }
4134 !blocked.is_empty()
4135 })
4136 }
4137
4138 #[test_group("slow")]
4139 #[test_traced]
4140 fn test_equivocator_bls12381_threshold_vrf_min_pk() {
4141 let detected = (0..5).any(|seed| {
4142 equivocator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>)
4143 });
4144 assert!(
4145 detected,
4146 "expected at least one seed to detect equivocation"
4147 );
4148 }
4149
4150 #[test_group("slow")]
4151 #[test_traced]
4152 fn test_equivocator_bls12381_threshold_vrf_min_sig() {
4153 let detected = (0..5).any(|seed| {
4154 equivocator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>)
4155 });
4156 assert!(
4157 detected,
4158 "expected at least one seed to detect equivocation"
4159 );
4160 }
4161
4162 #[test_group("slow")]
4163 #[test_traced]
4164 fn test_equivocator_bls12381_threshold_std_min_pk() {
4165 let detected = (0..5).any(|seed| {
4166 equivocator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>)
4167 });
4168 assert!(
4169 detected,
4170 "expected at least one seed to detect equivocation"
4171 );
4172 }
4173
4174 #[test_group("slow")]
4175 #[test_traced]
4176 fn test_equivocator_bls12381_threshold_std_min_sig() {
4177 let detected = (0..5).any(|seed| {
4178 equivocator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>)
4179 });
4180 assert!(
4181 detected,
4182 "expected at least one seed to detect equivocation"
4183 );
4184 }
4185
4186 #[test_group("slow")]
4187 #[test_traced]
4188 fn test_equivocator_bls12381_multisig_min_pk() {
4189 let detected = (0..5).any(|seed| {
4190 equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>)
4191 });
4192 assert!(
4193 detected,
4194 "expected at least one seed to detect equivocation"
4195 );
4196 }
4197
4198 #[test_group("slow")]
4199 #[test_traced]
4200 fn test_equivocator_bls12381_multisig_min_sig() {
4201 let detected = (0..5).any(|seed| {
4202 equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>)
4203 });
4204 assert!(
4205 detected,
4206 "expected at least one seed to detect equivocation"
4207 );
4208 }
4209
4210 #[test_group("slow")]
4211 #[test_traced]
4212 fn test_equivocator_ed25519() {
4213 let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, ed25519::fixture));
4214 assert!(
4215 detected,
4216 "expected at least one seed to detect equivocation"
4217 );
4218 }
4219
4220 #[test_group("slow")]
4221 #[test_traced]
4222 fn test_equivocator_secp256r1() {
4223 let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, secp256r1::fixture));
4224 assert!(
4225 detected,
4226 "expected at least one seed to detect equivocation"
4227 );
4228 }
4229
4230 fn reconfigurer<S, F, L>(seed: u64, mut fixture: F)
4231 where
4232 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4233 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4234 L: Elector<S>,
4235 {
4236 let n = 4;
4238 let required_containers = View::new(50);
4239 let activity_timeout = ViewDelta::new(10);
4240 let skip_timeout = ViewDelta::new(5);
4241 let namespace = b"consensus".to_vec();
4242 let cfg = deterministic::Config::new()
4243 .with_seed(seed)
4244 .with_timeout(Some(Duration::from_secs(30)));
4245 let executor = deterministic::Runner::new(cfg);
4246 executor.start(|mut context| async move {
4247 let Fixture {
4249 participants,
4250 schemes,
4251 ..
4252 } = fixture(&mut context, &namespace, n);
4253 let mut oracle =
4254 start_test_network_with_peers(context.child("network"), participants.clone(), true)
4255 .await;
4256 let mut registrations = register_validators(&mut oracle, &participants).await;
4257
4258 let link = Link {
4260 latency: Duration::from_millis(10),
4261 jitter: Duration::from_millis(1),
4262 success_rate: 1.0,
4263 };
4264 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4265
4266 let elector = L::default();
4268 let relay = Arc::new(mocks::relay::Relay::new());
4269 let mut reporters = Vec::new();
4270 for (idx_scheme, validator) in participants.iter().enumerate() {
4271 let context = context
4273 .child("validator")
4274 .with_attribute("public_key", validator);
4275
4276 let reporter_config = mocks::reporter::Config {
4278 participants: participants.clone().try_into().unwrap(),
4279 scheme: schemes[idx_scheme].clone(),
4280 elector: elector.clone(),
4281 };
4282 let reporter =
4283 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
4284 let (pending, recovered, resolver) = registrations
4285 .remove(validator)
4286 .expect("validator should be registered");
4287 if idx_scheme == 0 {
4288 let cfg = mocks::reconfigurer::Config {
4289 scheme: schemes[idx_scheme].clone(),
4290 };
4291 let engine: mocks::reconfigurer::Reconfigurer<_, _, Sha256> =
4292 mocks::reconfigurer::Reconfigurer::new(
4293 context.child("byzantine_engine"),
4294 cfg,
4295 );
4296 engine.start(pending);
4297 } else {
4298 reporters.push(reporter.clone());
4299 let application_cfg = mocks::application::Config {
4300 hasher: Sha256::default(),
4301 relay: relay.clone(),
4302 me: validator.clone(),
4303 propose_latency: (10.0, 5.0),
4304 verify_latency: (10.0, 5.0),
4305 certify_latency: (10.0, 5.0),
4306 should_certify: mocks::application::Certifier::Always,
4307 };
4308 let (actor, application) = mocks::application::Application::new(
4309 context.child("application"),
4310 application_cfg,
4311 );
4312 actor.start();
4313 let blocker = oracle.control(validator.clone());
4314 let cfg = config::Config {
4315 scheme: schemes[idx_scheme].clone(),
4316 elector: elector.clone(),
4317 blocker,
4318 automaton: application.clone(),
4319 relay: application.clone(),
4320 reporter: reporter.clone(),
4321 strategy: Sequential,
4322 partition: validator.to_string(),
4323 mailbox_size: NZUsize!(1024),
4324 epoch: Epoch::new(333),
4325 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
4326 Epoch::new(333),
4327 )),
4328 leader_timeout: Duration::from_secs(1),
4329 certification_timeout: Duration::from_secs(2),
4330 timeout_retry: Duration::from_secs(10),
4331 fetch_timeout: Duration::from_secs(1),
4332 activity_timeout,
4333 skip_timeout,
4334 fetch_concurrent: NZUsize!(4),
4335 replay_buffer: NZUsize!(1024 * 1024),
4336 write_buffer: NZUsize!(1024 * 1024),
4337 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4338 forwarding: ForwardingPolicy::Disabled,
4339 };
4340 let engine = Engine::new(context.child("engine"), cfg);
4341 engine.start(pending, recovered, resolver);
4342 }
4343 }
4344
4345 let mut finalizers = Vec::new();
4347 for reporter in reporters.iter_mut() {
4348 let (mut latest, mut monitor) = reporter.subscribe().await;
4349 finalizers.push(context.child("finalizer").spawn(move |_| async move {
4350 while latest < required_containers {
4351 latest = monitor.recv().await.expect("event missing");
4352 }
4353 }));
4354 }
4355 join_all(finalizers).await;
4356
4357 let byz = &participants[0];
4359 for reporter in reporters.iter() {
4360 reporter.assert_no_faults();
4362
4363 reporter.assert_no_invalid();
4365 }
4366
4367 let blocked = oracle.blocked().await.unwrap();
4369 assert!(!blocked.is_empty());
4370 for (a, b) in blocked {
4371 assert_ne!(&a, byz);
4372 assert_eq!(&b, byz);
4373 }
4374 });
4375 }
4376
4377 #[test_group("slow")]
4378 #[test_traced]
4379 fn test_reconfigurer() {
4380 for seed in 0..5 {
4381 reconfigurer::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
4382 reconfigurer::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
4383 reconfigurer::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
4384 reconfigurer::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
4385 reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
4386 reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
4387 reconfigurer::<_, _, RoundRobin>(seed, ed25519::fixture);
4388 reconfigurer::<_, _, RoundRobin>(seed, secp256r1::fixture);
4389 }
4390 }
4391
4392 fn nuller<S, F, L>(seed: u64, mut fixture: F)
4393 where
4394 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4395 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4396 L: Elector<S>,
4397 {
4398 let n = 4;
4400 let required_containers = View::new(50);
4401 let activity_timeout = ViewDelta::new(10);
4402 let skip_timeout = ViewDelta::new(5);
4403 let namespace = b"consensus".to_vec();
4404 let cfg = deterministic::Config::new()
4405 .with_seed(seed)
4406 .with_timeout(Some(Duration::from_secs(30)));
4407 let executor = deterministic::Runner::new(cfg);
4408 executor.start(|mut context| async move {
4409 let Fixture {
4411 participants,
4412 schemes,
4413 ..
4414 } = fixture(&mut context, &namespace, n);
4415 let mut oracle =
4416 start_test_network_with_peers(context.child("network"), participants.clone(), true)
4417 .await;
4418 let mut registrations = register_validators(&mut oracle, &participants).await;
4419
4420 let link = Link {
4422 latency: Duration::from_millis(10),
4423 jitter: Duration::from_millis(1),
4424 success_rate: 1.0,
4425 };
4426 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4427
4428 let elector = L::default();
4430 let relay = Arc::new(mocks::relay::Relay::new());
4431 let mut reporters = Vec::new();
4432 for (idx_scheme, validator) in participants.iter().enumerate() {
4433 let context = context
4435 .child("validator")
4436 .with_attribute("public_key", validator);
4437
4438 let reporter_config = mocks::reporter::Config {
4440 participants: participants.clone().try_into().unwrap(),
4441 scheme: schemes[idx_scheme].clone(),
4442 elector: elector.clone(),
4443 };
4444 let reporter =
4445 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
4446 let (pending, recovered, resolver) = registrations
4447 .remove(validator)
4448 .expect("validator should be registered");
4449 if idx_scheme == 0 {
4450 let cfg = mocks::nuller::Config {
4451 scheme: schemes[idx_scheme].clone(),
4452 };
4453 let engine: mocks::nuller::Nuller<_, _, Sha256> =
4454 mocks::nuller::Nuller::new(context.child("byzantine_engine"), cfg);
4455 engine.start(pending);
4456 } else {
4457 reporters.push(reporter.clone());
4458 let application_cfg = mocks::application::Config {
4459 hasher: Sha256::default(),
4460 relay: relay.clone(),
4461 me: validator.clone(),
4462 propose_latency: (10.0, 5.0),
4463 verify_latency: (10.0, 5.0),
4464 certify_latency: (10.0, 5.0),
4465 should_certify: mocks::application::Certifier::Always,
4466 };
4467 let (actor, application) = mocks::application::Application::new(
4468 context.child("application"),
4469 application_cfg,
4470 );
4471 actor.start();
4472 let blocker = oracle.control(validator.clone());
4473 let cfg = config::Config {
4474 scheme: schemes[idx_scheme].clone(),
4475 elector: elector.clone(),
4476 blocker,
4477 automaton: application.clone(),
4478 relay: application.clone(),
4479 reporter: reporter.clone(),
4480 strategy: Sequential,
4481 partition: validator.clone().to_string(),
4482 mailbox_size: NZUsize!(1024),
4483 epoch: Epoch::new(333),
4484 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
4485 Epoch::new(333),
4486 )),
4487 leader_timeout: Duration::from_secs(1),
4488 certification_timeout: Duration::from_secs(2),
4489 timeout_retry: Duration::from_secs(10),
4490 fetch_timeout: Duration::from_secs(1),
4491 activity_timeout,
4492 skip_timeout,
4493 fetch_concurrent: NZUsize!(4),
4494 replay_buffer: NZUsize!(1024 * 1024),
4495 write_buffer: NZUsize!(1024 * 1024),
4496 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4497 forwarding: ForwardingPolicy::Disabled,
4498 };
4499 let engine = Engine::new(context.child("engine"), cfg);
4500 engine.start(pending, recovered, resolver);
4501 }
4502 }
4503
4504 let mut finalizers = Vec::new();
4506 for reporter in reporters.iter_mut() {
4507 let (mut latest, mut monitor) = reporter.subscribe().await;
4508 finalizers.push(context.child("finalizer").spawn(move |_| async move {
4509 while latest < required_containers {
4510 latest = monitor.recv().await.expect("event missing");
4511 }
4512 }));
4513 }
4514 join_all(finalizers).await;
4515
4516 let byz = &participants[0];
4518 let mut count_nullify_and_finalize = 0;
4519 for reporter in reporters.iter() {
4520 {
4522 let faults = reporter.faults.lock();
4523 assert_eq!(faults.len(), 1);
4524 let faulter = faults.get(byz).expect("byzantine party is not faulter");
4525 for (_, faults) in faulter.iter() {
4526 for fault in faults.iter() {
4527 match fault {
4528 Activity::NullifyFinalize(_) => {
4529 count_nullify_and_finalize += 1;
4530 }
4531 _ => panic!("unexpected fault: {fault:?}"),
4532 }
4533 }
4534 }
4535 }
4536
4537 reporter.assert_no_invalid();
4539 }
4540 assert!(count_nullify_and_finalize > 0);
4541
4542 let blocked = oracle.blocked().await.unwrap();
4544 assert!(!blocked.is_empty());
4545 for (a, b) in blocked {
4546 assert_ne!(&a, byz);
4547 assert_eq!(&b, byz);
4548 }
4549 });
4550 }
4551
4552 #[test_group("slow")]
4553 #[test_traced]
4554 fn test_nuller() {
4555 for seed in 0..5 {
4556 nuller::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
4557 nuller::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
4558 nuller::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
4559 nuller::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
4560 nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
4561 nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
4562 nuller::<_, _, RoundRobin>(seed, ed25519::fixture);
4563 nuller::<_, _, RoundRobin>(seed, secp256r1::fixture);
4564 }
4565 }
4566
4567 fn outdated<S, F, L>(seed: u64, mut fixture: F)
4568 where
4569 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4570 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4571 L: Elector<S>,
4572 {
4573 let n = 4;
4575 let required_containers = View::new(100);
4576 let activity_timeout = ViewDelta::new(10);
4577 let skip_timeout = ViewDelta::new(5);
4578 let namespace = b"consensus".to_vec();
4579 let cfg = deterministic::Config::new()
4580 .with_seed(seed)
4581 .with_timeout(Some(Duration::from_secs(30)));
4582 let executor = deterministic::Runner::new(cfg);
4583 executor.start(|mut context| async move {
4584 let Fixture {
4586 participants,
4587 schemes,
4588 ..
4589 } = fixture(&mut context, &namespace, n);
4590 let mut oracle =
4591 start_test_network_with_peers(context.child("network"), participants.clone(), true)
4592 .await;
4593 let mut registrations = register_validators(&mut oracle, &participants).await;
4594
4595 let link = Link {
4597 latency: Duration::from_millis(10),
4598 jitter: Duration::from_millis(1),
4599 success_rate: 1.0,
4600 };
4601 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4602
4603 let elector = L::default();
4605 let relay = Arc::new(mocks::relay::Relay::new());
4606 let mut reporters = Vec::new();
4607 for (idx_scheme, validator) in participants.iter().enumerate() {
4608 let context = context
4610 .child("validator")
4611 .with_attribute("public_key", validator);
4612
4613 let reporter_config = mocks::reporter::Config {
4615 participants: participants.clone().try_into().unwrap(),
4616 scheme: schemes[idx_scheme].clone(),
4617 elector: elector.clone(),
4618 };
4619 let reporter =
4620 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
4621 let (pending, recovered, resolver) = registrations
4622 .remove(validator)
4623 .expect("validator should be registered");
4624 if idx_scheme == 0 {
4625 let cfg = mocks::outdated::Config {
4626 scheme: schemes[idx_scheme].clone(),
4627 view_delta: ViewDelta::new(activity_timeout.get().saturating_mul(4)),
4628 };
4629 let engine: mocks::outdated::Outdated<_, _, Sha256> =
4630 mocks::outdated::Outdated::new(context.child("byzantine_engine"), cfg);
4631 engine.start(pending);
4632 } else {
4633 reporters.push(reporter.clone());
4634 let application_cfg = mocks::application::Config {
4635 hasher: Sha256::default(),
4636 relay: relay.clone(),
4637 me: validator.clone(),
4638 propose_latency: (10.0, 5.0),
4639 verify_latency: (10.0, 5.0),
4640 certify_latency: (10.0, 5.0),
4641 should_certify: mocks::application::Certifier::Always,
4642 };
4643 let (actor, application) = mocks::application::Application::new(
4644 context.child("application"),
4645 application_cfg,
4646 );
4647 actor.start();
4648 let blocker = oracle.control(validator.clone());
4649 let cfg = config::Config {
4650 scheme: schemes[idx_scheme].clone(),
4651 elector: elector.clone(),
4652 blocker,
4653 automaton: application.clone(),
4654 relay: application.clone(),
4655 reporter: reporter.clone(),
4656 strategy: Sequential,
4657 partition: validator.clone().to_string(),
4658 mailbox_size: NZUsize!(1024),
4659 epoch: Epoch::new(333),
4660 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
4661 Epoch::new(333),
4662 )),
4663 leader_timeout: Duration::from_secs(1),
4664 certification_timeout: Duration::from_secs(2),
4665 timeout_retry: Duration::from_secs(10),
4666 fetch_timeout: Duration::from_secs(1),
4667 activity_timeout,
4668 skip_timeout,
4669 fetch_concurrent: NZUsize!(4),
4670 replay_buffer: NZUsize!(1024 * 1024),
4671 write_buffer: NZUsize!(1024 * 1024),
4672 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4673 forwarding: ForwardingPolicy::Disabled,
4674 };
4675 let engine = Engine::new(context.child("engine"), cfg);
4676 engine.start(pending, recovered, resolver);
4677 }
4678 }
4679
4680 let mut finalizers = Vec::new();
4682 for reporter in reporters.iter_mut() {
4683 let (mut latest, mut monitor) = reporter.subscribe().await;
4684 finalizers.push(context.child("finalizer").spawn(move |_| async move {
4685 while latest < required_containers {
4686 latest = monitor.recv().await.expect("event missing");
4687 }
4688 }));
4689 }
4690 join_all(finalizers).await;
4691
4692 for reporter in reporters.iter() {
4694 reporter.assert_no_faults();
4696
4697 reporter.assert_no_invalid();
4699 }
4700
4701 let blocked = oracle.blocked().await.unwrap();
4703 assert!(blocked.is_empty());
4704 });
4705 }
4706
4707 #[test_group("slow")]
4708 #[test_traced]
4709 fn test_outdated() {
4710 for seed in 0..5 {
4711 outdated::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
4712 outdated::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
4713 outdated::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
4714 outdated::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
4715 outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
4716 outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
4717 outdated::<_, _, RoundRobin>(seed, ed25519::fixture);
4718 outdated::<_, _, RoundRobin>(seed, secp256r1::fixture);
4719 }
4720 }
4721
4722 fn run_1k<S, F, L>(mut fixture: F)
4723 where
4724 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4725 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4726 L: Elector<S>,
4727 {
4728 let n = 10;
4730 let required_containers = View::new(1_000);
4731 let activity_timeout = ViewDelta::new(10);
4732 let skip_timeout = ViewDelta::new(5);
4733 let namespace = b"consensus".to_vec();
4734 let cfg = deterministic::Config::new();
4735 let executor = deterministic::Runner::new(cfg);
4736 executor.start(|mut context| async move {
4737 let Fixture {
4739 participants,
4740 schemes,
4741 ..
4742 } = fixture(&mut context, &namespace, n);
4743 let mut oracle =
4744 start_test_network_with_peers(context.child("network"), participants.clone(), true)
4745 .await;
4746 let mut registrations = register_validators(&mut oracle, &participants).await;
4747
4748 let link = Link {
4750 latency: Duration::from_millis(80),
4751 jitter: Duration::from_millis(10),
4752 success_rate: 0.98,
4753 };
4754 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4755
4756 let elector = L::default();
4758 let relay = Arc::new(mocks::relay::Relay::new());
4759 let mut reporters = Vec::new();
4760 let mut engine_handlers = Vec::new();
4761 for (idx, validator) in participants.iter().enumerate() {
4762 let context = context
4764 .child("validator")
4765 .with_attribute("public_key", validator);
4766
4767 let reporter_config = mocks::reporter::Config {
4769 participants: participants.clone().try_into().unwrap(),
4770 scheme: schemes[idx].clone(),
4771 elector: elector.clone(),
4772 };
4773 let reporter =
4774 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
4775 reporters.push(reporter.clone());
4776 let application_cfg = mocks::application::Config {
4777 hasher: Sha256::default(),
4778 relay: relay.clone(),
4779 me: validator.clone(),
4780 propose_latency: (100.0, 50.0),
4781 verify_latency: (50.0, 40.0),
4782 certify_latency: (50.0, 40.0),
4783 should_certify: mocks::application::Certifier::Always,
4784 };
4785 let (actor, application) = mocks::application::Application::new(
4786 context.child("application"),
4787 application_cfg,
4788 );
4789 actor.start();
4790 let blocker = oracle.control(validator.clone());
4791 let cfg = config::Config {
4792 scheme: schemes[idx].clone(),
4793 elector: elector.clone(),
4794 blocker,
4795 automaton: application.clone(),
4796 relay: application.clone(),
4797 reporter: reporter.clone(),
4798 strategy: Sequential,
4799 partition: validator.to_string(),
4800 mailbox_size: NZUsize!(1024),
4801 epoch: Epoch::new(333),
4802 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
4803 Epoch::new(333),
4804 )),
4805 leader_timeout: Duration::from_secs(1),
4806 certification_timeout: Duration::from_secs(2),
4807 timeout_retry: Duration::from_secs(10),
4808 fetch_timeout: Duration::from_secs(1),
4809 activity_timeout,
4810 skip_timeout,
4811 fetch_concurrent: NZUsize!(4),
4812 replay_buffer: NZUsize!(1024 * 1024),
4813 write_buffer: NZUsize!(1024 * 1024),
4814 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4815 forwarding: ForwardingPolicy::Disabled,
4816 };
4817 let engine = Engine::new(context.child("engine"), cfg);
4818
4819 let (pending, recovered, resolver) = registrations
4821 .remove(validator)
4822 .expect("validator should be registered");
4823 engine_handlers.push(engine.start(pending, recovered, resolver));
4824 }
4825
4826 let mut finalizers = Vec::new();
4828 for reporter in reporters.iter_mut() {
4829 let (mut latest, mut monitor) = reporter.subscribe().await;
4830 finalizers.push(context.child("finalizer").spawn(move |_| async move {
4831 while latest < required_containers {
4832 latest = monitor.recv().await.expect("event missing");
4833 }
4834 }));
4835 }
4836 join_all(finalizers).await;
4837
4838 for reporter in reporters.iter() {
4840 reporter.assert_no_faults();
4842
4843 reporter.assert_no_invalid();
4845 }
4846
4847 let blocked = oracle.blocked().await.unwrap();
4849 assert!(blocked.is_empty());
4850 })
4851 }
4852
4853 #[test_group("slow")]
4854 #[test_traced]
4855 fn test_1k_bls12381_threshold_vrf_min_pk() {
4856 run_1k::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
4857 }
4858
4859 #[test_group("slow")]
4860 #[test_traced]
4861 fn test_1k_bls12381_threshold_vrf_min_sig() {
4862 run_1k::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
4863 }
4864
4865 #[test_group("slow")]
4866 #[test_traced]
4867 fn test_1k_bls12381_threshold_std_min_pk() {
4868 run_1k::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
4869 }
4870
4871 #[test_group("slow")]
4872 #[test_traced]
4873 fn test_1k_bls12381_threshold_std_min_sig() {
4874 run_1k::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
4875 }
4876
4877 #[test_group("slow")]
4878 #[test_traced]
4879 fn test_1k_bls12381_multisig_min_pk() {
4880 run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4881 }
4882
4883 #[test_group("slow")]
4884 #[test_traced]
4885 fn test_1k_bls12381_multisig_min_sig() {
4886 run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
4887 }
4888
4889 #[test_group("slow")]
4890 #[test_traced]
4891 fn test_1k_ed25519() {
4892 run_1k::<_, _, RoundRobin>(ed25519::fixture);
4893 }
4894
4895 #[test_group("slow")]
4896 #[test_traced]
4897 fn test_1k_secp256r1() {
4898 run_1k::<_, _, RoundRobin>(secp256r1::fixture);
4899 }
4900
4901 fn engine_shutdown<S, F, L>(seed: u64, mut fixture: F, graceful: bool)
4902 where
4903 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4904 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4905 L: Elector<S>,
4906 {
4907 let n = 1;
4908 let namespace = b"consensus".to_vec();
4909 let cfg = deterministic::Config::default()
4910 .with_seed(seed)
4911 .with_timeout(Some(Duration::from_secs(10)));
4912 let executor = deterministic::Runner::new(cfg);
4913 executor.start(|mut context| async move {
4914 let Fixture {
4916 participants,
4917 schemes,
4918 ..
4919 } = fixture(&mut context, &namespace, n);
4920 let mut oracle =
4921 start_test_network_with_peers(context.child("network"), participants.clone(), true)
4922 .await;
4923 let mut registrations = register_validators(&mut oracle, &participants).await;
4924
4925 let link = Link {
4927 latency: Duration::from_millis(1),
4928 jitter: Duration::from_millis(0),
4929 success_rate: 1.0,
4930 };
4931 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4932
4933 let elector = L::default();
4935 let reporter_config = mocks::reporter::Config {
4936 participants: participants.clone().try_into().unwrap(),
4937 scheme: schemes[0].clone(),
4938 elector: elector.clone(),
4939 };
4940 let reporter =
4941 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
4942 let relay = Arc::new(mocks::relay::Relay::new());
4943 let application_cfg = mocks::application::Config {
4944 hasher: Sha256::default(),
4945 relay: relay.clone(),
4946 me: participants[0].clone(),
4947 propose_latency: (1.0, 0.0),
4948 verify_latency: (1.0, 0.0),
4949 certify_latency: (1.0, 0.0),
4950 should_certify: mocks::application::Certifier::Always,
4951 };
4952 let (actor, application) =
4953 mocks::application::Application::new(context.child("application"), application_cfg);
4954 actor.start();
4955 let blocker = oracle.control(participants[0].clone());
4956 let cfg = config::Config {
4957 scheme: schemes[0].clone(),
4958 elector: elector.clone(),
4959 blocker,
4960 automaton: application.clone(),
4961 relay: application.clone(),
4962 reporter: reporter.clone(),
4963 strategy: Sequential,
4964 partition: participants[0].clone().to_string(),
4965 mailbox_size: NZUsize!(64),
4966 epoch: Epoch::new(333),
4967 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(Epoch::new(
4968 333,
4969 ))),
4970 leader_timeout: Duration::from_millis(50),
4971 certification_timeout: Duration::from_millis(100),
4972 timeout_retry: Duration::from_millis(250),
4973 fetch_timeout: Duration::from_millis(50),
4974 activity_timeout: ViewDelta::new(4),
4975 skip_timeout: ViewDelta::new(2),
4976 fetch_concurrent: NZUsize!(4),
4977 replay_buffer: NZUsize!(1024 * 16),
4978 write_buffer: NZUsize!(1024 * 16),
4979 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4980 forwarding: ForwardingPolicy::Disabled,
4981 };
4982 let engine = Engine::new(context.child("engine"), cfg);
4983
4984 let (pending, recovered, resolver) = registrations
4986 .remove(&participants[0])
4987 .expect("validator should be registered");
4988 let handle = engine.start(pending, recovered, resolver);
4989
4990 context.sleep(Duration::from_millis(1000)).await;
4992
4993 let running_before = count_running_tasks(&context, "engine");
4995 assert!(
4996 running_before > 0,
4997 "at least one engine task should be running"
4998 );
4999
5000 context.sleep(Duration::from_millis(1500)).await;
5002 assert!(
5003 count_running_tasks(&context, "engine") > 0,
5004 "engine tasks should still be running"
5005 );
5006
5007 let running_after = if graceful {
5009 let result = context
5010 .child("stop")
5011 .stop(0, Some(Duration::from_secs(5)))
5012 .await;
5013 assert!(
5014 result.is_ok(),
5015 "graceful shutdown should complete: {result:?}"
5016 );
5017 count_running_tasks(&context, "engine")
5018 } else {
5019 handle.abort();
5020 let _ = handle.await; context.sleep(Duration::from_millis(1000)).await;
5024 count_running_tasks(&context, "engine")
5025 };
5026 assert_eq!(
5027 running_after, 0,
5028 "all engine tasks should be stopped, but {running_after} still running"
5029 );
5030 });
5031 }
5032
5033 #[test_group("slow")]
5034 #[test_traced]
5035 fn test_children_shutdown_on_engine_abort() {
5036 for seed in 0..10 {
5037 engine_shutdown::<_, _, Random>(
5038 seed,
5039 bls12381_threshold_vrf::fixture::<MinPk, _>,
5040 false,
5041 );
5042 engine_shutdown::<_, _, Random>(
5043 seed,
5044 bls12381_threshold_vrf::fixture::<MinSig, _>,
5045 false,
5046 );
5047 engine_shutdown::<_, _, RoundRobin>(
5048 seed,
5049 bls12381_threshold_std::fixture::<MinPk, _>,
5050 false,
5051 );
5052 engine_shutdown::<_, _, RoundRobin>(
5053 seed,
5054 bls12381_threshold_std::fixture::<MinSig, _>,
5055 false,
5056 );
5057 engine_shutdown::<_, _, RoundRobin>(
5058 seed,
5059 bls12381_multisig::fixture::<MinPk, _>,
5060 false,
5061 );
5062 engine_shutdown::<_, _, RoundRobin>(
5063 seed,
5064 bls12381_multisig::fixture::<MinSig, _>,
5065 false,
5066 );
5067 engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, false);
5068 engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, false);
5069 }
5070 }
5071
5072 #[test_group("slow")]
5073 #[test_traced]
5074 fn test_graceful_shutdown() {
5075 for seed in 0..10 {
5076 engine_shutdown::<_, _, Random>(
5077 seed,
5078 bls12381_threshold_vrf::fixture::<MinPk, _>,
5079 true,
5080 );
5081 engine_shutdown::<_, _, Random>(
5082 seed,
5083 bls12381_threshold_vrf::fixture::<MinSig, _>,
5084 true,
5085 );
5086 engine_shutdown::<_, _, RoundRobin>(
5087 seed,
5088 bls12381_threshold_std::fixture::<MinPk, _>,
5089 true,
5090 );
5091 engine_shutdown::<_, _, RoundRobin>(
5092 seed,
5093 bls12381_threshold_std::fixture::<MinSig, _>,
5094 true,
5095 );
5096 engine_shutdown::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>, true);
5097 engine_shutdown::<_, _, RoundRobin>(
5098 seed,
5099 bls12381_multisig::fixture::<MinSig, _>,
5100 true,
5101 );
5102 engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, true);
5103 engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, true);
5104 }
5105 }
5106
5107 fn attributable_reporter_filtering<S, F, L>(mut fixture: F)
5108 where
5109 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5110 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5111 L: Elector<S>,
5112 {
5113 let n = 3;
5114 let required_containers = View::new(10);
5115 let activity_timeout = ViewDelta::new(10);
5116 let skip_timeout = ViewDelta::new(5);
5117 let namespace = b"consensus".to_vec();
5118 let executor = deterministic::Runner::timed(Duration::from_secs(30));
5119 executor.start(|mut context| async move {
5120 let Fixture {
5122 participants,
5123 schemes,
5124 ..
5125 } = fixture(&mut context, &namespace, n);
5126 let mut oracle = start_test_network_with_peers(
5127 context.child("network"),
5128 participants.clone(),
5129 false,
5130 )
5131 .await;
5132 let mut registrations = register_validators(&mut oracle, &participants).await;
5133
5134 let link = Link {
5136 latency: Duration::from_millis(10),
5137 jitter: Duration::from_millis(1),
5138 success_rate: 1.0,
5139 };
5140 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5141
5142 let elector = L::default();
5144 let relay = Arc::new(mocks::relay::Relay::new());
5145 let mut reporters = Vec::new();
5146 for (idx, validator) in participants.iter().enumerate() {
5147 let context = context
5148 .child("validator")
5149 .with_attribute("public_key", validator);
5150
5151 let reporter_config = mocks::reporter::Config {
5152 participants: participants.clone().try_into().unwrap(),
5153 scheme: schemes[idx].clone(),
5154 elector: elector.clone(),
5155 };
5156 let mock_reporter =
5157 mocks::reporter::Reporter::new(context.child("mock_reporter"), reporter_config);
5158
5159 let attributable_reporter = scheme::reporter::AttributableReporter::new(
5161 context.child("rng"),
5162 schemes[idx].clone(),
5163 mock_reporter.clone(),
5164 Sequential,
5165 true, );
5167 reporters.push(mock_reporter.clone());
5168
5169 let application_cfg = mocks::application::Config {
5170 hasher: Sha256::default(),
5171 relay: relay.clone(),
5172 me: validator.clone(),
5173 propose_latency: (10.0, 5.0),
5174 verify_latency: (10.0, 5.0),
5175 certify_latency: (10.0, 5.0),
5176 should_certify: mocks::application::Certifier::Always,
5177 };
5178 let (actor, application) = mocks::application::Application::new(
5179 context.child("application"),
5180 application_cfg,
5181 );
5182 actor.start();
5183 let blocker = oracle.control(validator.clone());
5184 let cfg = config::Config {
5185 scheme: schemes[idx].clone(),
5186 elector: elector.clone(),
5187 blocker,
5188 automaton: application.clone(),
5189 relay: application.clone(),
5190 reporter: attributable_reporter,
5191 strategy: Sequential,
5192 partition: validator.to_string(),
5193 mailbox_size: NZUsize!(1024),
5194 epoch: Epoch::new(333),
5195 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
5196 Epoch::new(333),
5197 )),
5198 leader_timeout: Duration::from_secs(1),
5199 certification_timeout: Duration::from_secs(2),
5200 timeout_retry: Duration::from_secs(10),
5201 fetch_timeout: Duration::from_secs(1),
5202 activity_timeout,
5203 skip_timeout,
5204 fetch_concurrent: NZUsize!(4),
5205 replay_buffer: NZUsize!(1024 * 1024),
5206 write_buffer: NZUsize!(1024 * 1024),
5207 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5208 forwarding: ForwardingPolicy::Disabled,
5209 };
5210 let engine = Engine::new(context.child("engine"), cfg);
5211
5212 let (pending, recovered, resolver) = registrations
5214 .remove(validator)
5215 .expect("validator should be registered");
5216 engine.start(pending, recovered, resolver);
5217 }
5218
5219 let mut finalizers = Vec::new();
5221 for reporter in reporters.iter_mut() {
5222 let (mut latest, mut monitor) = reporter.subscribe().await;
5223 finalizers.push(context.child("finalizer").spawn(move |_| async move {
5224 while latest < required_containers {
5225 latest = monitor.recv().await.expect("event missing");
5226 }
5227 }));
5228 }
5229 join_all(finalizers).await;
5230
5231 for reporter in reporters.iter() {
5233 reporter.assert_no_faults();
5235
5236 reporter.assert_no_invalid();
5238
5239 {
5241 let notarizations = reporter.notarizations.lock();
5242 let finalizations = reporter.finalizations.lock();
5243 assert!(
5244 !notarizations.is_empty() || !finalizations.is_empty(),
5245 "Certificates should be reported"
5246 );
5247 }
5248
5249 let notarizes = reporter.notarizes.lock();
5251 let last_view = notarizes.keys().max().cloned().unwrap_or_default();
5252 for (view, payloads) in notarizes.iter() {
5253 if *view == last_view {
5254 continue; }
5256
5257 let signers: usize = payloads.values().map(|signers| signers.len()).sum();
5258
5259 if S::is_attributable() {
5261 assert!(signers > 1, "view {view}: {signers}");
5262 } else {
5263 assert_eq!(signers, 0);
5265 }
5266 }
5267
5268 let finalizes = reporter.finalizes.lock();
5270 for (_, payloads) in finalizes.iter() {
5271 let signers: usize = payloads.values().map(|signers| signers.len()).sum();
5272
5273 if S::is_attributable() {
5275 assert!(signers > 1);
5276 } else {
5277 assert_eq!(signers, 0);
5279 }
5280 }
5281 }
5282
5283 let blocked = oracle.blocked().await.unwrap();
5285 assert!(blocked.is_empty());
5286 });
5287 }
5288
5289 #[test_traced]
5290 fn test_attributable_reporter_filtering() {
5291 attributable_reporter_filtering::<_, _, Random>(
5292 bls12381_threshold_vrf::fixture::<MinPk, _>,
5293 );
5294 attributable_reporter_filtering::<_, _, Random>(
5295 bls12381_threshold_vrf::fixture::<MinSig, _>,
5296 );
5297 attributable_reporter_filtering::<_, _, RoundRobin>(
5298 bls12381_threshold_std::fixture::<MinPk, _>,
5299 );
5300 attributable_reporter_filtering::<_, _, RoundRobin>(
5301 bls12381_threshold_std::fixture::<MinSig, _>,
5302 );
5303 attributable_reporter_filtering::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
5304 attributable_reporter_filtering::<_, _, RoundRobin>(
5305 bls12381_multisig::fixture::<MinSig, _>,
5306 );
5307 attributable_reporter_filtering::<_, _, RoundRobin>(ed25519::fixture);
5308 attributable_reporter_filtering::<_, _, RoundRobin>(secp256r1::fixture);
5309 }
5310
5311 fn split_views_no_lockup<S, F, L>(mut fixture: F)
5312 where
5313 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5314 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5315 L: Elector<S>,
5316 {
5317 enum ParticipantType {
5330 Group1, Group2, Ignorant, Byzantine, }
5335 let get_type = |idx: usize| -> ParticipantType {
5336 match idx {
5337 0..3 => ParticipantType::Group1,
5338 3..6 => ParticipantType::Group2,
5339 6 => ParticipantType::Ignorant,
5340 7..10 => ParticipantType::Byzantine,
5341 _ => unreachable!(),
5342 }
5343 };
5344
5345 let n = 10;
5347 let quorum = quorum(n) as usize;
5348 assert_eq!(quorum, 7);
5349 let activity_timeout = ViewDelta::new(10);
5350 let skip_timeout = ViewDelta::new(5);
5351 let namespace = b"consensus".to_vec();
5352 let executor = deterministic::Runner::timed(Duration::from_secs(300));
5353 executor.start(|mut context| async move {
5354 let Fixture {
5356 participants,
5357 schemes,
5358 ..
5359 } = fixture(&mut context, &namespace, n);
5360 let mut oracle = start_test_network_with_peers(
5361 context.child("network"),
5362 participants.clone(),
5363 false,
5364 )
5365 .await;
5366 let mut registrations = register_validators(&mut oracle, &participants).await;
5367
5368 let build_finalization = |proposal: &Proposal<D>| -> TFinalization<_, D> {
5372 let votes: Vec<_> = (0..=quorum)
5373 .map(|i| TFinalize::sign(&schemes[i], proposal.clone()).unwrap())
5374 .collect();
5375 TFinalization::from_finalizes(&schemes[0], &votes, &Sequential)
5376 .expect("finalization quorum")
5377 };
5378 let build_notarization = |proposal: &Proposal<D>| -> TNotarization<_, D> {
5380 let votes: Vec<_> = (0..=quorum)
5381 .map(|i| TNotarize::sign(&schemes[i], proposal.clone()).unwrap())
5382 .collect();
5383 TNotarization::from_notarizes(&schemes[0], &votes, &Sequential)
5384 .expect("notarization quorum")
5385 };
5386 let build_nullification = |round: Round| -> TNullification<_> {
5387 let votes: Vec<_> = (0..=quorum)
5388 .map(|i| TNullify::sign::<D>(&schemes[i], round).unwrap())
5389 .collect();
5390 TNullification::from_nullifies(&schemes[0], &votes, &Sequential)
5391 .expect("nullification quorum")
5392 };
5393 let f_view = 1;
5395 let round_f = Round::new(Epoch::new(333), View::new(f_view));
5396 let payload_b0 = Sha256::hash(b"B_F");
5397 let proposal_b0 = Proposal::new(round_f, View::new(f_view - 1), payload_b0);
5398 let payload_b1a = Sha256::hash(b"B_G1");
5399 let proposal_b1a = Proposal::new(
5400 Round::new(Epoch::new(333), View::new(f_view + 1)),
5401 View::new(f_view),
5402 payload_b1a,
5403 );
5404 let payload_b1b = Sha256::hash(b"B_G2");
5405 let proposal_b1b = Proposal::new(
5406 Round::new(Epoch::new(333), View::new(f_view + 2)),
5407 View::new(f_view),
5408 payload_b1b,
5409 );
5410
5411 let b0_notarization = build_notarization(&proposal_b0);
5413 let b0_finalization = build_finalization(&proposal_b0);
5414 let b1a_notarization = build_notarization(&proposal_b1a);
5416 let b1b_notarization = build_notarization(&proposal_b1b);
5417 let null_a = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 1)));
5419 let null_b = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 2)));
5420
5421 let injector_pk = PrivateKey::from_seed(1_000_000).public_key();
5423 let (mut injector_sender, _inj_certificate_receiver) = oracle
5424 .control(injector_pk.clone())
5425 .register(1, TEST_QUOTA)
5426 .await
5427 .unwrap();
5428
5429 let link = Link {
5431 latency: Duration::from_millis(10),
5432 jitter: Duration::from_millis(0),
5433 success_rate: 1.0,
5434 };
5435 for p in participants.iter() {
5436 oracle
5437 .add_link(injector_pk.clone(), p.clone(), link.clone())
5438 .await
5439 .unwrap();
5440 }
5441 oracle.manager().track(
5442 1,
5443 TrackedPeers::new(
5444 Set::from_iter_dedup(participants.iter().cloned()),
5445 Set::from_iter_dedup(std::slice::from_ref(&injector_pk).iter().cloned()),
5446 ),
5447 );
5448 context.sleep(Duration::from_millis(10)).await;
5449
5450 let msg = Certificate::<_, D>::Notarization(b0_notarization).encode();
5454 injector_sender.send(Recipients::All, msg, true);
5455 let msg = Certificate::<_, D>::Finalization(b0_finalization).encode();
5456 injector_sender.send(Recipients::All, msg, true);
5457 let notarization_msg = Certificate::<_, D>::Notarization(b1a_notarization);
5459 let nullification_msg = Certificate::<_, D>::Nullification(null_a.clone());
5460 for (i, participant) in participants.iter().enumerate() {
5461 let recipient = Recipients::One(participant.clone());
5462 let msg = match get_type(i) {
5463 ParticipantType::Group1 => notarization_msg.encode(),
5464 _ => nullification_msg.encode(),
5465 };
5466 injector_sender.send(recipient, msg, true);
5467 }
5468 let notarization_msg = Certificate::<_, D>::Notarization(b1b_notarization);
5470 let nullification_msg = Certificate::<_, D>::Nullification(null_b.clone());
5471 for (i, participant) in participants.iter().enumerate() {
5472 let recipient = Recipients::One(participant.clone());
5473 let msg = match get_type(i) {
5474 ParticipantType::Group2 => notarization_msg.encode(),
5475 _ => nullification_msg.encode(),
5476 };
5477 injector_sender.send(recipient, msg, true);
5478 }
5479
5480 let elector = L::default();
5486 let relay = Arc::new(mocks::relay::Relay::new());
5487 let mut honest_reporters = Vec::new();
5488 for (idx, validator) in participants.iter().enumerate() {
5489 let (pending, recovered, resolver) = registrations
5490 .remove(validator)
5491 .expect("validator should be registered");
5492 let participant_type = get_type(idx);
5493 if matches!(participant_type, ParticipantType::Byzantine) {
5494 let cfg = mocks::nullify_only::Config {
5496 scheme: schemes[idx].clone(),
5497 };
5498 let engine: mocks::nullify_only::NullifyOnly<_, _, Sha256> =
5499 mocks::nullify_only::NullifyOnly::new(
5500 context
5501 .child("byzantine")
5502 .with_attribute("public_key", validator),
5503 cfg,
5504 );
5505 engine.start(pending);
5506 drop(recovered);
5508 drop(resolver);
5509 } else {
5510 let reporter_config = mocks::reporter::Config {
5512 participants: participants.clone().try_into().unwrap(),
5513 scheme: schemes[idx].clone(),
5514 elector: elector.clone(),
5515 };
5516 let reporter = mocks::reporter::Reporter::new(
5517 context
5518 .child("reporter")
5519 .with_attribute("public_key", validator),
5520 reporter_config,
5521 );
5522 honest_reporters.push(reporter.clone());
5523
5524 let application_cfg = mocks::application::Config {
5525 hasher: Sha256::default(),
5526 relay: relay.clone(),
5527 me: validator.clone(),
5528 propose_latency: (250.0, 50.0), verify_latency: (10.0, 5.0),
5530 certify_latency: (10.0, 5.0),
5531 should_certify: mocks::application::Certifier::Always,
5532 };
5533 let (actor, application) = mocks::application::Application::new(
5534 context
5535 .child("application")
5536 .with_attribute("public_key", validator),
5537 application_cfg,
5538 );
5539 actor.start();
5540 let blocker = oracle.control(validator.clone());
5541 let cfg = config::Config {
5542 scheme: schemes[idx].clone(),
5543 elector: elector.clone(),
5544 blocker,
5545 automaton: application.clone(),
5546 relay: application.clone(),
5547 reporter: reporter.clone(),
5548 strategy: Sequential,
5549 partition: validator.to_string(),
5550 mailbox_size: NZUsize!(1024),
5551 epoch: Epoch::new(333),
5552 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
5553 Epoch::new(333),
5554 )),
5555 leader_timeout: Duration::from_secs(10),
5556 certification_timeout: Duration::from_secs(10),
5557 timeout_retry: Duration::from_secs(10),
5558 fetch_timeout: Duration::from_secs(1),
5559 activity_timeout,
5560 skip_timeout,
5561 fetch_concurrent: NZUsize!(4),
5562 replay_buffer: NZUsize!(1024 * 1024),
5563 write_buffer: NZUsize!(1024 * 1024),
5564 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5565 forwarding: ForwardingPolicy::Disabled,
5566 };
5567 let engine = Engine::new(
5568 context
5569 .child("engine")
5570 .with_attribute("public_key", validator),
5571 cfg,
5572 );
5573 engine.start(pending, recovered, resolver);
5574 }
5575 }
5576
5577 context.sleep(Duration::from_secs(2)).await;
5579
5580 let view = View::new(f_view);
5585 for reporter in honest_reporters.iter() {
5586 let finalizations = reporter.finalizations.lock();
5587 assert!(finalizations.contains_key(&view));
5588 }
5589
5590 let view = View::new(f_view + 1);
5594 for (i, reporter) in honest_reporters.iter().enumerate() {
5595 let finalizations = reporter.finalizations.lock();
5596 assert!(!finalizations.contains_key(&view));
5597 let nullifications = reporter.nullifications.lock();
5598 let notarizations = reporter.notarizations.lock();
5599 match get_type(i) {
5600 ParticipantType::Group1 => {
5601 assert!(notarizations.contains_key(&view));
5602 assert!(!nullifications.contains_key(&view));
5603 }
5604 _ => {
5605 assert!(nullifications.contains_key(&view));
5606 assert!(!notarizations.contains_key(&view));
5607 }
5608 }
5609 }
5610
5611 let view = View::new(f_view + 2);
5615 for (i, reporter) in honest_reporters.iter().enumerate() {
5616 let finalizations = reporter.finalizations.lock();
5617 assert!(!finalizations.contains_key(&view));
5618 let nullifications = reporter.nullifications.lock();
5619 let notarizations = reporter.notarizations.lock();
5620 match get_type(i) {
5621 ParticipantType::Group2 => {
5622 assert!(notarizations.contains_key(&view));
5623 assert!(!nullifications.contains_key(&view));
5624 }
5625 _ => {
5626 assert!(nullifications.contains_key(&view));
5627 assert!(!notarizations.contains_key(&view));
5628 }
5629 }
5630 }
5631
5632 let next_view = View::new(f_view + 3);
5634 for (i, reporter) in honest_reporters.iter().enumerate() {
5635 let nullifies = reporter.nullifies.lock();
5636 assert!(!nullifies.contains_key(&next_view), "reporter {i}");
5637 }
5638
5639 link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
5643
5644 {
5646 let target = View::new(f_view + 3);
5647 let mut finalizers = Vec::new();
5648 for reporter in honest_reporters.iter_mut() {
5649 let (mut latest, mut monitor) = reporter.subscribe().await;
5650 finalizers.push(
5651 context
5652 .child("resume_finalizer")
5653 .spawn(move |_| async move {
5654 while latest < target {
5655 latest = monitor.recv().await.expect("event missing");
5656 }
5657 }),
5658 );
5659 }
5660 join_all(finalizers).await;
5661 }
5662
5663 for reporter in honest_reporters.iter() {
5665 reporter.assert_no_faults();
5666 reporter.assert_no_invalid();
5667 }
5668 let blocked = oracle.blocked().await.unwrap();
5669 assert!(blocked.is_empty(), "blocked peers: {blocked:?}");
5670 });
5671 }
5672
5673 #[test_group("slow")]
5674 #[test_traced]
5675 fn test_split_views_no_lockup() {
5676 split_views_no_lockup::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
5677 split_views_no_lockup::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
5678 split_views_no_lockup::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
5679 split_views_no_lockup::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
5680 split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
5681 split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
5682 split_views_no_lockup::<_, _, RoundRobin>(ed25519::fixture);
5683 split_views_no_lockup::<_, _, RoundRobin>(secp256r1::fixture);
5684 }
5685
5686 fn tle<V, L>()
5687 where
5688 V: Variant,
5689 L: Elector<bls12381_threshold_vrf::Scheme<PublicKey, V>>,
5690 {
5691 let n = 4;
5693 let namespace = b"consensus".to_vec();
5694 let activity_timeout = ViewDelta::new(100);
5695 let skip_timeout = ViewDelta::new(50);
5696 let executor = deterministic::Runner::timed(Duration::from_secs(30));
5697 executor.start(|mut context| async move {
5698 let Fixture {
5700 participants,
5701 schemes,
5702 ..
5703 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, &namespace, n);
5704 let mut oracle =
5705 start_test_network_with_peers(context.child("network"), participants.clone(), true)
5706 .await;
5707 let mut registrations = register_validators(&mut oracle, &participants).await;
5708
5709 let link = Link {
5711 latency: Duration::from_millis(10),
5712 jitter: Duration::from_millis(5),
5713 success_rate: 1.0,
5714 };
5715 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5716
5717 let elector = L::default();
5719 let relay = Arc::new(mocks::relay::Relay::new());
5720 let mut reporters = Vec::new();
5721 let mut engine_handlers = Vec::new();
5722 let monitor_reporter = Arc::new(Mutex::new(None));
5723 for (idx, validator) in participants.iter().enumerate() {
5724 let context = context
5726 .child("validator")
5727 .with_attribute("public_key", validator);
5728
5729 let reporter_config = mocks::reporter::Config {
5731 participants: participants.clone().try_into().unwrap(),
5732 scheme: schemes[idx].clone(),
5733 elector: elector.clone(),
5734 };
5735 let reporter =
5736 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
5737 reporters.push(reporter.clone());
5738 if idx == 0 {
5739 *monitor_reporter.lock() = Some(reporter.clone());
5740 }
5741
5742 let application_cfg = mocks::application::Config {
5744 hasher: Sha256::default(),
5745 relay: relay.clone(),
5746 me: validator.clone(),
5747 propose_latency: (10.0, 5.0),
5748 verify_latency: (10.0, 5.0),
5749 certify_latency: (10.0, 5.0),
5750 should_certify: mocks::application::Certifier::Always,
5751 };
5752 let (actor, application) = mocks::application::Application::new(
5753 context.child("application"),
5754 application_cfg,
5755 );
5756 actor.start();
5757 let blocker = oracle.control(validator.clone());
5758 let cfg = config::Config {
5759 scheme: schemes[idx].clone(),
5760 elector: elector.clone(),
5761 blocker,
5762 automaton: application.clone(),
5763 relay: application.clone(),
5764 reporter: reporter.clone(),
5765 strategy: Sequential,
5766 partition: validator.to_string(),
5767 mailbox_size: NZUsize!(1024),
5768 epoch: Epoch::new(333),
5769 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
5770 Epoch::new(333),
5771 )),
5772 leader_timeout: Duration::from_millis(100),
5773 certification_timeout: Duration::from_millis(200),
5774 timeout_retry: Duration::from_millis(500),
5775 fetch_timeout: Duration::from_millis(100),
5776 activity_timeout,
5777 skip_timeout,
5778 fetch_concurrent: NZUsize!(4),
5779 replay_buffer: NZUsize!(1024 * 1024),
5780 write_buffer: NZUsize!(1024 * 1024),
5781 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5782 forwarding: ForwardingPolicy::Disabled,
5783 };
5784 let engine = Engine::new(context.child("engine"), cfg);
5785
5786 let (pending, recovered, resolver) = registrations
5788 .remove(validator)
5789 .expect("validator should be registered");
5790 engine_handlers.push(engine.start(pending, recovered, resolver));
5791 }
5792
5793 let target = Round::new(Epoch::new(333), View::new(10)); let message = b"Secret message for future view10"; let ciphertext = schemes[0].encrypt(&mut context, target, *message);
5799
5800 let reporter = monitor_reporter.lock().clone().unwrap();
5802 loop {
5803 context.sleep(Duration::from_millis(100)).await;
5805 let notarizations = reporter.notarizations.lock();
5806 let Some(notarization) = notarizations.get(&target.view()) else {
5807 continue;
5808 };
5809
5810 let seed = notarization.seed();
5812 let decrypted = seed
5813 .decrypt(&ciphertext)
5814 .expect("Decryption should succeed with valid seed signature");
5815 assert_eq!(
5816 message,
5817 decrypted.as_ref(),
5818 "Decrypted message should match original message"
5819 );
5820 break;
5821 }
5822 });
5823 }
5824
5825 #[test_traced]
5826 fn test_tle() {
5827 tle::<MinPk, Random>();
5828 tle::<MinSig, Random>();
5829 }
5830
5831 fn hailstorm<S, F, L>(
5832 seed: u64,
5833 shutdowns: usize,
5834 interval: ViewDelta,
5835 mut fixture: F,
5836 ) -> String
5837 where
5838 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5839 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5840 L: Elector<S>,
5841 {
5842 let n = 5;
5844 let activity_timeout = ViewDelta::new(10);
5845 let skip_timeout = ViewDelta::new(5);
5846 let namespace = b"consensus".to_vec();
5847 let cfg = deterministic::Config::new().with_seed(seed);
5848 let executor = deterministic::Runner::new(cfg);
5849 executor.start(|mut context| async move {
5850 let Fixture {
5852 participants,
5853 schemes,
5854 ..
5855 } = fixture(&mut context, &namespace, n);
5856 let mut oracle =
5857 start_test_network_with_peers(context.child("network"), participants.clone(), true)
5858 .await;
5859 let mut registrations = register_validators(&mut oracle, &participants).await;
5860
5861 let link = Link {
5863 latency: Duration::from_millis(10),
5864 jitter: Duration::from_millis(1),
5865 success_rate: 1.0,
5866 };
5867 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5868
5869 let elector = L::default();
5871 let relay = Arc::new(mocks::relay::Relay::new());
5872 let mut reporters = BTreeMap::new();
5873 let mut engine_handlers = BTreeMap::new();
5874 for (idx, validator) in participants.iter().enumerate() {
5875 let context = context
5877 .child("validator")
5878 .with_attribute("public_key", validator);
5879
5880 let reporter_config = mocks::reporter::Config {
5882 participants: participants.clone().try_into().unwrap(),
5883 scheme: schemes[idx].clone(),
5884 elector: elector.clone(),
5885 };
5886 let reporter =
5887 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
5888 reporters.insert(idx, reporter.clone());
5889 let application_cfg = mocks::application::Config {
5890 hasher: Sha256::default(),
5891 relay: relay.clone(),
5892 me: validator.clone(),
5893 propose_latency: (10.0, 5.0),
5894 verify_latency: (10.0, 5.0),
5895 certify_latency: (10.0, 5.0),
5896 should_certify: mocks::application::Certifier::Always,
5897 };
5898 let (actor, application) = mocks::application::Application::new(
5899 context.child("application"),
5900 application_cfg,
5901 );
5902 actor.start();
5903 let blocker = oracle.control(validator.clone());
5904 let cfg = config::Config {
5905 scheme: schemes[idx].clone(),
5906 elector: elector.clone(),
5907 blocker,
5908 automaton: application.clone(),
5909 relay: application.clone(),
5910 reporter: reporter.clone(),
5911 strategy: Sequential,
5912 partition: validator.to_string(),
5913 mailbox_size: NZUsize!(1024),
5914 epoch: Epoch::new(333),
5915 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
5916 Epoch::new(333),
5917 )),
5918 leader_timeout: Duration::from_secs(1),
5919 certification_timeout: Duration::from_secs(2),
5920 timeout_retry: Duration::from_secs(10),
5921 fetch_timeout: Duration::from_secs(1),
5922 activity_timeout,
5923 skip_timeout,
5924 fetch_concurrent: NZUsize!(4),
5925 replay_buffer: NZUsize!(1024 * 1024),
5926 write_buffer: NZUsize!(1024 * 1024),
5927 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5928 forwarding: ForwardingPolicy::Disabled,
5929 };
5930 let engine = Engine::new(context.child("engine"), cfg);
5931
5932 let (pending, recovered, resolver) = registrations
5934 .remove(validator)
5935 .expect("validator should be registered");
5936 engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
5937 }
5938
5939 let mut target = View::zero();
5941 for i in 0..shutdowns {
5942 target = target.saturating_add(interval);
5944
5945 let mut finalizers = Vec::new();
5947 for (_, reporter) in reporters.iter_mut() {
5948 let (mut latest, mut monitor) = reporter.subscribe().await;
5949 finalizers.push(context.child("finalizer").spawn(move |_| async move {
5950 while latest < target {
5951 latest = monitor.recv().await.expect("event missing");
5952 }
5953 }));
5954 }
5955 join_all(finalizers).await;
5956 target = target.saturating_add(interval);
5957
5958 let idx = context.gen_range(0..engine_handlers.len());
5960 let validator = &participants[idx];
5961 let handle = engine_handlers.remove(&idx).unwrap();
5962 handle.abort();
5963 let _ = handle.await;
5964 let selected_reporter = reporters.remove(&idx).unwrap();
5965 info!(idx, ?validator, "shutdown validator");
5966
5967 let mut finalizers = Vec::new();
5969 for (_, reporter) in reporters.iter_mut() {
5970 let (mut latest, mut monitor) = reporter.subscribe().await;
5971 finalizers.push(context.child("finalizer").spawn(move |_| async move {
5972 while latest < target {
5973 latest = monitor.recv().await.expect("event missing");
5974 }
5975 }));
5976 }
5977 join_all(finalizers).await;
5978 target = target.saturating_add(interval);
5979
5980 info!(idx, ?validator, "restarting validator");
5982 let context = context
5983 .child("validator_restarted")
5984 .with_attribute("public_key", validator)
5985 .with_attribute("restart", i);
5986
5987 let (pending, recovered, resolver) =
5989 register_validator(&mut oracle, validator.clone()).await;
5990 let application_cfg = mocks::application::Config {
5991 hasher: Sha256::default(),
5992 relay: relay.clone(),
5993 me: validator.clone(),
5994 propose_latency: (10.0, 5.0),
5995 verify_latency: (10.0, 5.0),
5996 certify_latency: (10.0, 5.0),
5997 should_certify: mocks::application::Certifier::Always,
5998 };
5999 let (actor, application) = mocks::application::Application::new(
6000 context.child("application"),
6001 application_cfg,
6002 );
6003 actor.start();
6004 reporters.insert(idx, selected_reporter.clone());
6005 let blocker = oracle.control(validator.clone());
6006 let cfg = config::Config {
6007 scheme: schemes[idx].clone(),
6008 elector: elector.clone(),
6009 blocker,
6010 automaton: application.clone(),
6011 relay: application.clone(),
6012 reporter: selected_reporter,
6013 strategy: Sequential,
6014 partition: validator.to_string(),
6015 mailbox_size: NZUsize!(1024),
6016 epoch: Epoch::new(333),
6017 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
6018 Epoch::new(333),
6019 )),
6020 leader_timeout: Duration::from_secs(1),
6021 certification_timeout: Duration::from_secs(2),
6022 timeout_retry: Duration::from_secs(10),
6023 fetch_timeout: Duration::from_secs(1),
6024 activity_timeout,
6025 skip_timeout,
6026 fetch_concurrent: NZUsize!(4),
6027 replay_buffer: NZUsize!(1024 * 1024),
6028 write_buffer: NZUsize!(1024 * 1024),
6029 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
6030 forwarding: ForwardingPolicy::Disabled,
6031 };
6032 let engine = Engine::new(context.child("engine"), cfg);
6033 engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
6034
6035 let mut finalizers = Vec::new();
6037 for (_, reporter) in reporters.iter_mut() {
6038 let (mut latest, mut monitor) = reporter.subscribe().await;
6039 finalizers.push(context.child("finalizer").spawn(move |_| async move {
6040 while latest < target {
6041 latest = monitor.recv().await.expect("event missing");
6042 }
6043 }));
6044 }
6045 join_all(finalizers).await;
6046 info!(idx, ?validator, "validator recovered");
6047 }
6048
6049 let latest_complete = target.saturating_sub(activity_timeout);
6051 for (_, reporter) in reporters.iter() {
6052 reporter.assert_no_faults();
6054
6055 reporter.assert_no_invalid();
6057
6058 let mut notarized = HashMap::new();
6060 let mut finalized = HashMap::new();
6061 {
6062 let notarizes = reporter.notarizes.lock();
6063 for view in View::range(View::new(1), latest_complete) {
6064 let Some(payloads) = notarizes.get(&view) else {
6066 continue;
6067 };
6068 if payloads.len() > 1 {
6069 panic!("view: {view}");
6070 }
6071 let (digest, _) = payloads.iter().next().unwrap();
6072 notarized.insert(view, *digest);
6073 }
6074 }
6075 {
6076 let notarizations = reporter.notarizations.lock();
6077 for view in View::range(View::new(1), latest_complete) {
6078 let Some(notarization) = notarizations.get(&view) else {
6080 continue;
6081 };
6082 let Some(digest) = notarized.get(&view) else {
6083 continue;
6084 };
6085 assert_eq!(¬arization.proposal.payload, digest);
6086 }
6087 }
6088 {
6089 let finalizes = reporter.finalizes.lock();
6090 for view in View::range(View::new(1), latest_complete) {
6091 let Some(payloads) = finalizes.get(&view) else {
6093 continue;
6094 };
6095 if payloads.len() > 1 {
6096 panic!("view: {view}");
6097 }
6098 let (digest, _) = payloads.iter().next().unwrap();
6099 finalized.insert(view, *digest);
6100
6101 if view > latest_complete {
6103 continue;
6104 }
6105
6106 let nullifies = reporter.nullifies.lock();
6108 let Some(nullifies) = nullifies.get(&view) else {
6109 continue;
6110 };
6111 for (_, finalizers) in payloads.iter() {
6112 for finalizer in finalizers.iter() {
6113 if nullifies.contains(finalizer) {
6114 panic!("should not nullify and finalize at same view");
6115 }
6116 }
6117 }
6118 }
6119 }
6120 {
6121 let finalizations = reporter.finalizations.lock();
6122 for view in View::range(View::new(1), latest_complete) {
6123 let Some(finalization) = finalizations.get(&view) else {
6125 continue;
6126 };
6127 let Some(digest) = finalized.get(&view) else {
6128 continue;
6129 };
6130 assert_eq!(&finalization.proposal.payload, digest);
6131 }
6132 }
6133 }
6134
6135 let blocked = oracle.blocked().await.unwrap();
6137 assert!(blocked.is_empty());
6138
6139 context.auditor().state()
6141 })
6142 }
6143
6144 #[test_group("slow")]
6145 #[test_traced]
6146 fn test_hailstorm_bls12381_threshold_vrf_min_pk() {
6147 assert_eq!(
6148 hailstorm::<_, _, Random>(
6149 0,
6150 10,
6151 ViewDelta::new(15),
6152 bls12381_threshold_vrf::fixture::<MinPk, _>
6153 ),
6154 hailstorm::<_, _, Random>(
6155 0,
6156 10,
6157 ViewDelta::new(15),
6158 bls12381_threshold_vrf::fixture::<MinPk, _>
6159 ),
6160 );
6161 }
6162
6163 #[test_group("slow")]
6164 #[test_traced]
6165 fn test_hailstorm_bls12381_threshold_vrf_min_sig() {
6166 assert_eq!(
6167 hailstorm::<_, _, Random>(
6168 0,
6169 10,
6170 ViewDelta::new(15),
6171 bls12381_threshold_vrf::fixture::<MinSig, _>
6172 ),
6173 hailstorm::<_, _, Random>(
6174 0,
6175 10,
6176 ViewDelta::new(15),
6177 bls12381_threshold_vrf::fixture::<MinSig, _>
6178 ),
6179 );
6180 }
6181
6182 #[test_group("slow")]
6183 #[test_traced]
6184 fn test_hailstorm_bls12381_threshold_std_min_pk() {
6185 assert_eq!(
6186 hailstorm::<_, _, RoundRobin>(
6187 0,
6188 10,
6189 ViewDelta::new(15),
6190 bls12381_threshold_std::fixture::<MinPk, _>
6191 ),
6192 hailstorm::<_, _, RoundRobin>(
6193 0,
6194 10,
6195 ViewDelta::new(15),
6196 bls12381_threshold_std::fixture::<MinPk, _>
6197 ),
6198 );
6199 }
6200
6201 #[test_group("slow")]
6202 #[test_traced]
6203 fn test_hailstorm_bls12381_threshold_std_min_sig() {
6204 assert_eq!(
6205 hailstorm::<_, _, RoundRobin>(
6206 0,
6207 10,
6208 ViewDelta::new(15),
6209 bls12381_threshold_std::fixture::<MinSig, _>
6210 ),
6211 hailstorm::<_, _, RoundRobin>(
6212 0,
6213 10,
6214 ViewDelta::new(15),
6215 bls12381_threshold_std::fixture::<MinSig, _>
6216 ),
6217 );
6218 }
6219
6220 #[test_group("slow")]
6221 #[test_traced]
6222 fn test_hailstorm_bls12381_multisig_min_pk() {
6223 assert_eq!(
6224 hailstorm::<_, _, RoundRobin>(
6225 0,
6226 10,
6227 ViewDelta::new(15),
6228 bls12381_multisig::fixture::<MinPk, _>
6229 ),
6230 hailstorm::<_, _, RoundRobin>(
6231 0,
6232 10,
6233 ViewDelta::new(15),
6234 bls12381_multisig::fixture::<MinPk, _>
6235 ),
6236 );
6237 }
6238
6239 #[test_group("slow")]
6240 #[test_traced]
6241 fn test_hailstorm_bls12381_multisig_min_sig() {
6242 assert_eq!(
6243 hailstorm::<_, _, RoundRobin>(
6244 0,
6245 10,
6246 ViewDelta::new(15),
6247 bls12381_multisig::fixture::<MinSig, _>
6248 ),
6249 hailstorm::<_, _, RoundRobin>(
6250 0,
6251 10,
6252 ViewDelta::new(15),
6253 bls12381_multisig::fixture::<MinSig, _>
6254 ),
6255 );
6256 }
6257
6258 #[test_group("slow")]
6259 #[test_traced]
6260 fn test_hailstorm_ed25519() {
6261 assert_eq!(
6262 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture),
6263 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture)
6264 );
6265 }
6266
6267 #[test_group("slow")]
6268 #[test_traced]
6269 fn test_hailstorm_secp256r1() {
6270 assert_eq!(
6271 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture),
6272 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture)
6273 );
6274 }
6275
6276 #[derive(Clone, Copy, Debug)]
6313 struct TwinsCampaign {
6314 n: u32,
6315 rounds: usize,
6316 mode: twins::Mode,
6317 max_cases: usize,
6318 trailing_finalizations: usize,
6319 }
6320
6321 fn twins_campaign<S, F, L>(
6322 rng: &mut StdRng,
6323 campaign: TwinsCampaign,
6324 link: Link,
6325 mut fixture: F,
6326 ) where
6327 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
6328 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
6329 L: Elector<S>,
6330 {
6331 let n = campaign.n;
6332 let faults = N3f1::max_faults(n) as usize;
6333 let cases = twins::cases(
6334 rng,
6335 twins::Framework {
6336 participants: n as usize,
6337 faults,
6338 rounds: campaign.rounds,
6339 mode: campaign.mode,
6340 max_cases: campaign.max_cases,
6341 },
6342 );
6343 assert!(
6344 !cases.is_empty(),
6345 "twins campaign should generate at least one case"
6346 );
6347 for case in cases {
6348 let scenario = case.scenario.clone();
6349 let twin_indices = case.compromised.clone();
6350 assert_eq!(
6351 twin_indices.len(),
6352 faults,
6353 "unexpected twins count for n={n} (expected f={faults})",
6354 );
6355
6356 let activity_timeout = ViewDelta::new(10);
6357 let skip_timeout = ViewDelta::new(5);
6358 let namespace = b"consensus".to_vec();
6359 let link = link.clone();
6360 let trailing_finalizations = campaign.trailing_finalizations;
6361 let mut case_fixture =
6362 |ctx: &mut deterministic::Context, ns: &[u8], n: u32| fixture(ctx, ns, n);
6363 let cfg = deterministic::Config::new()
6364 .with_rng(Box::new(StdRng::from_rng(&mut *rng).unwrap()));
6365 let executor = deterministic::Runner::new(cfg);
6366 executor.start(|mut context| async move {
6367 let Fixture {
6368 participants,
6369 schemes,
6370 ..
6371 } = case_fixture(&mut context, &namespace, n);
6372 let participants: Arc<[_]> = participants.into();
6373 let mut oracle = start_test_network_with_peers(
6374 context.child("network"),
6375 participants.iter().cloned(),
6376 false,
6377 )
6378 .await;
6379 let mut registrations = register_validators(&mut oracle, &participants).await;
6380 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
6381
6382 let elector = TwinsElector::new(L::default(), &scenario, n as usize);
6383 let relay = Arc::new(mocks::relay::Relay::new());
6384 let mut reporters = Vec::new();
6385 let mut engine_handlers = Vec::new();
6386 let twin_index_set: HashSet<usize> = twin_indices.iter().copied().collect();
6387
6388 for idx in twin_indices.iter().copied() {
6390 let validator = &participants[idx];
6391 let (
6392 (vote_sender, vote_receiver),
6393 (certificate_sender, certificate_receiver),
6394 (_resolver_sender, _resolver_receiver),
6395 ) = registrations
6396 .remove(validator)
6397 .expect("validator should be registered");
6398
6399 let make_vote_forwarder = || {
6400 let participants = participants.clone();
6401 let scenario = scenario.clone();
6402 move |origin: SplitOrigin, _: &Recipients<_>, message: &IoBuf| {
6403 let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
6404 let (primary, secondary) =
6405 scenario.partitions(msg.view(), participants.as_ref());
6406 match origin {
6407 SplitOrigin::Primary => Some(Recipients::Some(primary)),
6408 SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
6409 }
6410 }
6411 };
6412 let make_certificate_forwarder = || {
6413 let codec = schemes[idx].certificate_codec_config();
6414 let participants = participants.clone();
6415 let scenario = scenario.clone();
6416 move |origin: SplitOrigin, _: &Recipients<_>, message: &IoBuf| {
6417 let msg: Certificate<S, D> =
6418 Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
6419 let (primary, secondary) =
6420 scenario.partitions(msg.view(), participants.as_ref());
6421 match origin {
6422 SplitOrigin::Primary => Some(Recipients::Some(primary)),
6423 SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
6424 }
6425 }
6426 };
6427 let make_vote_router = || {
6428 let participants = participants.clone();
6429 let scenario = scenario.clone();
6430 move |(sender, message): &(_, IoBuf)| {
6431 let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
6432 scenario.route(msg.view(), sender, participants.as_ref())
6433 }
6434 };
6435 let make_certificate_router = || {
6436 let codec = schemes[idx].certificate_codec_config();
6437 let participants = participants.clone();
6438 let scenario = scenario.clone();
6439 move |(sender, message): &(_, IoBuf)| {
6440 let msg: Certificate<S, D> =
6441 Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
6442 scenario.route(msg.view(), sender, participants.as_ref())
6443 }
6444 };
6445 let (vote_sender_primary, vote_sender_secondary) =
6446 vote_sender.split_with(make_vote_forwarder());
6447 let (vote_receiver_primary, vote_receiver_secondary) = vote_receiver
6448 .split_with(
6449 context.child("pending_split").with_attribute("index", idx),
6450 make_vote_router(),
6451 );
6452 let (certificate_sender_primary, certificate_sender_secondary) =
6453 certificate_sender.split_with(make_certificate_forwarder());
6454 let (certificate_receiver_primary, certificate_receiver_secondary) =
6455 certificate_receiver.split_with(
6456 context
6457 .child("recovered_split")
6458 .with_attribute("index", idx),
6459 make_certificate_router(),
6460 );
6461
6462 for (twin_label, pending, recovered) in [
6463 (
6464 "primary",
6465 (vote_sender_primary, vote_receiver_primary),
6466 (certificate_sender_primary, certificate_receiver_primary),
6467 ),
6468 (
6469 "secondary",
6470 (vote_sender_secondary, vote_receiver_secondary),
6471 (certificate_sender_secondary, certificate_receiver_secondary),
6472 ),
6473 ] {
6474 let partition = format!("twin_{idx}_{twin_label}");
6475 let context = context
6476 .child("twin")
6477 .with_attribute("index", idx)
6478 .with_attribute("side", twin_label);
6479
6480 let reporter_config = mocks::reporter::Config {
6481 participants: participants.as_ref().try_into().unwrap(),
6482 scheme: schemes[idx].clone(),
6483 elector: elector.clone(),
6484 };
6485 let reporter = mocks::reporter::Reporter::new(
6486 context.child("reporter"),
6487 reporter_config,
6488 );
6489 reporters.push(reporter.clone());
6490
6491 let application_cfg = mocks::application::Config {
6492 hasher: Sha256::default(),
6493 relay: relay.clone(),
6494 me: validator.clone(),
6495 propose_latency: (10.0, 5.0),
6496 verify_latency: (10.0, 5.0),
6497 certify_latency: (10.0, 5.0),
6498 should_certify: mocks::application::Certifier::Always,
6499 };
6500 let (actor, application) = mocks::application::Application::new(
6501 context.child("application"),
6502 application_cfg,
6503 );
6504 actor.start();
6505
6506 let blocker = oracle.control(validator.clone());
6507 let cfg = config::Config {
6508 scheme: schemes[idx].clone(),
6509 elector: elector.clone(),
6510 blocker,
6511 automaton: application.clone(),
6512 relay: application.clone(),
6513 reporter: reporter.clone(),
6514 strategy: Sequential,
6515 partition,
6516 mailbox_size: NZUsize!(1024),
6517 epoch: Epoch::new(333),
6518 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
6519 Epoch::new(333),
6520 )),
6521 leader_timeout: Duration::from_secs(1),
6522 certification_timeout: Duration::from_millis(1_500),
6523 timeout_retry: Duration::from_secs(10),
6524 fetch_timeout: Duration::from_secs(1),
6525 activity_timeout,
6526 skip_timeout,
6527 fetch_concurrent: NZUsize!(4),
6528 replay_buffer: NZUsize!(1024 * 1024),
6529 write_buffer: NZUsize!(1024 * 1024),
6530 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
6531 forwarding: ForwardingPolicy::Disabled,
6532 };
6533 let engine = Engine::new(context.child("engine"), cfg);
6534 engine_handlers.push(engine.start(
6535 pending,
6536 recovered,
6537 inert_channel(participants.as_ref()),
6538 ));
6539 }
6540 }
6541
6542 let honest_start = reporters.len();
6544 for (idx, validator) in participants.iter().enumerate() {
6545 if twin_index_set.contains(&idx) {
6546 continue;
6547 }
6548
6549 let partition = format!("honest_{idx}");
6550 let context = context.child("honest").with_attribute("index", idx);
6551
6552 let reporter_config = mocks::reporter::Config {
6553 participants: participants.as_ref().try_into().unwrap(),
6554 scheme: schemes[idx].clone(),
6555 elector: elector.clone(),
6556 };
6557 let reporter =
6558 mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
6559 reporters.push(reporter.clone());
6560
6561 let application_cfg = mocks::application::Config {
6562 hasher: Sha256::default(),
6563 relay: relay.clone(),
6564 me: validator.clone(),
6565 propose_latency: (10.0, 5.0),
6566 verify_latency: (10.0, 5.0),
6567 certify_latency: (10.0, 5.0),
6568 should_certify: mocks::application::Certifier::Always,
6569 };
6570 let (actor, application) = mocks::application::Application::new(
6571 context.child("application"),
6572 application_cfg,
6573 );
6574 actor.start();
6575
6576 let blocker = oracle.control(validator.clone());
6577 let cfg = config::Config {
6578 scheme: schemes[idx].clone(),
6579 elector: elector.clone(),
6580 blocker,
6581 automaton: application.clone(),
6582 relay: application.clone(),
6583 reporter: reporter.clone(),
6584 strategy: Sequential,
6585 partition,
6586 mailbox_size: NZUsize!(1024),
6587 epoch: Epoch::new(333),
6588 floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
6589 Epoch::new(333),
6590 )),
6591 leader_timeout: Duration::from_secs(1),
6592 certification_timeout: Duration::from_millis(1_500),
6593 timeout_retry: Duration::from_secs(10),
6594 fetch_timeout: Duration::from_secs(1),
6595 activity_timeout,
6596 skip_timeout,
6597 fetch_concurrent: NZUsize!(4),
6598 replay_buffer: NZUsize!(1024 * 1024),
6599 write_buffer: NZUsize!(1024 * 1024),
6600 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
6601 forwarding: ForwardingPolicy::Disabled,
6602 };
6603 let engine = Engine::new(context.child("engine"), cfg);
6604
6605 let (
6606 (pending_sender, pending_receiver),
6607 (recovered_sender, recovered_receiver),
6608 _,
6609 ) = registrations
6610 .remove(validator)
6611 .expect("validator should be registered");
6612 engine_handlers.push(engine.start(
6613 (pending_sender, pending_receiver),
6614 (recovered_sender, recovered_receiver),
6615 inert_channel(participants.as_ref()),
6616 ));
6617 }
6618
6619 let prefix_end = View::new(scenario.rounds().len() as u64);
6629 let mut finalizers = Vec::new();
6630 for (i, reporter) in reporters.iter_mut().skip(honest_start).enumerate() {
6631 let (_latest, mut monitor) = reporter.subscribe().await;
6632 let required = trailing_finalizations;
6633 finalizers.push(context.child("finalizer").with_attribute("index", i).spawn(
6634 move |_| async move {
6635 let mut count = 0usize;
6636 while count < required {
6637 let view = monitor.recv().await.expect("event missing");
6638 if view > prefix_end {
6639 count += 1;
6640 }
6641 }
6642 },
6643 ));
6644 }
6645 join_all(finalizers).await;
6646
6647 let mut finalized_at_view: BTreeMap<View, D> = BTreeMap::new();
6649 for reporter in reporters.iter().skip(honest_start) {
6650 let finalizations = reporter.finalizations.lock();
6651 for (view, finalization) in finalizations.iter() {
6652 let digest = finalization.proposal.payload;
6653 if let Some(existing) = finalized_at_view.get(view) {
6654 assert_eq!(
6655 existing, &digest,
6656 "safety violation: conflicting finalizations at view {view}"
6657 );
6658 } else {
6659 finalized_at_view.insert(*view, digest);
6660 }
6661 }
6662 }
6663
6664 for reporter in reporters.iter().skip(honest_start) {
6666 reporter.assert_no_invalid();
6667 }
6668
6669 let twin_identities: HashSet<_> = twin_indices
6671 .iter()
6672 .map(|idx| participants[*idx].clone())
6673 .collect();
6674 let mut notarized_by_honest_signer: BTreeMap<View, HashMap<PublicKey, D>> =
6675 BTreeMap::new();
6676 let mut finalized_by_honest_signer: BTreeMap<View, HashMap<PublicKey, D>> =
6677 BTreeMap::new();
6678 for reporter in reporters.iter().skip(honest_start) {
6679 let notarizes = reporter.notarizes.lock();
6680 for (view, payloads) in notarizes.iter() {
6681 let signers = notarized_by_honest_signer.entry(*view).or_default();
6682 for (digest, payload_signers) in payloads.iter() {
6683 for signer in payload_signers.iter() {
6684 if twin_identities.contains(signer) {
6685 continue;
6686 }
6687 if let Some(existing) = signers.insert(signer.clone(), *digest) {
6688 assert_eq!(
6689 existing, *digest,
6690 "honest signer produced conflicting notarizes at view {view}"
6691 );
6692 }
6693 }
6694 }
6695 }
6696
6697 let finalizes = reporter.finalizes.lock();
6698 for (view, payloads) in finalizes.iter() {
6699 let signers = finalized_by_honest_signer.entry(*view).or_default();
6700 for (digest, payload_signers) in payloads.iter() {
6701 for signer in payload_signers.iter() {
6702 if twin_identities.contains(signer) {
6703 continue;
6704 }
6705 if let Some(existing) = signers.insert(signer.clone(), *digest) {
6706 assert_eq!(
6707 existing, *digest,
6708 "honest signer produced conflicting finalizes at view {view}"
6709 );
6710 }
6711 }
6712 }
6713 }
6714 }
6715
6716 for reporter in reporters.iter().skip(honest_start) {
6718 let faults = reporter.faults.lock();
6719 for (faulter, _) in faults.iter() {
6720 assert!(
6721 twin_identities.contains(faulter),
6722 "fault from non-twin participant"
6723 );
6724 }
6725 }
6726
6727 let blocked = oracle.blocked().await.unwrap();
6728 for (_, faulter) in blocked {
6729 assert!(
6730 twin_identities.contains(&faulter),
6731 "blocked peer attributed to non-twin participant"
6732 );
6733 }
6734 });
6735 }
6736 }
6737
6738 const TWINS_CAMPAIGN: TwinsCampaign = TwinsCampaign {
6739 n: 5,
6740 rounds: 3,
6741 mode: twins::Mode::Sampled,
6742 max_cases: 20,
6743 trailing_finalizations: 10,
6744 };
6745
6746 const TWINS_LINK: Link = Link {
6747 latency: Duration::from_millis(500),
6748 jitter: Duration::from_millis(500),
6749 success_rate: 1.0,
6750 };
6751
6752 #[test_group("slow")]
6753 #[test_traced("INFO")]
6754 fn test_twins_sampled() {
6755 for link in [
6756 Link {
6757 latency: Duration::from_millis(10),
6758 jitter: Duration::from_millis(10),
6759 success_rate: 1.0,
6760 },
6761 TWINS_LINK,
6762 ] {
6763 twins_campaign::<_, _, RoundRobin>(
6764 &mut test_rng(),
6765 TWINS_CAMPAIGN,
6766 link,
6767 scheme_mocks::fixture,
6768 );
6769 }
6770 }
6771
6772 #[test_group("slow")]
6773 #[test_traced("INFO")]
6774 fn test_twins_sustained() {
6775 let campaign = TwinsCampaign {
6776 mode: twins::Mode::Sustained,
6777 ..TWINS_CAMPAIGN
6778 };
6779 for link in [
6780 Link {
6781 latency: Duration::from_millis(10),
6782 jitter: Duration::from_millis(10),
6783 success_rate: 1.0,
6784 },
6785 TWINS_LINK,
6786 ] {
6787 twins_campaign::<_, _, RoundRobin>(
6788 &mut test_rng(),
6789 campaign,
6790 link,
6791 scheme_mocks::fixture,
6792 );
6793 }
6794 }
6795
6796 #[test_group("slow")]
6797 #[test_traced("INFO")]
6798 fn test_twins_large_sampled() {
6799 let campaign = TwinsCampaign {
6800 n: 10,
6801 rounds: 5,
6802 ..TWINS_CAMPAIGN
6803 };
6804 twins_campaign::<_, _, RoundRobin>(
6805 &mut test_rng(),
6806 campaign,
6807 TWINS_LINK,
6808 scheme_mocks::fixture,
6809 );
6810 }
6811
6812 #[test_group("slow")]
6813 #[test_traced("INFO")]
6814 fn test_twins_large_sustained() {
6815 let campaign = TwinsCampaign {
6816 n: 10,
6817 rounds: 5,
6818 mode: twins::Mode::Sustained,
6819 ..TWINS_CAMPAIGN
6820 };
6821 twins_campaign::<_, _, RoundRobin>(
6822 &mut test_rng(),
6823 campaign,
6824 TWINS_LINK,
6825 scheme_mocks::fixture,
6826 );
6827 }
6828
6829 #[test_group("slow")]
6830 #[test_traced("INFO")]
6831 fn test_twins_multisig_min_pk() {
6832 twins_campaign::<_, _, RoundRobin>(
6833 &mut test_rng(),
6834 TWINS_CAMPAIGN,
6835 TWINS_LINK,
6836 bls12381_multisig::fixture::<MinPk, _>,
6837 );
6838 }
6839
6840 #[test_group("slow")]
6841 #[test_traced("INFO")]
6842 fn test_twins_multisig_min_sig() {
6843 twins_campaign::<_, _, RoundRobin>(
6844 &mut test_rng(),
6845 TWINS_CAMPAIGN,
6846 TWINS_LINK,
6847 bls12381_multisig::fixture::<MinSig, _>,
6848 );
6849 }
6850
6851 #[test_group("slow")]
6852 #[test_traced("INFO")]
6853 fn test_twins_threshold_vrf_min_pk() {
6854 twins_campaign::<_, _, Random>(
6855 &mut test_rng(),
6856 TWINS_CAMPAIGN,
6857 TWINS_LINK,
6858 bls12381_threshold_vrf::fixture::<MinPk, _>,
6859 );
6860 }
6861
6862 #[test_group("slow")]
6863 #[test_traced("INFO")]
6864 fn test_twins_threshold_vrf_min_sig() {
6865 twins_campaign::<_, _, Random>(
6866 &mut test_rng(),
6867 TWINS_CAMPAIGN,
6868 TWINS_LINK,
6869 bls12381_threshold_vrf::fixture::<MinSig, _>,
6870 );
6871 }
6872
6873 #[test_group("slow")]
6874 #[test_traced("INFO")]
6875 fn test_twins_threshold_std_min_pk() {
6876 twins_campaign::<_, _, RoundRobin>(
6877 &mut test_rng(),
6878 TWINS_CAMPAIGN,
6879 TWINS_LINK,
6880 bls12381_threshold_std::fixture::<MinPk, _>,
6881 );
6882 }
6883
6884 #[test_group("slow")]
6885 #[test_traced("INFO")]
6886 fn test_twins_threshold_std_min_sig() {
6887 twins_campaign::<_, _, RoundRobin>(
6888 &mut test_rng(),
6889 TWINS_CAMPAIGN,
6890 TWINS_LINK,
6891 bls12381_threshold_std::fixture::<MinSig, _>,
6892 );
6893 }
6894
6895 #[test_group("slow")]
6896 #[test_traced("INFO")]
6897 fn test_twins_ed25519() {
6898 twins_campaign::<_, _, RoundRobin>(
6899 &mut test_rng(),
6900 TWINS_CAMPAIGN,
6901 TWINS_LINK,
6902 ed25519::fixture,
6903 );
6904 }
6905
6906 #[test_group("slow")]
6907 #[test_traced("INFO")]
6908 fn test_twins_secp256r1() {
6909 twins_campaign::<_, _, RoundRobin>(
6910 &mut test_rng(),
6911 TWINS_CAMPAIGN,
6912 TWINS_LINK,
6913 secp256r1::fixture,
6914 );
6915 }
6916}