1pub mod elector;
331pub mod scheme;
332pub mod types;
333
334cfg_if::cfg_if! {
335 if #[cfg(not(target_arch = "wasm32"))] {
336 mod actors;
337 pub mod config;
338 pub use config::Config;
339 mod engine;
340 pub use engine::Engine;
341 mod metrics;
342 }
343}
344
345#[cfg(any(test, feature = "mocks"))]
346pub mod mocks;
347
348#[cfg(not(target_arch = "wasm32"))]
349use crate::types::{View, ViewDelta};
350
351#[cfg(not(target_arch = "wasm32"))]
353pub(crate) const fn min_active(activity_timeout: ViewDelta, last_finalized: View) -> View {
354 last_finalized.saturating_sub(activity_timeout)
355}
356
357#[cfg(not(target_arch = "wasm32"))]
361pub(crate) fn interesting(
362 activity_timeout: ViewDelta,
363 last_finalized: View,
364 current: View,
365 pending: View,
366 allow_future: bool,
367) -> bool {
368 if pending.is_zero() {
370 return false;
371 }
372 if pending < min_active(activity_timeout, last_finalized) {
373 return false;
374 }
375 if !allow_future && pending > current.next() {
376 return false;
377 }
378 true
379}
380
381#[cfg(test)]
383pub(crate) fn quorum(n: u32) -> u32 {
384 use commonware_utils::{Faults, N3f1};
385
386 N3f1::quorum(n)
387}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392 use crate::{
393 simplex::{
394 elector::{Config as Elector, Random, RoundRobin},
395 mocks::twins::Strategy,
396 scheme::{
397 bls12381_multisig,
398 bls12381_threshold::{
399 standard as bls12381_threshold_std,
400 vrf::{self as bls12381_threshold_vrf, Seedable},
401 },
402 ed25519, secp256r1, Scheme,
403 },
404 types::{
405 Certificate, Finalization as TFinalization, Finalize as TFinalize,
406 Notarization as TNotarization, Notarize as TNotarize,
407 Nullification as TNullification, Nullify as TNullify, Proposal, Vote,
408 },
409 },
410 types::{Epoch, Round},
411 Monitor, Viewable,
412 };
413 use commonware_codec::{Decode, DecodeExt, Encode};
414 use commonware_cryptography::{
415 bls12381::primitives::variant::{MinPk, MinSig, Variant},
416 certificate::mocks::Fixture,
417 ed25519::{PrivateKey, PublicKey},
418 sha256::{Digest as Sha256Digest, Digest as D},
419 Hasher as _, Sha256, Signer as _,
420 };
421 use commonware_macros::{select, test_group, test_traced};
422 use commonware_p2p::{
423 simulated::{Config, Link, Network, Oracle, Receiver, Sender, SplitOrigin, SplitTarget},
424 Recipients, Sender as _,
425 };
426 use commonware_parallel::Sequential;
427 use commonware_runtime::{
428 buffer::paged::CacheRef, count_running_tasks, deterministic, Clock, IoBuf, Metrics, Quota,
429 Runner, Spawner,
430 };
431 use commonware_utils::{sync::Mutex, test_rng, Faults, N3f1, NZUsize, NZU16};
432 use engine::Engine;
433 use futures::future::join_all;
434 use rand::{rngs::StdRng, Rng as _};
435 use std::{
436 collections::{BTreeMap, HashMap},
437 num::{NonZeroU16, NonZeroU32, NonZeroUsize},
438 sync::Arc,
439 time::Duration,
440 };
441 use tracing::{debug, info, warn};
442 use types::Activity;
443
444 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
445 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
446 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
447
448 #[test]
449 fn test_interesting() {
450 let activity_timeout = ViewDelta::new(10);
451
452 assert!(!interesting(
454 activity_timeout,
455 View::zero(),
456 View::zero(),
457 View::zero(),
458 false
459 ));
460 assert!(!interesting(
461 activity_timeout,
462 View::zero(),
463 View::new(1),
464 View::zero(),
465 true
466 ));
467
468 assert!(!interesting(
470 activity_timeout,
471 View::new(20),
472 View::new(25),
473 View::new(5), false
475 ));
476
477 assert!(interesting(
479 activity_timeout,
480 View::new(20),
481 View::new(25),
482 View::new(10), false
484 ));
485
486 assert!(!interesting(
488 activity_timeout,
489 View::new(20),
490 View::new(25),
491 View::new(27),
492 false
493 ));
494
495 assert!(interesting(
497 activity_timeout,
498 View::new(20),
499 View::new(25),
500 View::new(27),
501 true
502 ));
503
504 assert!(interesting(
506 activity_timeout,
507 View::new(20),
508 View::new(25),
509 View::new(26),
510 false
511 ));
512
513 assert!(interesting(
515 activity_timeout,
516 View::new(20),
517 View::new(25),
518 View::new(22),
519 false
520 ));
521
522 assert!(interesting(
525 activity_timeout,
526 View::zero(),
527 View::new(5),
528 View::new(1),
529 false
530 ));
531 }
532
533 async fn register_validator(
535 oracle: &mut Oracle<PublicKey, deterministic::Context>,
536 validator: PublicKey,
537 ) -> (
538 (
539 Sender<PublicKey, deterministic::Context>,
540 Receiver<PublicKey>,
541 ),
542 (
543 Sender<PublicKey, deterministic::Context>,
544 Receiver<PublicKey>,
545 ),
546 (
547 Sender<PublicKey, deterministic::Context>,
548 Receiver<PublicKey>,
549 ),
550 ) {
551 let control = oracle.control(validator.clone());
552 let (vote_sender, vote_receiver) = control.register(0, TEST_QUOTA).await.unwrap();
553 let (certificate_sender, certificate_receiver) =
554 control.register(1, TEST_QUOTA).await.unwrap();
555 let (resolver_sender, resolver_receiver) = control.register(2, TEST_QUOTA).await.unwrap();
556 (
557 (vote_sender, vote_receiver),
558 (certificate_sender, certificate_receiver),
559 (resolver_sender, resolver_receiver),
560 )
561 }
562
563 async fn register_validators(
565 oracle: &mut Oracle<PublicKey, deterministic::Context>,
566 validators: &[PublicKey],
567 ) -> HashMap<
568 PublicKey,
569 (
570 (
571 Sender<PublicKey, deterministic::Context>,
572 Receiver<PublicKey>,
573 ),
574 (
575 Sender<PublicKey, deterministic::Context>,
576 Receiver<PublicKey>,
577 ),
578 (
579 Sender<PublicKey, deterministic::Context>,
580 Receiver<PublicKey>,
581 ),
582 ),
583 > {
584 let mut registrations = HashMap::new();
585 for validator in validators.iter() {
586 let registration = register_validator(oracle, validator.clone()).await;
587 registrations.insert(validator.clone(), registration);
588 }
589 registrations
590 }
591
592 enum Action {
594 Link(Link),
595 Update(Link), Unlink,
597 }
598
599 async fn link_validators(
605 oracle: &mut Oracle<PublicKey, deterministic::Context>,
606 validators: &[PublicKey],
607 action: Action,
608 restrict_to: Option<fn(usize, usize, usize) -> bool>,
609 ) {
610 for (i1, v1) in validators.iter().enumerate() {
611 for (i2, v2) in validators.iter().enumerate() {
612 if v2 == v1 {
614 continue;
615 }
616
617 if let Some(f) = restrict_to {
619 if !f(validators.len(), i1, i2) {
620 continue;
621 }
622 }
623
624 match action {
626 Action::Update(_) | Action::Unlink => {
627 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
628 }
629 _ => {}
630 }
631
632 match action {
634 Action::Link(ref link) | Action::Update(ref link) => {
635 oracle
636 .add_link(v1.clone(), v2.clone(), link.clone())
637 .await
638 .unwrap();
639 }
640 _ => {}
641 }
642 }
643 }
644 }
645
646 fn count_nonzero_metric_lines(encoded: &str, patterns: &[&str]) -> u32 {
648 encoded
649 .lines()
650 .filter(|line| patterns.iter().all(|p| line.contains(p)))
651 .filter(|line| {
652 line.split_whitespace()
653 .last()
654 .and_then(|s| s.parse::<u64>().ok())
655 .is_some_and(|n| n > 0)
656 })
657 .count() as u32
658 }
659
660 fn all_online<S, F, L>(mut fixture: F)
661 where
662 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
663 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
664 L: Elector<S>,
665 {
666 let n = 5;
668 let quorum = quorum(n) as usize;
669 let required_containers = View::new(100);
670 let activity_timeout = ViewDelta::new(10);
671 let skip_timeout = ViewDelta::new(5);
672 let namespace = b"consensus".to_vec();
673 let executor = deterministic::Runner::timed(Duration::from_secs(300));
674 executor.start(|mut context| async move {
675 let (network, mut oracle) = Network::new(
677 context.with_label("network"),
678 Config {
679 max_size: 1024 * 1024,
680 disconnect_on_block: true,
681 tracked_peer_sets: None,
682 },
683 );
684
685 network.start();
687
688 let Fixture {
690 participants,
691 schemes,
692 ..
693 } = fixture(&mut context, &namespace, n);
694 let mut registrations = register_validators(&mut oracle, &participants).await;
695
696 let link = Link {
698 latency: Duration::from_millis(10),
699 jitter: Duration::from_millis(1),
700 success_rate: 1.0,
701 };
702 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
703
704 let elector = L::default();
706 let relay = Arc::new(mocks::relay::Relay::new());
707 let mut reporters = Vec::new();
708 let mut engine_handlers = Vec::new();
709 for (idx, validator) in participants.iter().enumerate() {
710 let context = context.with_label(&format!("validator_{}", *validator));
712
713 let reporter_config = mocks::reporter::Config {
715 participants: participants.clone().try_into().unwrap(),
716 scheme: schemes[idx].clone(),
717 elector: elector.clone(),
718 };
719 let reporter =
720 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
721 reporters.push(reporter.clone());
722 let application_cfg = mocks::application::Config {
723 hasher: Sha256::default(),
724 relay: relay.clone(),
725 me: validator.clone(),
726 propose_latency: (10.0, 5.0),
727 verify_latency: (10.0, 5.0),
728 certify_latency: (10.0, 5.0),
729 should_certify: mocks::application::Certifier::Sometimes,
730 };
731 let (actor, application) = mocks::application::Application::new(
732 context.with_label("application"),
733 application_cfg,
734 );
735 actor.start();
736 let blocker = oracle.control(validator.clone());
737 let cfg = config::Config {
738 scheme: schemes[idx].clone(),
739 elector: elector.clone(),
740 blocker,
741 automaton: application.clone(),
742 relay: application.clone(),
743 reporter: reporter.clone(),
744 strategy: Sequential,
745 partition: validator.to_string(),
746 mailbox_size: 1024,
747 epoch: Epoch::new(333),
748 leader_timeout: Duration::from_secs(1),
749 certification_timeout: Duration::from_secs(2),
750 timeout_retry: Duration::from_secs(10),
751 fetch_timeout: Duration::from_secs(1),
752 activity_timeout,
753 skip_timeout,
754 fetch_concurrent: 4,
755 replay_buffer: NZUsize!(1024 * 1024),
756 write_buffer: NZUsize!(1024 * 1024),
757 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
758 };
759 let engine = Engine::new(context.with_label("engine"), cfg);
760
761 let (pending, recovered, resolver) = registrations
763 .remove(validator)
764 .expect("validator should be registered");
765 engine_handlers.push(engine.start(pending, recovered, resolver));
766 }
767
768 let mut finalizers = Vec::new();
770 for reporter in reporters.iter_mut() {
771 let (mut latest, mut monitor) = reporter.subscribe().await;
772 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
773 while latest < required_containers {
774 latest = monitor.recv().await.expect("event missing");
775 }
776 }));
777 }
778 join_all(finalizers).await;
779
780 let latest_complete = required_containers.saturating_sub(activity_timeout);
782 for reporter in reporters.iter() {
783 {
785 let faults = reporter.faults.lock();
786 assert!(faults.is_empty());
787 }
788
789 {
791 let invalid = reporter.invalid.lock();
792 assert_eq!(*invalid, 0);
793 }
794
795 {
797 let certified = reporter.certified.lock();
798 for view in View::range(View::new(1), latest_complete) {
799 if !certified.contains(&view) {
801 panic!("view: {view}");
802 }
803 }
804 }
805
806 let mut notarized = HashMap::new();
808 let mut finalized = HashMap::new();
809 {
810 let notarizes = reporter.notarizes.lock();
811 for view in View::range(View::new(1), latest_complete) {
812 let Some(payloads) = notarizes.get(&view) else {
814 continue;
815 };
816 if payloads.len() > 1 {
817 panic!("view: {view}");
818 }
819 let (digest, notarizers) = payloads.iter().next().unwrap();
820 notarized.insert(view, *digest);
821
822 if notarizers.len() < quorum {
823 panic!("view: {view}");
826 }
827 }
828 }
829 {
830 let notarizations = reporter.notarizations.lock();
831 for view in View::range(View::new(1), latest_complete) {
832 let Some(notarization) = notarizations.get(&view) else {
834 continue;
835 };
836 let Some(digest) = notarized.get(&view) else {
837 continue;
838 };
839 assert_eq!(¬arization.proposal.payload, digest);
840 }
841 }
842 {
843 let finalizes = reporter.finalizes.lock();
844 for view in View::range(View::new(1), latest_complete) {
845 let Some(payloads) = finalizes.get(&view) else {
847 continue;
848 };
849 if payloads.len() > 1 {
850 panic!("view: {view}");
851 }
852 let (digest, finalizers) = payloads.iter().next().unwrap();
853 finalized.insert(view, *digest);
854
855 if view > latest_complete {
857 continue;
858 }
859
860 if finalizers.len() < quorum {
862 panic!("view: {view}");
865 }
866
867 let nullifies = reporter.nullifies.lock();
869 let Some(nullifies) = nullifies.get(&view) else {
870 continue;
871 };
872 for (_, finalizers) in payloads.iter() {
873 for finalizer in finalizers.iter() {
874 if nullifies.contains(finalizer) {
875 panic!("should not nullify and finalize at same view");
876 }
877 }
878 }
879 }
880 }
881 {
882 let finalizations = reporter.finalizations.lock();
883 for view in View::range(View::new(1), latest_complete) {
884 let Some(finalization) = finalizations.get(&view) else {
886 continue;
887 };
888 let Some(digest) = finalized.get(&view) else {
889 continue;
890 };
891 assert_eq!(&finalization.proposal.payload, digest);
892 }
893 }
894 }
895
896 let blocked = oracle.blocked().await.unwrap();
898 assert!(blocked.is_empty());
899 });
900 }
901
902 #[test_group("slow")]
903 #[test_traced]
904 fn test_all_online() {
905 all_online::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
906 all_online::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
907 all_online::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
908 all_online::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
909 all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
910 all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
911 all_online::<_, _, RoundRobin>(ed25519::fixture);
912 all_online::<_, _, RoundRobin>(secp256r1::fixture);
913 }
914
915 fn observer<S, F, L>(mut fixture: F)
916 where
917 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
918 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
919 L: Elector<S>,
920 {
921 let n_active = 5;
923 let required_containers = View::new(100);
924 let activity_timeout = ViewDelta::new(10);
925 let skip_timeout = ViewDelta::new(5);
926 let namespace = b"consensus".to_vec();
927 let executor = deterministic::Runner::timed(Duration::from_secs(300));
928 executor.start(|mut context| async move {
929 let (network, mut oracle) = Network::new(
931 context.with_label("network"),
932 Config {
933 max_size: 1024 * 1024,
934 disconnect_on_block: true,
935 tracked_peer_sets: None,
936 },
937 );
938
939 network.start();
941
942 let Fixture {
944 participants,
945 schemes,
946 verifier,
947 ..
948 } = fixture(&mut context, &namespace, n_active);
949
950 let private_key_observer = PrivateKey::from_seed(n_active as u64);
952 let public_key_observer = private_key_observer.public_key();
953
954 let mut all_validators = participants.clone();
956 all_validators.push(public_key_observer.clone());
957 all_validators.sort();
958 let mut registrations = register_validators(&mut oracle, &all_validators).await;
959
960 let link = Link {
962 latency: Duration::from_millis(10),
963 jitter: Duration::from_millis(1),
964 success_rate: 1.0,
965 };
966 link_validators(&mut oracle, &all_validators, Action::Link(link), None).await;
967
968 let elector = L::default();
970 let relay = Arc::new(mocks::relay::Relay::new());
971 let mut reporters = Vec::new();
972
973 for (idx, validator) in participants.iter().enumerate() {
974 let is_observer = *validator == public_key_observer;
975
976 let context = context.with_label(&format!("validator_{}", *validator));
978
979 let signing = if is_observer {
981 verifier.clone()
982 } else {
983 schemes[idx].clone()
984 };
985 let reporter_config = mocks::reporter::Config {
986 participants: participants.clone().try_into().unwrap(),
987 scheme: signing.clone(),
988 elector: elector.clone(),
989 };
990 let reporter =
991 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
992 reporters.push(reporter.clone());
993 let application_cfg = mocks::application::Config {
994 hasher: Sha256::default(),
995 relay: relay.clone(),
996 me: validator.clone(),
997 propose_latency: (10.0, 5.0),
998 verify_latency: (10.0, 5.0),
999 certify_latency: (10.0, 5.0),
1000 should_certify: mocks::application::Certifier::Sometimes,
1001 };
1002 let (actor, application) = mocks::application::Application::new(
1003 context.with_label("application"),
1004 application_cfg,
1005 );
1006 actor.start();
1007 let blocker = oracle.control(validator.clone());
1008 let cfg = config::Config {
1009 scheme: signing.clone(),
1010 elector: elector.clone(),
1011 blocker,
1012 automaton: application.clone(),
1013 relay: application.clone(),
1014 reporter: reporter.clone(),
1015 strategy: Sequential,
1016 partition: validator.to_string(),
1017 mailbox_size: 1024,
1018 epoch: Epoch::new(333),
1019 leader_timeout: Duration::from_secs(1),
1020 certification_timeout: Duration::from_secs(2),
1021 timeout_retry: Duration::from_secs(10),
1022 fetch_timeout: Duration::from_secs(1),
1023 activity_timeout,
1024 skip_timeout,
1025 fetch_concurrent: 4,
1026 replay_buffer: NZUsize!(1024 * 1024),
1027 write_buffer: NZUsize!(1024 * 1024),
1028 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1029 };
1030 let engine = Engine::new(context.with_label("engine"), cfg);
1031
1032 let (pending, recovered, resolver) = registrations
1034 .remove(validator)
1035 .expect("validator should be registered");
1036 engine.start(pending, recovered, resolver);
1037 }
1038
1039 let mut finalizers = Vec::new();
1041 for reporter in reporters.iter_mut() {
1042 let (mut latest, mut monitor) = reporter.subscribe().await;
1043 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1044 while latest < required_containers {
1045 latest = monitor.recv().await.expect("event missing");
1046 }
1047 }));
1048 }
1049 join_all(finalizers).await;
1050
1051 for reporter in reporters.iter() {
1053 {
1055 let faults = reporter.faults.lock();
1056 assert!(faults.is_empty());
1057 }
1058 {
1059 let invalid = reporter.invalid.lock();
1060 assert_eq!(*invalid, 0);
1061 }
1062
1063 let blocked = oracle.blocked().await.unwrap();
1065 assert!(blocked.is_empty());
1066 }
1067 });
1068 }
1069
1070 #[test_group("slow")]
1071 #[test_traced]
1072 fn test_observer() {
1073 observer::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1074 observer::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1075 observer::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1076 observer::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1077 observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1078 observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1079 observer::<_, _, RoundRobin>(ed25519::fixture);
1080 observer::<_, _, RoundRobin>(secp256r1::fixture);
1081 }
1082
1083 fn unclean_shutdown<S, F, L>(mut fixture: F)
1084 where
1085 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1086 F: FnMut(&mut StdRng, &[u8], u32) -> Fixture<S>,
1087 L: Elector<S>,
1088 {
1089 let n = 5;
1091 let required_containers = View::new(100);
1092 let activity_timeout = ViewDelta::new(10);
1093 let skip_timeout = ViewDelta::new(5);
1094 let namespace = b"consensus".to_vec();
1095
1096 let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
1098 let supervised = Arc::new(Mutex::new(Vec::new()));
1099 let mut prev_checkpoint = None;
1100
1101 let mut rng = test_rng();
1103 let Fixture {
1104 participants,
1105 schemes,
1106 ..
1107 } = fixture(&mut rng, &namespace, n);
1108
1109 let relay = Arc::new(mocks::relay::Relay::<Sha256Digest, S::PublicKey>::new());
1111
1112 loop {
1113 let rng = rng.clone();
1114 let participants = participants.clone();
1115 let schemes = schemes.clone();
1116 let shutdowns = shutdowns.clone();
1117 let supervised = supervised.clone();
1118 let relay = relay.clone();
1119 relay.deregister_all(); let f = |mut context: deterministic::Context| async move {
1122 let (network, mut oracle) = Network::new(
1124 context.with_label("network"),
1125 Config {
1126 max_size: 1024 * 1024,
1127 disconnect_on_block: true,
1128 tracked_peer_sets: None,
1129 },
1130 );
1131
1132 network.start();
1134
1135 let mut registrations = register_validators(&mut oracle, &participants).await;
1137
1138 let link = Link {
1140 latency: Duration::from_millis(50),
1141 jitter: Duration::from_millis(50),
1142 success_rate: 1.0,
1143 };
1144 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1145
1146 let elector = L::default();
1148 let relay = Arc::new(mocks::relay::Relay::new());
1149 let mut reporters = HashMap::new();
1150 let mut engine_handlers = Vec::new();
1151 for (idx, validator) in participants.iter().enumerate() {
1152 let context = context.with_label(&format!("validator_{}", *validator));
1154
1155 let reporter_config = mocks::reporter::Config {
1157 participants: participants.clone().try_into().unwrap(),
1158 scheme: schemes[idx].clone(),
1159 elector: elector.clone(),
1160 };
1161 let reporter = mocks::reporter::Reporter::new(rng.clone(), reporter_config);
1162 reporters.insert(validator.clone(), reporter.clone());
1163 let application_cfg = mocks::application::Config {
1164 hasher: Sha256::default(),
1165 relay: relay.clone(),
1166 me: validator.clone(),
1167 propose_latency: (10.0, 5.0),
1168 verify_latency: (10.0, 5.0),
1169 certify_latency: (10.0, 5.0),
1170 should_certify: mocks::application::Certifier::Sometimes,
1171 };
1172 let (actor, application) = mocks::application::Application::new(
1173 context.with_label("application"),
1174 application_cfg,
1175 );
1176 actor.start();
1177 let blocker = oracle.control(validator.clone());
1178 let cfg = config::Config {
1179 scheme: schemes[idx].clone(),
1180 elector: elector.clone(),
1181 blocker,
1182 automaton: application.clone(),
1183 relay: application.clone(),
1184 reporter: reporter.clone(),
1185 strategy: Sequential,
1186 partition: validator.to_string(),
1187 mailbox_size: 1024,
1188 epoch: Epoch::new(333),
1189 leader_timeout: Duration::from_secs(1),
1190 certification_timeout: Duration::from_secs(2),
1191 timeout_retry: Duration::from_secs(10),
1192 fetch_timeout: Duration::from_secs(1),
1193 activity_timeout,
1194 skip_timeout,
1195 fetch_concurrent: 4,
1196 replay_buffer: NZUsize!(1024 * 1024),
1197 write_buffer: NZUsize!(1024 * 1024),
1198 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1199 };
1200 let engine = Engine::new(context.with_label("engine"), cfg);
1201
1202 let (pending, recovered, resolver) = registrations
1204 .remove(validator)
1205 .expect("validator should be registered");
1206 engine_handlers.push(engine.start(pending, recovered, resolver));
1207 }
1208
1209 let mut finalizers = Vec::new();
1211 for (_, reporter) in reporters.iter_mut() {
1212 let (mut latest, mut monitor) = reporter.subscribe().await;
1213 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1214 while latest < required_containers {
1215 latest = monitor.recv().await.expect("event missing");
1216 }
1217 }));
1218 }
1219
1220 let wait =
1222 context.gen_range(Duration::from_millis(100)..Duration::from_millis(2_000));
1223 let result = select! {
1224 _ = context.sleep(wait) => {
1225 {
1227 let mut shutdowns = shutdowns.lock();
1228 debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
1229 *shutdowns += 1;
1230 }
1231 supervised.lock().push(reporters);
1232 false
1233 },
1234 _ = join_all(finalizers) => {
1235 let supervised = supervised.lock();
1237 for reporters in supervised.iter() {
1238 for (_, reporter) in reporters.iter() {
1239 let faults = reporter.faults.lock();
1240 assert!(faults.is_empty());
1241 }
1242 }
1243 true
1244 },
1245 };
1246
1247 let blocked = oracle.blocked().await.unwrap();
1249 assert!(blocked.is_empty());
1250
1251 result
1252 };
1253
1254 let (complete, checkpoint) = prev_checkpoint
1255 .map_or_else(
1256 || deterministic::Runner::timed(Duration::from_secs(180)),
1257 deterministic::Runner::from,
1258 )
1259 .start_and_recover(f);
1260
1261 if complete {
1263 break;
1264 }
1265
1266 prev_checkpoint = Some(checkpoint);
1267 }
1268 }
1269
1270 #[test_group("slow")]
1271 #[test_traced]
1272 fn test_unclean_shutdown() {
1273 unclean_shutdown::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1274 unclean_shutdown::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1275 unclean_shutdown::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1276 unclean_shutdown::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1277 unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1278 unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1279 unclean_shutdown::<_, _, RoundRobin>(ed25519::fixture);
1280 unclean_shutdown::<_, _, RoundRobin>(secp256r1::fixture);
1281 }
1282
1283 fn backfill<S, F, L>(mut fixture: F)
1284 where
1285 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1286 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1287 L: Elector<S>,
1288 {
1289 let n = 4;
1291 let required_containers = View::new(100);
1292 let activity_timeout = ViewDelta::new(10);
1293 let skip_timeout = ViewDelta::new(5);
1294 let namespace = b"consensus".to_vec();
1295 let executor = deterministic::Runner::timed(Duration::from_secs(240));
1296 executor.start(|mut context| async move {
1297 let (network, mut oracle) = Network::new(
1299 context.with_label("network"),
1300 Config {
1301 max_size: 1024 * 1024,
1302 disconnect_on_block: true,
1303 tracked_peer_sets: None,
1304 },
1305 );
1306
1307 network.start();
1309
1310 let Fixture {
1312 participants,
1313 schemes,
1314 ..
1315 } = fixture(&mut context, &namespace, n);
1316 let mut registrations = register_validators(&mut oracle, &participants).await;
1317
1318 let link = Link {
1320 latency: Duration::from_millis(10),
1321 jitter: Duration::from_millis(1),
1322 success_rate: 1.0,
1323 };
1324 link_validators(
1325 &mut oracle,
1326 &participants,
1327 Action::Link(link),
1328 Some(|_, i, j| ![i, j].contains(&0usize)),
1329 )
1330 .await;
1331
1332 let elector = L::default();
1334 let relay = Arc::new(mocks::relay::Relay::new());
1335 let mut reporters = Vec::new();
1336 let mut engine_handlers = Vec::new();
1337 for (idx_scheme, validator) in participants.iter().enumerate() {
1338 if idx_scheme == 0 {
1340 continue;
1341 }
1342
1343 let context = context.with_label(&format!("validator_{}", *validator));
1345
1346 let reporter_config = mocks::reporter::Config {
1348 participants: participants.clone().try_into().unwrap(),
1349 scheme: schemes[idx_scheme].clone(),
1350 elector: elector.clone(),
1351 };
1352 let reporter =
1353 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1354 reporters.push(reporter.clone());
1355 let application_cfg = mocks::application::Config {
1356 hasher: Sha256::default(),
1357 relay: relay.clone(),
1358 me: validator.clone(),
1359 propose_latency: (10.0, 5.0),
1360 verify_latency: (10.0, 5.0),
1361 certify_latency: (10.0, 5.0),
1362 should_certify: mocks::application::Certifier::Sometimes,
1363 };
1364 let (actor, application) = mocks::application::Application::new(
1365 context.with_label("application"),
1366 application_cfg,
1367 );
1368 actor.start();
1369 let blocker = oracle.control(validator.clone());
1370 let cfg = config::Config {
1371 scheme: schemes[idx_scheme].clone(),
1372 elector: elector.clone(),
1373 blocker,
1374 automaton: application.clone(),
1375 relay: application.clone(),
1376 reporter: reporter.clone(),
1377 strategy: Sequential,
1378 partition: validator.to_string(),
1379 mailbox_size: 1024,
1380 epoch: Epoch::new(333),
1381 leader_timeout: Duration::from_secs(1),
1382 certification_timeout: Duration::from_secs(2),
1383 timeout_retry: Duration::from_secs(10),
1384 fetch_timeout: Duration::from_secs(1),
1385 activity_timeout,
1386 skip_timeout,
1387 fetch_concurrent: 4,
1388 replay_buffer: NZUsize!(1024 * 1024),
1389 write_buffer: NZUsize!(1024 * 1024),
1390 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1391 };
1392 let engine = Engine::new(context.with_label("engine"), cfg);
1393
1394 let (pending, recovered, resolver) = registrations
1396 .remove(validator)
1397 .expect("validator should be registered");
1398 engine_handlers.push(engine.start(pending, recovered, resolver));
1399 }
1400
1401 let mut finalizers = Vec::new();
1403 for reporter in reporters.iter_mut() {
1404 let (mut latest, mut monitor) = reporter.subscribe().await;
1405 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1406 while latest < required_containers {
1407 latest = monitor.recv().await.expect("event missing");
1408 }
1409 }));
1410 }
1411 join_all(finalizers).await;
1412
1413 let link = Link {
1415 latency: Duration::from_secs(3),
1416 jitter: Duration::from_millis(0),
1417 success_rate: 1.0,
1418 };
1419 link_validators(
1420 &mut oracle,
1421 &participants,
1422 Action::Update(link.clone()),
1423 Some(|_, i, j| ![i, j].contains(&0usize)),
1424 )
1425 .await;
1426
1427 context.sleep(Duration::from_secs(60)).await;
1429
1430 link_validators(
1432 &mut oracle,
1433 &participants,
1434 Action::Unlink,
1435 Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
1436 )
1437 .await;
1438
1439 let me = participants[0].clone();
1441 let context = context.with_label(&format!("validator_{me}"));
1442
1443 link_validators(
1445 &mut oracle,
1446 &participants,
1447 Action::Link(link),
1448 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
1449 )
1450 .await;
1451
1452 let link = Link {
1454 latency: Duration::from_millis(10),
1455 jitter: Duration::from_millis(3),
1456 success_rate: 1.0,
1457 };
1458 link_validators(
1459 &mut oracle,
1460 &participants,
1461 Action::Update(link),
1462 Some(|_, i, j| ![i, j].contains(&1usize)),
1463 )
1464 .await;
1465
1466 let reporter_config = mocks::reporter::Config {
1468 participants: participants.clone().try_into().unwrap(),
1469 scheme: schemes[0].clone(),
1470 elector: elector.clone(),
1471 };
1472 let mut reporter =
1473 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1474 reporters.push(reporter.clone());
1475 let application_cfg = mocks::application::Config {
1476 hasher: Sha256::default(),
1477 relay: relay.clone(),
1478 me: me.clone(),
1479 propose_latency: (10.0, 5.0),
1480 verify_latency: (10.0, 5.0),
1481 certify_latency: (10.0, 5.0),
1482 should_certify: mocks::application::Certifier::Sometimes,
1483 };
1484 let (actor, application) = mocks::application::Application::new(
1485 context.with_label("application"),
1486 application_cfg,
1487 );
1488 actor.start();
1489 let blocker = oracle.control(me.clone());
1490 let cfg = config::Config {
1491 scheme: schemes[0].clone(),
1492 elector: elector.clone(),
1493 blocker,
1494 automaton: application.clone(),
1495 relay: application.clone(),
1496 reporter: reporter.clone(),
1497 strategy: Sequential,
1498 partition: me.to_string(),
1499 mailbox_size: 1024,
1500 epoch: Epoch::new(333),
1501 leader_timeout: Duration::from_secs(1),
1502 certification_timeout: Duration::from_secs(2),
1503 timeout_retry: Duration::from_secs(10),
1504 fetch_timeout: Duration::from_secs(1),
1505 activity_timeout,
1506 skip_timeout,
1507 fetch_concurrent: 4,
1508 replay_buffer: NZUsize!(1024 * 1024),
1509 write_buffer: NZUsize!(1024 * 1024),
1510 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1511 };
1512 let engine = Engine::new(context.with_label("engine"), cfg);
1513
1514 let (pending, recovered, resolver) = registrations
1516 .remove(&me)
1517 .expect("validator should be registered");
1518 engine_handlers.push(engine.start(pending, recovered, resolver));
1519
1520 let (mut latest, mut monitor) = reporter.subscribe().await;
1522 while latest < required_containers {
1523 latest = monitor.recv().await.expect("event missing");
1524 }
1525
1526 let blocked = oracle.blocked().await.unwrap();
1528 assert!(blocked.is_empty());
1529 });
1530 }
1531
1532 #[test_group("slow")]
1533 #[test_traced]
1534 fn test_backfill() {
1535 backfill::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1536 backfill::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1537 backfill::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1538 backfill::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1539 backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1540 backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1541 backfill::<_, _, RoundRobin>(ed25519::fixture);
1542 backfill::<_, _, RoundRobin>(secp256r1::fixture);
1543 }
1544
1545 fn one_offline<S, F, L>(mut fixture: F)
1546 where
1547 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1548 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1549 L: Elector<S>,
1550 {
1551 let n = 5;
1553 let quorum = quorum(n) as usize;
1554 let required_containers = View::new(100);
1555 let activity_timeout = ViewDelta::new(10);
1556 let skip_timeout = ViewDelta::new(5);
1557 let max_exceptions = 10;
1558 let namespace = b"consensus".to_vec();
1559 let executor = deterministic::Runner::timed(Duration::from_secs(300));
1560 executor.start(|mut context| async move {
1561 let (network, mut oracle) = Network::new(
1563 context.with_label("network"),
1564 Config {
1565 max_size: 1024 * 1024,
1566 disconnect_on_block: true,
1567 tracked_peer_sets: None,
1568 },
1569 );
1570
1571 network.start();
1573
1574 let Fixture {
1576 participants,
1577 schemes,
1578 ..
1579 } = fixture(&mut context, &namespace, n);
1580 let mut registrations = register_validators(&mut oracle, &participants).await;
1581
1582 let link = Link {
1584 latency: Duration::from_millis(10),
1585 jitter: Duration::from_millis(1),
1586 success_rate: 1.0,
1587 };
1588 link_validators(
1589 &mut oracle,
1590 &participants,
1591 Action::Link(link),
1592 Some(|_, i, j| ![i, j].contains(&0usize)),
1593 )
1594 .await;
1595
1596 let elector = L::default();
1598 let relay = Arc::new(mocks::relay::Relay::new());
1599 let mut reporters = Vec::new();
1600 let mut engine_handlers = Vec::new();
1601 for (idx_scheme, validator) in participants.iter().enumerate() {
1602 if idx_scheme == 0 {
1604 continue;
1605 }
1606
1607 let context = context.with_label(&format!("validator_{}", *validator));
1609
1610 let reporter_config = mocks::reporter::Config {
1612 participants: participants.clone().try_into().unwrap(),
1613 scheme: schemes[idx_scheme].clone(),
1614 elector: elector.clone(),
1615 };
1616 let reporter =
1617 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1618 reporters.push(reporter.clone());
1619 let application_cfg = mocks::application::Config {
1620 hasher: Sha256::default(),
1621 relay: relay.clone(),
1622 me: validator.clone(),
1623 propose_latency: (10.0, 5.0),
1624 verify_latency: (10.0, 5.0),
1625 certify_latency: (10.0, 5.0),
1626 should_certify: mocks::application::Certifier::Sometimes,
1627 };
1628 let (actor, application) = mocks::application::Application::new(
1629 context.with_label("application"),
1630 application_cfg,
1631 );
1632 actor.start();
1633 let blocker = oracle.control(validator.clone());
1634 let cfg = config::Config {
1635 scheme: schemes[idx_scheme].clone(),
1636 elector: elector.clone(),
1637 blocker,
1638 automaton: application.clone(),
1639 relay: application.clone(),
1640 reporter: reporter.clone(),
1641 strategy: Sequential,
1642 partition: validator.to_string(),
1643 mailbox_size: 1024,
1644 epoch: Epoch::new(333),
1645 leader_timeout: Duration::from_secs(1),
1646 certification_timeout: Duration::from_secs(2),
1647 timeout_retry: Duration::from_secs(10),
1648 fetch_timeout: Duration::from_secs(1),
1649 activity_timeout,
1650 skip_timeout,
1651 fetch_concurrent: 4,
1652 replay_buffer: NZUsize!(1024 * 1024),
1653 write_buffer: NZUsize!(1024 * 1024),
1654 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1655 };
1656 let engine = Engine::new(context.with_label("engine"), cfg);
1657
1658 let (pending, recovered, resolver) = registrations
1660 .remove(validator)
1661 .expect("validator should be registered");
1662 engine_handlers.push(engine.start(pending, recovered, resolver));
1663 }
1664
1665 let mut finalizers = Vec::new();
1667 for reporter in reporters.iter_mut() {
1668 let (mut latest, mut monitor) = reporter.subscribe().await;
1669 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1670 while latest < required_containers {
1671 latest = monitor.recv().await.expect("event missing");
1672 }
1673 }));
1674 }
1675 join_all(finalizers).await;
1676
1677 let exceptions = 0;
1679 let offline = &participants[0];
1680 for reporter in reporters.iter() {
1681 {
1683 let faults = reporter.faults.lock();
1684 assert!(faults.is_empty());
1685 }
1686
1687 {
1689 let invalid = reporter.invalid.lock();
1690 assert_eq!(*invalid, 0);
1691 }
1692
1693 let mut exceptions = 0;
1695 {
1696 let notarizes = reporter.notarizes.lock();
1697 for (view, payloads) in notarizes.iter() {
1698 for (_, participants) in payloads.iter() {
1699 if participants.contains(offline) {
1700 panic!("view: {view}");
1701 }
1702 }
1703 }
1704 }
1705 {
1706 let nullifies = reporter.nullifies.lock();
1707 for (view, participants) in nullifies.iter() {
1708 if participants.contains(offline) {
1709 panic!("view: {view}");
1710 }
1711 }
1712 }
1713 {
1714 let finalizes = reporter.finalizes.lock();
1715 for (view, payloads) in finalizes.iter() {
1716 for (_, finalizers) in payloads.iter() {
1717 if finalizers.contains(offline) {
1718 panic!("view: {view}");
1719 }
1720 }
1721 }
1722 }
1723
1724 let mut offline_views = Vec::new();
1726 {
1727 let leaders = reporter.leaders.lock();
1728 for (view, leader) in leaders.iter() {
1729 if leader == offline {
1730 offline_views.push(*view);
1731 }
1732 }
1733 }
1734 assert!(!offline_views.is_empty());
1735
1736 {
1738 let nullifies = reporter.nullifies.lock();
1739 for view in offline_views.iter() {
1740 let nullifies = nullifies.get(view).map_or(0, |n| n.len());
1741 if nullifies < quorum {
1742 warn!("missing expected view nullifies: {}", view);
1743 exceptions += 1;
1744 }
1745 }
1746 }
1747 {
1748 let nullifications = reporter.nullifications.lock();
1749 for view in offline_views.iter() {
1750 if !nullifications.contains_key(view) {
1751 warn!("missing expected view nullifies: {}", view);
1752 exceptions += 1;
1753 }
1754 }
1755 }
1756
1757 assert!(exceptions <= max_exceptions);
1759 }
1760 assert!(exceptions <= max_exceptions);
1761
1762 let blocked = oracle.blocked().await.unwrap();
1764 assert!(blocked.is_empty());
1765
1766 let encoded = context.encode();
1768 let leader_label = format!("leader=\"{}\"", offline);
1769 assert!(
1770 count_nonzero_metric_lines(&encoded, &["_timeouts", &leader_label]) >= n - 1,
1771 "expected timeout metrics for offline leader"
1772 );
1773 assert_eq!(
1774 count_nonzero_metric_lines(&encoded, &["_nullifications", &leader_label]),
1775 n - 1,
1776 "expected all online nodes to record _nullifications for offline leader"
1777 );
1778 });
1779 }
1780
1781 #[test_group("slow")]
1782 #[test_traced]
1783 fn test_one_offline() {
1784 one_offline::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1785 one_offline::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1786 one_offline::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1787 one_offline::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1788 one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1789 one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1790 one_offline::<_, _, RoundRobin>(ed25519::fixture);
1791 one_offline::<_, _, RoundRobin>(secp256r1::fixture);
1792 }
1793
1794 fn slow_validator<S, F, L>(mut fixture: F)
1795 where
1796 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1797 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1798 L: Elector<S>,
1799 {
1800 let n = 5;
1802 let required_containers = View::new(50);
1803 let activity_timeout = ViewDelta::new(10);
1804 let skip_timeout = ViewDelta::new(5);
1805 let namespace = b"consensus".to_vec();
1806 let executor = deterministic::Runner::timed(Duration::from_secs(300));
1807 executor.start(|mut context| async move {
1808 let (network, mut oracle) = Network::new(
1810 context.with_label("network"),
1811 Config {
1812 max_size: 1024 * 1024,
1813 disconnect_on_block: true,
1814 tracked_peer_sets: None,
1815 },
1816 );
1817
1818 network.start();
1820
1821 let Fixture {
1823 participants,
1824 schemes,
1825 ..
1826 } = fixture(&mut context, &namespace, n);
1827 let mut registrations = register_validators(&mut oracle, &participants).await;
1828
1829 let link = Link {
1831 latency: Duration::from_millis(10),
1832 jitter: Duration::from_millis(1),
1833 success_rate: 1.0,
1834 };
1835 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1836
1837 let elector = L::default();
1839 let relay = Arc::new(mocks::relay::Relay::new());
1840 let mut reporters = Vec::new();
1841 let mut engine_handlers = Vec::new();
1842 for (idx_scheme, validator) in participants.iter().enumerate() {
1843 let context = context.with_label(&format!("validator_{}", *validator));
1845
1846 let reporter_config = mocks::reporter::Config {
1848 participants: participants.clone().try_into().unwrap(),
1849 scheme: schemes[idx_scheme].clone(),
1850 elector: elector.clone(),
1851 };
1852 let reporter =
1853 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1854 reporters.push(reporter.clone());
1855 let application_cfg = if idx_scheme == 0 {
1856 mocks::application::Config {
1857 hasher: Sha256::default(),
1858 relay: relay.clone(),
1859 me: validator.clone(),
1860 propose_latency: (10_000.0, 0.0),
1861 verify_latency: (10_000.0, 5.0),
1862 certify_latency: (10_000.0, 5.0),
1863 should_certify: mocks::application::Certifier::Sometimes,
1864 }
1865 } else {
1866 mocks::application::Config {
1867 hasher: Sha256::default(),
1868 relay: relay.clone(),
1869 me: validator.clone(),
1870 propose_latency: (10.0, 5.0),
1871 verify_latency: (10.0, 5.0),
1872 certify_latency: (10.0, 5.0),
1873 should_certify: mocks::application::Certifier::Sometimes,
1874 }
1875 };
1876 let (actor, application) = mocks::application::Application::new(
1877 context.with_label("application"),
1878 application_cfg,
1879 );
1880 actor.start();
1881 let blocker = oracle.control(validator.clone());
1882 let cfg = config::Config {
1883 scheme: schemes[idx_scheme].clone(),
1884 elector: elector.clone(),
1885 blocker,
1886 automaton: application.clone(),
1887 relay: application.clone(),
1888 reporter: reporter.clone(),
1889 strategy: Sequential,
1890 partition: validator.to_string(),
1891 mailbox_size: 1024,
1892 epoch: Epoch::new(333),
1893 leader_timeout: Duration::from_secs(1),
1894 certification_timeout: Duration::from_secs(2),
1895 timeout_retry: Duration::from_secs(10),
1896 fetch_timeout: Duration::from_secs(1),
1897 activity_timeout,
1898 skip_timeout,
1899 fetch_concurrent: 4,
1900 replay_buffer: NZUsize!(1024 * 1024),
1901 write_buffer: NZUsize!(1024 * 1024),
1902 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1903 };
1904 let engine = Engine::new(context.with_label("engine"), cfg);
1905
1906 let (pending, recovered, resolver) = registrations
1908 .remove(validator)
1909 .expect("validator should be registered");
1910 engine_handlers.push(engine.start(pending, recovered, resolver));
1911 }
1912
1913 let mut finalizers = Vec::new();
1915 for reporter in reporters.iter_mut() {
1916 let (mut latest, mut monitor) = reporter.subscribe().await;
1917 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1918 while latest < required_containers {
1919 latest = monitor.recv().await.expect("event missing");
1920 }
1921 }));
1922 }
1923 join_all(finalizers).await;
1924
1925 let slow = &participants[0];
1927 for reporter in reporters.iter() {
1928 {
1930 let faults = reporter.faults.lock();
1931 assert!(faults.is_empty());
1932 }
1933
1934 {
1936 let invalid = reporter.invalid.lock();
1937 assert_eq!(*invalid, 0);
1938 }
1939
1940 let mut observed = false;
1942 {
1943 let notarizes = reporter.notarizes.lock();
1944 for (_, payloads) in notarizes.iter() {
1945 for (_, participants) in payloads.iter() {
1946 if participants.contains(slow) {
1947 observed = true;
1948 break;
1949 }
1950 }
1951 }
1952 }
1953 {
1954 let finalizes = reporter.finalizes.lock();
1955 for (_, payloads) in finalizes.iter() {
1956 for (_, finalizers) in payloads.iter() {
1957 if finalizers.contains(slow) {
1958 observed = true;
1959 break;
1960 }
1961 }
1962 }
1963 }
1964 assert!(observed);
1965 }
1966
1967 let blocked = oracle.blocked().await.unwrap();
1969 assert!(blocked.is_empty());
1970 });
1971 }
1972
1973 #[test_group("slow")]
1974 #[test_traced]
1975 fn test_slow_validator() {
1976 slow_validator::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1977 slow_validator::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1978 slow_validator::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1979 slow_validator::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1980 slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1981 slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1982 slow_validator::<_, _, RoundRobin>(ed25519::fixture);
1983 slow_validator::<_, _, RoundRobin>(secp256r1::fixture);
1984 }
1985
1986 fn all_recovery<S, F, L>(mut fixture: F)
1987 where
1988 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1989 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1990 L: Elector<S>,
1991 {
1992 let n = 5;
1994 let required_containers = View::new(100);
1995 let activity_timeout = ViewDelta::new(10);
1996 let skip_timeout = ViewDelta::new(2);
1997 let namespace = b"consensus".to_vec();
1998 let executor = deterministic::Runner::timed(Duration::from_secs(1800));
1999 executor.start(|mut context| async move {
2000 let (network, mut oracle) = Network::new(
2002 context.with_label("network"),
2003 Config {
2004 max_size: 1024 * 1024,
2005 disconnect_on_block: false,
2006 tracked_peer_sets: None,
2007 },
2008 );
2009
2010 network.start();
2012
2013 let Fixture {
2015 participants,
2016 schemes,
2017 ..
2018 } = fixture(&mut context, &namespace, n);
2019 let mut registrations = register_validators(&mut oracle, &participants).await;
2020
2021 let link = Link {
2023 latency: Duration::from_secs(3),
2024 jitter: Duration::from_millis(0),
2025 success_rate: 1.0,
2026 };
2027 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2028
2029 let elector = L::default();
2031 let relay = Arc::new(mocks::relay::Relay::new());
2032 let mut reporters = Vec::new();
2033 let mut engine_handlers = Vec::new();
2034 for (idx, validator) in participants.iter().enumerate() {
2035 let context = context.with_label(&format!("validator_{}", *validator));
2037
2038 let reporter_config = mocks::reporter::Config {
2040 participants: participants.clone().try_into().unwrap(),
2041 scheme: schemes[idx].clone(),
2042 elector: elector.clone(),
2043 };
2044 let reporter =
2045 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2046 reporters.push(reporter.clone());
2047 let application_cfg = mocks::application::Config {
2048 hasher: Sha256::default(),
2049 relay: relay.clone(),
2050 me: validator.clone(),
2051 propose_latency: (10.0, 5.0),
2052 verify_latency: (10.0, 5.0),
2053 certify_latency: (10.0, 5.0),
2054 should_certify: mocks::application::Certifier::Sometimes,
2055 };
2056 let (actor, application) = mocks::application::Application::new(
2057 context.with_label("application"),
2058 application_cfg,
2059 );
2060 actor.start();
2061 let blocker = oracle.control(validator.clone());
2062 let cfg = config::Config {
2063 scheme: schemes[idx].clone(),
2064 elector: elector.clone(),
2065 blocker,
2066 automaton: application.clone(),
2067 relay: application.clone(),
2068 reporter: reporter.clone(),
2069 strategy: Sequential,
2070 partition: validator.to_string(),
2071 mailbox_size: 1024,
2072 epoch: Epoch::new(333),
2073 leader_timeout: Duration::from_secs(1),
2074 certification_timeout: Duration::from_secs(2),
2075 timeout_retry: Duration::from_secs(10),
2076 fetch_timeout: Duration::from_secs(1),
2077 activity_timeout,
2078 skip_timeout,
2079 fetch_concurrent: 4,
2080 replay_buffer: NZUsize!(1024 * 1024),
2081 write_buffer: NZUsize!(1024 * 1024),
2082 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2083 };
2084 let engine = Engine::new(context.with_label("engine"), cfg);
2085
2086 let (pending, recovered, resolver) = registrations
2088 .remove(validator)
2089 .expect("validator should be registered");
2090 engine_handlers.push(engine.start(pending, recovered, resolver));
2091 }
2092
2093 let mut finalizers = Vec::new();
2095 for reporter in reporters.iter_mut() {
2096 let (_, mut monitor) = reporter.subscribe().await;
2097 finalizers.push(
2098 context
2099 .with_label("finalizer")
2100 .spawn(move |context| async move {
2101 select! {
2102 _timeout = context.sleep(Duration::from_secs(60)) => {},
2103 _done = monitor.recv() => {
2104 panic!("engine should not notarize or finalize anything");
2105 },
2106 }
2107 }),
2108 );
2109 }
2110 join_all(finalizers).await;
2111
2112 link_validators(&mut oracle, &participants, Action::Unlink, None).await;
2114
2115 context.sleep(Duration::from_secs(60)).await;
2117
2118 let mut latest = View::zero();
2120 for reporter in reporters.iter() {
2121 let nullifies = reporter.nullifies.lock();
2122 let max = nullifies.keys().max().unwrap();
2123 if *max > latest {
2124 latest = *max;
2125 }
2126 }
2127
2128 let link = Link {
2130 latency: Duration::from_millis(10),
2131 jitter: Duration::from_millis(1),
2132 success_rate: 1.0,
2133 };
2134 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2135
2136 let mut finalizers = Vec::new();
2138 for reporter in reporters.iter_mut() {
2139 let (mut latest, mut monitor) = reporter.subscribe().await;
2140 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2141 while latest < required_containers {
2142 latest = monitor.recv().await.expect("event missing");
2143 }
2144 }));
2145 }
2146 join_all(finalizers).await;
2147
2148 for reporter in reporters.iter() {
2150 {
2152 let faults = reporter.faults.lock();
2153 assert!(faults.is_empty());
2154 }
2155
2156 {
2158 let invalid = reporter.invalid.lock();
2159 assert_eq!(*invalid, 0);
2160 }
2161
2162 {
2167 let mut found = 0;
2171 let notarizations = reporter.notarizations.lock();
2172 for view in View::range(latest, latest.saturating_add(activity_timeout)) {
2173 if notarizations.contains_key(&view) {
2174 found += 1;
2175 }
2176 }
2177 assert!(
2178 found >= activity_timeout.get().saturating_sub(2),
2179 "found: {found}"
2180 );
2181 }
2182 }
2183
2184 let blocked = oracle.blocked().await.unwrap();
2186 assert!(blocked.is_empty());
2187 });
2188 }
2189
2190 #[test_group("slow")]
2191 #[test_traced]
2192 fn test_all_recovery() {
2193 all_recovery::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
2194 all_recovery::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
2195 all_recovery::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
2196 all_recovery::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
2197 all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2198 all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2199 all_recovery::<_, _, RoundRobin>(ed25519::fixture);
2200 all_recovery::<_, _, RoundRobin>(secp256r1::fixture);
2201 }
2202
2203 fn partition<S, F, L>(mut fixture: F)
2204 where
2205 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2206 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2207 L: Elector<S>,
2208 {
2209 let n = 10;
2211 let required_containers = View::new(50);
2212 let activity_timeout = ViewDelta::new(10);
2213 let skip_timeout = ViewDelta::new(5);
2214 let namespace = b"consensus".to_vec();
2215 let executor = deterministic::Runner::timed(Duration::from_secs(900));
2216 executor.start(|mut context| async move {
2217 let (network, mut oracle) = Network::new(
2219 context.with_label("network"),
2220 Config {
2221 max_size: 1024 * 1024,
2222 disconnect_on_block: false,
2223 tracked_peer_sets: None,
2224 },
2225 );
2226
2227 network.start();
2229
2230 let Fixture {
2232 participants,
2233 schemes,
2234 ..
2235 } = fixture(&mut context, &namespace, n);
2236 let mut registrations = register_validators(&mut oracle, &participants).await;
2237
2238 let link = Link {
2240 latency: Duration::from_millis(10),
2241 jitter: Duration::from_millis(1),
2242 success_rate: 1.0,
2243 };
2244 link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
2245
2246 let elector = L::default();
2248 let relay = Arc::new(mocks::relay::Relay::new());
2249 let mut reporters = Vec::new();
2250 let mut engine_handlers = Vec::new();
2251 for (idx, validator) in participants.iter().enumerate() {
2252 let context = context.with_label(&format!("validator_{}", *validator));
2254
2255 let reporter_config = mocks::reporter::Config {
2257 participants: participants.clone().try_into().unwrap(),
2258 scheme: schemes[idx].clone(),
2259 elector: elector.clone(),
2260 };
2261 let reporter =
2262 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2263 reporters.push(reporter.clone());
2264 let application_cfg = mocks::application::Config {
2265 hasher: Sha256::default(),
2266 relay: relay.clone(),
2267 me: validator.clone(),
2268 propose_latency: (10.0, 5.0),
2269 verify_latency: (10.0, 5.0),
2270 certify_latency: (10.0, 5.0),
2271 should_certify: mocks::application::Certifier::Sometimes,
2272 };
2273 let (actor, application) = mocks::application::Application::new(
2274 context.with_label("application"),
2275 application_cfg,
2276 );
2277 actor.start();
2278 let blocker = oracle.control(validator.clone());
2279 let cfg = config::Config {
2280 scheme: schemes[idx].clone(),
2281 elector: elector.clone(),
2282 blocker,
2283 automaton: application.clone(),
2284 relay: application.clone(),
2285 reporter: reporter.clone(),
2286 strategy: Sequential,
2287 partition: validator.to_string(),
2288 mailbox_size: 1024,
2289 epoch: Epoch::new(333),
2290 leader_timeout: Duration::from_secs(1),
2291 certification_timeout: Duration::from_secs(2),
2292 timeout_retry: Duration::from_secs(10),
2293 fetch_timeout: Duration::from_secs(1),
2294 activity_timeout,
2295 skip_timeout,
2296 fetch_concurrent: 4,
2297 replay_buffer: NZUsize!(1024 * 1024),
2298 write_buffer: NZUsize!(1024 * 1024),
2299 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2300 };
2301 let engine = Engine::new(context.with_label("engine"), cfg);
2302
2303 let (pending, recovered, resolver) = registrations
2305 .remove(validator)
2306 .expect("validator should be registered");
2307 engine_handlers.push(engine.start(pending, recovered, resolver));
2308 }
2309
2310 let mut finalizers = Vec::new();
2312 for reporter in reporters.iter_mut() {
2313 let (mut latest, mut monitor) = reporter.subscribe().await;
2314 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2315 while latest < required_containers {
2316 latest = monitor.recv().await.expect("event missing");
2317 }
2318 }));
2319 }
2320 join_all(finalizers).await;
2321
2322 fn separated(n: usize, a: usize, b: usize) -> bool {
2324 let m = n / 2;
2325 (a < m && b >= m) || (a >= m && b < m)
2326 }
2327 link_validators(&mut oracle, &participants, Action::Unlink, Some(separated)).await;
2328
2329 context.sleep(Duration::from_secs(10)).await;
2331
2332 let mut finalizers = Vec::new();
2334 for reporter in reporters.iter_mut() {
2335 let (_, mut monitor) = reporter.subscribe().await;
2336 finalizers.push(
2337 context
2338 .with_label("finalizer")
2339 .spawn(move |context| async move {
2340 select! {
2341 _timeout = context.sleep(Duration::from_secs(60)) => {},
2342 _done = monitor.recv() => {
2343 panic!("engine should not notarize or finalize anything");
2344 },
2345 }
2346 }),
2347 );
2348 }
2349 join_all(finalizers).await;
2350
2351 link_validators(
2353 &mut oracle,
2354 &participants,
2355 Action::Link(link),
2356 Some(separated),
2357 )
2358 .await;
2359
2360 let mut finalizers = Vec::new();
2362 for reporter in reporters.iter_mut() {
2363 let (mut latest, mut monitor) = reporter.subscribe().await;
2364 let required = latest.saturating_add(ViewDelta::new(required_containers.get()));
2365 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2366 while latest < required {
2367 latest = monitor.recv().await.expect("event missing");
2368 }
2369 }));
2370 }
2371 join_all(finalizers).await;
2372
2373 for reporter in reporters.iter() {
2375 {
2377 let faults = reporter.faults.lock();
2378 assert!(faults.is_empty());
2379 }
2380
2381 {
2383 let invalid = reporter.invalid.lock();
2384 assert_eq!(*invalid, 0);
2385 }
2386 }
2387
2388 let blocked = oracle.blocked().await.unwrap();
2390 assert!(blocked.is_empty());
2391 });
2392 }
2393
2394 #[test_group("slow")]
2395 #[test_traced]
2396 fn test_partition() {
2397 partition::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
2398 partition::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
2399 partition::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
2400 partition::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
2401 partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2402 partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2403 partition::<_, _, RoundRobin>(ed25519::fixture);
2404 partition::<_, _, RoundRobin>(secp256r1::fixture);
2405 }
2406
2407 fn slow_and_lossy_links<S, F, L>(seed: u64, mut fixture: F) -> String
2408 where
2409 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2410 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2411 L: Elector<S>,
2412 {
2413 let n = 5;
2415 let required_containers = View::new(50);
2416 let activity_timeout = ViewDelta::new(10);
2417 let skip_timeout = ViewDelta::new(5);
2418 let namespace = b"consensus".to_vec();
2419 let cfg = deterministic::Config::new()
2420 .with_seed(seed)
2421 .with_timeout(Some(Duration::from_secs(5_000)));
2422 let executor = deterministic::Runner::new(cfg);
2423 executor.start(|mut context| async move {
2424 let (network, mut oracle) = Network::new(
2426 context.with_label("network"),
2427 Config {
2428 max_size: 1024 * 1024,
2429 disconnect_on_block: false,
2430 tracked_peer_sets: None,
2431 },
2432 );
2433
2434 network.start();
2436
2437 let Fixture {
2439 participants,
2440 schemes,
2441 ..
2442 } = fixture(&mut context, &namespace, n);
2443 let mut registrations = register_validators(&mut oracle, &participants).await;
2444
2445 let degraded_link = Link {
2447 latency: Duration::from_millis(200),
2448 jitter: Duration::from_millis(150),
2449 success_rate: 0.5,
2450 };
2451 link_validators(
2452 &mut oracle,
2453 &participants,
2454 Action::Link(degraded_link),
2455 None,
2456 )
2457 .await;
2458
2459 let elector = L::default();
2461 let relay = Arc::new(mocks::relay::Relay::new());
2462 let mut reporters = Vec::new();
2463 let mut engine_handlers = Vec::new();
2464 for (idx, validator) in participants.iter().enumerate() {
2465 let context = context.with_label(&format!("validator_{}", *validator));
2467
2468 let reporter_config = mocks::reporter::Config {
2470 participants: participants.clone().try_into().unwrap(),
2471 scheme: schemes[idx].clone(),
2472 elector: elector.clone(),
2473 };
2474 let reporter =
2475 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2476 reporters.push(reporter.clone());
2477 let application_cfg = mocks::application::Config {
2478 hasher: Sha256::default(),
2479 relay: relay.clone(),
2480 me: validator.clone(),
2481 propose_latency: (10.0, 5.0),
2482 verify_latency: (10.0, 5.0),
2483 certify_latency: (10.0, 5.0),
2484 should_certify: mocks::application::Certifier::Sometimes,
2485 };
2486 let (actor, application) = mocks::application::Application::new(
2487 context.with_label("application"),
2488 application_cfg,
2489 );
2490 actor.start();
2491 let blocker = oracle.control(validator.clone());
2492 let cfg = config::Config {
2493 scheme: schemes[idx].clone(),
2494 elector: elector.clone(),
2495 blocker,
2496 automaton: application.clone(),
2497 relay: application.clone(),
2498 reporter: reporter.clone(),
2499 strategy: Sequential,
2500 partition: validator.to_string(),
2501 mailbox_size: 1024,
2502 epoch: Epoch::new(333),
2503 leader_timeout: Duration::from_secs(1),
2504 certification_timeout: Duration::from_secs(2),
2505 timeout_retry: Duration::from_secs(10),
2506 fetch_timeout: Duration::from_secs(1),
2507 activity_timeout,
2508 skip_timeout,
2509 fetch_concurrent: 4,
2510 replay_buffer: NZUsize!(1024 * 1024),
2511 write_buffer: NZUsize!(1024 * 1024),
2512 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2513 };
2514 let engine = Engine::new(context.with_label("engine"), cfg);
2515
2516 let (pending, recovered, resolver) = registrations
2518 .remove(validator)
2519 .expect("validator should be registered");
2520 engine_handlers.push(engine.start(pending, recovered, resolver));
2521 }
2522
2523 let mut finalizers = Vec::new();
2525 for reporter in reporters.iter_mut() {
2526 let (mut latest, mut monitor) = reporter.subscribe().await;
2527 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2528 while latest < required_containers {
2529 latest = monitor.recv().await.expect("event missing");
2530 }
2531 }));
2532 }
2533 join_all(finalizers).await;
2534
2535 for reporter in reporters.iter() {
2537 {
2539 let faults = reporter.faults.lock();
2540 assert!(faults.is_empty());
2541 }
2542
2543 {
2545 let invalid = reporter.invalid.lock();
2546 assert_eq!(*invalid, 0);
2547 }
2548 }
2549
2550 let blocked = oracle.blocked().await.unwrap();
2552 assert!(blocked.is_empty());
2553
2554 context.auditor().state()
2555 })
2556 }
2557
2558 #[test_group("slow")]
2559 #[test_traced]
2560 fn test_slow_and_lossy_links() {
2561 slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold_vrf::fixture::<MinPk, _>);
2562 slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold_vrf::fixture::<MinSig, _>);
2563 slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_threshold_std::fixture::<MinPk, _>);
2564 slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_threshold_std::fixture::<MinSig, _>);
2565 slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinPk, _>);
2566 slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinSig, _>);
2567 slow_and_lossy_links::<_, _, RoundRobin>(0, ed25519::fixture);
2568 slow_and_lossy_links::<_, _, RoundRobin>(0, secp256r1::fixture);
2569 }
2570
2571 #[test_group("slow")]
2572 #[test_traced]
2573 fn test_determinism() {
2574 for seed in 1..6 {
2577 let ts_vrf_pk_state_1 = slow_and_lossy_links::<_, _, Random>(
2578 seed,
2579 bls12381_threshold_vrf::fixture::<MinPk, _>,
2580 );
2581 let ts_vrf_pk_state_2 = slow_and_lossy_links::<_, _, Random>(
2582 seed,
2583 bls12381_threshold_vrf::fixture::<MinPk, _>,
2584 );
2585 assert_eq!(ts_vrf_pk_state_1, ts_vrf_pk_state_2);
2586
2587 let ts_vrf_sig_state_1 = slow_and_lossy_links::<_, _, Random>(
2588 seed,
2589 bls12381_threshold_vrf::fixture::<MinSig, _>,
2590 );
2591 let ts_vrf_sig_state_2 = slow_and_lossy_links::<_, _, Random>(
2592 seed,
2593 bls12381_threshold_vrf::fixture::<MinSig, _>,
2594 );
2595 assert_eq!(ts_vrf_sig_state_1, ts_vrf_sig_state_2);
2596
2597 let ts_std_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2598 seed,
2599 bls12381_threshold_std::fixture::<MinPk, _>,
2600 );
2601 let ts_std_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2602 seed,
2603 bls12381_threshold_std::fixture::<MinPk, _>,
2604 );
2605 assert_eq!(ts_std_pk_state_1, ts_std_pk_state_2);
2606
2607 let ts_std_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2608 seed,
2609 bls12381_threshold_std::fixture::<MinSig, _>,
2610 );
2611 let ts_std_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2612 seed,
2613 bls12381_threshold_std::fixture::<MinSig, _>,
2614 );
2615 assert_eq!(ts_std_sig_state_1, ts_std_sig_state_2);
2616
2617 let ms_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2618 seed,
2619 bls12381_multisig::fixture::<MinPk, _>,
2620 );
2621 let ms_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2622 seed,
2623 bls12381_multisig::fixture::<MinPk, _>,
2624 );
2625 assert_eq!(ms_pk_state_1, ms_pk_state_2);
2626
2627 let ms_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2628 seed,
2629 bls12381_multisig::fixture::<MinSig, _>,
2630 );
2631 let ms_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2632 seed,
2633 bls12381_multisig::fixture::<MinSig, _>,
2634 );
2635 assert_eq!(ms_sig_state_1, ms_sig_state_2);
2636
2637 let ed_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
2638 let ed_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
2639 assert_eq!(ed_state_1, ed_state_2);
2640
2641 let secp_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
2642 let secp_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
2643 assert_eq!(secp_state_1, secp_state_2);
2644
2645 let states = [
2646 ("threshold-vrf-minpk", ts_vrf_pk_state_1),
2647 ("threshold-vrf-minsig", ts_vrf_sig_state_1),
2648 ("threshold-std-minpk", ts_std_pk_state_1),
2649 ("threshold-std-minsig", ts_std_sig_state_1),
2650 ("multisig-minpk", ms_pk_state_1),
2651 ("multisig-minsig", ms_sig_state_1),
2652 ("ed25519", ed_state_1),
2653 ("secp256r1", secp_state_1),
2654 ];
2655
2656 for pair in states.windows(2) {
2658 assert_ne!(
2659 pair[0].1, pair[1].1,
2660 "state {} equals state {}",
2661 pair[0].0, pair[1].0
2662 );
2663 }
2664 }
2665 }
2666
2667 fn conflicter<S, F, L>(seed: u64, mut fixture: F)
2668 where
2669 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2670 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2671 L: Elector<S>,
2672 {
2673 let n = 4;
2675 let required_containers = View::new(50);
2676 let activity_timeout = ViewDelta::new(10);
2677 let skip_timeout = ViewDelta::new(5);
2678 let namespace = b"consensus".to_vec();
2679 let cfg = deterministic::Config::new()
2680 .with_seed(seed)
2681 .with_timeout(Some(Duration::from_secs(30)));
2682 let executor = deterministic::Runner::new(cfg);
2683 executor.start(|mut context| async move {
2684 let (network, mut oracle) = Network::new(
2686 context.with_label("network"),
2687 Config {
2688 max_size: 1024 * 1024,
2689 disconnect_on_block: false,
2690 tracked_peer_sets: None,
2691 },
2692 );
2693
2694 network.start();
2696
2697 let Fixture {
2699 participants,
2700 schemes,
2701 ..
2702 } = fixture(&mut context, &namespace, n);
2703 let mut registrations = register_validators(&mut oracle, &participants).await;
2704
2705 let link = Link {
2707 latency: Duration::from_millis(10),
2708 jitter: Duration::from_millis(1),
2709 success_rate: 1.0,
2710 };
2711 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2712
2713 let elector = L::default();
2715 let relay = Arc::new(mocks::relay::Relay::new());
2716 let mut reporters = Vec::new();
2717 for (idx_scheme, validator) in participants.iter().enumerate() {
2718 let context = context.with_label(&format!("validator_{}", *validator));
2720
2721 let reporter_config = mocks::reporter::Config {
2723 participants: participants.clone().try_into().unwrap(),
2724 scheme: schemes[idx_scheme].clone(),
2725 elector: elector.clone(),
2726 };
2727 let reporter =
2728 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2729 let (pending, recovered, resolver) = registrations
2730 .remove(validator)
2731 .expect("validator should be registered");
2732 if idx_scheme == 0 {
2733 let cfg = mocks::conflicter::Config {
2734 scheme: schemes[idx_scheme].clone(),
2735 };
2736
2737 let engine: mocks::conflicter::Conflicter<_, _, Sha256> =
2738 mocks::conflicter::Conflicter::new(
2739 context.with_label("byzantine_engine"),
2740 cfg,
2741 );
2742 engine.start(pending);
2743 } else {
2744 reporters.push(reporter.clone());
2745 let application_cfg = mocks::application::Config {
2746 hasher: Sha256::default(),
2747 relay: relay.clone(),
2748 me: validator.clone(),
2749 propose_latency: (10.0, 5.0),
2750 verify_latency: (10.0, 5.0),
2751 certify_latency: (10.0, 5.0),
2752 should_certify: mocks::application::Certifier::Sometimes,
2753 };
2754 let (actor, application) = mocks::application::Application::new(
2755 context.with_label("application"),
2756 application_cfg,
2757 );
2758 actor.start();
2759 let blocker = oracle.control(validator.clone());
2760 let cfg = config::Config {
2761 scheme: schemes[idx_scheme].clone(),
2762 elector: elector.clone(),
2763 blocker,
2764 automaton: application.clone(),
2765 relay: application.clone(),
2766 reporter: reporter.clone(),
2767 strategy: Sequential,
2768 partition: validator.to_string(),
2769 mailbox_size: 1024,
2770 epoch: Epoch::new(333),
2771 leader_timeout: Duration::from_secs(1),
2772 certification_timeout: Duration::from_secs(2),
2773 timeout_retry: Duration::from_secs(10),
2774 fetch_timeout: Duration::from_secs(1),
2775 activity_timeout,
2776 skip_timeout,
2777 fetch_concurrent: 4,
2778 replay_buffer: NZUsize!(1024 * 1024),
2779 write_buffer: NZUsize!(1024 * 1024),
2780 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2781 };
2782 let engine = Engine::new(context.with_label("engine"), cfg);
2783 engine.start(pending, recovered, resolver);
2784 }
2785 }
2786
2787 let mut finalizers = Vec::new();
2789 for reporter in reporters.iter_mut() {
2790 let (mut latest, mut monitor) = reporter.subscribe().await;
2791 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2792 while latest < required_containers {
2793 latest = monitor.recv().await.expect("event missing");
2794 }
2795 }));
2796 }
2797 join_all(finalizers).await;
2798
2799 let byz = &participants[0];
2801 let mut count_conflicting = 0;
2802 for reporter in reporters.iter() {
2803 {
2805 let faults = reporter.faults.lock();
2806 assert_eq!(faults.len(), 1);
2807 let faulter = faults.get(byz).expect("byzantine party is not faulter");
2808 for (_, faults) in faulter.iter() {
2809 for fault in faults.iter() {
2810 match fault {
2811 Activity::ConflictingNotarize(_) => {
2812 count_conflicting += 1;
2813 }
2814 Activity::ConflictingFinalize(_) => {
2815 count_conflicting += 1;
2816 }
2817 _ => panic!("unexpected fault: {fault:?}"),
2818 }
2819 }
2820 }
2821 }
2822
2823 {
2825 let invalid = reporter.invalid.lock();
2826 assert_eq!(*invalid, 0);
2827 }
2828 }
2829 assert!(count_conflicting > 0);
2830
2831 let blocked = oracle.blocked().await.unwrap();
2833 assert!(!blocked.is_empty());
2834 for (a, b) in blocked {
2835 assert_ne!(&a, byz);
2836 assert_eq!(&b, byz);
2837 }
2838 });
2839 }
2840
2841 #[test_group("slow")]
2842 #[test_traced]
2843 fn test_conflicter() {
2844 for seed in 0..5 {
2845 conflicter::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
2846 conflicter::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
2847 conflicter::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
2848 conflicter::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
2849 conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
2850 conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
2851 conflicter::<_, _, RoundRobin>(seed, ed25519::fixture);
2852 conflicter::<_, _, RoundRobin>(seed, secp256r1::fixture);
2853 }
2854 }
2855
2856 fn invalid<S, F, L>(seed: u64, mut fixture: F)
2857 where
2858 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2859 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2860 L: Elector<S>,
2861 {
2862 let n = 4;
2864 let required_containers = View::new(50);
2865 let activity_timeout = ViewDelta::new(10);
2866 let skip_timeout = ViewDelta::new(5);
2867 let namespace = b"consensus".to_vec();
2868 let cfg = deterministic::Config::new()
2869 .with_seed(seed)
2870 .with_timeout(Some(Duration::from_secs(30)));
2871 let executor = deterministic::Runner::new(cfg);
2872 executor.start(|mut context| async move {
2873 let (network, mut oracle) = Network::new(
2875 context.with_label("network"),
2876 Config {
2877 max_size: 1024 * 1024,
2878 disconnect_on_block: false,
2879 tracked_peer_sets: None,
2880 },
2881 );
2882
2883 network.start();
2885
2886 let Fixture {
2888 participants,
2889 schemes,
2890 ..
2891 } = fixture(&mut context, &namespace, n);
2892
2893 let Fixture {
2895 schemes: wrong_schemes,
2896 ..
2897 } = fixture(&mut context, b"wrong-namespace", n);
2898
2899 let mut registrations = register_validators(&mut oracle, &participants).await;
2900
2901 let link = Link {
2903 latency: Duration::from_millis(10),
2904 jitter: Duration::from_millis(1),
2905 success_rate: 1.0,
2906 };
2907 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2908
2909 let elector = L::default();
2911 let relay = Arc::new(mocks::relay::Relay::new());
2912 let mut reporters = Vec::new();
2913 for (idx_scheme, validator) in participants.iter().enumerate() {
2914 let context = context.with_label(&format!("validator_{}", *validator));
2916
2917 let scheme = if idx_scheme == 0 {
2920 wrong_schemes[idx_scheme].clone()
2921 } else {
2922 schemes[idx_scheme].clone()
2923 };
2924
2925 let reporter_config = mocks::reporter::Config {
2926 participants: participants.clone().try_into().unwrap(),
2927 scheme: schemes[idx_scheme].clone(),
2928 elector: elector.clone(),
2929 };
2930 let reporter =
2931 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2932 reporters.push(reporter.clone());
2933
2934 let application_cfg = mocks::application::Config {
2935 hasher: Sha256::default(),
2936 relay: relay.clone(),
2937 me: validator.clone(),
2938 propose_latency: (10.0, 5.0),
2939 verify_latency: (10.0, 5.0),
2940 certify_latency: (10.0, 5.0),
2941 should_certify: mocks::application::Certifier::Sometimes,
2942 };
2943 let (actor, application) = mocks::application::Application::new(
2944 context.with_label("application"),
2945 application_cfg,
2946 );
2947 actor.start();
2948 let blocker = oracle.control(validator.clone());
2949 let leader_timeout = if idx_scheme == 0 {
2950 Duration::from_millis(100)
2953 } else {
2954 Duration::from_secs(1)
2955 };
2956 let cfg = config::Config {
2957 scheme,
2958 elector: elector.clone(),
2959 blocker,
2960 automaton: application.clone(),
2961 relay: application.clone(),
2962 reporter: reporter.clone(),
2963 strategy: Sequential,
2964 partition: validator.clone().to_string(),
2965 mailbox_size: 1024,
2966 epoch: Epoch::new(333),
2967 leader_timeout,
2968 certification_timeout: Duration::from_secs(2),
2969 timeout_retry: Duration::from_secs(10),
2970 fetch_timeout: Duration::from_secs(1),
2971 activity_timeout,
2972 skip_timeout,
2973 fetch_concurrent: 4,
2974 replay_buffer: NZUsize!(1024 * 1024),
2975 write_buffer: NZUsize!(1024 * 1024),
2976 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2977 };
2978 let engine = Engine::new(context.with_label("engine"), cfg);
2979 let (pending, recovered, resolver) = registrations
2980 .remove(validator)
2981 .expect("validator should be registered");
2982 engine.start(pending, recovered, resolver);
2983 }
2984
2985 let mut finalizers = Vec::new();
2987 for reporter in reporters.iter_mut().skip(1) {
2988 let (mut latest, mut monitor) = reporter.subscribe().await;
2989 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2990 while latest < required_containers {
2991 latest = monitor.recv().await.expect("event missing");
2992 }
2993 }));
2994 }
2995 join_all(finalizers).await;
2996
2997 let mut invalid_count = 0;
2999 for reporter in reporters.iter().skip(1) {
3000 {
3002 let faults = reporter.faults.lock();
3003 assert!(faults.is_empty());
3004 }
3005
3006 {
3008 let invalid = reporter.invalid.lock();
3009 if *invalid > 0 {
3010 invalid_count += 1;
3011 }
3012 }
3013 }
3014
3015 assert_eq!(invalid_count, n - 1);
3017
3018 let blocked = oracle.blocked().await.unwrap();
3020 assert!(!blocked.is_empty());
3021 for (a, b) in blocked {
3022 if a != participants[0] {
3023 assert_eq!(b, participants[0]);
3024 }
3025 }
3026 });
3027 }
3028
3029 #[test_group("slow")]
3030 #[test_traced]
3031 fn test_invalid() {
3032 for seed in 0..5 {
3033 invalid::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3034 invalid::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3035 invalid::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3036 invalid::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3037 invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3038 invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3039 invalid::<_, _, RoundRobin>(seed, ed25519::fixture);
3040 invalid::<_, _, RoundRobin>(seed, secp256r1::fixture);
3041 }
3042 }
3043
3044 fn impersonator<S, F, L>(seed: u64, mut fixture: F)
3045 where
3046 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3047 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3048 L: Elector<S>,
3049 {
3050 let n = 4;
3052 let required_containers = View::new(50);
3053 let activity_timeout = ViewDelta::new(10);
3054 let skip_timeout = ViewDelta::new(5);
3055 let namespace = b"consensus".to_vec();
3056 let cfg = deterministic::Config::new()
3057 .with_seed(seed)
3058 .with_timeout(Some(Duration::from_secs(30)));
3059 let executor = deterministic::Runner::new(cfg);
3060 executor.start(|mut context| async move {
3061 let (network, mut oracle) = Network::new(
3063 context.with_label("network"),
3064 Config {
3065 max_size: 1024 * 1024,
3066 disconnect_on_block: false,
3067 tracked_peer_sets: None,
3068 },
3069 );
3070
3071 network.start();
3073
3074 let Fixture {
3076 participants,
3077 schemes,
3078 ..
3079 } = fixture(&mut context, &namespace, n);
3080 let mut registrations = register_validators(&mut oracle, &participants).await;
3081
3082 let link = Link {
3084 latency: Duration::from_millis(10),
3085 jitter: Duration::from_millis(1),
3086 success_rate: 1.0,
3087 };
3088 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3089
3090 let elector = L::default();
3092 let relay = Arc::new(mocks::relay::Relay::new());
3093 let mut reporters = Vec::new();
3094 for (idx_scheme, validator) in participants.iter().enumerate() {
3095 let context = context.with_label(&format!("validator_{}", *validator));
3097
3098 let reporter_config = mocks::reporter::Config {
3100 participants: participants.clone().try_into().unwrap(),
3101 scheme: schemes[idx_scheme].clone(),
3102 elector: elector.clone(),
3103 };
3104 let reporter =
3105 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3106 let (pending, recovered, resolver) = registrations
3107 .remove(validator)
3108 .expect("validator should be registered");
3109 if idx_scheme == 0 {
3110 let cfg = mocks::impersonator::Config {
3111 scheme: schemes[idx_scheme].clone(),
3112 };
3113
3114 let engine: mocks::impersonator::Impersonator<_, _, Sha256> =
3115 mocks::impersonator::Impersonator::new(
3116 context.with_label("byzantine_engine"),
3117 cfg,
3118 );
3119 engine.start(pending);
3120 } else {
3121 reporters.push(reporter.clone());
3122 let application_cfg = mocks::application::Config {
3123 hasher: Sha256::default(),
3124 relay: relay.clone(),
3125 me: validator.clone(),
3126 propose_latency: (10.0, 5.0),
3127 verify_latency: (10.0, 5.0),
3128 certify_latency: (10.0, 5.0),
3129 should_certify: mocks::application::Certifier::Sometimes,
3130 };
3131 let (actor, application) = mocks::application::Application::new(
3132 context.with_label("application"),
3133 application_cfg,
3134 );
3135 actor.start();
3136 let blocker = oracle.control(validator.clone());
3137 let cfg = config::Config {
3138 scheme: schemes[idx_scheme].clone(),
3139 elector: elector.clone(),
3140 blocker,
3141 automaton: application.clone(),
3142 relay: application.clone(),
3143 reporter: reporter.clone(),
3144 strategy: Sequential,
3145 partition: validator.clone().to_string(),
3146 mailbox_size: 1024,
3147 epoch: Epoch::new(333),
3148 leader_timeout: Duration::from_secs(1),
3149 certification_timeout: Duration::from_secs(2),
3150 timeout_retry: Duration::from_secs(10),
3151 fetch_timeout: Duration::from_secs(1),
3152 activity_timeout,
3153 skip_timeout,
3154 fetch_concurrent: 4,
3155 replay_buffer: NZUsize!(1024 * 1024),
3156 write_buffer: NZUsize!(1024 * 1024),
3157 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3158 };
3159 let engine = Engine::new(context.with_label("engine"), cfg);
3160 engine.start(pending, recovered, resolver);
3161 }
3162 }
3163
3164 let mut finalizers = Vec::new();
3166 for reporter in reporters.iter_mut() {
3167 let (mut latest, mut monitor) = reporter.subscribe().await;
3168 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3169 while latest < required_containers {
3170 latest = monitor.recv().await.expect("event missing");
3171 }
3172 }));
3173 }
3174 join_all(finalizers).await;
3175
3176 let byz = &participants[0];
3178 for reporter in reporters.iter() {
3179 {
3181 let faults = reporter.faults.lock();
3182 assert!(faults.is_empty());
3183 }
3184
3185 {
3187 let invalid = reporter.invalid.lock();
3188 assert_eq!(*invalid, 0);
3189 }
3190 }
3191
3192 let blocked = oracle.blocked().await.unwrap();
3194 assert!(!blocked.is_empty());
3195 for (a, b) in blocked {
3196 assert_ne!(&a, byz);
3197 assert_eq!(&b, byz);
3198 }
3199 });
3200 }
3201
3202 #[test_group("slow")]
3203 #[test_traced]
3204 fn test_impersonator() {
3205 for seed in 0..5 {
3206 impersonator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3207 impersonator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3208 impersonator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3209 impersonator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3210 impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3211 impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3212 impersonator::<_, _, RoundRobin>(seed, ed25519::fixture);
3213 impersonator::<_, _, RoundRobin>(seed, secp256r1::fixture);
3214 }
3215 }
3216
3217 fn equivocator<S, F, L>(seed: u64, mut fixture: F) -> bool
3218 where
3219 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3220 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3221 L: Elector<S>,
3222 {
3223 let n = 7;
3225 let required_containers = View::new(50);
3226 let activity_timeout = ViewDelta::new(10);
3227 let skip_timeout = ViewDelta::new(5);
3228 let namespace = b"consensus".to_vec();
3229 let cfg = deterministic::Config::new()
3230 .with_seed(seed)
3231 .with_timeout(Some(Duration::from_secs(60)));
3232 let executor = deterministic::Runner::new(cfg);
3233 executor.start(|mut context| async move {
3234 let (network, mut oracle) = Network::new(
3236 context.with_label("network"),
3237 Config {
3238 max_size: 1024 * 1024,
3239 disconnect_on_block: false,
3240 tracked_peer_sets: None,
3241 },
3242 );
3243
3244 network.start();
3246
3247 let Fixture {
3249 participants,
3250 schemes,
3251 ..
3252 } = fixture(&mut context, &namespace, n);
3253 let mut registrations = register_validators(&mut oracle, &participants).await;
3254
3255 let link = Link {
3257 latency: Duration::from_millis(10),
3258 jitter: Duration::from_millis(1),
3259 success_rate: 1.0,
3260 };
3261 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3262
3263 let elector = L::default();
3265 let mut engines = Vec::new();
3266 let relay = Arc::new(mocks::relay::Relay::new());
3267 let mut reporters = Vec::new();
3268 for (idx_scheme, validator) in participants.iter().enumerate() {
3269 let context = context.with_label(&format!("validator_{}", *validator));
3271
3272 let reporter_config = mocks::reporter::Config {
3274 participants: participants.clone().try_into().unwrap(),
3275 scheme: schemes[idx_scheme].clone(),
3276 elector: elector.clone(),
3277 };
3278 let reporter =
3279 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3280 reporters.push(reporter.clone());
3281 let (pending, recovered, resolver) = registrations
3282 .remove(validator)
3283 .expect("validator should be registered");
3284 if idx_scheme == 0 {
3285 let cfg = mocks::equivocator::Config {
3286 scheme: schemes[idx_scheme].clone(),
3287 epoch: Epoch::new(333),
3288 relay: relay.clone(),
3289 hasher: Sha256::default(),
3290 elector: elector.clone(),
3291 };
3292
3293 let engine = mocks::equivocator::Equivocator::new(
3294 context.with_label("byzantine_engine"),
3295 cfg,
3296 );
3297 engines.push(engine.start(pending, recovered));
3298 } else {
3299 let application_cfg = mocks::application::Config {
3300 hasher: Sha256::default(),
3301 relay: relay.clone(),
3302 me: validator.clone(),
3303 propose_latency: (10.0, 5.0),
3304 verify_latency: (10.0, 5.0),
3305 certify_latency: (10.0, 5.0),
3306 should_certify: mocks::application::Certifier::Sometimes,
3307 };
3308 let (actor, application) = mocks::application::Application::new(
3309 context.with_label("application"),
3310 application_cfg,
3311 );
3312 actor.start();
3313 let blocker = oracle.control(validator.clone());
3314 let cfg = config::Config {
3315 scheme: schemes[idx_scheme].clone(),
3316 elector: elector.clone(),
3317 blocker,
3318 automaton: application.clone(),
3319 relay: application.clone(),
3320 reporter: reporter.clone(),
3321 strategy: Sequential,
3322 partition: validator.to_string(),
3323 mailbox_size: 1024,
3324 epoch: Epoch::new(333),
3325 leader_timeout: Duration::from_secs(1),
3326 certification_timeout: Duration::from_secs(2),
3327 timeout_retry: Duration::from_secs(10),
3328 fetch_timeout: Duration::from_secs(1),
3329 activity_timeout,
3330 skip_timeout,
3331 fetch_concurrent: 4,
3332 replay_buffer: NZUsize!(1024 * 1024),
3333 write_buffer: NZUsize!(1024 * 1024),
3334 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3335 };
3336 let engine = Engine::new(context.with_label("engine"), cfg);
3337 engines.push(engine.start(pending, recovered, resolver));
3338 }
3339 }
3340
3341 let mut finalizers = Vec::new();
3343 for reporter in reporters.iter_mut().skip(1) {
3344 let (mut latest, mut monitor) = reporter.subscribe().await;
3345 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3346 while latest < required_containers {
3347 latest = monitor.recv().await.expect("event missing");
3348 }
3349 }));
3350 }
3351 join_all(finalizers).await;
3352
3353 let idx = context.gen_range(1..engines.len()); let validator = &participants[idx];
3356 let handle = engines.remove(idx);
3357 handle.abort();
3358 let _ = handle.await;
3359 reporters.remove(idx);
3360 info!(idx, ?validator, "aborted validator");
3361
3362 let mut finalizers = Vec::new();
3364 for reporter in reporters.iter_mut().skip(1) {
3365 let (mut latest, mut monitor) = reporter.subscribe().await;
3366 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3367 while latest < View::new(required_containers.get() * 2) {
3368 latest = monitor.recv().await.expect("event missing");
3369 }
3370 }));
3371 }
3372 join_all(finalizers).await;
3373
3374 info!(idx, ?validator, "restarting validator");
3376 let context = context.with_label(&format!("validator_{}_restarted", *validator));
3377
3378 let reporter_config = mocks::reporter::Config {
3380 participants: participants.clone().try_into().unwrap(),
3381 scheme: schemes[idx].clone(),
3382 elector: elector.clone(),
3383 };
3384 let reporter =
3385 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3386 let (pending, recovered, resolver) =
3387 register_validator(&mut oracle, validator.clone()).await;
3388 reporters.push(reporter.clone());
3389 let application_cfg = mocks::application::Config {
3390 hasher: Sha256::default(),
3391 relay: relay.clone(),
3392 me: validator.clone(),
3393 propose_latency: (10.0, 5.0),
3394 verify_latency: (10.0, 5.0),
3395 certify_latency: (10.0, 5.0),
3396 should_certify: mocks::application::Certifier::Sometimes,
3397 };
3398 let (actor, application) = mocks::application::Application::new(
3399 context.with_label("application"),
3400 application_cfg,
3401 );
3402 actor.start();
3403 let blocker = oracle.control(validator.clone());
3404 let cfg = config::Config {
3405 scheme: schemes[idx].clone(),
3406 elector: elector.clone(),
3407 blocker,
3408 automaton: application.clone(),
3409 relay: application.clone(),
3410 reporter: reporter.clone(),
3411 strategy: Sequential,
3412 partition: validator.to_string(),
3413 mailbox_size: 1024,
3414 epoch: Epoch::new(333),
3415 leader_timeout: Duration::from_secs(1),
3416 certification_timeout: Duration::from_secs(2),
3417 timeout_retry: Duration::from_secs(10),
3418 fetch_timeout: Duration::from_secs(1),
3419 activity_timeout,
3420 skip_timeout,
3421 fetch_concurrent: 4,
3422 replay_buffer: NZUsize!(1024 * 1024),
3423 write_buffer: NZUsize!(1024 * 1024),
3424 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3425 };
3426 let engine = Engine::new(context.with_label("engine"), cfg);
3427 engine.start(pending, recovered, resolver);
3428
3429 let mut finalizers = Vec::new();
3431 for reporter in reporters.iter_mut().skip(1) {
3432 let (mut latest, mut monitor) = reporter.subscribe().await;
3433 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3434 while latest < View::new(required_containers.get() * 3) {
3435 latest = monitor.recv().await.expect("event missing");
3436 }
3437 }));
3438 }
3439 join_all(finalizers).await;
3440
3441 let byz = &participants[0];
3445 let blocked = oracle.blocked().await.unwrap();
3446 for (a, b) in &blocked {
3447 assert_ne!(a, byz);
3448 assert_eq!(b, byz);
3449 }
3450 !blocked.is_empty()
3451 })
3452 }
3453
3454 #[test_group("slow")]
3455 #[test_traced]
3456 fn test_equivocator_bls12381_threshold_vrf_min_pk() {
3457 let detected = (0..5).any(|seed| {
3458 equivocator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>)
3459 });
3460 assert!(
3461 detected,
3462 "expected at least one seed to detect equivocation"
3463 );
3464 }
3465
3466 #[test_group("slow")]
3467 #[test_traced]
3468 fn test_equivocator_bls12381_threshold_vrf_min_sig() {
3469 let detected = (0..5).any(|seed| {
3470 equivocator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>)
3471 });
3472 assert!(
3473 detected,
3474 "expected at least one seed to detect equivocation"
3475 );
3476 }
3477
3478 #[test_group("slow")]
3479 #[test_traced]
3480 fn test_equivocator_bls12381_threshold_std_min_pk() {
3481 let detected = (0..5).any(|seed| {
3482 equivocator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>)
3483 });
3484 assert!(
3485 detected,
3486 "expected at least one seed to detect equivocation"
3487 );
3488 }
3489
3490 #[test_group("slow")]
3491 #[test_traced]
3492 fn test_equivocator_bls12381_threshold_std_min_sig() {
3493 let detected = (0..5).any(|seed| {
3494 equivocator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>)
3495 });
3496 assert!(
3497 detected,
3498 "expected at least one seed to detect equivocation"
3499 );
3500 }
3501
3502 #[test_group("slow")]
3503 #[test_traced]
3504 fn test_equivocator_bls12381_multisig_min_pk() {
3505 let detected = (0..5).any(|seed| {
3506 equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>)
3507 });
3508 assert!(
3509 detected,
3510 "expected at least one seed to detect equivocation"
3511 );
3512 }
3513
3514 #[test_group("slow")]
3515 #[test_traced]
3516 fn test_equivocator_bls12381_multisig_min_sig() {
3517 let detected = (0..5).any(|seed| {
3518 equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>)
3519 });
3520 assert!(
3521 detected,
3522 "expected at least one seed to detect equivocation"
3523 );
3524 }
3525
3526 #[test_group("slow")]
3527 #[test_traced]
3528 fn test_equivocator_ed25519() {
3529 let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, ed25519::fixture));
3530 assert!(
3531 detected,
3532 "expected at least one seed to detect equivocation"
3533 );
3534 }
3535
3536 #[test_group("slow")]
3537 #[test_traced]
3538 fn test_equivocator_secp256r1() {
3539 let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, secp256r1::fixture));
3540 assert!(
3541 detected,
3542 "expected at least one seed to detect equivocation"
3543 );
3544 }
3545
3546 fn reconfigurer<S, F, L>(seed: u64, mut fixture: F)
3547 where
3548 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3549 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3550 L: Elector<S>,
3551 {
3552 let n = 4;
3554 let required_containers = View::new(50);
3555 let activity_timeout = ViewDelta::new(10);
3556 let skip_timeout = ViewDelta::new(5);
3557 let namespace = b"consensus".to_vec();
3558 let cfg = deterministic::Config::new()
3559 .with_seed(seed)
3560 .with_timeout(Some(Duration::from_secs(30)));
3561 let executor = deterministic::Runner::new(cfg);
3562 executor.start(|mut context| async move {
3563 let (network, mut oracle) = Network::new(
3565 context.with_label("network"),
3566 Config {
3567 max_size: 1024 * 1024,
3568 disconnect_on_block: false,
3569 tracked_peer_sets: None,
3570 },
3571 );
3572
3573 network.start();
3575
3576 let Fixture {
3578 participants,
3579 schemes,
3580 ..
3581 } = fixture(&mut context, &namespace, n);
3582 let mut registrations = register_validators(&mut oracle, &participants).await;
3583
3584 let link = Link {
3586 latency: Duration::from_millis(10),
3587 jitter: Duration::from_millis(1),
3588 success_rate: 1.0,
3589 };
3590 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3591
3592 let elector = L::default();
3594 let relay = Arc::new(mocks::relay::Relay::new());
3595 let mut reporters = Vec::new();
3596 for (idx_scheme, validator) in participants.iter().enumerate() {
3597 let context = context.with_label(&format!("validator_{}", *validator));
3599
3600 let reporter_config = mocks::reporter::Config {
3602 participants: participants.clone().try_into().unwrap(),
3603 scheme: schemes[idx_scheme].clone(),
3604 elector: elector.clone(),
3605 };
3606 let reporter =
3607 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3608 let (pending, recovered, resolver) = registrations
3609 .remove(validator)
3610 .expect("validator should be registered");
3611 if idx_scheme == 0 {
3612 let cfg = mocks::reconfigurer::Config {
3613 scheme: schemes[idx_scheme].clone(),
3614 };
3615 let engine: mocks::reconfigurer::Reconfigurer<_, _, Sha256> =
3616 mocks::reconfigurer::Reconfigurer::new(
3617 context.with_label("byzantine_engine"),
3618 cfg,
3619 );
3620 engine.start(pending);
3621 } else {
3622 reporters.push(reporter.clone());
3623 let application_cfg = mocks::application::Config {
3624 hasher: Sha256::default(),
3625 relay: relay.clone(),
3626 me: validator.clone(),
3627 propose_latency: (10.0, 5.0),
3628 verify_latency: (10.0, 5.0),
3629 certify_latency: (10.0, 5.0),
3630 should_certify: mocks::application::Certifier::Sometimes,
3631 };
3632 let (actor, application) = mocks::application::Application::new(
3633 context.with_label("application"),
3634 application_cfg,
3635 );
3636 actor.start();
3637 let blocker = oracle.control(validator.clone());
3638 let cfg = config::Config {
3639 scheme: schemes[idx_scheme].clone(),
3640 elector: elector.clone(),
3641 blocker,
3642 automaton: application.clone(),
3643 relay: application.clone(),
3644 reporter: reporter.clone(),
3645 strategy: Sequential,
3646 partition: validator.to_string(),
3647 mailbox_size: 1024,
3648 epoch: Epoch::new(333),
3649 leader_timeout: Duration::from_secs(1),
3650 certification_timeout: Duration::from_secs(2),
3651 timeout_retry: Duration::from_secs(10),
3652 fetch_timeout: Duration::from_secs(1),
3653 activity_timeout,
3654 skip_timeout,
3655 fetch_concurrent: 4,
3656 replay_buffer: NZUsize!(1024 * 1024),
3657 write_buffer: NZUsize!(1024 * 1024),
3658 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3659 };
3660 let engine = Engine::new(context.with_label("engine"), cfg);
3661 engine.start(pending, recovered, resolver);
3662 }
3663 }
3664
3665 let mut finalizers = Vec::new();
3667 for reporter in reporters.iter_mut() {
3668 let (mut latest, mut monitor) = reporter.subscribe().await;
3669 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3670 while latest < required_containers {
3671 latest = monitor.recv().await.expect("event missing");
3672 }
3673 }));
3674 }
3675 join_all(finalizers).await;
3676
3677 let byz = &participants[0];
3679 for reporter in reporters.iter() {
3680 {
3682 let faults = reporter.faults.lock();
3683 assert!(faults.is_empty());
3684 }
3685
3686 {
3688 let invalid = reporter.invalid.lock();
3689 assert_eq!(*invalid, 0);
3690 }
3691 }
3692
3693 let blocked = oracle.blocked().await.unwrap();
3695 assert!(!blocked.is_empty());
3696 for (a, b) in blocked {
3697 assert_ne!(&a, byz);
3698 assert_eq!(&b, byz);
3699 }
3700 });
3701 }
3702
3703 #[test_group("slow")]
3704 #[test_traced]
3705 fn test_reconfigurer() {
3706 for seed in 0..5 {
3707 reconfigurer::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3708 reconfigurer::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3709 reconfigurer::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3710 reconfigurer::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3711 reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3712 reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3713 reconfigurer::<_, _, RoundRobin>(seed, ed25519::fixture);
3714 reconfigurer::<_, _, RoundRobin>(seed, secp256r1::fixture);
3715 }
3716 }
3717
3718 fn nuller<S, F, L>(seed: u64, mut fixture: F)
3719 where
3720 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3721 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3722 L: Elector<S>,
3723 {
3724 let n = 4;
3726 let required_containers = View::new(50);
3727 let activity_timeout = ViewDelta::new(10);
3728 let skip_timeout = ViewDelta::new(5);
3729 let namespace = b"consensus".to_vec();
3730 let cfg = deterministic::Config::new()
3731 .with_seed(seed)
3732 .with_timeout(Some(Duration::from_secs(30)));
3733 let executor = deterministic::Runner::new(cfg);
3734 executor.start(|mut context| async move {
3735 let (network, mut oracle) = Network::new(
3737 context.with_label("network"),
3738 Config {
3739 max_size: 1024 * 1024,
3740 disconnect_on_block: false,
3741 tracked_peer_sets: None,
3742 },
3743 );
3744
3745 network.start();
3747
3748 let Fixture {
3750 participants,
3751 schemes,
3752 ..
3753 } = fixture(&mut context, &namespace, n);
3754 let mut registrations = register_validators(&mut oracle, &participants).await;
3755
3756 let link = Link {
3758 latency: Duration::from_millis(10),
3759 jitter: Duration::from_millis(1),
3760 success_rate: 1.0,
3761 };
3762 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3763
3764 let elector = L::default();
3766 let relay = Arc::new(mocks::relay::Relay::new());
3767 let mut reporters = Vec::new();
3768 for (idx_scheme, validator) in participants.iter().enumerate() {
3769 let context = context.with_label(&format!("validator_{}", *validator));
3771
3772 let reporter_config = mocks::reporter::Config {
3774 participants: participants.clone().try_into().unwrap(),
3775 scheme: schemes[idx_scheme].clone(),
3776 elector: elector.clone(),
3777 };
3778 let reporter =
3779 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3780 let (pending, recovered, resolver) = registrations
3781 .remove(validator)
3782 .expect("validator should be registered");
3783 if idx_scheme == 0 {
3784 let cfg = mocks::nuller::Config {
3785 scheme: schemes[idx_scheme].clone(),
3786 };
3787 let engine: mocks::nuller::Nuller<_, _, Sha256> =
3788 mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
3789 engine.start(pending);
3790 } else {
3791 reporters.push(reporter.clone());
3792 let application_cfg = mocks::application::Config {
3793 hasher: Sha256::default(),
3794 relay: relay.clone(),
3795 me: validator.clone(),
3796 propose_latency: (10.0, 5.0),
3797 verify_latency: (10.0, 5.0),
3798 certify_latency: (10.0, 5.0),
3799 should_certify: mocks::application::Certifier::Sometimes,
3800 };
3801 let (actor, application) = mocks::application::Application::new(
3802 context.with_label("application"),
3803 application_cfg,
3804 );
3805 actor.start();
3806 let blocker = oracle.control(validator.clone());
3807 let cfg = config::Config {
3808 scheme: schemes[idx_scheme].clone(),
3809 elector: elector.clone(),
3810 blocker,
3811 automaton: application.clone(),
3812 relay: application.clone(),
3813 reporter: reporter.clone(),
3814 strategy: Sequential,
3815 partition: validator.clone().to_string(),
3816 mailbox_size: 1024,
3817 epoch: Epoch::new(333),
3818 leader_timeout: Duration::from_secs(1),
3819 certification_timeout: Duration::from_secs(2),
3820 timeout_retry: Duration::from_secs(10),
3821 fetch_timeout: Duration::from_secs(1),
3822 activity_timeout,
3823 skip_timeout,
3824 fetch_concurrent: 4,
3825 replay_buffer: NZUsize!(1024 * 1024),
3826 write_buffer: NZUsize!(1024 * 1024),
3827 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3828 };
3829 let engine = Engine::new(context.with_label("engine"), cfg);
3830 engine.start(pending, recovered, resolver);
3831 }
3832 }
3833
3834 let mut finalizers = Vec::new();
3836 for reporter in reporters.iter_mut() {
3837 let (mut latest, mut monitor) = reporter.subscribe().await;
3838 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3839 while latest < required_containers {
3840 latest = monitor.recv().await.expect("event missing");
3841 }
3842 }));
3843 }
3844 join_all(finalizers).await;
3845
3846 let byz = &participants[0];
3848 let mut count_nullify_and_finalize = 0;
3849 for reporter in reporters.iter() {
3850 {
3852 let faults = reporter.faults.lock();
3853 assert_eq!(faults.len(), 1);
3854 let faulter = faults.get(byz).expect("byzantine party is not faulter");
3855 for (_, faults) in faulter.iter() {
3856 for fault in faults.iter() {
3857 match fault {
3858 Activity::NullifyFinalize(_) => {
3859 count_nullify_and_finalize += 1;
3860 }
3861 _ => panic!("unexpected fault: {fault:?}"),
3862 }
3863 }
3864 }
3865 }
3866
3867 {
3869 let invalid = reporter.invalid.lock();
3870 assert_eq!(*invalid, 0);
3871 }
3872 }
3873 assert!(count_nullify_and_finalize > 0);
3874
3875 let blocked = oracle.blocked().await.unwrap();
3877 assert!(!blocked.is_empty());
3878 for (a, b) in blocked {
3879 assert_ne!(&a, byz);
3880 assert_eq!(&b, byz);
3881 }
3882 });
3883 }
3884
3885 #[test_group("slow")]
3886 #[test_traced]
3887 fn test_nuller() {
3888 for seed in 0..5 {
3889 nuller::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3890 nuller::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3891 nuller::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3892 nuller::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3893 nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3894 nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3895 nuller::<_, _, RoundRobin>(seed, ed25519::fixture);
3896 nuller::<_, _, RoundRobin>(seed, secp256r1::fixture);
3897 }
3898 }
3899
3900 fn outdated<S, F, L>(seed: u64, mut fixture: F)
3901 where
3902 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3903 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3904 L: Elector<S>,
3905 {
3906 let n = 4;
3908 let required_containers = View::new(100);
3909 let activity_timeout = ViewDelta::new(10);
3910 let skip_timeout = ViewDelta::new(5);
3911 let namespace = b"consensus".to_vec();
3912 let cfg = deterministic::Config::new()
3913 .with_seed(seed)
3914 .with_timeout(Some(Duration::from_secs(30)));
3915 let executor = deterministic::Runner::new(cfg);
3916 executor.start(|mut context| async move {
3917 let (network, mut oracle) = Network::new(
3919 context.with_label("network"),
3920 Config {
3921 max_size: 1024 * 1024,
3922 disconnect_on_block: false,
3923 tracked_peer_sets: None,
3924 },
3925 );
3926
3927 network.start();
3929
3930 let Fixture {
3932 participants,
3933 schemes,
3934 ..
3935 } = fixture(&mut context, &namespace, n);
3936 let mut registrations = register_validators(&mut oracle, &participants).await;
3937
3938 let link = Link {
3940 latency: Duration::from_millis(10),
3941 jitter: Duration::from_millis(1),
3942 success_rate: 1.0,
3943 };
3944 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3945
3946 let elector = L::default();
3948 let relay = Arc::new(mocks::relay::Relay::new());
3949 let mut reporters = Vec::new();
3950 for (idx_scheme, validator) in participants.iter().enumerate() {
3951 let context = context.with_label(&format!("validator_{}", *validator));
3953
3954 let reporter_config = mocks::reporter::Config {
3956 participants: participants.clone().try_into().unwrap(),
3957 scheme: schemes[idx_scheme].clone(),
3958 elector: elector.clone(),
3959 };
3960 let reporter =
3961 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3962 let (pending, recovered, resolver) = registrations
3963 .remove(validator)
3964 .expect("validator should be registered");
3965 if idx_scheme == 0 {
3966 let cfg = mocks::outdated::Config {
3967 scheme: schemes[idx_scheme].clone(),
3968 view_delta: ViewDelta::new(activity_timeout.get().saturating_mul(4)),
3969 };
3970 let engine: mocks::outdated::Outdated<_, _, Sha256> =
3971 mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
3972 engine.start(pending);
3973 } else {
3974 reporters.push(reporter.clone());
3975 let application_cfg = mocks::application::Config {
3976 hasher: Sha256::default(),
3977 relay: relay.clone(),
3978 me: validator.clone(),
3979 propose_latency: (10.0, 5.0),
3980 verify_latency: (10.0, 5.0),
3981 certify_latency: (10.0, 5.0),
3982 should_certify: mocks::application::Certifier::Sometimes,
3983 };
3984 let (actor, application) = mocks::application::Application::new(
3985 context.with_label("application"),
3986 application_cfg,
3987 );
3988 actor.start();
3989 let blocker = oracle.control(validator.clone());
3990 let cfg = config::Config {
3991 scheme: schemes[idx_scheme].clone(),
3992 elector: elector.clone(),
3993 blocker,
3994 automaton: application.clone(),
3995 relay: application.clone(),
3996 reporter: reporter.clone(),
3997 strategy: Sequential,
3998 partition: validator.clone().to_string(),
3999 mailbox_size: 1024,
4000 epoch: Epoch::new(333),
4001 leader_timeout: Duration::from_secs(1),
4002 certification_timeout: Duration::from_secs(2),
4003 timeout_retry: Duration::from_secs(10),
4004 fetch_timeout: Duration::from_secs(1),
4005 activity_timeout,
4006 skip_timeout,
4007 fetch_concurrent: 4,
4008 replay_buffer: NZUsize!(1024 * 1024),
4009 write_buffer: NZUsize!(1024 * 1024),
4010 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4011 };
4012 let engine = Engine::new(context.with_label("engine"), cfg);
4013 engine.start(pending, recovered, resolver);
4014 }
4015 }
4016
4017 let mut finalizers = Vec::new();
4019 for reporter in reporters.iter_mut() {
4020 let (mut latest, mut monitor) = reporter.subscribe().await;
4021 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4022 while latest < required_containers {
4023 latest = monitor.recv().await.expect("event missing");
4024 }
4025 }));
4026 }
4027 join_all(finalizers).await;
4028
4029 for reporter in reporters.iter() {
4031 {
4033 let faults = reporter.faults.lock();
4034 assert!(faults.is_empty());
4035 }
4036
4037 {
4039 let invalid = reporter.invalid.lock();
4040 assert_eq!(*invalid, 0);
4041 }
4042 }
4043
4044 let blocked = oracle.blocked().await.unwrap();
4046 assert!(blocked.is_empty());
4047 });
4048 }
4049
4050 #[test_group("slow")]
4051 #[test_traced]
4052 fn test_outdated() {
4053 for seed in 0..5 {
4054 outdated::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
4055 outdated::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
4056 outdated::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
4057 outdated::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
4058 outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
4059 outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
4060 outdated::<_, _, RoundRobin>(seed, ed25519::fixture);
4061 outdated::<_, _, RoundRobin>(seed, secp256r1::fixture);
4062 }
4063 }
4064
4065 fn run_1k<S, F, L>(mut fixture: F)
4066 where
4067 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4068 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4069 L: Elector<S>,
4070 {
4071 let n = 10;
4073 let required_containers = View::new(1_000);
4074 let activity_timeout = ViewDelta::new(10);
4075 let skip_timeout = ViewDelta::new(5);
4076 let namespace = b"consensus".to_vec();
4077 let cfg = deterministic::Config::new();
4078 let executor = deterministic::Runner::new(cfg);
4079 executor.start(|mut context| async move {
4080 let (network, mut oracle) = Network::new(
4082 context.with_label("network"),
4083 Config {
4084 max_size: 1024 * 1024,
4085 disconnect_on_block: false,
4086 tracked_peer_sets: None,
4087 },
4088 );
4089
4090 network.start();
4092
4093 let Fixture {
4095 participants,
4096 schemes,
4097 ..
4098 } = fixture(&mut context, &namespace, n);
4099 let mut registrations = register_validators(&mut oracle, &participants).await;
4100
4101 let link = Link {
4103 latency: Duration::from_millis(80),
4104 jitter: Duration::from_millis(10),
4105 success_rate: 0.98,
4106 };
4107 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4108
4109 let elector = L::default();
4111 let relay = Arc::new(mocks::relay::Relay::new());
4112 let mut reporters = Vec::new();
4113 let mut engine_handlers = Vec::new();
4114 for (idx, validator) in participants.iter().enumerate() {
4115 let context = context.with_label(&format!("validator_{}", *validator));
4117
4118 let reporter_config = mocks::reporter::Config {
4120 participants: participants.clone().try_into().unwrap(),
4121 scheme: schemes[idx].clone(),
4122 elector: elector.clone(),
4123 };
4124 let reporter =
4125 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
4126 reporters.push(reporter.clone());
4127 let application_cfg = mocks::application::Config {
4128 hasher: Sha256::default(),
4129 relay: relay.clone(),
4130 me: validator.clone(),
4131 propose_latency: (100.0, 50.0),
4132 verify_latency: (50.0, 40.0),
4133 certify_latency: (50.0, 40.0),
4134 should_certify: mocks::application::Certifier::Sometimes,
4135 };
4136 let (actor, application) = mocks::application::Application::new(
4137 context.with_label("application"),
4138 application_cfg,
4139 );
4140 actor.start();
4141 let blocker = oracle.control(validator.clone());
4142 let cfg = config::Config {
4143 scheme: schemes[idx].clone(),
4144 elector: elector.clone(),
4145 blocker,
4146 automaton: application.clone(),
4147 relay: application.clone(),
4148 reporter: reporter.clone(),
4149 strategy: Sequential,
4150 partition: validator.to_string(),
4151 mailbox_size: 1024,
4152 epoch: Epoch::new(333),
4153 leader_timeout: Duration::from_secs(1),
4154 certification_timeout: Duration::from_secs(2),
4155 timeout_retry: Duration::from_secs(10),
4156 fetch_timeout: Duration::from_secs(1),
4157 activity_timeout,
4158 skip_timeout,
4159 fetch_concurrent: 4,
4160 replay_buffer: NZUsize!(1024 * 1024),
4161 write_buffer: NZUsize!(1024 * 1024),
4162 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4163 };
4164 let engine = Engine::new(context.with_label("engine"), cfg);
4165
4166 let (pending, recovered, resolver) = registrations
4168 .remove(validator)
4169 .expect("validator should be registered");
4170 engine_handlers.push(engine.start(pending, recovered, resolver));
4171 }
4172
4173 let mut finalizers = Vec::new();
4175 for reporter in reporters.iter_mut() {
4176 let (mut latest, mut monitor) = reporter.subscribe().await;
4177 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4178 while latest < required_containers {
4179 latest = monitor.recv().await.expect("event missing");
4180 }
4181 }));
4182 }
4183 join_all(finalizers).await;
4184
4185 for reporter in reporters.iter() {
4187 {
4189 let faults = reporter.faults.lock();
4190 assert!(faults.is_empty());
4191 }
4192
4193 {
4195 let invalid = reporter.invalid.lock();
4196 assert_eq!(*invalid, 0);
4197 }
4198 }
4199
4200 let blocked = oracle.blocked().await.unwrap();
4202 assert!(blocked.is_empty());
4203 })
4204 }
4205
4206 #[test_group("slow")]
4207 #[test_traced]
4208 fn test_1k_bls12381_threshold_vrf_min_pk() {
4209 run_1k::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
4210 }
4211
4212 #[test_group("slow")]
4213 #[test_traced]
4214 fn test_1k_bls12381_threshold_vrf_min_sig() {
4215 run_1k::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
4216 }
4217
4218 #[test_group("slow")]
4219 #[test_traced]
4220 fn test_1k_bls12381_threshold_std_min_pk() {
4221 run_1k::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
4222 }
4223
4224 #[test_group("slow")]
4225 #[test_traced]
4226 fn test_1k_bls12381_threshold_std_min_sig() {
4227 run_1k::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
4228 }
4229
4230 #[test_group("slow")]
4231 #[test_traced]
4232 fn test_1k_bls12381_multisig_min_pk() {
4233 run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4234 }
4235
4236 #[test_group("slow")]
4237 #[test_traced]
4238 fn test_1k_bls12381_multisig_min_sig() {
4239 run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
4240 }
4241
4242 #[test_group("slow")]
4243 #[test_traced]
4244 fn test_1k_ed25519() {
4245 run_1k::<_, _, RoundRobin>(ed25519::fixture);
4246 }
4247
4248 #[test_group("slow")]
4249 #[test_traced]
4250 fn test_1k_secp256r1() {
4251 run_1k::<_, _, RoundRobin>(secp256r1::fixture);
4252 }
4253
4254 fn engine_shutdown<S, F, L>(seed: u64, mut fixture: F, graceful: bool)
4255 where
4256 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4257 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4258 L: Elector<S>,
4259 {
4260 let n = 1;
4261 let namespace = b"consensus".to_vec();
4262 let cfg = deterministic::Config::default()
4263 .with_seed(seed)
4264 .with_timeout(Some(Duration::from_secs(10)));
4265 let executor = deterministic::Runner::new(cfg);
4266 executor.start(|mut context| async move {
4267 let (network, mut oracle) = Network::new(
4269 context.with_label("network"),
4270 Config {
4271 max_size: 1024 * 1024,
4272 disconnect_on_block: true,
4273 tracked_peer_sets: None,
4274 },
4275 );
4276
4277 network.start();
4279
4280 let Fixture {
4282 participants,
4283 schemes,
4284 ..
4285 } = fixture(&mut context, &namespace, n);
4286 let mut registrations = register_validators(&mut oracle, &participants).await;
4287
4288 let link = Link {
4290 latency: Duration::from_millis(1),
4291 jitter: Duration::from_millis(0),
4292 success_rate: 1.0,
4293 };
4294 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4295
4296 let elector = L::default();
4298 let reporter_config = mocks::reporter::Config {
4299 participants: participants.clone().try_into().unwrap(),
4300 scheme: schemes[0].clone(),
4301 elector: elector.clone(),
4302 };
4303 let reporter =
4304 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
4305 let relay = Arc::new(mocks::relay::Relay::new());
4306 let application_cfg = mocks::application::Config {
4307 hasher: Sha256::default(),
4308 relay: relay.clone(),
4309 me: participants[0].clone(),
4310 propose_latency: (1.0, 0.0),
4311 verify_latency: (1.0, 0.0),
4312 certify_latency: (1.0, 0.0),
4313 should_certify: mocks::application::Certifier::Sometimes,
4314 };
4315 let (actor, application) = mocks::application::Application::new(
4316 context.with_label("application"),
4317 application_cfg,
4318 );
4319 actor.start();
4320 let blocker = oracle.control(participants[0].clone());
4321 let cfg = config::Config {
4322 scheme: schemes[0].clone(),
4323 elector: elector.clone(),
4324 blocker,
4325 automaton: application.clone(),
4326 relay: application.clone(),
4327 reporter: reporter.clone(),
4328 strategy: Sequential,
4329 partition: participants[0].clone().to_string(),
4330 mailbox_size: 64,
4331 epoch: Epoch::new(333),
4332 leader_timeout: Duration::from_millis(50),
4333 certification_timeout: Duration::from_millis(100),
4334 timeout_retry: Duration::from_millis(250),
4335 fetch_timeout: Duration::from_millis(50),
4336 activity_timeout: ViewDelta::new(4),
4337 skip_timeout: ViewDelta::new(2),
4338 fetch_concurrent: 4,
4339 replay_buffer: NZUsize!(1024 * 16),
4340 write_buffer: NZUsize!(1024 * 16),
4341 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4342 };
4343 let engine = Engine::new(context.with_label("engine"), cfg);
4344
4345 let (pending, recovered, resolver) = registrations
4347 .remove(&participants[0])
4348 .expect("validator should be registered");
4349 let handle = engine.start(pending, recovered, resolver);
4350
4351 context.sleep(Duration::from_millis(1000)).await;
4353
4354 let running_before = count_running_tasks(&context, "engine");
4356 assert!(
4357 running_before > 0,
4358 "at least one engine task should be running"
4359 );
4360
4361 context.sleep(Duration::from_millis(1500)).await;
4363 assert!(
4364 count_running_tasks(&context, "engine") > 0,
4365 "engine tasks should still be running"
4366 );
4367
4368 let running_after = if graceful {
4370 let metrics_context = context.clone();
4371 let result = context.stop(0, Some(Duration::from_secs(5))).await;
4372 assert!(
4373 result.is_ok(),
4374 "graceful shutdown should complete: {result:?}"
4375 );
4376 count_running_tasks(&metrics_context, "engine")
4377 } else {
4378 handle.abort();
4379 let _ = handle.await; context.sleep(Duration::from_millis(1000)).await;
4383 count_running_tasks(&context, "engine")
4384 };
4385 assert_eq!(
4386 running_after, 0,
4387 "all engine tasks should be stopped, but {running_after} still running"
4388 );
4389 });
4390 }
4391
4392 #[test_group("slow")]
4393 #[test_traced]
4394 fn test_children_shutdown_on_engine_abort() {
4395 for seed in 0..10 {
4396 engine_shutdown::<_, _, Random>(
4397 seed,
4398 bls12381_threshold_vrf::fixture::<MinPk, _>,
4399 false,
4400 );
4401 engine_shutdown::<_, _, Random>(
4402 seed,
4403 bls12381_threshold_vrf::fixture::<MinSig, _>,
4404 false,
4405 );
4406 engine_shutdown::<_, _, RoundRobin>(
4407 seed,
4408 bls12381_threshold_std::fixture::<MinPk, _>,
4409 false,
4410 );
4411 engine_shutdown::<_, _, RoundRobin>(
4412 seed,
4413 bls12381_threshold_std::fixture::<MinSig, _>,
4414 false,
4415 );
4416 engine_shutdown::<_, _, RoundRobin>(
4417 seed,
4418 bls12381_multisig::fixture::<MinPk, _>,
4419 false,
4420 );
4421 engine_shutdown::<_, _, RoundRobin>(
4422 seed,
4423 bls12381_multisig::fixture::<MinSig, _>,
4424 false,
4425 );
4426 engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, false);
4427 engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, false);
4428 }
4429 }
4430
4431 #[test_group("slow")]
4432 #[test_traced]
4433 fn test_graceful_shutdown() {
4434 for seed in 0..10 {
4435 engine_shutdown::<_, _, Random>(
4436 seed,
4437 bls12381_threshold_vrf::fixture::<MinPk, _>,
4438 true,
4439 );
4440 engine_shutdown::<_, _, Random>(
4441 seed,
4442 bls12381_threshold_vrf::fixture::<MinSig, _>,
4443 true,
4444 );
4445 engine_shutdown::<_, _, RoundRobin>(
4446 seed,
4447 bls12381_threshold_std::fixture::<MinPk, _>,
4448 true,
4449 );
4450 engine_shutdown::<_, _, RoundRobin>(
4451 seed,
4452 bls12381_threshold_std::fixture::<MinSig, _>,
4453 true,
4454 );
4455 engine_shutdown::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>, true);
4456 engine_shutdown::<_, _, RoundRobin>(
4457 seed,
4458 bls12381_multisig::fixture::<MinSig, _>,
4459 true,
4460 );
4461 engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, true);
4462 engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, true);
4463 }
4464 }
4465
4466 fn attributable_reporter_filtering<S, F, L>(mut fixture: F)
4467 where
4468 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4469 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4470 L: Elector<S>,
4471 {
4472 let n = 3;
4473 let required_containers = View::new(10);
4474 let activity_timeout = ViewDelta::new(10);
4475 let skip_timeout = ViewDelta::new(5);
4476 let namespace = b"consensus".to_vec();
4477 let executor = deterministic::Runner::timed(Duration::from_secs(30));
4478 executor.start(|mut context| async move {
4479 let (network, mut oracle) = Network::new(
4481 context.with_label("network"),
4482 Config {
4483 max_size: 1024 * 1024,
4484 disconnect_on_block: false,
4485 tracked_peer_sets: None,
4486 },
4487 );
4488 network.start();
4489
4490 let Fixture {
4492 participants,
4493 schemes,
4494 ..
4495 } = fixture(&mut context, &namespace, n);
4496 let mut registrations = register_validators(&mut oracle, &participants).await;
4497
4498 let link = Link {
4500 latency: Duration::from_millis(10),
4501 jitter: Duration::from_millis(1),
4502 success_rate: 1.0,
4503 };
4504 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4505
4506 let elector = L::default();
4508 let relay = Arc::new(mocks::relay::Relay::new());
4509 let mut reporters = Vec::new();
4510 for (idx, validator) in participants.iter().enumerate() {
4511 let context = context.with_label(&format!("validator_{}", *validator));
4512
4513 let reporter_config = mocks::reporter::Config {
4514 participants: participants.clone().try_into().unwrap(),
4515 scheme: schemes[idx].clone(),
4516 elector: elector.clone(),
4517 };
4518 let mock_reporter = mocks::reporter::Reporter::new(
4519 context.with_label("mock_reporter"),
4520 reporter_config,
4521 );
4522
4523 let attributable_reporter = scheme::reporter::AttributableReporter::new(
4525 context.with_label("rng"),
4526 schemes[idx].clone(),
4527 mock_reporter.clone(),
4528 Sequential,
4529 true, );
4531 reporters.push(mock_reporter.clone());
4532
4533 let application_cfg = mocks::application::Config {
4534 hasher: Sha256::default(),
4535 relay: relay.clone(),
4536 me: validator.clone(),
4537 propose_latency: (10.0, 5.0),
4538 verify_latency: (10.0, 5.0),
4539 certify_latency: (10.0, 5.0),
4540 should_certify: mocks::application::Certifier::Sometimes,
4541 };
4542 let (actor, application) = mocks::application::Application::new(
4543 context.with_label("application"),
4544 application_cfg,
4545 );
4546 actor.start();
4547 let blocker = oracle.control(validator.clone());
4548 let cfg = config::Config {
4549 scheme: schemes[idx].clone(),
4550 elector: elector.clone(),
4551 blocker,
4552 automaton: application.clone(),
4553 relay: application.clone(),
4554 reporter: attributable_reporter,
4555 strategy: Sequential,
4556 partition: validator.to_string(),
4557 mailbox_size: 1024,
4558 epoch: Epoch::new(333),
4559 leader_timeout: Duration::from_secs(1),
4560 certification_timeout: Duration::from_secs(2),
4561 timeout_retry: Duration::from_secs(10),
4562 fetch_timeout: Duration::from_secs(1),
4563 activity_timeout,
4564 skip_timeout,
4565 fetch_concurrent: 4,
4566 replay_buffer: NZUsize!(1024 * 1024),
4567 write_buffer: NZUsize!(1024 * 1024),
4568 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4569 };
4570 let engine = Engine::new(context.with_label("engine"), cfg);
4571
4572 let (pending, recovered, resolver) = registrations
4574 .remove(validator)
4575 .expect("validator should be registered");
4576 engine.start(pending, recovered, resolver);
4577 }
4578
4579 let mut finalizers = Vec::new();
4581 for reporter in reporters.iter_mut() {
4582 let (mut latest, mut monitor) = reporter.subscribe().await;
4583 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4584 while latest < required_containers {
4585 latest = monitor.recv().await.expect("event missing");
4586 }
4587 }));
4588 }
4589 join_all(finalizers).await;
4590
4591 for reporter in reporters.iter() {
4593 {
4595 let faults = reporter.faults.lock();
4596 assert!(faults.is_empty(), "No faults should be reported");
4597 }
4598
4599 {
4601 let invalid = reporter.invalid.lock();
4602 assert_eq!(*invalid, 0, "No invalid signatures");
4603 }
4604
4605 {
4607 let notarizations = reporter.notarizations.lock();
4608 let finalizations = reporter.finalizations.lock();
4609 assert!(
4610 !notarizations.is_empty() || !finalizations.is_empty(),
4611 "Certificates should be reported"
4612 );
4613 }
4614
4615 let notarizes = reporter.notarizes.lock();
4617 let last_view = notarizes.keys().max().cloned().unwrap_or_default();
4618 for (view, payloads) in notarizes.iter() {
4619 if *view == last_view {
4620 continue; }
4622
4623 let signers: usize = payloads.values().map(|signers| signers.len()).sum();
4624
4625 if S::is_attributable() {
4627 assert!(signers > 1, "view {view}: {signers}");
4628 } else {
4629 assert_eq!(signers, 0);
4631 }
4632 }
4633
4634 let finalizes = reporter.finalizes.lock();
4636 for (_, payloads) in finalizes.iter() {
4637 let signers: usize = payloads.values().map(|signers| signers.len()).sum();
4638
4639 if S::is_attributable() {
4641 assert!(signers > 1);
4642 } else {
4643 assert_eq!(signers, 0);
4645 }
4646 }
4647 }
4648
4649 let blocked = oracle.blocked().await.unwrap();
4651 assert!(blocked.is_empty());
4652 });
4653 }
4654
4655 #[test_traced]
4656 fn test_attributable_reporter_filtering() {
4657 attributable_reporter_filtering::<_, _, Random>(
4658 bls12381_threshold_vrf::fixture::<MinPk, _>,
4659 );
4660 attributable_reporter_filtering::<_, _, Random>(
4661 bls12381_threshold_vrf::fixture::<MinSig, _>,
4662 );
4663 attributable_reporter_filtering::<_, _, RoundRobin>(
4664 bls12381_threshold_std::fixture::<MinPk, _>,
4665 );
4666 attributable_reporter_filtering::<_, _, RoundRobin>(
4667 bls12381_threshold_std::fixture::<MinSig, _>,
4668 );
4669 attributable_reporter_filtering::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4670 attributable_reporter_filtering::<_, _, RoundRobin>(
4671 bls12381_multisig::fixture::<MinSig, _>,
4672 );
4673 attributable_reporter_filtering::<_, _, RoundRobin>(ed25519::fixture);
4674 attributable_reporter_filtering::<_, _, RoundRobin>(secp256r1::fixture);
4675 }
4676
4677 fn split_views_no_lockup<S, F, L>(mut fixture: F)
4678 where
4679 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4680 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4681 L: Elector<S>,
4682 {
4683 enum ParticipantType {
4696 Group1, Group2, Ignorant, Byzantine, }
4701 let get_type = |idx: usize| -> ParticipantType {
4702 match idx {
4703 0..3 => ParticipantType::Group1,
4704 3..6 => ParticipantType::Group2,
4705 6 => ParticipantType::Ignorant,
4706 7..10 => ParticipantType::Byzantine,
4707 _ => unreachable!(),
4708 }
4709 };
4710
4711 let n = 10;
4713 let quorum = quorum(n) as usize;
4714 assert_eq!(quorum, 7);
4715 let activity_timeout = ViewDelta::new(10);
4716 let skip_timeout = ViewDelta::new(5);
4717 let namespace = b"consensus".to_vec();
4718 let executor = deterministic::Runner::timed(Duration::from_secs(300));
4719 executor.start(|mut context| async move {
4720 let (network, mut oracle) = Network::new(
4722 context.with_label("network"),
4723 Config {
4724 max_size: 1024 * 1024,
4725 disconnect_on_block: false,
4726 tracked_peer_sets: None,
4727 },
4728 );
4729 network.start();
4730
4731 let Fixture {
4733 participants,
4734 schemes,
4735 ..
4736 } = fixture(&mut context, &namespace, n);
4737 let mut registrations = register_validators(&mut oracle, &participants).await;
4738
4739 let build_finalization = |proposal: &Proposal<D>| -> TFinalization<_, D> {
4743 let votes: Vec<_> = (0..=quorum)
4744 .map(|i| TFinalize::sign(&schemes[i], proposal.clone()).unwrap())
4745 .collect();
4746 TFinalization::from_finalizes(&schemes[0], &votes, &Sequential)
4747 .expect("finalization quorum")
4748 };
4749 let build_notarization = |proposal: &Proposal<D>| -> TNotarization<_, D> {
4751 let votes: Vec<_> = (0..=quorum)
4752 .map(|i| TNotarize::sign(&schemes[i], proposal.clone()).unwrap())
4753 .collect();
4754 TNotarization::from_notarizes(&schemes[0], &votes, &Sequential)
4755 .expect("notarization quorum")
4756 };
4757 let build_nullification = |round: Round| -> TNullification<_> {
4758 let votes: Vec<_> = (0..=quorum)
4759 .map(|i| TNullify::sign::<D>(&schemes[i], round).unwrap())
4760 .collect();
4761 TNullification::from_nullifies(&schemes[0], &votes, &Sequential)
4762 .expect("nullification quorum")
4763 };
4764 let f_view = 1;
4766 let round_f = Round::new(Epoch::new(333), View::new(f_view));
4767 let payload_b0 = Sha256::hash(b"B_F");
4768 let proposal_b0 = Proposal::new(round_f, View::new(f_view - 1), payload_b0);
4769 let payload_b1a = Sha256::hash(b"B_G1");
4770 let proposal_b1a = Proposal::new(
4771 Round::new(Epoch::new(333), View::new(f_view + 1)),
4772 View::new(f_view),
4773 payload_b1a,
4774 );
4775 let payload_b1b = Sha256::hash(b"B_G2");
4776 let proposal_b1b = Proposal::new(
4777 Round::new(Epoch::new(333), View::new(f_view + 2)),
4778 View::new(f_view),
4779 payload_b1b,
4780 );
4781
4782 let b0_notarization = build_notarization(&proposal_b0);
4784 let b0_finalization = build_finalization(&proposal_b0);
4785 let b1a_notarization = build_notarization(&proposal_b1a);
4787 let b1b_notarization = build_notarization(&proposal_b1b);
4788 let null_a = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 1)));
4790 let null_b = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 2)));
4791
4792 let injector_pk = PrivateKey::from_seed(1_000_000).public_key();
4794 let (mut injector_sender, _inj_certificate_receiver) = oracle
4795 .control(injector_pk.clone())
4796 .register(1, TEST_QUOTA)
4797 .await
4798 .unwrap();
4799
4800 let link = Link {
4802 latency: Duration::from_millis(10),
4803 jitter: Duration::from_millis(0),
4804 success_rate: 1.0,
4805 };
4806 for p in participants.iter() {
4807 oracle
4808 .add_link(injector_pk.clone(), p.clone(), link.clone())
4809 .await
4810 .unwrap();
4811 }
4812
4813 let msg = Certificate::<_, D>::Notarization(b0_notarization).encode();
4817 injector_sender
4818 .send(Recipients::All, msg, true)
4819 .await
4820 .unwrap();
4821 let msg = Certificate::<_, D>::Finalization(b0_finalization).encode();
4822 injector_sender
4823 .send(Recipients::All, msg, true)
4824 .await
4825 .unwrap();
4826 let notarization_msg = Certificate::<_, D>::Notarization(b1a_notarization);
4828 let nullification_msg = Certificate::<_, D>::Nullification(null_a.clone());
4829 for (i, participant) in participants.iter().enumerate() {
4830 let recipient = Recipients::One(participant.clone());
4831 let msg = match get_type(i) {
4832 ParticipantType::Group1 => notarization_msg.encode(),
4833 _ => nullification_msg.encode(),
4834 };
4835 injector_sender.send(recipient, msg, true).await.unwrap();
4836 }
4837 let notarization_msg = Certificate::<_, D>::Notarization(b1b_notarization);
4839 let nullification_msg = Certificate::<_, D>::Nullification(null_b.clone());
4840 for (i, participant) in participants.iter().enumerate() {
4841 let recipient = Recipients::One(participant.clone());
4842 let msg = match get_type(i) {
4843 ParticipantType::Group2 => notarization_msg.encode(),
4844 _ => nullification_msg.encode(),
4845 };
4846 injector_sender.send(recipient, msg, true).await.unwrap();
4847 }
4848
4849 let elector = L::default();
4855 let relay = Arc::new(mocks::relay::Relay::new());
4856 let mut honest_reporters = Vec::new();
4857 for (idx, validator) in participants.iter().enumerate() {
4858 let (pending, recovered, resolver) = registrations
4859 .remove(validator)
4860 .expect("validator should be registered");
4861 let participant_type = get_type(idx);
4862 if matches!(participant_type, ParticipantType::Byzantine) {
4863 let cfg = mocks::nullify_only::Config {
4865 scheme: schemes[idx].clone(),
4866 };
4867 let engine: mocks::nullify_only::NullifyOnly<_, _, Sha256> =
4868 mocks::nullify_only::NullifyOnly::new(
4869 context.with_label(&format!("byzantine_{}", *validator)),
4870 cfg,
4871 );
4872 engine.start(pending);
4873 drop(recovered);
4875 drop(resolver);
4876 } else {
4877 let reporter_config = mocks::reporter::Config {
4879 participants: participants.clone().try_into().unwrap(),
4880 scheme: schemes[idx].clone(),
4881 elector: elector.clone(),
4882 };
4883 let reporter = mocks::reporter::Reporter::new(
4884 context.with_label(&format!("reporter_{}", *validator)),
4885 reporter_config,
4886 );
4887 honest_reporters.push(reporter.clone());
4888
4889 let application_cfg = mocks::application::Config {
4890 hasher: Sha256::default(),
4891 relay: relay.clone(),
4892 me: validator.clone(),
4893 propose_latency: (250.0, 50.0), verify_latency: (10.0, 5.0),
4895 certify_latency: (10.0, 5.0),
4896 should_certify: mocks::application::Certifier::Sometimes,
4897 };
4898 let (actor, application) = mocks::application::Application::new(
4899 context.with_label(&format!("application_{}", *validator)),
4900 application_cfg,
4901 );
4902 actor.start();
4903 let blocker = oracle.control(validator.clone());
4904 let cfg = config::Config {
4905 scheme: schemes[idx].clone(),
4906 elector: elector.clone(),
4907 blocker,
4908 automaton: application.clone(),
4909 relay: application.clone(),
4910 reporter: reporter.clone(),
4911 strategy: Sequential,
4912 partition: validator.to_string(),
4913 mailbox_size: 1024,
4914 epoch: Epoch::new(333),
4915 leader_timeout: Duration::from_secs(10),
4916 certification_timeout: Duration::from_secs(10),
4917 timeout_retry: Duration::from_secs(10),
4918 fetch_timeout: Duration::from_secs(1),
4919 activity_timeout,
4920 skip_timeout,
4921 fetch_concurrent: 4,
4922 replay_buffer: NZUsize!(1024 * 1024),
4923 write_buffer: NZUsize!(1024 * 1024),
4924 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4925 };
4926 let engine =
4927 Engine::new(context.with_label(&format!("engine_{}", *validator)), cfg);
4928 engine.start(pending, recovered, resolver);
4929 }
4930 }
4931
4932 context.sleep(Duration::from_secs(2)).await;
4934
4935 let view = View::new(f_view);
4940 for reporter in honest_reporters.iter() {
4941 let finalizations = reporter.finalizations.lock();
4942 assert!(finalizations.contains_key(&view));
4943 }
4944
4945 let view = View::new(f_view + 1);
4949 for (i, reporter) in honest_reporters.iter().enumerate() {
4950 let finalizations = reporter.finalizations.lock();
4951 assert!(!finalizations.contains_key(&view));
4952 let nullifications = reporter.nullifications.lock();
4953 let notarizations = reporter.notarizations.lock();
4954 match get_type(i) {
4955 ParticipantType::Group1 => {
4956 assert!(notarizations.contains_key(&view));
4957 assert!(!nullifications.contains_key(&view));
4958 }
4959 _ => {
4960 assert!(nullifications.contains_key(&view));
4961 assert!(!notarizations.contains_key(&view));
4962 }
4963 }
4964 }
4965
4966 let view = View::new(f_view + 2);
4970 for (i, reporter) in honest_reporters.iter().enumerate() {
4971 let finalizations = reporter.finalizations.lock();
4972 assert!(!finalizations.contains_key(&view));
4973 let nullifications = reporter.nullifications.lock();
4974 let notarizations = reporter.notarizations.lock();
4975 match get_type(i) {
4976 ParticipantType::Group2 => {
4977 assert!(notarizations.contains_key(&view));
4978 assert!(!nullifications.contains_key(&view));
4979 }
4980 _ => {
4981 assert!(nullifications.contains_key(&view));
4982 assert!(!notarizations.contains_key(&view));
4983 }
4984 }
4985 }
4986
4987 let next_view = View::new(f_view + 3);
4989 for (i, reporter) in honest_reporters.iter().enumerate() {
4990 let nullifies = reporter.nullifies.lock();
4991 assert!(!nullifies.contains_key(&next_view), "reporter {i}");
4992 }
4993
4994 link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
4998
4999 {
5001 let target = View::new(f_view + 3);
5002 let mut finalizers = Vec::new();
5003 for reporter in honest_reporters.iter_mut() {
5004 let (mut latest, mut monitor) = reporter.subscribe().await;
5005 finalizers.push(context.with_label("resume_finalizer").spawn(
5006 move |_| async move {
5007 while latest < target {
5008 latest = monitor.recv().await.expect("event missing");
5009 }
5010 },
5011 ));
5012 }
5013 join_all(finalizers).await;
5014 }
5015
5016 for reporter in honest_reporters.iter() {
5018 {
5019 let faults = reporter.faults.lock();
5020 assert!(faults.is_empty());
5021 }
5022 {
5023 let invalid = reporter.invalid.lock();
5024 assert_eq!(*invalid, 0);
5025 }
5026 }
5027 let blocked = oracle.blocked().await.unwrap();
5028 assert!(blocked.is_empty(), "blocked peers: {blocked:?}");
5029 });
5030 }
5031
5032 #[test_group("slow")]
5033 #[test_traced]
5034 fn test_split_views_no_lockup() {
5035 split_views_no_lockup::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
5036 split_views_no_lockup::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
5037 split_views_no_lockup::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
5038 split_views_no_lockup::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
5039 split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
5040 split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
5041 split_views_no_lockup::<_, _, RoundRobin>(ed25519::fixture);
5042 split_views_no_lockup::<_, _, RoundRobin>(secp256r1::fixture);
5043 }
5044
5045 fn tle<V, L>()
5046 where
5047 V: Variant,
5048 L: Elector<bls12381_threshold_vrf::Scheme<PublicKey, V>>,
5049 {
5050 let n = 4;
5052 let namespace = b"consensus".to_vec();
5053 let activity_timeout = ViewDelta::new(100);
5054 let skip_timeout = ViewDelta::new(50);
5055 let executor = deterministic::Runner::timed(Duration::from_secs(30));
5056 executor.start(|mut context| async move {
5057 let (network, mut oracle) = Network::new(
5059 context.with_label("network"),
5060 Config {
5061 max_size: 1024 * 1024,
5062 disconnect_on_block: false,
5063 tracked_peer_sets: None,
5064 },
5065 );
5066
5067 network.start();
5069
5070 let Fixture {
5072 participants,
5073 schemes,
5074 ..
5075 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, &namespace, n);
5076 let mut registrations = register_validators(&mut oracle, &participants).await;
5077
5078 let link = Link {
5080 latency: Duration::from_millis(10),
5081 jitter: Duration::from_millis(5),
5082 success_rate: 1.0,
5083 };
5084 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5085
5086 let elector = L::default();
5088 let relay = Arc::new(mocks::relay::Relay::new());
5089 let mut reporters = Vec::new();
5090 let mut engine_handlers = Vec::new();
5091 let monitor_reporter = Arc::new(Mutex::new(None));
5092 for (idx, validator) in participants.iter().enumerate() {
5093 let context = context.with_label(&format!("validator_{}", *validator));
5095
5096 let reporter_config = mocks::reporter::Config {
5098 participants: participants.clone().try_into().unwrap(),
5099 scheme: schemes[idx].clone(),
5100 elector: elector.clone(),
5101 };
5102 let reporter =
5103 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5104 reporters.push(reporter.clone());
5105 if idx == 0 {
5106 *monitor_reporter.lock() = Some(reporter.clone());
5107 }
5108
5109 let application_cfg = mocks::application::Config {
5111 hasher: Sha256::default(),
5112 relay: relay.clone(),
5113 me: validator.clone(),
5114 propose_latency: (10.0, 5.0),
5115 verify_latency: (10.0, 5.0),
5116 certify_latency: (10.0, 5.0),
5117 should_certify: mocks::application::Certifier::Sometimes,
5118 };
5119 let (actor, application) = mocks::application::Application::new(
5120 context.with_label("application"),
5121 application_cfg,
5122 );
5123 actor.start();
5124 let blocker = oracle.control(validator.clone());
5125 let cfg = config::Config {
5126 scheme: schemes[idx].clone(),
5127 elector: elector.clone(),
5128 blocker,
5129 automaton: application.clone(),
5130 relay: application.clone(),
5131 reporter: reporter.clone(),
5132 strategy: Sequential,
5133 partition: validator.to_string(),
5134 mailbox_size: 1024,
5135 epoch: Epoch::new(333),
5136 leader_timeout: Duration::from_millis(100),
5137 certification_timeout: Duration::from_millis(200),
5138 timeout_retry: Duration::from_millis(500),
5139 fetch_timeout: Duration::from_millis(100),
5140 activity_timeout,
5141 skip_timeout,
5142 fetch_concurrent: 4,
5143 replay_buffer: NZUsize!(1024 * 1024),
5144 write_buffer: NZUsize!(1024 * 1024),
5145 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5146 };
5147 let engine = Engine::new(context.with_label("engine"), cfg);
5148
5149 let (pending, recovered, resolver) = registrations
5151 .remove(validator)
5152 .expect("validator should be registered");
5153 engine_handlers.push(engine.start(pending, recovered, resolver));
5154 }
5155
5156 let target = Round::new(Epoch::new(333), View::new(10)); let message = b"Secret message for future view10"; let ciphertext = schemes[0].encrypt(&mut context, target, *message);
5162
5163 let reporter = monitor_reporter.lock().clone().unwrap();
5165 loop {
5166 context.sleep(Duration::from_millis(100)).await;
5168 let notarizations = reporter.notarizations.lock();
5169 let Some(notarization) = notarizations.get(&target.view()) else {
5170 continue;
5171 };
5172
5173 let seed = notarization.seed();
5175 let decrypted = seed
5176 .decrypt(&ciphertext)
5177 .expect("Decryption should succeed with valid seed signature");
5178 assert_eq!(
5179 message,
5180 decrypted.as_ref(),
5181 "Decrypted message should match original message"
5182 );
5183 break;
5184 }
5185 });
5186 }
5187
5188 #[test_traced]
5189 fn test_tle() {
5190 tle::<MinPk, Random>();
5191 tle::<MinSig, Random>();
5192 }
5193
5194 fn hailstorm<S, F, L>(
5195 seed: u64,
5196 shutdowns: usize,
5197 interval: ViewDelta,
5198 mut fixture: F,
5199 ) -> String
5200 where
5201 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5202 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5203 L: Elector<S>,
5204 {
5205 let n = 5;
5207 let activity_timeout = ViewDelta::new(10);
5208 let skip_timeout = ViewDelta::new(5);
5209 let namespace = b"consensus".to_vec();
5210 let cfg = deterministic::Config::new().with_seed(seed);
5211 let executor = deterministic::Runner::new(cfg);
5212 executor.start(|mut context| async move {
5213 let (network, mut oracle) = Network::new(
5215 context.with_label("network"),
5216 Config {
5217 max_size: 1024 * 1024,
5218 disconnect_on_block: true,
5219 tracked_peer_sets: None,
5220 },
5221 );
5222
5223 network.start();
5225
5226 let Fixture {
5228 participants,
5229 schemes,
5230 ..
5231 } = fixture(&mut context, &namespace, n);
5232 let mut registrations = register_validators(&mut oracle, &participants).await;
5233
5234 let link = Link {
5236 latency: Duration::from_millis(10),
5237 jitter: Duration::from_millis(1),
5238 success_rate: 1.0,
5239 };
5240 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5241
5242 let elector = L::default();
5244 let relay = Arc::new(mocks::relay::Relay::new());
5245 let mut reporters = BTreeMap::new();
5246 let mut engine_handlers = BTreeMap::new();
5247 for (idx, validator) in participants.iter().enumerate() {
5248 let context = context.with_label(&format!("validator_{}", *validator));
5250
5251 let reporter_config = mocks::reporter::Config {
5253 participants: participants.clone().try_into().unwrap(),
5254 scheme: schemes[idx].clone(),
5255 elector: elector.clone(),
5256 };
5257 let reporter =
5258 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5259 reporters.insert(idx, reporter.clone());
5260 let application_cfg = mocks::application::Config {
5261 hasher: Sha256::default(),
5262 relay: relay.clone(),
5263 me: validator.clone(),
5264 propose_latency: (10.0, 5.0),
5265 verify_latency: (10.0, 5.0),
5266 certify_latency: (10.0, 5.0),
5267 should_certify: mocks::application::Certifier::Sometimes,
5268 };
5269 let (actor, application) = mocks::application::Application::new(
5270 context.with_label("application"),
5271 application_cfg,
5272 );
5273 actor.start();
5274 let blocker = oracle.control(validator.clone());
5275 let cfg = config::Config {
5276 scheme: schemes[idx].clone(),
5277 elector: elector.clone(),
5278 blocker,
5279 automaton: application.clone(),
5280 relay: application.clone(),
5281 reporter: reporter.clone(),
5282 strategy: Sequential,
5283 partition: validator.to_string(),
5284 mailbox_size: 1024,
5285 epoch: Epoch::new(333),
5286 leader_timeout: Duration::from_secs(1),
5287 certification_timeout: Duration::from_secs(2),
5288 timeout_retry: Duration::from_secs(10),
5289 fetch_timeout: Duration::from_secs(1),
5290 activity_timeout,
5291 skip_timeout,
5292 fetch_concurrent: 4,
5293 replay_buffer: NZUsize!(1024 * 1024),
5294 write_buffer: NZUsize!(1024 * 1024),
5295 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5296 };
5297 let engine = Engine::new(context.with_label("engine"), cfg);
5298
5299 let (pending, recovered, resolver) = registrations
5301 .remove(validator)
5302 .expect("validator should be registered");
5303 engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
5304 }
5305
5306 let mut target = View::zero();
5308 for i in 0..shutdowns {
5309 target = target.saturating_add(interval);
5311
5312 let mut finalizers = Vec::new();
5314 for (_, reporter) in reporters.iter_mut() {
5315 let (mut latest, mut monitor) = reporter.subscribe().await;
5316 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5317 while latest < target {
5318 latest = monitor.recv().await.expect("event missing");
5319 }
5320 }));
5321 }
5322 join_all(finalizers).await;
5323 target = target.saturating_add(interval);
5324
5325 let idx = context.gen_range(0..engine_handlers.len());
5327 let validator = &participants[idx];
5328 let handle = engine_handlers.remove(&idx).unwrap();
5329 handle.abort();
5330 let _ = handle.await;
5331 let selected_reporter = reporters.remove(&idx).unwrap();
5332 info!(idx, ?validator, "shutdown validator");
5333
5334 let mut finalizers = Vec::new();
5336 for (_, reporter) in reporters.iter_mut() {
5337 let (mut latest, mut monitor) = reporter.subscribe().await;
5338 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5339 while latest < target {
5340 latest = monitor.recv().await.expect("event missing");
5341 }
5342 }));
5343 }
5344 join_all(finalizers).await;
5345 target = target.saturating_add(interval);
5346
5347 info!(idx, ?validator, "restarting validator");
5349 let context =
5350 context.with_label(&format!("validator_{}_restarted_{}", *validator, i));
5351
5352 let (pending, recovered, resolver) =
5354 register_validator(&mut oracle, validator.clone()).await;
5355 let application_cfg = mocks::application::Config {
5356 hasher: Sha256::default(),
5357 relay: relay.clone(),
5358 me: validator.clone(),
5359 propose_latency: (10.0, 5.0),
5360 verify_latency: (10.0, 5.0),
5361 certify_latency: (10.0, 5.0),
5362 should_certify: mocks::application::Certifier::Sometimes,
5363 };
5364 let (actor, application) = mocks::application::Application::new(
5365 context.with_label("application"),
5366 application_cfg,
5367 );
5368 actor.start();
5369 reporters.insert(idx, selected_reporter.clone());
5370 let blocker = oracle.control(validator.clone());
5371 let cfg = config::Config {
5372 scheme: schemes[idx].clone(),
5373 elector: elector.clone(),
5374 blocker,
5375 automaton: application.clone(),
5376 relay: application.clone(),
5377 reporter: selected_reporter,
5378 strategy: Sequential,
5379 partition: validator.to_string(),
5380 mailbox_size: 1024,
5381 epoch: Epoch::new(333),
5382 leader_timeout: Duration::from_secs(1),
5383 certification_timeout: Duration::from_secs(2),
5384 timeout_retry: Duration::from_secs(10),
5385 fetch_timeout: Duration::from_secs(1),
5386 activity_timeout,
5387 skip_timeout,
5388 fetch_concurrent: 4,
5389 replay_buffer: NZUsize!(1024 * 1024),
5390 write_buffer: NZUsize!(1024 * 1024),
5391 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5392 };
5393 let engine = Engine::new(context.with_label("engine"), cfg);
5394 engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
5395
5396 let mut finalizers = Vec::new();
5398 for (_, reporter) in reporters.iter_mut() {
5399 let (mut latest, mut monitor) = reporter.subscribe().await;
5400 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5401 while latest < target {
5402 latest = monitor.recv().await.expect("event missing");
5403 }
5404 }));
5405 }
5406 join_all(finalizers).await;
5407 info!(idx, ?validator, "validator recovered");
5408 }
5409
5410 let latest_complete = target.saturating_sub(activity_timeout);
5412 for (_, reporter) in reporters.iter() {
5413 {
5415 let faults = reporter.faults.lock();
5416 assert!(faults.is_empty());
5417 }
5418
5419 {
5421 let invalid = reporter.invalid.lock();
5422 assert_eq!(*invalid, 0);
5423 }
5424
5425 let mut notarized = HashMap::new();
5427 let mut finalized = HashMap::new();
5428 {
5429 let notarizes = reporter.notarizes.lock();
5430 for view in View::range(View::new(1), latest_complete) {
5431 let Some(payloads) = notarizes.get(&view) else {
5433 continue;
5434 };
5435 if payloads.len() > 1 {
5436 panic!("view: {view}");
5437 }
5438 let (digest, _) = payloads.iter().next().unwrap();
5439 notarized.insert(view, *digest);
5440 }
5441 }
5442 {
5443 let notarizations = reporter.notarizations.lock();
5444 for view in View::range(View::new(1), latest_complete) {
5445 let Some(notarization) = notarizations.get(&view) else {
5447 continue;
5448 };
5449 let Some(digest) = notarized.get(&view) else {
5450 continue;
5451 };
5452 assert_eq!(¬arization.proposal.payload, digest);
5453 }
5454 }
5455 {
5456 let finalizes = reporter.finalizes.lock();
5457 for view in View::range(View::new(1), latest_complete) {
5458 let Some(payloads) = finalizes.get(&view) else {
5460 continue;
5461 };
5462 if payloads.len() > 1 {
5463 panic!("view: {view}");
5464 }
5465 let (digest, _) = payloads.iter().next().unwrap();
5466 finalized.insert(view, *digest);
5467
5468 if view > latest_complete {
5470 continue;
5471 }
5472
5473 let nullifies = reporter.nullifies.lock();
5475 let Some(nullifies) = nullifies.get(&view) else {
5476 continue;
5477 };
5478 for (_, finalizers) in payloads.iter() {
5479 for finalizer in finalizers.iter() {
5480 if nullifies.contains(finalizer) {
5481 panic!("should not nullify and finalize at same view");
5482 }
5483 }
5484 }
5485 }
5486 }
5487 {
5488 let finalizations = reporter.finalizations.lock();
5489 for view in View::range(View::new(1), latest_complete) {
5490 let Some(finalization) = finalizations.get(&view) else {
5492 continue;
5493 };
5494 let Some(digest) = finalized.get(&view) else {
5495 continue;
5496 };
5497 assert_eq!(&finalization.proposal.payload, digest);
5498 }
5499 }
5500 }
5501
5502 let blocked = oracle.blocked().await.unwrap();
5504 assert!(blocked.is_empty());
5505
5506 context.auditor().state()
5508 })
5509 }
5510
5511 #[test_group("slow")]
5512 #[test_traced]
5513 fn test_hailstorm_bls12381_threshold_vrf_min_pk() {
5514 assert_eq!(
5515 hailstorm::<_, _, Random>(
5516 0,
5517 10,
5518 ViewDelta::new(15),
5519 bls12381_threshold_vrf::fixture::<MinPk, _>
5520 ),
5521 hailstorm::<_, _, Random>(
5522 0,
5523 10,
5524 ViewDelta::new(15),
5525 bls12381_threshold_vrf::fixture::<MinPk, _>
5526 ),
5527 );
5528 }
5529
5530 #[test_group("slow")]
5531 #[test_traced]
5532 fn test_hailstorm_bls12381_threshold_vrf_min_sig() {
5533 assert_eq!(
5534 hailstorm::<_, _, Random>(
5535 0,
5536 10,
5537 ViewDelta::new(15),
5538 bls12381_threshold_vrf::fixture::<MinSig, _>
5539 ),
5540 hailstorm::<_, _, Random>(
5541 0,
5542 10,
5543 ViewDelta::new(15),
5544 bls12381_threshold_vrf::fixture::<MinSig, _>
5545 ),
5546 );
5547 }
5548
5549 #[test_group("slow")]
5550 #[test_traced]
5551 fn test_hailstorm_bls12381_threshold_std_min_pk() {
5552 assert_eq!(
5553 hailstorm::<_, _, RoundRobin>(
5554 0,
5555 10,
5556 ViewDelta::new(15),
5557 bls12381_threshold_std::fixture::<MinPk, _>
5558 ),
5559 hailstorm::<_, _, RoundRobin>(
5560 0,
5561 10,
5562 ViewDelta::new(15),
5563 bls12381_threshold_std::fixture::<MinPk, _>
5564 ),
5565 );
5566 }
5567
5568 #[test_group("slow")]
5569 #[test_traced]
5570 fn test_hailstorm_bls12381_threshold_std_min_sig() {
5571 assert_eq!(
5572 hailstorm::<_, _, RoundRobin>(
5573 0,
5574 10,
5575 ViewDelta::new(15),
5576 bls12381_threshold_std::fixture::<MinSig, _>
5577 ),
5578 hailstorm::<_, _, RoundRobin>(
5579 0,
5580 10,
5581 ViewDelta::new(15),
5582 bls12381_threshold_std::fixture::<MinSig, _>
5583 ),
5584 );
5585 }
5586
5587 #[test_group("slow")]
5588 #[test_traced]
5589 fn test_hailstorm_bls12381_multisig_min_pk() {
5590 assert_eq!(
5591 hailstorm::<_, _, RoundRobin>(
5592 0,
5593 10,
5594 ViewDelta::new(15),
5595 bls12381_multisig::fixture::<MinPk, _>
5596 ),
5597 hailstorm::<_, _, RoundRobin>(
5598 0,
5599 10,
5600 ViewDelta::new(15),
5601 bls12381_multisig::fixture::<MinPk, _>
5602 ),
5603 );
5604 }
5605
5606 #[test_group("slow")]
5607 #[test_traced]
5608 fn test_hailstorm_bls12381_multisig_min_sig() {
5609 assert_eq!(
5610 hailstorm::<_, _, RoundRobin>(
5611 0,
5612 10,
5613 ViewDelta::new(15),
5614 bls12381_multisig::fixture::<MinSig, _>
5615 ),
5616 hailstorm::<_, _, RoundRobin>(
5617 0,
5618 10,
5619 ViewDelta::new(15),
5620 bls12381_multisig::fixture::<MinSig, _>
5621 ),
5622 );
5623 }
5624
5625 #[test_group("slow")]
5626 #[test_traced]
5627 fn test_hailstorm_ed25519() {
5628 assert_eq!(
5629 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture),
5630 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture)
5631 );
5632 }
5633
5634 #[test_group("slow")]
5635 #[test_traced]
5636 fn test_hailstorm_secp256r1() {
5637 assert_eq!(
5638 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture),
5639 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture)
5640 );
5641 }
5642
5643 fn twins<S, F, L>(seed: u64, n: u32, strategy: Strategy, link: Link, mut fixture: F)
5645 where
5646 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5647 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5648 L: Elector<S>,
5649 {
5650 let faults = N3f1::max_faults(n);
5651 let required_containers = View::new(100);
5652 let activity_timeout = ViewDelta::new(10);
5653 let skip_timeout = ViewDelta::new(5);
5654 let namespace = b"consensus".to_vec();
5655 let cfg = deterministic::Config::new()
5656 .with_seed(seed)
5657 .with_timeout(Some(Duration::from_secs(900)));
5658 let executor = deterministic::Runner::new(cfg);
5659 executor.start(|mut context| async move {
5660 let (network, mut oracle) = Network::new(
5661 context.with_label("network"),
5662 Config {
5663 max_size: 1024 * 1024,
5664 disconnect_on_block: false,
5665 tracked_peer_sets: None,
5666 },
5667 );
5668 network.start();
5669
5670 let Fixture {
5671 participants,
5672 schemes,
5673 ..
5674 } = fixture(&mut context, &namespace, n);
5675 let participants: Arc<[_]> = participants.into();
5676 let mut registrations = register_validators(&mut oracle, &participants).await;
5677 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5678
5679 let elector = L::default();
5682 let relay = Arc::new(mocks::relay::Relay::new());
5683 let mut reporters = Vec::new();
5684 let mut engine_handlers = Vec::new();
5685
5686 for (idx, validator) in participants.iter().enumerate().take(faults as usize) {
5688 let (
5689 (vote_sender, vote_receiver),
5690 (certificate_sender, certificate_receiver),
5691 (resolver_sender, resolver_receiver),
5692 ) = registrations
5693 .remove(validator)
5694 .expect("validator should be registered");
5695
5696 let make_vote_forwarder = || {
5698 let participants = participants.clone();
5699 move |origin: SplitOrigin, _: &Recipients<_>, message: &IoBuf| {
5700 let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
5701 let (primary, secondary) =
5702 strategy.partitions(msg.view(), participants.as_ref());
5703 match origin {
5704 SplitOrigin::Primary => Some(Recipients::Some(primary)),
5705 SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
5706 }
5707 }
5708 };
5709 let make_certificate_forwarder = || {
5711 let codec = schemes[idx].certificate_codec_config();
5712 let participants = participants.clone();
5713 move |origin: SplitOrigin, _: &Recipients<_>, message: &IoBuf| {
5714 let msg: Certificate<S, D> =
5715 Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
5716 let (primary, secondary) =
5717 strategy.partitions(msg.view(), participants.as_ref());
5718 match origin {
5719 SplitOrigin::Primary => Some(Recipients::Some(primary)),
5720 SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
5721 }
5722 }
5723 };
5724 let make_drop_forwarder =
5725 || move |_: SplitOrigin, _: &Recipients<_>, _: &IoBuf| None;
5726
5727 let make_vote_router = || {
5729 let participants = participants.clone();
5730 move |(sender, message): &(_, IoBuf)| {
5731 let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
5732 strategy.route(msg.view(), sender, participants.as_ref())
5733 }
5734 };
5735 let make_certificate_router = || {
5737 let codec = schemes[idx].certificate_codec_config();
5738 let participants = participants.clone();
5739 move |(sender, message): &(_, IoBuf)| {
5740 let msg: Certificate<S, D> =
5741 Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
5742 strategy.route(msg.view(), sender, participants.as_ref())
5743 }
5744 };
5745 let make_drop_router = || move |(_, _): &(_, _)| SplitTarget::None;
5746
5747 let (vote_sender_primary, vote_sender_secondary) =
5749 vote_sender.split_with(make_vote_forwarder());
5750 let (vote_receiver_primary, vote_receiver_secondary) = vote_receiver.split_with(
5751 context.with_label(&format!("pending_split_{idx}")),
5752 make_vote_router(),
5753 );
5754 let (certificate_sender_primary, certificate_sender_secondary) =
5755 certificate_sender.split_with(make_certificate_forwarder());
5756 let (certificate_receiver_primary, certificate_receiver_secondary) =
5757 certificate_receiver.split_with(
5758 context.with_label(&format!("recovered_split_{idx}")),
5759 make_certificate_router(),
5760 );
5761
5762 let (resolver_sender_primary, resolver_sender_secondary) =
5764 resolver_sender.split_with(make_drop_forwarder());
5765 let (resolver_receiver_primary, resolver_receiver_secondary) = resolver_receiver
5766 .split_with(
5767 context.with_label(&format!("resolver_split_{idx}")),
5768 make_drop_router(),
5769 );
5770
5771 for (twin_label, pending, recovered, resolver) in [
5772 (
5773 "primary",
5774 (vote_sender_primary, vote_receiver_primary),
5775 (certificate_sender_primary, certificate_receiver_primary),
5776 (resolver_sender_primary, resolver_receiver_primary),
5777 ),
5778 (
5779 "secondary",
5780 (vote_sender_secondary, vote_receiver_secondary),
5781 (certificate_sender_secondary, certificate_receiver_secondary),
5782 (resolver_sender_secondary, resolver_receiver_secondary),
5783 ),
5784 ] {
5785 let label = format!("twin_{idx}_{twin_label}");
5786 let context = context.with_label(&label);
5787
5788 let reporter_config = mocks::reporter::Config {
5789 participants: participants.as_ref().try_into().unwrap(),
5790 scheme: schemes[idx].clone(),
5791 elector: elector.clone(),
5792 };
5793 let reporter = mocks::reporter::Reporter::new(
5794 context.with_label("reporter"),
5795 reporter_config,
5796 );
5797 reporters.push(reporter.clone());
5798
5799 let application_cfg = mocks::application::Config {
5800 hasher: Sha256::default(),
5801 relay: relay.clone(),
5802 me: validator.clone(),
5803 propose_latency: (10.0, 5.0),
5804 verify_latency: (10.0, 5.0),
5805 certify_latency: (10.0, 5.0),
5806 should_certify: mocks::application::Certifier::Sometimes,
5807 };
5808 let (actor, application) = mocks::application::Application::new(
5809 context.with_label("application"),
5810 application_cfg,
5811 );
5812 actor.start();
5813
5814 let blocker = oracle.control(validator.clone());
5815 let cfg = config::Config {
5816 scheme: schemes[idx].clone(),
5817 elector: elector.clone(),
5818 blocker,
5819 automaton: application.clone(),
5820 relay: application.clone(),
5821 reporter: reporter.clone(),
5822 strategy: Sequential,
5823 partition: label,
5824 mailbox_size: 1024,
5825 epoch: Epoch::new(333),
5826 leader_timeout: Duration::from_secs(1),
5827 certification_timeout: Duration::from_secs(2),
5828 timeout_retry: Duration::from_secs(10),
5829 fetch_timeout: Duration::from_secs(1),
5830 activity_timeout,
5831 skip_timeout,
5832 fetch_concurrent: 4,
5833 replay_buffer: NZUsize!(1024 * 1024),
5834 write_buffer: NZUsize!(1024 * 1024),
5835 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5836 };
5837 let engine = Engine::new(context.with_label("engine"), cfg);
5838 engine_handlers.push(engine.start(pending, recovered, resolver));
5839 }
5840 }
5841
5842 for (idx, validator) in participants.iter().enumerate().skip(faults as usize) {
5844 let label = format!("honest_{idx}");
5845 let context = context.with_label(&label);
5846
5847 let reporter_config = mocks::reporter::Config {
5848 participants: participants.as_ref().try_into().unwrap(),
5849 scheme: schemes[idx].clone(),
5850 elector: elector.clone(),
5851 };
5852 let reporter =
5853 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5854 reporters.push(reporter.clone());
5855
5856 let application_cfg = mocks::application::Config {
5857 hasher: Sha256::default(),
5858 relay: relay.clone(),
5859 me: validator.clone(),
5860 propose_latency: (10.0, 5.0),
5861 verify_latency: (10.0, 5.0),
5862 certify_latency: (10.0, 5.0),
5863 should_certify: mocks::application::Certifier::Sometimes,
5864 };
5865 let (actor, application) = mocks::application::Application::new(
5866 context.with_label("application"),
5867 application_cfg,
5868 );
5869 actor.start();
5870
5871 let blocker = oracle.control(validator.clone());
5872 let cfg = config::Config {
5873 scheme: schemes[idx].clone(),
5874 elector: elector.clone(),
5875 blocker,
5876 automaton: application.clone(),
5877 relay: application.clone(),
5878 reporter: reporter.clone(),
5879 strategy: Sequential,
5880 partition: label,
5881 mailbox_size: 1024,
5882 epoch: Epoch::new(333),
5883 leader_timeout: Duration::from_secs(1),
5884 certification_timeout: Duration::from_secs(2),
5885 timeout_retry: Duration::from_secs(10),
5886 fetch_timeout: Duration::from_secs(1),
5887 activity_timeout,
5888 skip_timeout,
5889 fetch_concurrent: 4,
5890 replay_buffer: NZUsize!(1024 * 1024),
5891 write_buffer: NZUsize!(1024 * 1024),
5892 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5893 };
5894 let engine = Engine::new(context.with_label("engine"), cfg);
5895
5896 let (pending, recovered, resolver) = registrations
5897 .remove(validator)
5898 .expect("validator should be registered");
5899 engine_handlers.push(engine.start(pending, recovered, resolver));
5900 }
5901
5902 let mut finalizers = Vec::new();
5904 for reporter in reporters.iter_mut() {
5905 let (mut latest, mut monitor) = reporter.subscribe().await;
5906 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5907 while latest < required_containers {
5908 latest = monitor.recv().await.expect("event missing");
5909 }
5910 }));
5911 }
5912 join_all(finalizers).await;
5913
5914 let honest_start = faults as usize * 2; let mut finalized_at_view: BTreeMap<View, D> = BTreeMap::new();
5917 for reporter in reporters.iter().skip(honest_start) {
5918 let finalizations = reporter.finalizations.lock();
5919 for (view, finalization) in finalizations.iter() {
5920 let digest = finalization.proposal.payload;
5921 if let Some(existing) = finalized_at_view.get(view) {
5922 assert_eq!(
5923 existing, &digest,
5924 "safety violation: conflicting finalizations at view {view}"
5925 );
5926 } else {
5927 finalized_at_view.insert(*view, digest);
5928 }
5929 }
5930 }
5931
5932 for reporter in reporters.iter().skip(honest_start) {
5934 let invalid = reporter.invalid.lock();
5935 assert_eq!(*invalid, 0, "invalid signatures detected");
5936 }
5937
5938 let twin_identities: Vec<_> = participants.iter().take(faults as usize).collect();
5940 for reporter in reporters.iter().skip(honest_start) {
5941 let faults = reporter.faults.lock();
5942 for (faulter, _) in faults.iter() {
5943 assert!(
5944 twin_identities.contains(&faulter),
5945 "fault from non-twin participant"
5946 );
5947 }
5948 }
5949
5950 let blocked = oracle.blocked().await.unwrap();
5952 for (_, faulter) in blocked.iter() {
5953 assert!(
5954 twin_identities.contains(&faulter),
5955 "blocked connection from non-twin participant"
5956 );
5957 }
5958 });
5959 }
5960
5961 fn test_twins<S, F, L>(mut fixture: F)
5962 where
5963 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5964 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5965 L: Elector<S>,
5966 {
5967 for strategy in [
5968 Strategy::View,
5969 Strategy::Fixed(3),
5970 Strategy::Isolate(4),
5971 Strategy::Broadcast,
5972 Strategy::Shuffle,
5973 ] {
5974 for link in [
5975 Link {
5976 latency: Duration::from_millis(10),
5977 jitter: Duration::from_millis(1),
5978 success_rate: 1.0,
5979 },
5980 Link {
5981 latency: Duration::from_millis(200),
5982 jitter: Duration::from_millis(150),
5983 success_rate: 0.75,
5984 },
5985 ] {
5986 twins::<S, _, L>(0, 5, strategy, link, |namespace, context, n| {
5987 fixture(namespace, context, n)
5988 });
5989 }
5990 }
5991 }
5992
5993 #[test_group("slow")]
5994 #[test_traced]
5995 fn test_twins_multisig_min_pk() {
5996 test_twins::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
5997 }
5998
5999 #[test_group("slow")]
6000 #[test_traced]
6001 fn test_twins_multisig_min_sig() {
6002 test_twins::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
6003 }
6004
6005 #[test_group("slow")]
6006 #[test_traced]
6007 fn test_twins_threshold_vrf_min_pk() {
6008 test_twins::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
6009 }
6010
6011 #[test_group("slow")]
6012 #[test_traced]
6013 fn test_twins_threshold_vrf_min_sig() {
6014 test_twins::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
6015 }
6016
6017 #[test_group("slow")]
6018 #[test_traced]
6019 fn test_twins_threshold_std_min_pk() {
6020 test_twins::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
6021 }
6022
6023 #[test_group("slow")]
6024 #[test_traced]
6025 fn test_twins_threshold_std_min_sig() {
6026 test_twins::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
6027 }
6028
6029 #[test_group("slow")]
6030 #[test_traced]
6031 fn test_twins_ed25519() {
6032 test_twins::<_, _, RoundRobin>(ed25519::fixture);
6033 }
6034
6035 #[test_group("slow")]
6036 #[test_traced]
6037 fn test_twins_secp256r1() {
6038 test_twins::<_, _, RoundRobin>(secp256r1::fixture);
6039 }
6040
6041 #[test_group("slow")]
6042 #[test_traced]
6043 fn test_twins_large_view() {
6044 twins::<_, _, Random>(
6045 0,
6046 10,
6047 Strategy::View,
6048 Link {
6049 latency: Duration::from_millis(200),
6050 jitter: Duration::from_millis(150),
6051 success_rate: 0.75,
6052 },
6053 bls12381_threshold_vrf::fixture::<MinPk, _>,
6054 );
6055 }
6056
6057 #[test_group("slow")]
6058 #[test_traced]
6059 fn test_twins_large_shuffle() {
6060 twins::<_, _, Random>(
6061 0,
6062 10,
6063 Strategy::Shuffle,
6064 Link {
6065 latency: Duration::from_millis(200),
6066 jitter: Duration::from_millis(150),
6067 success_rate: 0.75,
6068 },
6069 bls12381_threshold_vrf::fixture::<MinPk, _>,
6070 );
6071 }
6072}