1use crate::types::Round;
318use commonware_cryptography::PublicKey;
319
320pub mod elector;
321pub mod scheme;
322pub mod types;
323
324cfg_if::cfg_if! {
325 if #[cfg(not(target_arch = "wasm32"))] {
326 mod actors;
327 pub mod config;
328 pub use config::{Config, ForwardingPolicy};
329 mod engine;
330 pub use engine::Engine;
331 mod metrics;
332 }
333}
334
335#[cfg(any(test, feature = "mocks"))]
336pub mod mocks;
337
338#[cfg(not(target_arch = "wasm32"))]
339use crate::types::{View, ViewDelta};
340
341#[cfg(not(target_arch = "wasm32"))]
343pub(crate) const fn min_active(activity_timeout: ViewDelta, last_finalized: View) -> View {
344 last_finalized.saturating_sub(activity_timeout)
345}
346
347#[cfg(not(target_arch = "wasm32"))]
351pub(crate) fn interesting(
352 activity_timeout: ViewDelta,
353 last_finalized: View,
354 current: View,
355 pending: View,
356 allow_future: bool,
357) -> bool {
358 if pending.is_zero() {
360 return false;
361 }
362 if pending < min_active(activity_timeout, last_finalized) {
363 return false;
364 }
365 if !allow_future && pending > current.next() {
366 return false;
367 }
368 true
369}
370
371pub enum Plan<P: PublicKey> {
373 Propose,
375 Forward {
377 round: Round,
379 peers: Vec<P>,
381 },
382}
383
384#[cfg(test)]
386pub(crate) fn quorum(n: u32) -> u32 {
387 use commonware_utils::{Faults, N3f1};
388
389 N3f1::quorum(n)
390}
391
392#[cfg(test)]
393mod tests {
394 use super::*;
395 use crate::{
396 simplex::{
397 elector::{Config as Elector, Random, RoundRobin},
398 mocks::{
399 scheme as scheme_mocks,
400 twins::{self, Elector as TwinsElector},
401 wrapped,
402 },
403 scheme::{
404 bls12381_multisig,
405 bls12381_threshold::{
406 standard as bls12381_threshold_std,
407 vrf::{self as bls12381_threshold_vrf, Seedable},
408 },
409 ed25519, secp256r1, Scheme,
410 },
411 types::{
412 Certificate, Finalization as TFinalization, Finalize as TFinalize,
413 Notarization as TNotarization, Notarize as TNotarize,
414 Nullification as TNullification, Nullify as TNullify, Proposal, Vote,
415 },
416 },
417 types::{Epoch, Round},
418 Monitor, Viewable,
419 };
420 use commonware_codec::{Decode, DecodeExt, Encode};
421 use commonware_cryptography::{
422 bls12381::primitives::variant::{MinPk, MinSig, Variant},
423 certificate::mocks::Fixture,
424 ed25519::{PrivateKey, PublicKey},
425 sha256::{Digest as Sha256Digest, Digest as D},
426 Hasher as _, Sha256, Signer as _,
427 };
428 use commonware_macros::{select, test_group, test_traced};
429 use commonware_p2p::{
430 simulated::{Config, Link, Network, Oracle, Receiver, Sender, SplitOrigin},
431 utils::mocks::inert_channel,
432 Manager as _, Recipients, Sender as _, TrackedPeers,
433 };
434 use commonware_parallel::Sequential;
435 use commonware_runtime::{
436 buffer::paged::CacheRef, count_running_tasks, deterministic, Clock, IoBuf, Metrics, Quota,
437 Runner, Spawner,
438 };
439 use commonware_utils::{ordered::Set, sync::Mutex, test_rng, Faults, N3f1, NZUsize, NZU16};
440 use engine::Engine;
441 use futures::future::join_all;
442 use rand::{rngs::StdRng, Rng as _, SeedableRng};
443 use std::{
444 collections::{BTreeMap, HashMap, HashSet},
445 num::{NonZeroU16, NonZeroU32, NonZeroUsize},
446 sync::Arc,
447 time::Duration,
448 };
449 use tracing::{debug, info, warn};
450 use types::Activity;
451
452 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
453 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
454 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
455
456 #[test]
457 fn test_interesting() {
458 let activity_timeout = ViewDelta::new(10);
459
460 assert!(!interesting(
462 activity_timeout,
463 View::zero(),
464 View::zero(),
465 View::zero(),
466 false
467 ));
468 assert!(!interesting(
469 activity_timeout,
470 View::zero(),
471 View::new(1),
472 View::zero(),
473 true
474 ));
475
476 assert!(!interesting(
478 activity_timeout,
479 View::new(20),
480 View::new(25),
481 View::new(5), false
483 ));
484
485 assert!(interesting(
487 activity_timeout,
488 View::new(20),
489 View::new(25),
490 View::new(10), false
492 ));
493
494 assert!(!interesting(
496 activity_timeout,
497 View::new(20),
498 View::new(25),
499 View::new(27),
500 false
501 ));
502
503 assert!(interesting(
505 activity_timeout,
506 View::new(20),
507 View::new(25),
508 View::new(27),
509 true
510 ));
511
512 assert!(interesting(
514 activity_timeout,
515 View::new(20),
516 View::new(25),
517 View::new(26),
518 false
519 ));
520
521 assert!(interesting(
523 activity_timeout,
524 View::new(20),
525 View::new(25),
526 View::new(22),
527 false
528 ));
529
530 assert!(interesting(
533 activity_timeout,
534 View::zero(),
535 View::new(5),
536 View::new(1),
537 false
538 ));
539 }
540
541 async fn register_validator(
543 oracle: &mut Oracle<PublicKey, deterministic::Context>,
544 validator: PublicKey,
545 ) -> (
546 (
547 Sender<PublicKey, deterministic::Context>,
548 Receiver<PublicKey>,
549 ),
550 (
551 Sender<PublicKey, deterministic::Context>,
552 Receiver<PublicKey>,
553 ),
554 (
555 Sender<PublicKey, deterministic::Context>,
556 Receiver<PublicKey>,
557 ),
558 ) {
559 let control = oracle.control(validator.clone());
560 let (vote_sender, vote_receiver) = control.register(0, TEST_QUOTA).await.unwrap();
561 let (certificate_sender, certificate_receiver) =
562 control.register(1, TEST_QUOTA).await.unwrap();
563 let (resolver_sender, resolver_receiver) = control.register(2, TEST_QUOTA).await.unwrap();
564 (
565 (vote_sender, vote_receiver),
566 (certificate_sender, certificate_receiver),
567 (resolver_sender, resolver_receiver),
568 )
569 }
570
571 async fn register_validators(
573 oracle: &mut Oracle<PublicKey, deterministic::Context>,
574 validators: &[PublicKey],
575 ) -> HashMap<
576 PublicKey,
577 (
578 (
579 Sender<PublicKey, deterministic::Context>,
580 Receiver<PublicKey>,
581 ),
582 (
583 Sender<PublicKey, deterministic::Context>,
584 Receiver<PublicKey>,
585 ),
586 (
587 Sender<PublicKey, deterministic::Context>,
588 Receiver<PublicKey>,
589 ),
590 ),
591 > {
592 let mut registrations = HashMap::new();
593 for validator in validators.iter() {
594 let registration = register_validator(oracle, validator.clone()).await;
595 registrations.insert(validator.clone(), registration);
596 }
597 registrations
598 }
599
600 async fn start_test_network_with_peers<I>(
601 context: deterministic::Context,
602 peers: I,
603 disconnect_on_block: bool,
604 ) -> Oracle<PublicKey, deterministic::Context>
605 where
606 I: IntoIterator<Item = PublicKey>,
607 {
608 let (network, oracle) = Network::new_with_peers(
609 context.with_label("network"),
610 Config {
611 max_size: 1024 * 1024,
612 disconnect_on_block,
613 tracked_peer_sets: NZUsize!(1),
614 },
615 peers,
616 )
617 .await;
618 network.start();
619 oracle
620 }
621
622 async fn start_test_network_with_split_peers<I, J>(
623 context: deterministic::Context,
624 primary: I,
625 secondary: J,
626 disconnect_on_block: bool,
627 ) -> Oracle<PublicKey, deterministic::Context>
628 where
629 I: IntoIterator<Item = PublicKey>,
630 J: IntoIterator<Item = PublicKey>,
631 {
632 let (network, oracle) = Network::new_with_split_peers(
633 context.with_label("network"),
634 Config {
635 max_size: 1024 * 1024,
636 disconnect_on_block,
637 tracked_peer_sets: NZUsize!(1),
638 },
639 primary,
640 secondary,
641 )
642 .await;
643 network.start();
644 oracle
645 }
646
647 enum Action {
649 Link(Link),
650 Update(Link), Unlink,
652 }
653
654 async fn link_validators(
660 oracle: &mut Oracle<PublicKey, deterministic::Context>,
661 validators: &[PublicKey],
662 action: Action,
663 restrict_to: Option<fn(usize, usize, usize) -> bool>,
664 ) {
665 for (i1, v1) in validators.iter().enumerate() {
666 for (i2, v2) in validators.iter().enumerate() {
667 if v2 == v1 {
669 continue;
670 }
671
672 if let Some(f) = restrict_to {
674 if !f(validators.len(), i1, i2) {
675 continue;
676 }
677 }
678
679 match action {
681 Action::Update(_) | Action::Unlink => {
682 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
683 }
684 _ => {}
685 }
686
687 match action {
689 Action::Link(ref link) | Action::Update(ref link) => {
690 oracle
691 .add_link(v1.clone(), v2.clone(), link.clone())
692 .await
693 .unwrap();
694 }
695 _ => {}
696 }
697 }
698 }
699 }
700
701 fn count_nonzero_metric_lines(encoded: &str, patterns: &[&str]) -> u32 {
703 encoded
704 .lines()
705 .filter(|line| patterns.iter().all(|p| line.contains(p)))
706 .filter(|line| {
707 line.split_whitespace()
708 .last()
709 .and_then(|s| s.parse::<u64>().ok())
710 .is_some_and(|n| n > 0)
711 })
712 .count() as u32
713 }
714
715 fn all_online<S, F, L>(mut fixture: F)
716 where
717 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
718 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
719 L: Elector<S>,
720 {
721 let n = 5;
723 let quorum = quorum(n) as usize;
724 let required_containers = View::new(100);
725 let activity_timeout = ViewDelta::new(10);
726 let skip_timeout = ViewDelta::new(5);
727 let namespace = b"consensus".to_vec();
728 let executor = deterministic::Runner::timed(Duration::from_secs(300));
729 executor.start(|mut context| async move {
730 let Fixture {
732 participants,
733 schemes,
734 ..
735 } = fixture(&mut context, &namespace, n);
736 let mut oracle =
737 start_test_network_with_peers(context.clone(), participants.clone(), true).await;
738 let mut registrations = register_validators(&mut oracle, &participants).await;
739
740 let link = Link {
742 latency: Duration::from_millis(10),
743 jitter: Duration::from_millis(1),
744 success_rate: 1.0,
745 };
746 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
747
748 let elector = L::default();
750 let relay = Arc::new(mocks::relay::Relay::new());
751 let mut reporters = Vec::new();
752 let mut engine_handlers = Vec::new();
753 for (idx, validator) in participants.iter().enumerate() {
754 let context = context.with_label(&format!("validator_{}", *validator));
756
757 let reporter_config = mocks::reporter::Config {
759 participants: participants.clone().try_into().unwrap(),
760 scheme: schemes[idx].clone(),
761 elector: elector.clone(),
762 };
763 let reporter =
764 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
765 reporters.push(reporter.clone());
766 let application_cfg = mocks::application::Config {
767 hasher: Sha256::default(),
768 relay: relay.clone(),
769 me: validator.clone(),
770 propose_latency: (10.0, 5.0),
771 verify_latency: (10.0, 5.0),
772 certify_latency: (10.0, 5.0),
773 should_certify: mocks::application::Certifier::Sometimes,
774 };
775 let (actor, application) = mocks::application::Application::new(
776 context.with_label("application"),
777 application_cfg,
778 );
779 actor.start();
780 let blocker = oracle.control(validator.clone());
781 let cfg = config::Config {
782 scheme: schemes[idx].clone(),
783 elector: elector.clone(),
784 blocker,
785 automaton: application.clone(),
786 relay: application.clone(),
787 reporter: reporter.clone(),
788 strategy: Sequential,
789 partition: validator.to_string(),
790 mailbox_size: 1024,
791 epoch: Epoch::new(333),
792 leader_timeout: Duration::from_secs(1),
793 certification_timeout: Duration::from_secs(2),
794 timeout_retry: Duration::from_secs(10),
795 fetch_timeout: Duration::from_secs(1),
796 activity_timeout,
797 skip_timeout,
798 fetch_concurrent: 4,
799 replay_buffer: NZUsize!(1024 * 1024),
800 write_buffer: NZUsize!(1024 * 1024),
801 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
802 forwarding: ForwardingPolicy::Disabled,
803 };
804 let engine = Engine::new(context.with_label("engine"), cfg);
805
806 let (pending, recovered, resolver) = registrations
808 .remove(validator)
809 .expect("validator should be registered");
810 engine_handlers.push(engine.start(pending, recovered, resolver));
811 }
812
813 let mut finalizers = Vec::new();
815 for reporter in reporters.iter_mut() {
816 let (mut latest, mut monitor) = reporter.subscribe().await;
817 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
818 while latest < required_containers {
819 latest = monitor.recv().await.expect("event missing");
820 }
821 }));
822 }
823 join_all(finalizers).await;
824
825 let latest_complete = required_containers.saturating_sub(activity_timeout);
827 for reporter in reporters.iter() {
828 reporter.assert_no_faults();
830
831 reporter.assert_no_invalid();
833
834 {
836 let certified = reporter.certified.lock();
837 for view in View::range(View::new(1), latest_complete) {
838 if !certified.contains(&view) {
840 panic!("view: {view}");
841 }
842 }
843 }
844
845 let mut notarized = HashMap::new();
847 let mut finalized = HashMap::new();
848 {
849 let notarizes = reporter.notarizes.lock();
850 for view in View::range(View::new(1), latest_complete) {
851 let Some(payloads) = notarizes.get(&view) else {
853 continue;
854 };
855 if payloads.len() > 1 {
856 panic!("view: {view}");
857 }
858 let (digest, notarizers) = payloads.iter().next().unwrap();
859 notarized.insert(view, *digest);
860
861 if notarizers.len() < quorum {
862 panic!("view: {view}");
865 }
866 }
867 }
868 {
869 let notarizations = reporter.notarizations.lock();
870 for view in View::range(View::new(1), latest_complete) {
871 let Some(notarization) = notarizations.get(&view) else {
873 continue;
874 };
875 let Some(digest) = notarized.get(&view) else {
876 continue;
877 };
878 assert_eq!(¬arization.proposal.payload, digest);
879 }
880 }
881 {
882 let finalizes = reporter.finalizes.lock();
883 for view in View::range(View::new(1), latest_complete) {
884 let Some(payloads) = finalizes.get(&view) else {
886 continue;
887 };
888 if payloads.len() > 1 {
889 panic!("view: {view}");
890 }
891 let (digest, finalizers) = payloads.iter().next().unwrap();
892 finalized.insert(view, *digest);
893
894 if view > latest_complete {
896 continue;
897 }
898
899 if finalizers.len() < quorum {
901 panic!("view: {view}");
904 }
905
906 let nullifies = reporter.nullifies.lock();
908 let Some(nullifies) = nullifies.get(&view) else {
909 continue;
910 };
911 for (_, finalizers) in payloads.iter() {
912 for finalizer in finalizers.iter() {
913 if nullifies.contains(finalizer) {
914 panic!("should not nullify and finalize at same view");
915 }
916 }
917 }
918 }
919 }
920 {
921 let finalizations = reporter.finalizations.lock();
922 for view in View::range(View::new(1), latest_complete) {
923 let Some(finalization) = finalizations.get(&view) else {
925 continue;
926 };
927 let Some(digest) = finalized.get(&view) else {
928 continue;
929 };
930 assert_eq!(&finalization.proposal.payload, digest);
931 }
932 }
933 }
934
935 let blocked = oracle.blocked().await.unwrap();
937 assert!(blocked.is_empty());
938 });
939 }
940
941 #[test_group("slow")]
942 #[test_traced]
943 fn test_all_online() {
944 all_online::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
945 all_online::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
946 all_online::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
947 all_online::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
948 all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
949 all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
950 all_online::<_, _, RoundRobin>(ed25519::fixture);
951 all_online::<_, _, RoundRobin>(secp256r1::fixture);
952 }
953
954 fn observer<S, F, L>(mut fixture: F)
955 where
956 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
957 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
958 L: Elector<S>,
959 {
960 let n_active = 5;
962 let required_containers = View::new(100);
963 let activity_timeout = ViewDelta::new(10);
964 let skip_timeout = ViewDelta::new(5);
965 let namespace = b"consensus".to_vec();
966 let executor = deterministic::Runner::timed(Duration::from_secs(300));
967 executor.start(|mut context| async move {
968 let Fixture {
970 participants,
971 schemes,
972 verifier,
973 ..
974 } = fixture(&mut context, &namespace, n_active);
975
976 let private_key_observer = PrivateKey::from_seed(n_active as u64);
978 let public_key_observer = private_key_observer.public_key();
979
980 let mut oracle = start_test_network_with_split_peers(
981 context.clone(),
982 participants.clone(),
983 [public_key_observer.clone()],
984 true,
985 )
986 .await;
987
988 let mut all_validators = participants.clone();
990 all_validators.push(public_key_observer.clone());
991 all_validators.sort();
992 let mut registrations = register_validators(&mut oracle, &all_validators).await;
993
994 let link = Link {
996 latency: Duration::from_millis(10),
997 jitter: Duration::from_millis(1),
998 success_rate: 1.0,
999 };
1000 link_validators(&mut oracle, &all_validators, Action::Link(link), None).await;
1001
1002 let elector = L::default();
1004 let relay = Arc::new(mocks::relay::Relay::new());
1005 let mut reporters = Vec::new();
1006
1007 for (idx, validator) in participants.iter().enumerate() {
1008 let is_observer = *validator == public_key_observer;
1009
1010 let context = context.with_label(&format!("validator_{}", *validator));
1012
1013 let signing = if is_observer {
1015 verifier.clone()
1016 } else {
1017 schemes[idx].clone()
1018 };
1019 let reporter_config = mocks::reporter::Config {
1020 participants: participants.clone().try_into().unwrap(),
1021 scheme: signing.clone(),
1022 elector: elector.clone(),
1023 };
1024 let reporter =
1025 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1026 reporters.push(reporter.clone());
1027 let application_cfg = mocks::application::Config {
1028 hasher: Sha256::default(),
1029 relay: relay.clone(),
1030 me: validator.clone(),
1031 propose_latency: (10.0, 5.0),
1032 verify_latency: (10.0, 5.0),
1033 certify_latency: (10.0, 5.0),
1034 should_certify: mocks::application::Certifier::Sometimes,
1035 };
1036 let (actor, application) = mocks::application::Application::new(
1037 context.with_label("application"),
1038 application_cfg,
1039 );
1040 actor.start();
1041 let blocker = oracle.control(validator.clone());
1042 let cfg = config::Config {
1043 scheme: signing.clone(),
1044 elector: elector.clone(),
1045 blocker,
1046 automaton: application.clone(),
1047 relay: application.clone(),
1048 reporter: reporter.clone(),
1049 strategy: Sequential,
1050 partition: validator.to_string(),
1051 mailbox_size: 1024,
1052 epoch: Epoch::new(333),
1053 leader_timeout: Duration::from_secs(1),
1054 certification_timeout: Duration::from_secs(2),
1055 timeout_retry: Duration::from_secs(10),
1056 fetch_timeout: Duration::from_secs(1),
1057 activity_timeout,
1058 skip_timeout,
1059 fetch_concurrent: 4,
1060 replay_buffer: NZUsize!(1024 * 1024),
1061 write_buffer: NZUsize!(1024 * 1024),
1062 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1063 forwarding: ForwardingPolicy::Disabled,
1064 };
1065 let engine = Engine::new(context.with_label("engine"), cfg);
1066
1067 let (pending, recovered, resolver) = registrations
1069 .remove(validator)
1070 .expect("validator should be registered");
1071 engine.start(pending, recovered, resolver);
1072 }
1073
1074 let mut finalizers = Vec::new();
1076 for reporter in reporters.iter_mut() {
1077 let (mut latest, mut monitor) = reporter.subscribe().await;
1078 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1079 while latest < required_containers {
1080 latest = monitor.recv().await.expect("event missing");
1081 }
1082 }));
1083 }
1084 join_all(finalizers).await;
1085
1086 for reporter in reporters.iter() {
1089 reporter.assert_no_faults();
1091 reporter.assert_no_invalid();
1092
1093 let blocked = oracle.blocked().await.unwrap();
1095 assert!(blocked.is_empty());
1096 }
1097 });
1098 }
1099
1100 #[test_group("slow")]
1101 #[test_traced]
1102 fn test_observer() {
1103 observer::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1104 observer::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1105 observer::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1106 observer::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1107 observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1108 observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1109 observer::<_, _, RoundRobin>(ed25519::fixture);
1110 observer::<_, _, RoundRobin>(secp256r1::fixture);
1111 }
1112
1113 fn unclean_shutdown<S, F, L>(mut fixture: F)
1114 where
1115 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1116 F: FnMut(&mut StdRng, &[u8], u32) -> Fixture<S>,
1117 L: Elector<S>,
1118 {
1119 let n = 5;
1121 let required_containers = View::new(100);
1122 let activity_timeout = ViewDelta::new(10);
1123 let skip_timeout = ViewDelta::new(5);
1124 let namespace = b"consensus".to_vec();
1125
1126 let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
1128 let supervised = Arc::new(Mutex::new(Vec::new()));
1129 let mut prev_checkpoint = None;
1130
1131 let mut rng = test_rng();
1133 let Fixture {
1134 participants,
1135 schemes,
1136 ..
1137 } = fixture(&mut rng, &namespace, n);
1138
1139 let relay = Arc::new(mocks::relay::Relay::<Sha256Digest, S::PublicKey>::new());
1141
1142 loop {
1143 let rng = rng.clone();
1144 let participants = participants.clone();
1145 let schemes = schemes.clone();
1146 let shutdowns = shutdowns.clone();
1147 let supervised = supervised.clone();
1148 let relay = relay.clone();
1149 relay.deregister_all(); let f = |mut context: deterministic::Context| async move {
1152 let mut oracle =
1154 start_test_network_with_peers(context.clone(), participants.clone(), true)
1155 .await;
1156 let mut registrations = register_validators(&mut oracle, &participants).await;
1157
1158 let link = Link {
1160 latency: Duration::from_millis(50),
1161 jitter: Duration::from_millis(50),
1162 success_rate: 1.0,
1163 };
1164 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1165
1166 let elector = L::default();
1168 let relay = Arc::new(mocks::relay::Relay::new());
1169 let mut reporters = HashMap::new();
1170 let mut engine_handlers = Vec::new();
1171 for (idx, validator) in participants.iter().enumerate() {
1172 let context = context.with_label(&format!("validator_{}", *validator));
1174
1175 let reporter_config = mocks::reporter::Config {
1177 participants: participants.clone().try_into().unwrap(),
1178 scheme: schemes[idx].clone(),
1179 elector: elector.clone(),
1180 };
1181 let reporter = mocks::reporter::Reporter::new(rng.clone(), reporter_config);
1182 reporters.insert(validator.clone(), reporter.clone());
1183 let application_cfg = mocks::application::Config {
1184 hasher: Sha256::default(),
1185 relay: relay.clone(),
1186 me: validator.clone(),
1187 propose_latency: (10.0, 5.0),
1188 verify_latency: (10.0, 5.0),
1189 certify_latency: (10.0, 5.0),
1190 should_certify: mocks::application::Certifier::Sometimes,
1191 };
1192 let (actor, application) = mocks::application::Application::new(
1193 context.with_label("application"),
1194 application_cfg,
1195 );
1196 actor.start();
1197 let blocker = oracle.control(validator.clone());
1198 let cfg = config::Config {
1199 scheme: schemes[idx].clone(),
1200 elector: elector.clone(),
1201 blocker,
1202 automaton: application.clone(),
1203 relay: application.clone(),
1204 reporter: reporter.clone(),
1205 strategy: Sequential,
1206 partition: validator.to_string(),
1207 mailbox_size: 1024,
1208 epoch: Epoch::new(333),
1209 leader_timeout: Duration::from_secs(1),
1210 certification_timeout: Duration::from_secs(2),
1211 timeout_retry: Duration::from_secs(10),
1212 fetch_timeout: Duration::from_secs(1),
1213 activity_timeout,
1214 skip_timeout,
1215 fetch_concurrent: 4,
1216 replay_buffer: NZUsize!(1024 * 1024),
1217 write_buffer: NZUsize!(1024 * 1024),
1218 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1219 forwarding: ForwardingPolicy::Disabled,
1220 };
1221 let engine = Engine::new(context.with_label("engine"), cfg);
1222
1223 let (pending, recovered, resolver) = registrations
1225 .remove(validator)
1226 .expect("validator should be registered");
1227 engine_handlers.push(engine.start(pending, recovered, resolver));
1228 }
1229
1230 let mut finalizers = Vec::new();
1232 for (_, reporter) in reporters.iter_mut() {
1233 let (mut latest, mut monitor) = reporter.subscribe().await;
1234 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1235 while latest < required_containers {
1236 latest = monitor.recv().await.expect("event missing");
1237 }
1238 }));
1239 }
1240
1241 let wait =
1243 context.gen_range(Duration::from_millis(100)..Duration::from_millis(2_000));
1244 let result = select! {
1245 _ = context.sleep(wait) => {
1246 {
1248 let mut shutdowns = shutdowns.lock();
1249 debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
1250 *shutdowns += 1;
1251 }
1252 supervised.lock().push(reporters);
1253 false
1254 },
1255 _ = join_all(finalizers) => {
1256 let supervised = supervised.lock();
1258 for reporters in supervised.iter() {
1259 for (_, reporter) in reporters.iter() {
1260 reporter.assert_no_faults();
1261 }
1262 }
1263 true
1264 },
1265 };
1266
1267 let blocked = oracle.blocked().await.unwrap();
1269 assert!(blocked.is_empty());
1270
1271 result
1272 };
1273
1274 let (complete, checkpoint) = prev_checkpoint
1275 .map_or_else(
1276 || deterministic::Runner::timed(Duration::from_secs(180)),
1277 deterministic::Runner::from,
1278 )
1279 .start_and_recover(f);
1280
1281 if complete {
1283 break;
1284 }
1285
1286 prev_checkpoint = Some(checkpoint);
1287 }
1288 }
1289
1290 #[test_group("slow")]
1291 #[test_traced]
1292 fn test_unclean_shutdown() {
1293 unclean_shutdown::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1294 unclean_shutdown::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1295 unclean_shutdown::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1296 unclean_shutdown::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1297 unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1298 unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1299 unclean_shutdown::<_, _, RoundRobin>(ed25519::fixture);
1300 unclean_shutdown::<_, _, RoundRobin>(secp256r1::fixture);
1301 }
1302
1303 fn backfill<S, F, L>(mut fixture: F)
1304 where
1305 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1306 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1307 L: Elector<S>,
1308 {
1309 let n = 4;
1311 let required_containers = View::new(100);
1312 let activity_timeout = ViewDelta::new(10);
1313 let skip_timeout = ViewDelta::new(5);
1314 let namespace = b"consensus".to_vec();
1315 let executor = deterministic::Runner::timed(Duration::from_secs(240));
1316 executor.start(|mut context| async move {
1317 let Fixture {
1319 participants,
1320 schemes,
1321 ..
1322 } = fixture(&mut context, &namespace, n);
1323 let mut oracle =
1324 start_test_network_with_peers(context.clone(), participants.clone(), true).await;
1325 let mut registrations = register_validators(&mut oracle, &participants).await;
1326
1327 let link = Link {
1329 latency: Duration::from_millis(10),
1330 jitter: Duration::from_millis(1),
1331 success_rate: 1.0,
1332 };
1333 link_validators(
1334 &mut oracle,
1335 &participants,
1336 Action::Link(link),
1337 Some(|_, i, j| ![i, j].contains(&0usize)),
1338 )
1339 .await;
1340
1341 let elector = L::default();
1343 let relay = Arc::new(mocks::relay::Relay::new());
1344 let mut reporters = Vec::new();
1345 let mut engine_handlers = Vec::new();
1346 for (idx_scheme, validator) in participants.iter().enumerate() {
1347 if idx_scheme == 0 {
1349 continue;
1350 }
1351
1352 let context = context.with_label(&format!("validator_{}", *validator));
1354
1355 let reporter_config = mocks::reporter::Config {
1357 participants: participants.clone().try_into().unwrap(),
1358 scheme: schemes[idx_scheme].clone(),
1359 elector: elector.clone(),
1360 };
1361 let reporter =
1362 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1363 reporters.push(reporter.clone());
1364 let application_cfg = mocks::application::Config {
1365 hasher: Sha256::default(),
1366 relay: relay.clone(),
1367 me: validator.clone(),
1368 propose_latency: (10.0, 5.0),
1369 verify_latency: (10.0, 5.0),
1370 certify_latency: (10.0, 5.0),
1371 should_certify: mocks::application::Certifier::Sometimes,
1372 };
1373 let (actor, application) = mocks::application::Application::new(
1374 context.with_label("application"),
1375 application_cfg,
1376 );
1377 actor.start();
1378 let blocker = oracle.control(validator.clone());
1379 let cfg = config::Config {
1380 scheme: schemes[idx_scheme].clone(),
1381 elector: elector.clone(),
1382 blocker,
1383 automaton: application.clone(),
1384 relay: application.clone(),
1385 reporter: reporter.clone(),
1386 strategy: Sequential,
1387 partition: validator.to_string(),
1388 mailbox_size: 1024,
1389 epoch: Epoch::new(333),
1390 leader_timeout: Duration::from_secs(1),
1391 certification_timeout: Duration::from_secs(2),
1392 timeout_retry: Duration::from_secs(10),
1393 fetch_timeout: Duration::from_secs(1),
1394 activity_timeout,
1395 skip_timeout,
1396 fetch_concurrent: 4,
1397 replay_buffer: NZUsize!(1024 * 1024),
1398 write_buffer: NZUsize!(1024 * 1024),
1399 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1400 forwarding: ForwardingPolicy::Disabled,
1401 };
1402 let engine = Engine::new(context.with_label("engine"), cfg);
1403
1404 let (pending, recovered, resolver) = registrations
1406 .remove(validator)
1407 .expect("validator should be registered");
1408 engine_handlers.push(engine.start(pending, recovered, resolver));
1409 }
1410
1411 let mut finalizers = Vec::new();
1413 for reporter in reporters.iter_mut() {
1414 let (mut latest, mut monitor) = reporter.subscribe().await;
1415 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1416 while latest < required_containers {
1417 latest = monitor.recv().await.expect("event missing");
1418 }
1419 }));
1420 }
1421 join_all(finalizers).await;
1422
1423 let link = Link {
1425 latency: Duration::from_secs(3),
1426 jitter: Duration::from_millis(0),
1427 success_rate: 1.0,
1428 };
1429 link_validators(
1430 &mut oracle,
1431 &participants,
1432 Action::Update(link.clone()),
1433 Some(|_, i, j| ![i, j].contains(&0usize)),
1434 )
1435 .await;
1436
1437 context.sleep(Duration::from_secs(60)).await;
1439
1440 link_validators(
1442 &mut oracle,
1443 &participants,
1444 Action::Unlink,
1445 Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
1446 )
1447 .await;
1448
1449 let me = participants[0].clone();
1451 let context = context.with_label(&format!("validator_{me}"));
1452
1453 link_validators(
1455 &mut oracle,
1456 &participants,
1457 Action::Link(link),
1458 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
1459 )
1460 .await;
1461
1462 let link = Link {
1464 latency: Duration::from_millis(10),
1465 jitter: Duration::from_millis(3),
1466 success_rate: 1.0,
1467 };
1468 link_validators(
1469 &mut oracle,
1470 &participants,
1471 Action::Update(link),
1472 Some(|_, i, j| ![i, j].contains(&1usize)),
1473 )
1474 .await;
1475
1476 let reporter_config = mocks::reporter::Config {
1478 participants: participants.clone().try_into().unwrap(),
1479 scheme: schemes[0].clone(),
1480 elector: elector.clone(),
1481 };
1482 let mut reporter =
1483 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1484 reporters.push(reporter.clone());
1485 let application_cfg = mocks::application::Config {
1486 hasher: Sha256::default(),
1487 relay: relay.clone(),
1488 me: me.clone(),
1489 propose_latency: (10.0, 5.0),
1490 verify_latency: (10.0, 5.0),
1491 certify_latency: (10.0, 5.0),
1492 should_certify: mocks::application::Certifier::Sometimes,
1493 };
1494 let (actor, application) = mocks::application::Application::new(
1495 context.with_label("application"),
1496 application_cfg,
1497 );
1498 actor.start();
1499 let blocker = oracle.control(me.clone());
1500 let cfg = config::Config {
1501 scheme: schemes[0].clone(),
1502 elector: elector.clone(),
1503 blocker,
1504 automaton: application.clone(),
1505 relay: application.clone(),
1506 reporter: reporter.clone(),
1507 strategy: Sequential,
1508 partition: me.to_string(),
1509 mailbox_size: 1024,
1510 epoch: Epoch::new(333),
1511 leader_timeout: Duration::from_secs(1),
1512 certification_timeout: Duration::from_secs(2),
1513 timeout_retry: Duration::from_secs(10),
1514 fetch_timeout: Duration::from_secs(1),
1515 activity_timeout,
1516 skip_timeout,
1517 fetch_concurrent: 4,
1518 replay_buffer: NZUsize!(1024 * 1024),
1519 write_buffer: NZUsize!(1024 * 1024),
1520 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1521 forwarding: ForwardingPolicy::Disabled,
1522 };
1523 let engine = Engine::new(context.with_label("engine"), cfg);
1524
1525 let (pending, recovered, resolver) = registrations
1527 .remove(&me)
1528 .expect("validator should be registered");
1529 engine_handlers.push(engine.start(pending, recovered, resolver));
1530
1531 let (mut latest, mut monitor) = reporter.subscribe().await;
1533 while latest < required_containers {
1534 latest = monitor.recv().await.expect("event missing");
1535 }
1536
1537 let blocked = oracle.blocked().await.unwrap();
1539 assert!(blocked.is_empty());
1540 });
1541 }
1542
1543 #[test_group("slow")]
1544 #[test_traced]
1545 fn test_backfill() {
1546 backfill::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1547 backfill::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1548 backfill::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1549 backfill::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1550 backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1551 backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1552 backfill::<_, _, RoundRobin>(ed25519::fixture);
1553 backfill::<_, _, RoundRobin>(secp256r1::fixture);
1554 }
1555
1556 fn one_offline<S, F, L>(mut fixture: F)
1557 where
1558 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1559 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1560 L: Elector<S>,
1561 {
1562 let n = 5;
1564 let quorum = quorum(n) as usize;
1565 let required_containers = View::new(100);
1566 let activity_timeout = ViewDelta::new(10);
1567 let skip_timeout = ViewDelta::new(5);
1568 let max_exceptions = 10;
1569 let namespace = b"consensus".to_vec();
1570 let executor = deterministic::Runner::timed(Duration::from_secs(300));
1571 executor.start(|mut context| async move {
1572 let Fixture {
1574 participants,
1575 schemes,
1576 ..
1577 } = fixture(&mut context, &namespace, n);
1578 let mut oracle =
1579 start_test_network_with_peers(context.clone(), participants.clone(), true).await;
1580 let mut registrations = register_validators(&mut oracle, &participants).await;
1581
1582 let link = Link {
1584 latency: Duration::from_millis(10),
1585 jitter: Duration::from_millis(1),
1586 success_rate: 1.0,
1587 };
1588 link_validators(
1589 &mut oracle,
1590 &participants,
1591 Action::Link(link),
1592 Some(|_, i, j| ![i, j].contains(&0usize)),
1593 )
1594 .await;
1595
1596 let elector = L::default();
1598 let relay = Arc::new(mocks::relay::Relay::new());
1599 let mut reporters = Vec::new();
1600 let mut engine_handlers = Vec::new();
1601 for (idx_scheme, validator) in participants.iter().enumerate() {
1602 if idx_scheme == 0 {
1604 continue;
1605 }
1606
1607 let context = context.with_label(&format!("validator_{}", *validator));
1609
1610 let reporter_config = mocks::reporter::Config {
1612 participants: participants.clone().try_into().unwrap(),
1613 scheme: schemes[idx_scheme].clone(),
1614 elector: elector.clone(),
1615 };
1616 let reporter =
1617 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1618 reporters.push(reporter.clone());
1619 let application_cfg = mocks::application::Config {
1620 hasher: Sha256::default(),
1621 relay: relay.clone(),
1622 me: validator.clone(),
1623 propose_latency: (10.0, 5.0),
1624 verify_latency: (10.0, 5.0),
1625 certify_latency: (10.0, 5.0),
1626 should_certify: mocks::application::Certifier::Sometimes,
1627 };
1628 let (actor, application) = mocks::application::Application::new(
1629 context.with_label("application"),
1630 application_cfg,
1631 );
1632 actor.start();
1633 let blocker = oracle.control(validator.clone());
1634 let cfg = config::Config {
1635 scheme: schemes[idx_scheme].clone(),
1636 elector: elector.clone(),
1637 blocker,
1638 automaton: application.clone(),
1639 relay: application.clone(),
1640 reporter: reporter.clone(),
1641 strategy: Sequential,
1642 partition: validator.to_string(),
1643 mailbox_size: 1024,
1644 epoch: Epoch::new(333),
1645 leader_timeout: Duration::from_secs(1),
1646 certification_timeout: Duration::from_secs(2),
1647 timeout_retry: Duration::from_secs(10),
1648 fetch_timeout: Duration::from_secs(1),
1649 activity_timeout,
1650 skip_timeout,
1651 fetch_concurrent: 4,
1652 replay_buffer: NZUsize!(1024 * 1024),
1653 write_buffer: NZUsize!(1024 * 1024),
1654 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1655 forwarding: ForwardingPolicy::Disabled,
1656 };
1657 let engine = Engine::new(context.with_label("engine"), cfg);
1658
1659 let (pending, recovered, resolver) = registrations
1661 .remove(validator)
1662 .expect("validator should be registered");
1663 engine_handlers.push(engine.start(pending, recovered, resolver));
1664 }
1665
1666 let mut finalizers = Vec::new();
1668 for reporter in reporters.iter_mut() {
1669 let (mut latest, mut monitor) = reporter.subscribe().await;
1670 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1671 while latest < required_containers {
1672 latest = monitor.recv().await.expect("event missing");
1673 }
1674 }));
1675 }
1676 join_all(finalizers).await;
1677
1678 let exceptions = 0;
1680 let offline = &participants[0];
1681 for reporter in reporters.iter() {
1682 reporter.assert_no_faults();
1684
1685 reporter.assert_no_invalid();
1687
1688 let mut exceptions = 0;
1690 {
1691 let notarizes = reporter.notarizes.lock();
1692 for (view, payloads) in notarizes.iter() {
1693 for (_, participants) in payloads.iter() {
1694 if participants.contains(offline) {
1695 panic!("view: {view}");
1696 }
1697 }
1698 }
1699 }
1700 {
1701 let nullifies = reporter.nullifies.lock();
1702 for (view, participants) in nullifies.iter() {
1703 if participants.contains(offline) {
1704 panic!("view: {view}");
1705 }
1706 }
1707 }
1708 {
1709 let finalizes = reporter.finalizes.lock();
1710 for (view, payloads) in finalizes.iter() {
1711 for (_, finalizers) in payloads.iter() {
1712 if finalizers.contains(offline) {
1713 panic!("view: {view}");
1714 }
1715 }
1716 }
1717 }
1718
1719 let mut offline_views = Vec::new();
1721 {
1722 let leaders = reporter.leaders.lock();
1723 for (view, leader) in leaders.iter() {
1724 if leader == offline {
1725 offline_views.push(*view);
1726 }
1727 }
1728 }
1729 assert!(!offline_views.is_empty());
1730
1731 {
1733 let nullifies = reporter.nullifies.lock();
1734 for view in offline_views.iter() {
1735 let nullifies = nullifies.get(view).map_or(0, |n| n.len());
1736 if nullifies < quorum {
1737 warn!("missing expected view nullifies: {}", view);
1738 exceptions += 1;
1739 }
1740 }
1741 }
1742 {
1743 let nullifications = reporter.nullifications.lock();
1744 for view in offline_views.iter() {
1745 if !nullifications.contains_key(view) {
1746 warn!("missing expected view nullifies: {}", view);
1747 exceptions += 1;
1748 }
1749 }
1750 }
1751
1752 assert!(exceptions <= max_exceptions);
1754 }
1755 assert!(exceptions <= max_exceptions);
1756
1757 let blocked = oracle.blocked().await.unwrap();
1759 assert!(blocked.is_empty());
1760
1761 let encoded = context.encode();
1763 let leader_label = format!("leader=\"{}\"", offline);
1764 assert!(
1765 count_nonzero_metric_lines(&encoded, &["_timeouts", &leader_label]) >= n - 1,
1766 "expected timeout metrics for offline leader"
1767 );
1768 assert_eq!(
1769 count_nonzero_metric_lines(&encoded, &["_nullifications", &leader_label]),
1770 n - 1,
1771 "expected all online nodes to record _nullifications for offline leader"
1772 );
1773 });
1774 }
1775
1776 #[test_group("slow")]
1777 #[test_traced]
1778 fn test_one_offline() {
1779 one_offline::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1780 one_offline::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1781 one_offline::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1782 one_offline::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1783 one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1784 one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1785 one_offline::<_, _, RoundRobin>(ed25519::fixture);
1786 one_offline::<_, _, RoundRobin>(secp256r1::fixture);
1787 }
1788
1789 fn slow_validator<S, F, L>(mut fixture: F)
1790 where
1791 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1792 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1793 L: Elector<S>,
1794 {
1795 let n = 5;
1797 let required_containers = View::new(50);
1798 let activity_timeout = ViewDelta::new(10);
1799 let skip_timeout = ViewDelta::new(5);
1800 let namespace = b"consensus".to_vec();
1801 let executor = deterministic::Runner::timed(Duration::from_secs(300));
1802 executor.start(|mut context| async move {
1803 let Fixture {
1805 participants,
1806 schemes,
1807 ..
1808 } = fixture(&mut context, &namespace, n);
1809 let mut oracle =
1810 start_test_network_with_peers(context.clone(), participants.clone(), true).await;
1811 let mut registrations = register_validators(&mut oracle, &participants).await;
1812
1813 let link = Link {
1815 latency: Duration::from_millis(10),
1816 jitter: Duration::from_millis(1),
1817 success_rate: 1.0,
1818 };
1819 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1820
1821 let elector = L::default();
1823 let relay = Arc::new(mocks::relay::Relay::new());
1824 let mut reporters = Vec::new();
1825 let mut engine_handlers = Vec::new();
1826 for (idx_scheme, validator) in participants.iter().enumerate() {
1827 let context = context.with_label(&format!("validator_{}", *validator));
1829
1830 let reporter_config = mocks::reporter::Config {
1832 participants: participants.clone().try_into().unwrap(),
1833 scheme: schemes[idx_scheme].clone(),
1834 elector: elector.clone(),
1835 };
1836 let reporter =
1837 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1838 reporters.push(reporter.clone());
1839 let application_cfg = if idx_scheme == 0 {
1840 mocks::application::Config {
1841 hasher: Sha256::default(),
1842 relay: relay.clone(),
1843 me: validator.clone(),
1844 propose_latency: (10_000.0, 0.0),
1845 verify_latency: (10_000.0, 5.0),
1846 certify_latency: (10_000.0, 5.0),
1847 should_certify: mocks::application::Certifier::Sometimes,
1848 }
1849 } else {
1850 mocks::application::Config {
1851 hasher: Sha256::default(),
1852 relay: relay.clone(),
1853 me: validator.clone(),
1854 propose_latency: (10.0, 5.0),
1855 verify_latency: (10.0, 5.0),
1856 certify_latency: (10.0, 5.0),
1857 should_certify: mocks::application::Certifier::Sometimes,
1858 }
1859 };
1860 let (actor, application) = mocks::application::Application::new(
1861 context.with_label("application"),
1862 application_cfg,
1863 );
1864 actor.start();
1865 let blocker = oracle.control(validator.clone());
1866 let cfg = config::Config {
1867 scheme: schemes[idx_scheme].clone(),
1868 elector: elector.clone(),
1869 blocker,
1870 automaton: application.clone(),
1871 relay: application.clone(),
1872 reporter: reporter.clone(),
1873 strategy: Sequential,
1874 partition: validator.to_string(),
1875 mailbox_size: 1024,
1876 epoch: Epoch::new(333),
1877 leader_timeout: Duration::from_secs(1),
1878 certification_timeout: Duration::from_secs(2),
1879 timeout_retry: Duration::from_secs(10),
1880 fetch_timeout: Duration::from_secs(1),
1881 activity_timeout,
1882 skip_timeout,
1883 fetch_concurrent: 4,
1884 replay_buffer: NZUsize!(1024 * 1024),
1885 write_buffer: NZUsize!(1024 * 1024),
1886 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1887 forwarding: ForwardingPolicy::Disabled,
1888 };
1889 let engine = Engine::new(context.with_label("engine"), cfg);
1890
1891 let (pending, recovered, resolver) = registrations
1893 .remove(validator)
1894 .expect("validator should be registered");
1895 engine_handlers.push(engine.start(pending, recovered, resolver));
1896 }
1897
1898 let mut finalizers = Vec::new();
1900 for reporter in reporters.iter_mut() {
1901 let (mut latest, mut monitor) = reporter.subscribe().await;
1902 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1903 while latest < required_containers {
1904 latest = monitor.recv().await.expect("event missing");
1905 }
1906 }));
1907 }
1908 join_all(finalizers).await;
1909
1910 let slow = &participants[0];
1912 for reporter in reporters.iter() {
1913 reporter.assert_no_faults();
1915
1916 reporter.assert_no_invalid();
1918
1919 {
1922 let notarizes = reporter.notarizes.lock();
1923 assert!(notarizes.values().all(|payloads| {
1924 payloads
1925 .values()
1926 .all(|participants| !participants.contains(slow))
1927 }));
1928 }
1929 {
1930 let finalizes = reporter.finalizes.lock();
1931 assert!(finalizes.values().all(|payloads| {
1932 payloads
1933 .values()
1934 .all(|participants| !participants.contains(slow))
1935 }));
1936 }
1937
1938 {
1940 let finalizations = reporter.finalizations.lock();
1941 assert!(finalizations
1942 .keys()
1943 .any(|view| *view >= required_containers));
1944 }
1945 }
1946
1947 let blocked = oracle.blocked().await.unwrap();
1949 assert!(blocked.is_empty());
1950 });
1951 }
1952
1953 #[test_group("slow")]
1954 #[test_traced]
1955 fn test_slow_validator() {
1956 slow_validator::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1957 slow_validator::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1958 slow_validator::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1959 slow_validator::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1960 slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1961 slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1962 slow_validator::<_, _, RoundRobin>(ed25519::fixture);
1963 slow_validator::<_, _, RoundRobin>(secp256r1::fixture);
1964 }
1965
1966 fn all_recovery<S, F, L>(mut fixture: F)
1967 where
1968 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1969 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1970 L: Elector<S>,
1971 {
1972 let n = 5;
1974 let required_containers = View::new(100);
1975 let activity_timeout = ViewDelta::new(10);
1976 let skip_timeout = ViewDelta::new(2);
1977 let namespace = b"consensus".to_vec();
1978 let executor = deterministic::Runner::timed(Duration::from_secs(1800));
1979 executor.start(|mut context| async move {
1980 let Fixture {
1982 participants,
1983 schemes,
1984 ..
1985 } = fixture(&mut context, &namespace, n);
1986 let mut oracle =
1987 start_test_network_with_peers(context.clone(), participants.clone(), true).await;
1988 let mut registrations = register_validators(&mut oracle, &participants).await;
1989
1990 let link = Link {
1992 latency: Duration::from_secs(3),
1993 jitter: Duration::from_millis(0),
1994 success_rate: 1.0,
1995 };
1996 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1997
1998 let elector = L::default();
2000 let relay = Arc::new(mocks::relay::Relay::new());
2001 let mut reporters = Vec::new();
2002 let mut engine_handlers = Vec::new();
2003 for (idx, validator) in participants.iter().enumerate() {
2004 let context = context.with_label(&format!("validator_{}", *validator));
2006
2007 let reporter_config = mocks::reporter::Config {
2009 participants: participants.clone().try_into().unwrap(),
2010 scheme: schemes[idx].clone(),
2011 elector: elector.clone(),
2012 };
2013 let reporter =
2014 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2015 reporters.push(reporter.clone());
2016 let application_cfg = mocks::application::Config {
2017 hasher: Sha256::default(),
2018 relay: relay.clone(),
2019 me: validator.clone(),
2020 propose_latency: (10.0, 5.0),
2021 verify_latency: (10.0, 5.0),
2022 certify_latency: (10.0, 5.0),
2023 should_certify: mocks::application::Certifier::Sometimes,
2024 };
2025 let (actor, application) = mocks::application::Application::new(
2026 context.with_label("application"),
2027 application_cfg,
2028 );
2029 actor.start();
2030 let blocker = oracle.control(validator.clone());
2031 let cfg = config::Config {
2032 scheme: schemes[idx].clone(),
2033 elector: elector.clone(),
2034 blocker,
2035 automaton: application.clone(),
2036 relay: application.clone(),
2037 reporter: reporter.clone(),
2038 strategy: Sequential,
2039 partition: validator.to_string(),
2040 mailbox_size: 1024,
2041 epoch: Epoch::new(333),
2042 leader_timeout: Duration::from_secs(1),
2043 certification_timeout: Duration::from_secs(2),
2044 timeout_retry: Duration::from_secs(10),
2045 fetch_timeout: Duration::from_secs(1),
2046 activity_timeout,
2047 skip_timeout,
2048 fetch_concurrent: 4,
2049 replay_buffer: NZUsize!(1024 * 1024),
2050 write_buffer: NZUsize!(1024 * 1024),
2051 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2052 forwarding: ForwardingPolicy::Disabled,
2053 };
2054 let engine = Engine::new(context.with_label("engine"), cfg);
2055
2056 let (pending, recovered, resolver) = registrations
2058 .remove(validator)
2059 .expect("validator should be registered");
2060 engine_handlers.push(engine.start(pending, recovered, resolver));
2061 }
2062
2063 let mut finalizers = Vec::new();
2065 for reporter in reporters.iter_mut() {
2066 let (_, mut monitor) = reporter.subscribe().await;
2067 finalizers.push(
2068 context
2069 .with_label("finalizer")
2070 .spawn(move |context| async move {
2071 select! {
2072 _timeout = context.sleep(Duration::from_secs(60)) => {},
2073 _done = monitor.recv() => {
2074 panic!("engine should not notarize or finalize anything");
2075 },
2076 }
2077 }),
2078 );
2079 }
2080 join_all(finalizers).await;
2081
2082 link_validators(&mut oracle, &participants, Action::Unlink, None).await;
2084
2085 context.sleep(Duration::from_secs(60)).await;
2087
2088 let mut latest = View::zero();
2090 for reporter in reporters.iter() {
2091 let nullifies = reporter.nullifies.lock();
2092 let max = nullifies.keys().max().unwrap();
2093 if *max > latest {
2094 latest = *max;
2095 }
2096 }
2097
2098 let link = Link {
2100 latency: Duration::from_millis(10),
2101 jitter: Duration::from_millis(1),
2102 success_rate: 1.0,
2103 };
2104 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2105
2106 let mut finalizers = Vec::new();
2108 for reporter in reporters.iter_mut() {
2109 let (mut latest, mut monitor) = reporter.subscribe().await;
2110 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2111 while latest < required_containers {
2112 latest = monitor.recv().await.expect("event missing");
2113 }
2114 }));
2115 }
2116 join_all(finalizers).await;
2117
2118 for reporter in reporters.iter() {
2120 reporter.assert_no_faults();
2122
2123 reporter.assert_no_invalid();
2125
2126 {
2131 let mut found = 0;
2135 let notarizations = reporter.notarizations.lock();
2136 for view in View::range(latest, latest.saturating_add(activity_timeout)) {
2137 if notarizations.contains_key(&view) {
2138 found += 1;
2139 }
2140 }
2141 assert!(
2142 found >= activity_timeout.get().saturating_sub(2),
2143 "found: {found}"
2144 );
2145 }
2146 }
2147
2148 let blocked = oracle.blocked().await.unwrap();
2150 assert!(blocked.is_empty());
2151 });
2152 }
2153
2154 #[test_group("slow")]
2155 #[test_traced]
2156 fn test_all_recovery() {
2157 all_recovery::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
2158 all_recovery::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
2159 all_recovery::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
2160 all_recovery::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
2161 all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2162 all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2163 all_recovery::<_, _, RoundRobin>(ed25519::fixture);
2164 all_recovery::<_, _, RoundRobin>(secp256r1::fixture);
2165 }
2166
2167 fn partition<S, F, L>(mut fixture: F)
2168 where
2169 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2170 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2171 L: Elector<S>,
2172 {
2173 let n = 10;
2175 let required_containers = View::new(50);
2176 let activity_timeout = ViewDelta::new(10);
2177 let skip_timeout = ViewDelta::new(5);
2178 let namespace = b"consensus".to_vec();
2179 let executor = deterministic::Runner::timed(Duration::from_secs(900));
2180 executor.start(|mut context| async move {
2181 let Fixture {
2183 participants,
2184 schemes,
2185 ..
2186 } = fixture(&mut context, &namespace, n);
2187 let mut oracle =
2188 start_test_network_with_peers(context.clone(), participants.clone(), true).await;
2189 let mut registrations = register_validators(&mut oracle, &participants).await;
2190
2191 let link = Link {
2193 latency: Duration::from_millis(10),
2194 jitter: Duration::from_millis(1),
2195 success_rate: 1.0,
2196 };
2197 link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
2198
2199 let elector = L::default();
2201 let relay = Arc::new(mocks::relay::Relay::new());
2202 let mut reporters = Vec::new();
2203 let mut engine_handlers = Vec::new();
2204 for (idx, validator) in participants.iter().enumerate() {
2205 let context = context.with_label(&format!("validator_{}", *validator));
2207
2208 let reporter_config = mocks::reporter::Config {
2210 participants: participants.clone().try_into().unwrap(),
2211 scheme: schemes[idx].clone(),
2212 elector: elector.clone(),
2213 };
2214 let reporter =
2215 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2216 reporters.push(reporter.clone());
2217 let application_cfg = mocks::application::Config {
2218 hasher: Sha256::default(),
2219 relay: relay.clone(),
2220 me: validator.clone(),
2221 propose_latency: (10.0, 5.0),
2222 verify_latency: (10.0, 5.0),
2223 certify_latency: (10.0, 5.0),
2224 should_certify: mocks::application::Certifier::Sometimes,
2225 };
2226 let (actor, application) = mocks::application::Application::new(
2227 context.with_label("application"),
2228 application_cfg,
2229 );
2230 actor.start();
2231 let blocker = oracle.control(validator.clone());
2232 let cfg = config::Config {
2233 scheme: schemes[idx].clone(),
2234 elector: elector.clone(),
2235 blocker,
2236 automaton: application.clone(),
2237 relay: application.clone(),
2238 reporter: reporter.clone(),
2239 strategy: Sequential,
2240 partition: validator.to_string(),
2241 mailbox_size: 1024,
2242 epoch: Epoch::new(333),
2243 leader_timeout: Duration::from_secs(1),
2244 certification_timeout: Duration::from_secs(2),
2245 timeout_retry: Duration::from_secs(10),
2246 fetch_timeout: Duration::from_secs(1),
2247 activity_timeout,
2248 skip_timeout,
2249 fetch_concurrent: 4,
2250 replay_buffer: NZUsize!(1024 * 1024),
2251 write_buffer: NZUsize!(1024 * 1024),
2252 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2253 forwarding: ForwardingPolicy::Disabled,
2254 };
2255 let engine = Engine::new(context.with_label("engine"), cfg);
2256
2257 let (pending, recovered, resolver) = registrations
2259 .remove(validator)
2260 .expect("validator should be registered");
2261 engine_handlers.push(engine.start(pending, recovered, resolver));
2262 }
2263
2264 let mut finalizers = Vec::new();
2266 for reporter in reporters.iter_mut() {
2267 let (mut latest, mut monitor) = reporter.subscribe().await;
2268 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2269 while latest < required_containers {
2270 latest = monitor.recv().await.expect("event missing");
2271 }
2272 }));
2273 }
2274 join_all(finalizers).await;
2275
2276 fn separated(n: usize, a: usize, b: usize) -> bool {
2278 let m = n / 2;
2279 (a < m && b >= m) || (a >= m && b < m)
2280 }
2281 link_validators(&mut oracle, &participants, Action::Unlink, Some(separated)).await;
2282
2283 context.sleep(Duration::from_secs(10)).await;
2285
2286 let mut finalizers = Vec::new();
2288 for reporter in reporters.iter_mut() {
2289 let (_, mut monitor) = reporter.subscribe().await;
2290 finalizers.push(
2291 context
2292 .with_label("finalizer")
2293 .spawn(move |context| async move {
2294 select! {
2295 _timeout = context.sleep(Duration::from_secs(60)) => {},
2296 _done = monitor.recv() => {
2297 panic!("engine should not notarize or finalize anything");
2298 },
2299 }
2300 }),
2301 );
2302 }
2303 join_all(finalizers).await;
2304
2305 link_validators(
2307 &mut oracle,
2308 &participants,
2309 Action::Link(link),
2310 Some(separated),
2311 )
2312 .await;
2313
2314 let mut finalizers = Vec::new();
2316 for reporter in reporters.iter_mut() {
2317 let (mut latest, mut monitor) = reporter.subscribe().await;
2318 let required = latest.saturating_add(ViewDelta::new(required_containers.get()));
2319 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2320 while latest < required {
2321 latest = monitor.recv().await.expect("event missing");
2322 }
2323 }));
2324 }
2325 join_all(finalizers).await;
2326
2327 for reporter in reporters.iter() {
2329 reporter.assert_no_faults();
2331
2332 reporter.assert_no_invalid();
2334 }
2335
2336 let blocked = oracle.blocked().await.unwrap();
2338 assert!(blocked.is_empty());
2339 });
2340 }
2341
2342 #[test_group("slow")]
2343 #[test_traced]
2344 fn test_partition() {
2345 partition::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
2346 partition::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
2347 partition::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
2348 partition::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
2349 partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2350 partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2351 partition::<_, _, RoundRobin>(ed25519::fixture);
2352 partition::<_, _, RoundRobin>(secp256r1::fixture);
2353 }
2354
2355 fn slow_and_lossy_links<S, F, L>(seed: u64, mut fixture: F) -> String
2356 where
2357 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2358 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2359 L: Elector<S>,
2360 {
2361 let n = 5;
2363 let required_containers = View::new(50);
2364 let activity_timeout = ViewDelta::new(10);
2365 let skip_timeout = ViewDelta::new(5);
2366 let namespace = b"consensus".to_vec();
2367 let cfg = deterministic::Config::new()
2368 .with_seed(seed)
2369 .with_timeout(Some(Duration::from_secs(5_000)));
2370 let executor = deterministic::Runner::new(cfg);
2371 executor.start(|mut context| async move {
2372 let Fixture {
2374 participants,
2375 schemes,
2376 ..
2377 } = fixture(&mut context, &namespace, n);
2378 let mut oracle =
2379 start_test_network_with_peers(context.clone(), participants.clone(), true).await;
2380 let mut registrations = register_validators(&mut oracle, &participants).await;
2381
2382 let degraded_link = Link {
2384 latency: Duration::from_millis(200),
2385 jitter: Duration::from_millis(150),
2386 success_rate: 0.5,
2387 };
2388 link_validators(
2389 &mut oracle,
2390 &participants,
2391 Action::Link(degraded_link),
2392 None,
2393 )
2394 .await;
2395
2396 let elector = L::default();
2398 let relay = Arc::new(mocks::relay::Relay::new());
2399 let mut reporters = Vec::new();
2400 let mut engine_handlers = Vec::new();
2401 for (idx, validator) in participants.iter().enumerate() {
2402 let context = context.with_label(&format!("validator_{}", *validator));
2404
2405 let reporter_config = mocks::reporter::Config {
2407 participants: participants.clone().try_into().unwrap(),
2408 scheme: schemes[idx].clone(),
2409 elector: elector.clone(),
2410 };
2411 let reporter =
2412 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2413 reporters.push(reporter.clone());
2414 let application_cfg = mocks::application::Config {
2415 hasher: Sha256::default(),
2416 relay: relay.clone(),
2417 me: validator.clone(),
2418 propose_latency: (10.0, 5.0),
2419 verify_latency: (10.0, 5.0),
2420 certify_latency: (10.0, 5.0),
2421 should_certify: mocks::application::Certifier::Sometimes,
2422 };
2423 let (actor, application) = mocks::application::Application::new(
2424 context.with_label("application"),
2425 application_cfg,
2426 );
2427 actor.start();
2428 let blocker = oracle.control(validator.clone());
2429 let cfg = config::Config {
2430 scheme: schemes[idx].clone(),
2431 elector: elector.clone(),
2432 blocker,
2433 automaton: application.clone(),
2434 relay: application.clone(),
2435 reporter: reporter.clone(),
2436 strategy: Sequential,
2437 partition: validator.to_string(),
2438 mailbox_size: 1024,
2439 epoch: Epoch::new(333),
2440 leader_timeout: Duration::from_secs(1),
2441 certification_timeout: Duration::from_secs(2),
2442 timeout_retry: Duration::from_secs(10),
2443 fetch_timeout: Duration::from_secs(1),
2444 activity_timeout,
2445 skip_timeout,
2446 fetch_concurrent: 4,
2447 replay_buffer: NZUsize!(1024 * 1024),
2448 write_buffer: NZUsize!(1024 * 1024),
2449 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2450 forwarding: ForwardingPolicy::Disabled,
2451 };
2452 let engine = Engine::new(context.with_label("engine"), cfg);
2453
2454 let (pending, recovered, resolver) = registrations
2456 .remove(validator)
2457 .expect("validator should be registered");
2458 engine_handlers.push(engine.start(pending, recovered, resolver));
2459 }
2460
2461 let mut finalizers = Vec::new();
2463 for reporter in reporters.iter_mut() {
2464 let (mut latest, mut monitor) = reporter.subscribe().await;
2465 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2466 while latest < required_containers {
2467 latest = monitor.recv().await.expect("event missing");
2468 }
2469 }));
2470 }
2471 join_all(finalizers).await;
2472
2473 for reporter in reporters.iter() {
2475 reporter.assert_no_faults();
2477
2478 reporter.assert_no_invalid();
2480 }
2481
2482 let blocked = oracle.blocked().await.unwrap();
2484 assert!(blocked.is_empty());
2485
2486 context.auditor().state()
2487 })
2488 }
2489
2490 #[test_group("slow")]
2491 #[test_traced]
2492 fn test_slow_and_lossy_links() {
2493 slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold_vrf::fixture::<MinPk, _>);
2494 slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold_vrf::fixture::<MinSig, _>);
2495 slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_threshold_std::fixture::<MinPk, _>);
2496 slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_threshold_std::fixture::<MinSig, _>);
2497 slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinPk, _>);
2498 slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinSig, _>);
2499 slow_and_lossy_links::<_, _, RoundRobin>(0, ed25519::fixture);
2500 slow_and_lossy_links::<_, _, RoundRobin>(0, secp256r1::fixture);
2501 }
2502
2503 #[test_group("slow")]
2504 #[test_traced]
2505 fn test_determinism() {
2506 for seed in 1..6 {
2509 let ts_vrf_pk_state_1 = slow_and_lossy_links::<_, _, Random>(
2510 seed,
2511 bls12381_threshold_vrf::fixture::<MinPk, _>,
2512 );
2513 let ts_vrf_pk_state_2 = slow_and_lossy_links::<_, _, Random>(
2514 seed,
2515 bls12381_threshold_vrf::fixture::<MinPk, _>,
2516 );
2517 assert_eq!(ts_vrf_pk_state_1, ts_vrf_pk_state_2);
2518
2519 let ts_vrf_sig_state_1 = slow_and_lossy_links::<_, _, Random>(
2520 seed,
2521 bls12381_threshold_vrf::fixture::<MinSig, _>,
2522 );
2523 let ts_vrf_sig_state_2 = slow_and_lossy_links::<_, _, Random>(
2524 seed,
2525 bls12381_threshold_vrf::fixture::<MinSig, _>,
2526 );
2527 assert_eq!(ts_vrf_sig_state_1, ts_vrf_sig_state_2);
2528
2529 let ts_std_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2530 seed,
2531 bls12381_threshold_std::fixture::<MinPk, _>,
2532 );
2533 let ts_std_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2534 seed,
2535 bls12381_threshold_std::fixture::<MinPk, _>,
2536 );
2537 assert_eq!(ts_std_pk_state_1, ts_std_pk_state_2);
2538
2539 let ts_std_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2540 seed,
2541 bls12381_threshold_std::fixture::<MinSig, _>,
2542 );
2543 let ts_std_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2544 seed,
2545 bls12381_threshold_std::fixture::<MinSig, _>,
2546 );
2547 assert_eq!(ts_std_sig_state_1, ts_std_sig_state_2);
2548
2549 let ms_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2550 seed,
2551 bls12381_multisig::fixture::<MinPk, _>,
2552 );
2553 let ms_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2554 seed,
2555 bls12381_multisig::fixture::<MinPk, _>,
2556 );
2557 assert_eq!(ms_pk_state_1, ms_pk_state_2);
2558
2559 let ms_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2560 seed,
2561 bls12381_multisig::fixture::<MinSig, _>,
2562 );
2563 let ms_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2564 seed,
2565 bls12381_multisig::fixture::<MinSig, _>,
2566 );
2567 assert_eq!(ms_sig_state_1, ms_sig_state_2);
2568
2569 let ed_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
2570 let ed_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
2571 assert_eq!(ed_state_1, ed_state_2);
2572
2573 let secp_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
2574 let secp_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
2575 assert_eq!(secp_state_1, secp_state_2);
2576
2577 let states = [
2578 ("threshold-vrf-minpk", ts_vrf_pk_state_1),
2579 ("threshold-vrf-minsig", ts_vrf_sig_state_1),
2580 ("threshold-std-minpk", ts_std_pk_state_1),
2581 ("threshold-std-minsig", ts_std_sig_state_1),
2582 ("multisig-minpk", ms_pk_state_1),
2583 ("multisig-minsig", ms_sig_state_1),
2584 ("ed25519", ed_state_1),
2585 ("secp256r1", secp_state_1),
2586 ];
2587
2588 for pair in states.windows(2) {
2590 assert_ne!(
2591 pair[0].1, pair[1].1,
2592 "state {} equals state {}",
2593 pair[0].0, pair[1].0
2594 );
2595 }
2596 }
2597 }
2598
2599 fn conflicter<S, F, L>(seed: u64, mut fixture: F)
2600 where
2601 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2602 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2603 L: Elector<S>,
2604 {
2605 let n = 4;
2607 let required_containers = View::new(50);
2608 let activity_timeout = ViewDelta::new(10);
2609 let skip_timeout = ViewDelta::new(5);
2610 let namespace = b"consensus".to_vec();
2611 let cfg = deterministic::Config::new()
2612 .with_seed(seed)
2613 .with_timeout(Some(Duration::from_secs(30)));
2614 let executor = deterministic::Runner::new(cfg);
2615 executor.start(|mut context| async move {
2616 let Fixture {
2618 participants,
2619 schemes,
2620 ..
2621 } = fixture(&mut context, &namespace, n);
2622 let mut oracle =
2623 start_test_network_with_peers(context.clone(), participants.clone(), true).await;
2624 let mut registrations = register_validators(&mut oracle, &participants).await;
2625
2626 let link = Link {
2628 latency: Duration::from_millis(10),
2629 jitter: Duration::from_millis(1),
2630 success_rate: 1.0,
2631 };
2632 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2633
2634 let elector = L::default();
2636 let relay = Arc::new(mocks::relay::Relay::new());
2637 let mut reporters = Vec::new();
2638 for (idx_scheme, validator) in participants.iter().enumerate() {
2639 let context = context.with_label(&format!("validator_{}", *validator));
2641
2642 let reporter_config = mocks::reporter::Config {
2644 participants: participants.clone().try_into().unwrap(),
2645 scheme: schemes[idx_scheme].clone(),
2646 elector: elector.clone(),
2647 };
2648 let reporter =
2649 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2650 let (pending, recovered, resolver) = registrations
2651 .remove(validator)
2652 .expect("validator should be registered");
2653 if idx_scheme == 0 {
2654 let cfg = mocks::conflicter::Config {
2655 scheme: schemes[idx_scheme].clone(),
2656 };
2657
2658 let engine: mocks::conflicter::Conflicter<_, _, Sha256> =
2659 mocks::conflicter::Conflicter::new(
2660 context.with_label("byzantine_engine"),
2661 cfg,
2662 );
2663 engine.start(pending);
2664 } else {
2665 reporters.push(reporter.clone());
2666 let application_cfg = mocks::application::Config {
2667 hasher: Sha256::default(),
2668 relay: relay.clone(),
2669 me: validator.clone(),
2670 propose_latency: (10.0, 5.0),
2671 verify_latency: (10.0, 5.0),
2672 certify_latency: (10.0, 5.0),
2673 should_certify: mocks::application::Certifier::Sometimes,
2674 };
2675 let (actor, application) = mocks::application::Application::new(
2676 context.with_label("application"),
2677 application_cfg,
2678 );
2679 actor.start();
2680 let blocker = oracle.control(validator.clone());
2681 let cfg = config::Config {
2682 scheme: schemes[idx_scheme].clone(),
2683 elector: elector.clone(),
2684 blocker,
2685 automaton: application.clone(),
2686 relay: application.clone(),
2687 reporter: reporter.clone(),
2688 strategy: Sequential,
2689 partition: validator.to_string(),
2690 mailbox_size: 1024,
2691 epoch: Epoch::new(333),
2692 leader_timeout: Duration::from_secs(1),
2693 certification_timeout: Duration::from_secs(2),
2694 timeout_retry: Duration::from_secs(10),
2695 fetch_timeout: Duration::from_secs(1),
2696 activity_timeout,
2697 skip_timeout,
2698 fetch_concurrent: 4,
2699 replay_buffer: NZUsize!(1024 * 1024),
2700 write_buffer: NZUsize!(1024 * 1024),
2701 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2702 forwarding: ForwardingPolicy::Disabled,
2703 };
2704 let engine = Engine::new(context.with_label("engine"), cfg);
2705 engine.start(pending, recovered, resolver);
2706 }
2707 }
2708
2709 let mut finalizers = Vec::new();
2711 for reporter in reporters.iter_mut() {
2712 let (mut latest, mut monitor) = reporter.subscribe().await;
2713 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2714 while latest < required_containers {
2715 latest = monitor.recv().await.expect("event missing");
2716 }
2717 }));
2718 }
2719 join_all(finalizers).await;
2720
2721 let byz = &participants[0];
2723 let mut count_conflicting = 0;
2724 for reporter in reporters.iter() {
2725 {
2727 let faults = reporter.faults.lock();
2728 assert_eq!(faults.len(), 1);
2729 let faulter = faults.get(byz).expect("byzantine party is not faulter");
2730 for (_, faults) in faulter.iter() {
2731 for fault in faults.iter() {
2732 match fault {
2733 Activity::ConflictingNotarize(_) => {
2734 count_conflicting += 1;
2735 }
2736 Activity::ConflictingFinalize(_) => {
2737 count_conflicting += 1;
2738 }
2739 _ => panic!("unexpected fault: {fault:?}"),
2740 }
2741 }
2742 }
2743 }
2744
2745 reporter.assert_no_invalid();
2747 }
2748 assert!(count_conflicting > 0);
2749
2750 let blocked = oracle.blocked().await.unwrap();
2752 assert!(!blocked.is_empty());
2753 for (a, b) in blocked {
2754 assert_ne!(&a, byz);
2755 assert_eq!(&b, byz);
2756 }
2757 });
2758 }
2759
2760 #[test_group("slow")]
2761 #[test_traced]
2762 fn test_conflicter() {
2763 for seed in 0..5 {
2764 conflicter::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
2765 conflicter::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
2766 conflicter::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
2767 conflicter::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
2768 conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
2769 conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
2770 conflicter::<_, _, RoundRobin>(seed, ed25519::fixture);
2771 conflicter::<_, _, RoundRobin>(seed, secp256r1::fixture);
2772 }
2773 }
2774
2775 fn invalid<S, F, L>(seed: u64, mut fixture: F)
2776 where
2777 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2778 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2779 L: Elector<S>,
2780 {
2781 let n = 4;
2783 let required_containers = View::new(50);
2784 let activity_timeout = ViewDelta::new(10);
2785 let skip_timeout = ViewDelta::new(5);
2786 let namespace = b"consensus".to_vec();
2787 let cfg = deterministic::Config::new()
2788 .with_seed(seed)
2789 .with_timeout(Some(Duration::from_secs(30)));
2790 let executor = deterministic::Runner::new(cfg);
2791 executor.start(|mut context| async move {
2792 let Fixture {
2794 participants,
2795 schemes,
2796 ..
2797 } = fixture(&mut context, &namespace, n);
2798
2799 let schemes: Vec<_> = schemes
2800 .into_iter()
2801 .enumerate()
2802 .map(|(idx, scheme)| {
2803 let is_byzantine = idx == 0;
2804 let behavior = if is_byzantine {
2805 wrapped::Behavior::CorruptSignature
2806 } else {
2807 wrapped::Behavior::Honest
2808 };
2809 wrapped::Scheme::new(scheme, behavior)
2810 })
2811 .collect();
2812
2813 let mut oracle =
2814 start_test_network_with_peers(context.clone(), participants.clone(), true).await;
2815 let mut registrations = register_validators(&mut oracle, &participants).await;
2816
2817 let link = Link {
2819 latency: Duration::from_millis(10),
2820 jitter: Duration::from_millis(1),
2821 success_rate: 1.0,
2822 };
2823 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2824
2825 let elector = wrapped::Config(L::default());
2827 let relay = Arc::new(mocks::relay::Relay::new());
2828 let mut reporters = Vec::new();
2829 for (idx_scheme, validator) in participants.iter().enumerate() {
2830 let context = context.with_label(&format!("validator_{}", *validator));
2832
2833 let reporter_config = mocks::reporter::Config {
2834 participants: participants.clone().try_into().unwrap(),
2835 scheme: schemes[idx_scheme].clone(),
2836 elector: elector.clone(),
2837 };
2838 let reporter =
2839 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2840 reporters.push(reporter.clone());
2841
2842 let application_cfg = mocks::application::Config {
2843 hasher: Sha256::default(),
2844 relay: relay.clone(),
2845 me: validator.clone(),
2846 propose_latency: (10.0, 5.0),
2847 verify_latency: (10.0, 5.0),
2848 certify_latency: (10.0, 5.0),
2849 should_certify: mocks::application::Certifier::Sometimes,
2850 };
2851 let (actor, application) = mocks::application::Application::new(
2852 context.with_label("application"),
2853 application_cfg,
2854 );
2855 actor.start();
2856 let blocker = oracle.control(validator.clone());
2857 let cfg = config::Config {
2858 scheme: schemes[idx_scheme].clone(),
2859 elector: elector.clone(),
2860 blocker,
2861 automaton: application.clone(),
2862 relay: application.clone(),
2863 reporter: reporter.clone(),
2864 strategy: Sequential,
2865 partition: validator.clone().to_string(),
2866 mailbox_size: 1024,
2867 epoch: Epoch::new(333),
2868 leader_timeout: Duration::from_secs(1),
2869 certification_timeout: Duration::from_secs(2),
2870 timeout_retry: Duration::from_secs(10),
2871 fetch_timeout: Duration::from_secs(1),
2872 activity_timeout,
2873 skip_timeout,
2874 fetch_concurrent: 4,
2875 replay_buffer: NZUsize!(1024 * 1024),
2876 write_buffer: NZUsize!(1024 * 1024),
2877 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2878 forwarding: ForwardingPolicy::Disabled,
2879 };
2880 let engine = Engine::new(context.with_label("engine"), cfg);
2881 let (pending, recovered, resolver) = registrations
2882 .remove(validator)
2883 .expect("validator should be registered");
2884 engine.start(pending, recovered, resolver);
2885 }
2886
2887 let mut finalizers = Vec::new();
2891 for reporter in reporters.iter_mut().skip(1) {
2892 let (mut latest, mut monitor) = reporter.subscribe().await;
2893 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2894 while latest < required_containers {
2895 latest = monitor.recv().await.expect("event missing");
2896 }
2897 }));
2898 }
2899 join_all(finalizers).await;
2900
2901 for (i, reporter) in reporters.iter().enumerate() {
2903 reporter.assert_no_faults();
2905
2906 assert!(*reporter.invalid_votes.lock() > 0);
2909
2910 let is_byzantine = i == 0;
2913 if is_byzantine {
2914 assert!(*reporter.invalid_certificates.lock() > 0);
2915 } else {
2916 assert_eq!(*reporter.invalid_certificates.lock(), 0);
2917 }
2918 }
2919
2920 let blocked = oracle.blocked().await.unwrap();
2923 assert!(blocked.len() >= participants.len() - 1);
2924 let byz = &participants[0];
2925 for (_, b) in blocked {
2926 assert_eq!(&b, byz);
2928 }
2929 });
2930 }
2931
2932 #[test_group("slow")]
2933 #[test_traced]
2934 fn test_invalid() {
2935 for seed in 0..5 {
2936 invalid::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
2937 invalid::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
2938 invalid::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
2939 invalid::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
2940 invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
2941 invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
2942 invalid::<_, _, RoundRobin>(seed, ed25519::fixture);
2943 invalid::<_, _, RoundRobin>(seed, secp256r1::fixture);
2944 }
2945 }
2946
2947 fn received_certificates_are_reported<S, F, L>(seed: u64, mut fixture: F)
2948 where
2949 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2950 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2951 L: Elector<S>,
2952 {
2953 let n = 4;
2954 let required_containers = View::new(10);
2955 let activity_timeout = ViewDelta::new(10);
2956 let skip_timeout = ViewDelta::new(5);
2957 let namespace = b"consensus".to_vec();
2958 let cfg = deterministic::Config::new()
2959 .with_seed(seed)
2960 .with_timeout(Some(Duration::from_secs(30)));
2961 let executor = deterministic::Runner::new(cfg);
2962 executor.start(|mut context| async move {
2963 let Fixture {
2964 participants,
2965 schemes,
2966 ..
2967 } = fixture(&mut context, &namespace, n);
2968
2969 let mut oracle =
2970 start_test_network_with_peers(context.clone(), participants.clone(), false).await;
2971 let mut registrations = register_validators(&mut oracle, &participants).await;
2972
2973 let link = Link {
2978 latency: Duration::from_millis(100),
2979 jitter: Duration::from_millis(1),
2980 success_rate: 1.0,
2981 };
2982 fn link_graph(_: usize, i: usize, j: usize) -> bool {
2983 if i == 0 || j == 0 {
2984 return i == 1 || j == 1;
2985 }
2986 true
2987 }
2988 link_validators(
2989 &mut oracle,
2990 &participants,
2991 Action::Link(link),
2992 Some(link_graph),
2993 )
2994 .await;
2995
2996 let elector = L::default();
2997 let relay = Arc::new(mocks::relay::Relay::new());
2998 let mut reporters = Vec::new();
2999 for (idx_scheme, validator) in participants.iter().enumerate() {
3000 let context = context.with_label(&format!("validator_{}", *validator));
3001 let reporter_config = mocks::reporter::Config {
3002 participants: participants.clone().try_into().unwrap(),
3003 scheme: schemes[idx_scheme].clone(),
3004 elector: elector.clone(),
3005 };
3006 let reporter =
3007 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3008 reporters.push(reporter.clone());
3009
3010 let application_cfg = mocks::application::Config {
3011 hasher: Sha256::default(),
3012 relay: relay.clone(),
3013 me: validator.clone(),
3014 propose_latency: (10.0, 5.0),
3015 verify_latency: (10.0, 5.0),
3016 certify_latency: (10.0, 5.0),
3017 should_certify: mocks::application::Certifier::Sometimes,
3018 };
3019 let (actor, application) = mocks::application::Application::new(
3020 context.with_label("application"),
3021 application_cfg,
3022 );
3023 actor.start();
3024 let blocker = oracle.control(validator.clone());
3025 let cfg = config::Config {
3026 scheme: schemes[idx_scheme].clone(),
3027 elector: elector.clone(),
3028 blocker,
3029 automaton: application.clone(),
3030 relay: application.clone(),
3031 reporter: reporter.clone(),
3032 strategy: Sequential,
3033 partition: validator.clone().to_string(),
3034 mailbox_size: 1024,
3035 epoch: Epoch::new(333),
3036 leader_timeout: Duration::from_secs(1),
3037 certification_timeout: Duration::from_secs(2),
3038 timeout_retry: Duration::from_secs(10),
3039 fetch_timeout: Duration::from_secs(1),
3040 activity_timeout,
3041 skip_timeout,
3042 fetch_concurrent: 4,
3043 replay_buffer: NZUsize!(1024 * 1024),
3044 write_buffer: NZUsize!(1024 * 1024),
3045 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3046 forwarding: ForwardingPolicy::Disabled,
3047 };
3048 let engine = Engine::new(context.with_label("engine"), cfg);
3049 let (pending, recovered, resolver) = registrations
3050 .remove(validator)
3051 .expect("validator should be registered");
3052 engine.start(pending, recovered, resolver);
3053 }
3054 let excluded_reporter = reporters[0].clone();
3056 let mut honest_reporter = reporters[1].clone();
3057 let (mut honest_latest, mut honest_monitor) = honest_reporter.subscribe().await;
3058 while honest_latest < required_containers {
3059 honest_latest = honest_monitor.recv().await.expect("event missing");
3060 }
3061
3062 context.sleep(Duration::from_secs(1)).await;
3064
3065 let honest_notarized = {
3068 let notarizations = honest_reporter.notarizations.lock();
3069 View::range(View::new(1), required_containers.next())
3070 .filter(|view| notarizations.contains_key(view))
3071 .count()
3072 };
3073 let excluded_notarized = {
3074 let notarizations = excluded_reporter.notarizations.lock();
3075 View::range(View::new(1), required_containers.next())
3076 .filter(|view| notarizations.contains_key(view))
3077 .count()
3078 };
3079 assert!(
3080 excluded_notarized >= honest_notarized.saturating_sub(2),
3081 "honest_notarized: {honest_notarized}, excluded_notarized: {excluded_notarized}"
3082 );
3083
3084 let honest_finalized = {
3085 let finalizations = honest_reporter.finalizations.lock();
3086 View::range(View::new(1), required_containers.next())
3087 .filter(|view| finalizations.contains_key(view))
3088 .count()
3089 };
3090 let excluded_finalized = {
3091 let finalizations = excluded_reporter.finalizations.lock();
3092 View::range(View::new(1), required_containers.next())
3093 .filter(|view| finalizations.contains_key(view))
3094 .count()
3095 };
3096 assert!(
3097 excluded_finalized >= honest_finalized.saturating_sub(2),
3098 "honest_finalized: {honest_finalized}, excluded_finalized: {excluded_finalized}"
3099 );
3100 });
3101 }
3102
3103 #[test_group("slow")]
3105 #[test_traced]
3106 fn test_received_certificates_are_reported() {
3107 received_certificates_are_reported::<_, _, Random>(
3108 0,
3109 bls12381_threshold_vrf::fixture::<MinPk, _>,
3110 );
3111 received_certificates_are_reported::<_, _, Random>(
3112 0,
3113 bls12381_threshold_vrf::fixture::<MinSig, _>,
3114 );
3115 received_certificates_are_reported::<_, _, RoundRobin>(
3116 0,
3117 bls12381_threshold_std::fixture::<MinPk, _>,
3118 );
3119 received_certificates_are_reported::<_, _, RoundRobin>(
3120 0,
3121 bls12381_threshold_std::fixture::<MinSig, _>,
3122 );
3123 received_certificates_are_reported::<_, _, RoundRobin>(
3124 0,
3125 bls12381_multisig::fixture::<MinPk, _>,
3126 );
3127 received_certificates_are_reported::<_, _, RoundRobin>(
3128 0,
3129 bls12381_multisig::fixture::<MinSig, _>,
3130 );
3131 received_certificates_are_reported::<_, _, RoundRobin>(0, ed25519::fixture);
3132 received_certificates_are_reported::<_, _, RoundRobin>(0, secp256r1::fixture);
3133 }
3134
3135 fn impersonator<S, F, L>(seed: u64, mut fixture: F)
3136 where
3137 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3138 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3139 L: Elector<S>,
3140 {
3141 let n = 4;
3143 let required_containers = View::new(50);
3144 let activity_timeout = ViewDelta::new(10);
3145 let skip_timeout = ViewDelta::new(5);
3146 let namespace = b"consensus".to_vec();
3147 let cfg = deterministic::Config::new()
3148 .with_seed(seed)
3149 .with_timeout(Some(Duration::from_secs(30)));
3150 let executor = deterministic::Runner::new(cfg);
3151 executor.start(|mut context| async move {
3152 let Fixture {
3154 participants,
3155 schemes,
3156 ..
3157 } = fixture(&mut context, &namespace, n);
3158 let mut oracle =
3159 start_test_network_with_peers(context.clone(), participants.clone(), true).await;
3160 let mut registrations = register_validators(&mut oracle, &participants).await;
3161
3162 let link = Link {
3164 latency: Duration::from_millis(10),
3165 jitter: Duration::from_millis(1),
3166 success_rate: 1.0,
3167 };
3168 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3169
3170 let elector = L::default();
3172 let relay = Arc::new(mocks::relay::Relay::new());
3173 let mut reporters = Vec::new();
3174 for (idx_scheme, validator) in participants.iter().enumerate() {
3175 let context = context.with_label(&format!("validator_{}", *validator));
3177
3178 let reporter_config = mocks::reporter::Config {
3180 participants: participants.clone().try_into().unwrap(),
3181 scheme: schemes[idx_scheme].clone(),
3182 elector: elector.clone(),
3183 };
3184 let reporter =
3185 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3186 let (pending, recovered, resolver) = registrations
3187 .remove(validator)
3188 .expect("validator should be registered");
3189 if idx_scheme == 0 {
3190 let cfg = mocks::impersonator::Config {
3191 scheme: schemes[idx_scheme].clone(),
3192 };
3193
3194 let engine: mocks::impersonator::Impersonator<_, _, Sha256> =
3195 mocks::impersonator::Impersonator::new(
3196 context.with_label("byzantine_engine"),
3197 cfg,
3198 );
3199 engine.start(pending);
3200 } else {
3201 reporters.push(reporter.clone());
3202 let application_cfg = mocks::application::Config {
3203 hasher: Sha256::default(),
3204 relay: relay.clone(),
3205 me: validator.clone(),
3206 propose_latency: (10.0, 5.0),
3207 verify_latency: (10.0, 5.0),
3208 certify_latency: (10.0, 5.0),
3209 should_certify: mocks::application::Certifier::Sometimes,
3210 };
3211 let (actor, application) = mocks::application::Application::new(
3212 context.with_label("application"),
3213 application_cfg,
3214 );
3215 actor.start();
3216 let blocker = oracle.control(validator.clone());
3217 let cfg = config::Config {
3218 scheme: schemes[idx_scheme].clone(),
3219 elector: elector.clone(),
3220 blocker,
3221 automaton: application.clone(),
3222 relay: application.clone(),
3223 reporter: reporter.clone(),
3224 strategy: Sequential,
3225 partition: validator.clone().to_string(),
3226 mailbox_size: 1024,
3227 epoch: Epoch::new(333),
3228 leader_timeout: Duration::from_secs(1),
3229 certification_timeout: Duration::from_secs(2),
3230 timeout_retry: Duration::from_secs(10),
3231 fetch_timeout: Duration::from_secs(1),
3232 activity_timeout,
3233 skip_timeout,
3234 fetch_concurrent: 4,
3235 replay_buffer: NZUsize!(1024 * 1024),
3236 write_buffer: NZUsize!(1024 * 1024),
3237 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3238 forwarding: ForwardingPolicy::Disabled,
3239 };
3240 let engine = Engine::new(context.with_label("engine"), cfg);
3241 engine.start(pending, recovered, resolver);
3242 }
3243 }
3244
3245 let mut finalizers = Vec::new();
3247 for reporter in reporters.iter_mut() {
3248 let (mut latest, mut monitor) = reporter.subscribe().await;
3249 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3250 while latest < required_containers {
3251 latest = monitor.recv().await.expect("event missing");
3252 }
3253 }));
3254 }
3255 join_all(finalizers).await;
3256
3257 let byz = &participants[0];
3259 for reporter in reporters.iter() {
3260 reporter.assert_no_faults();
3262
3263 reporter.assert_no_invalid();
3265 }
3266
3267 let blocked = oracle.blocked().await.unwrap();
3269 assert!(!blocked.is_empty());
3270 for (a, b) in blocked {
3271 assert_ne!(&a, byz);
3272 assert_eq!(&b, byz);
3273 }
3274 });
3275 }
3276
3277 #[test_group("slow")]
3278 #[test_traced]
3279 fn test_impersonator() {
3280 for seed in 0..5 {
3281 impersonator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3282 impersonator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3283 impersonator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3284 impersonator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3285 impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3286 impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3287 impersonator::<_, _, RoundRobin>(seed, ed25519::fixture);
3288 impersonator::<_, _, RoundRobin>(seed, secp256r1::fixture);
3289 }
3290 }
3291
3292 fn equivocator<S, F, L>(seed: u64, mut fixture: F) -> bool
3293 where
3294 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3295 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3296 L: Elector<S>,
3297 {
3298 let n = 7;
3300 let required_containers = View::new(50);
3301 let activity_timeout = ViewDelta::new(10);
3302 let skip_timeout = ViewDelta::new(5);
3303 let namespace = b"consensus".to_vec();
3304 let cfg = deterministic::Config::new()
3305 .with_seed(seed)
3306 .with_timeout(Some(Duration::from_secs(60)));
3307 let executor = deterministic::Runner::new(cfg);
3308 executor.start(|mut context| async move {
3309 let Fixture {
3311 participants,
3312 schemes,
3313 ..
3314 } = fixture(&mut context, &namespace, n);
3315 let mut oracle =
3316 start_test_network_with_peers(context.clone(), participants.clone(), true).await;
3317 let mut registrations = register_validators(&mut oracle, &participants).await;
3318
3319 let link = Link {
3321 latency: Duration::from_millis(10),
3322 jitter: Duration::from_millis(1),
3323 success_rate: 1.0,
3324 };
3325 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3326
3327 let elector = L::default();
3329 let mut engines = Vec::new();
3330 let relay = Arc::new(mocks::relay::Relay::new());
3331 let mut reporters = Vec::new();
3332 for (idx_scheme, validator) in participants.iter().enumerate() {
3333 let context = context.with_label(&format!("validator_{}", *validator));
3335
3336 let reporter_config = mocks::reporter::Config {
3338 participants: participants.clone().try_into().unwrap(),
3339 scheme: schemes[idx_scheme].clone(),
3340 elector: elector.clone(),
3341 };
3342 let reporter =
3343 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3344 reporters.push(reporter.clone());
3345 let (pending, recovered, resolver) = registrations
3346 .remove(validator)
3347 .expect("validator should be registered");
3348 if idx_scheme == 0 {
3349 let cfg = mocks::equivocator::Config {
3350 scheme: schemes[idx_scheme].clone(),
3351 epoch: Epoch::new(333),
3352 relay: relay.clone(),
3353 hasher: Sha256::default(),
3354 elector: elector.clone(),
3355 };
3356
3357 let engine = mocks::equivocator::Equivocator::new(
3358 context.with_label("byzantine_engine"),
3359 cfg,
3360 );
3361 engines.push(engine.start(pending, recovered));
3362 } else {
3363 let application_cfg = mocks::application::Config {
3364 hasher: Sha256::default(),
3365 relay: relay.clone(),
3366 me: validator.clone(),
3367 propose_latency: (10.0, 5.0),
3368 verify_latency: (10.0, 5.0),
3369 certify_latency: (10.0, 5.0),
3370 should_certify: mocks::application::Certifier::Sometimes,
3371 };
3372 let (actor, application) = mocks::application::Application::new(
3373 context.with_label("application"),
3374 application_cfg,
3375 );
3376 actor.start();
3377 let blocker = oracle.control(validator.clone());
3378 let cfg = config::Config {
3379 scheme: schemes[idx_scheme].clone(),
3380 elector: elector.clone(),
3381 blocker,
3382 automaton: application.clone(),
3383 relay: application.clone(),
3384 reporter: reporter.clone(),
3385 strategy: Sequential,
3386 partition: validator.to_string(),
3387 mailbox_size: 1024,
3388 epoch: Epoch::new(333),
3389 leader_timeout: Duration::from_secs(1),
3390 certification_timeout: Duration::from_secs(2),
3391 timeout_retry: Duration::from_secs(10),
3392 fetch_timeout: Duration::from_secs(1),
3393 activity_timeout,
3394 skip_timeout,
3395 fetch_concurrent: 4,
3396 replay_buffer: NZUsize!(1024 * 1024),
3397 write_buffer: NZUsize!(1024 * 1024),
3398 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3399 forwarding: ForwardingPolicy::Disabled,
3400 };
3401 let engine = Engine::new(context.with_label("engine"), cfg);
3402 engines.push(engine.start(pending, recovered, resolver));
3403 }
3404 }
3405
3406 let mut finalizers = Vec::new();
3408 for reporter in reporters.iter_mut().skip(1) {
3409 let (mut latest, mut monitor) = reporter.subscribe().await;
3410 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3411 while latest < required_containers {
3412 latest = monitor.recv().await.expect("event missing");
3413 }
3414 }));
3415 }
3416 join_all(finalizers).await;
3417
3418 let idx = context.gen_range(1..engines.len()); let validator = &participants[idx];
3421 let handle = engines.remove(idx);
3422 handle.abort();
3423 let _ = handle.await;
3424 reporters.remove(idx);
3425 info!(idx, ?validator, "aborted validator");
3426
3427 let mut finalizers = Vec::new();
3429 for reporter in reporters.iter_mut().skip(1) {
3430 let (mut latest, mut monitor) = reporter.subscribe().await;
3431 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3432 while latest < View::new(required_containers.get() * 2) {
3433 latest = monitor.recv().await.expect("event missing");
3434 }
3435 }));
3436 }
3437 join_all(finalizers).await;
3438
3439 info!(idx, ?validator, "restarting validator");
3441 let context = context.with_label(&format!("validator_{}_restarted", *validator));
3442
3443 let reporter_config = mocks::reporter::Config {
3445 participants: participants.clone().try_into().unwrap(),
3446 scheme: schemes[idx].clone(),
3447 elector: elector.clone(),
3448 };
3449 let reporter =
3450 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3451 let (pending, recovered, resolver) =
3452 register_validator(&mut oracle, validator.clone()).await;
3453 reporters.push(reporter.clone());
3454 let application_cfg = mocks::application::Config {
3455 hasher: Sha256::default(),
3456 relay: relay.clone(),
3457 me: validator.clone(),
3458 propose_latency: (10.0, 5.0),
3459 verify_latency: (10.0, 5.0),
3460 certify_latency: (10.0, 5.0),
3461 should_certify: mocks::application::Certifier::Sometimes,
3462 };
3463 let (actor, application) = mocks::application::Application::new(
3464 context.with_label("application"),
3465 application_cfg,
3466 );
3467 actor.start();
3468 let blocker = oracle.control(validator.clone());
3469 let cfg = config::Config {
3470 scheme: schemes[idx].clone(),
3471 elector: elector.clone(),
3472 blocker,
3473 automaton: application.clone(),
3474 relay: application.clone(),
3475 reporter: reporter.clone(),
3476 strategy: Sequential,
3477 partition: validator.to_string(),
3478 mailbox_size: 1024,
3479 epoch: Epoch::new(333),
3480 leader_timeout: Duration::from_secs(1),
3481 certification_timeout: Duration::from_secs(2),
3482 timeout_retry: Duration::from_secs(10),
3483 fetch_timeout: Duration::from_secs(1),
3484 activity_timeout,
3485 skip_timeout,
3486 fetch_concurrent: 4,
3487 replay_buffer: NZUsize!(1024 * 1024),
3488 write_buffer: NZUsize!(1024 * 1024),
3489 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3490 forwarding: ForwardingPolicy::Disabled,
3491 };
3492 let engine = Engine::new(context.with_label("engine"), cfg);
3493 engine.start(pending, recovered, resolver);
3494
3495 let mut finalizers = Vec::new();
3497 for reporter in reporters.iter_mut().skip(1) {
3498 let (mut latest, mut monitor) = reporter.subscribe().await;
3499 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3500 while latest < View::new(required_containers.get() * 3) {
3501 latest = monitor.recv().await.expect("event missing");
3502 }
3503 }));
3504 }
3505 join_all(finalizers).await;
3506
3507 let byz = &participants[0];
3511 let blocked = oracle.blocked().await.unwrap();
3512 for (a, b) in &blocked {
3513 assert_ne!(a, byz);
3514 assert_eq!(b, byz);
3515 }
3516 !blocked.is_empty()
3517 })
3518 }
3519
3520 #[test_group("slow")]
3521 #[test_traced]
3522 fn test_equivocator_bls12381_threshold_vrf_min_pk() {
3523 let detected = (0..5).any(|seed| {
3524 equivocator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>)
3525 });
3526 assert!(
3527 detected,
3528 "expected at least one seed to detect equivocation"
3529 );
3530 }
3531
3532 #[test_group("slow")]
3533 #[test_traced]
3534 fn test_equivocator_bls12381_threshold_vrf_min_sig() {
3535 let detected = (0..5).any(|seed| {
3536 equivocator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>)
3537 });
3538 assert!(
3539 detected,
3540 "expected at least one seed to detect equivocation"
3541 );
3542 }
3543
3544 #[test_group("slow")]
3545 #[test_traced]
3546 fn test_equivocator_bls12381_threshold_std_min_pk() {
3547 let detected = (0..5).any(|seed| {
3548 equivocator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>)
3549 });
3550 assert!(
3551 detected,
3552 "expected at least one seed to detect equivocation"
3553 );
3554 }
3555
3556 #[test_group("slow")]
3557 #[test_traced]
3558 fn test_equivocator_bls12381_threshold_std_min_sig() {
3559 let detected = (0..5).any(|seed| {
3560 equivocator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>)
3561 });
3562 assert!(
3563 detected,
3564 "expected at least one seed to detect equivocation"
3565 );
3566 }
3567
3568 #[test_group("slow")]
3569 #[test_traced]
3570 fn test_equivocator_bls12381_multisig_min_pk() {
3571 let detected = (0..5).any(|seed| {
3572 equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>)
3573 });
3574 assert!(
3575 detected,
3576 "expected at least one seed to detect equivocation"
3577 );
3578 }
3579
3580 #[test_group("slow")]
3581 #[test_traced]
3582 fn test_equivocator_bls12381_multisig_min_sig() {
3583 let detected = (0..5).any(|seed| {
3584 equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>)
3585 });
3586 assert!(
3587 detected,
3588 "expected at least one seed to detect equivocation"
3589 );
3590 }
3591
3592 #[test_group("slow")]
3593 #[test_traced]
3594 fn test_equivocator_ed25519() {
3595 let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, ed25519::fixture));
3596 assert!(
3597 detected,
3598 "expected at least one seed to detect equivocation"
3599 );
3600 }
3601
3602 #[test_group("slow")]
3603 #[test_traced]
3604 fn test_equivocator_secp256r1() {
3605 let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, secp256r1::fixture));
3606 assert!(
3607 detected,
3608 "expected at least one seed to detect equivocation"
3609 );
3610 }
3611
3612 fn reconfigurer<S, F, L>(seed: u64, mut fixture: F)
3613 where
3614 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3615 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3616 L: Elector<S>,
3617 {
3618 let n = 4;
3620 let required_containers = View::new(50);
3621 let activity_timeout = ViewDelta::new(10);
3622 let skip_timeout = ViewDelta::new(5);
3623 let namespace = b"consensus".to_vec();
3624 let cfg = deterministic::Config::new()
3625 .with_seed(seed)
3626 .with_timeout(Some(Duration::from_secs(30)));
3627 let executor = deterministic::Runner::new(cfg);
3628 executor.start(|mut context| async move {
3629 let Fixture {
3631 participants,
3632 schemes,
3633 ..
3634 } = fixture(&mut context, &namespace, n);
3635 let mut oracle =
3636 start_test_network_with_peers(context.clone(), participants.clone(), true).await;
3637 let mut registrations = register_validators(&mut oracle, &participants).await;
3638
3639 let link = Link {
3641 latency: Duration::from_millis(10),
3642 jitter: Duration::from_millis(1),
3643 success_rate: 1.0,
3644 };
3645 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3646
3647 let elector = L::default();
3649 let relay = Arc::new(mocks::relay::Relay::new());
3650 let mut reporters = Vec::new();
3651 for (idx_scheme, validator) in participants.iter().enumerate() {
3652 let context = context.with_label(&format!("validator_{}", *validator));
3654
3655 let reporter_config = mocks::reporter::Config {
3657 participants: participants.clone().try_into().unwrap(),
3658 scheme: schemes[idx_scheme].clone(),
3659 elector: elector.clone(),
3660 };
3661 let reporter =
3662 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3663 let (pending, recovered, resolver) = registrations
3664 .remove(validator)
3665 .expect("validator should be registered");
3666 if idx_scheme == 0 {
3667 let cfg = mocks::reconfigurer::Config {
3668 scheme: schemes[idx_scheme].clone(),
3669 };
3670 let engine: mocks::reconfigurer::Reconfigurer<_, _, Sha256> =
3671 mocks::reconfigurer::Reconfigurer::new(
3672 context.with_label("byzantine_engine"),
3673 cfg,
3674 );
3675 engine.start(pending);
3676 } else {
3677 reporters.push(reporter.clone());
3678 let application_cfg = mocks::application::Config {
3679 hasher: Sha256::default(),
3680 relay: relay.clone(),
3681 me: validator.clone(),
3682 propose_latency: (10.0, 5.0),
3683 verify_latency: (10.0, 5.0),
3684 certify_latency: (10.0, 5.0),
3685 should_certify: mocks::application::Certifier::Sometimes,
3686 };
3687 let (actor, application) = mocks::application::Application::new(
3688 context.with_label("application"),
3689 application_cfg,
3690 );
3691 actor.start();
3692 let blocker = oracle.control(validator.clone());
3693 let cfg = config::Config {
3694 scheme: schemes[idx_scheme].clone(),
3695 elector: elector.clone(),
3696 blocker,
3697 automaton: application.clone(),
3698 relay: application.clone(),
3699 reporter: reporter.clone(),
3700 strategy: Sequential,
3701 partition: validator.to_string(),
3702 mailbox_size: 1024,
3703 epoch: Epoch::new(333),
3704 leader_timeout: Duration::from_secs(1),
3705 certification_timeout: Duration::from_secs(2),
3706 timeout_retry: Duration::from_secs(10),
3707 fetch_timeout: Duration::from_secs(1),
3708 activity_timeout,
3709 skip_timeout,
3710 fetch_concurrent: 4,
3711 replay_buffer: NZUsize!(1024 * 1024),
3712 write_buffer: NZUsize!(1024 * 1024),
3713 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3714 forwarding: ForwardingPolicy::Disabled,
3715 };
3716 let engine = Engine::new(context.with_label("engine"), cfg);
3717 engine.start(pending, recovered, resolver);
3718 }
3719 }
3720
3721 let mut finalizers = Vec::new();
3723 for reporter in reporters.iter_mut() {
3724 let (mut latest, mut monitor) = reporter.subscribe().await;
3725 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3726 while latest < required_containers {
3727 latest = monitor.recv().await.expect("event missing");
3728 }
3729 }));
3730 }
3731 join_all(finalizers).await;
3732
3733 let byz = &participants[0];
3735 for reporter in reporters.iter() {
3736 reporter.assert_no_faults();
3738
3739 reporter.assert_no_invalid();
3741 }
3742
3743 let blocked = oracle.blocked().await.unwrap();
3745 assert!(!blocked.is_empty());
3746 for (a, b) in blocked {
3747 assert_ne!(&a, byz);
3748 assert_eq!(&b, byz);
3749 }
3750 });
3751 }
3752
3753 #[test_group("slow")]
3754 #[test_traced]
3755 fn test_reconfigurer() {
3756 for seed in 0..5 {
3757 reconfigurer::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3758 reconfigurer::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3759 reconfigurer::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3760 reconfigurer::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3761 reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3762 reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3763 reconfigurer::<_, _, RoundRobin>(seed, ed25519::fixture);
3764 reconfigurer::<_, _, RoundRobin>(seed, secp256r1::fixture);
3765 }
3766 }
3767
3768 fn nuller<S, F, L>(seed: u64, mut fixture: F)
3769 where
3770 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3771 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3772 L: Elector<S>,
3773 {
3774 let n = 4;
3776 let required_containers = View::new(50);
3777 let activity_timeout = ViewDelta::new(10);
3778 let skip_timeout = ViewDelta::new(5);
3779 let namespace = b"consensus".to_vec();
3780 let cfg = deterministic::Config::new()
3781 .with_seed(seed)
3782 .with_timeout(Some(Duration::from_secs(30)));
3783 let executor = deterministic::Runner::new(cfg);
3784 executor.start(|mut context| async move {
3785 let Fixture {
3787 participants,
3788 schemes,
3789 ..
3790 } = fixture(&mut context, &namespace, n);
3791 let mut oracle =
3792 start_test_network_with_peers(context.clone(), participants.clone(), true).await;
3793 let mut registrations = register_validators(&mut oracle, &participants).await;
3794
3795 let link = Link {
3797 latency: Duration::from_millis(10),
3798 jitter: Duration::from_millis(1),
3799 success_rate: 1.0,
3800 };
3801 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3802
3803 let elector = L::default();
3805 let relay = Arc::new(mocks::relay::Relay::new());
3806 let mut reporters = Vec::new();
3807 for (idx_scheme, validator) in participants.iter().enumerate() {
3808 let context = context.with_label(&format!("validator_{}", *validator));
3810
3811 let reporter_config = mocks::reporter::Config {
3813 participants: participants.clone().try_into().unwrap(),
3814 scheme: schemes[idx_scheme].clone(),
3815 elector: elector.clone(),
3816 };
3817 let reporter =
3818 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3819 let (pending, recovered, resolver) = registrations
3820 .remove(validator)
3821 .expect("validator should be registered");
3822 if idx_scheme == 0 {
3823 let cfg = mocks::nuller::Config {
3824 scheme: schemes[idx_scheme].clone(),
3825 };
3826 let engine: mocks::nuller::Nuller<_, _, Sha256> =
3827 mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
3828 engine.start(pending);
3829 } else {
3830 reporters.push(reporter.clone());
3831 let application_cfg = mocks::application::Config {
3832 hasher: Sha256::default(),
3833 relay: relay.clone(),
3834 me: validator.clone(),
3835 propose_latency: (10.0, 5.0),
3836 verify_latency: (10.0, 5.0),
3837 certify_latency: (10.0, 5.0),
3838 should_certify: mocks::application::Certifier::Sometimes,
3839 };
3840 let (actor, application) = mocks::application::Application::new(
3841 context.with_label("application"),
3842 application_cfg,
3843 );
3844 actor.start();
3845 let blocker = oracle.control(validator.clone());
3846 let cfg = config::Config {
3847 scheme: schemes[idx_scheme].clone(),
3848 elector: elector.clone(),
3849 blocker,
3850 automaton: application.clone(),
3851 relay: application.clone(),
3852 reporter: reporter.clone(),
3853 strategy: Sequential,
3854 partition: validator.clone().to_string(),
3855 mailbox_size: 1024,
3856 epoch: Epoch::new(333),
3857 leader_timeout: Duration::from_secs(1),
3858 certification_timeout: Duration::from_secs(2),
3859 timeout_retry: Duration::from_secs(10),
3860 fetch_timeout: Duration::from_secs(1),
3861 activity_timeout,
3862 skip_timeout,
3863 fetch_concurrent: 4,
3864 replay_buffer: NZUsize!(1024 * 1024),
3865 write_buffer: NZUsize!(1024 * 1024),
3866 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3867 forwarding: ForwardingPolicy::Disabled,
3868 };
3869 let engine = Engine::new(context.with_label("engine"), cfg);
3870 engine.start(pending, recovered, resolver);
3871 }
3872 }
3873
3874 let mut finalizers = Vec::new();
3876 for reporter in reporters.iter_mut() {
3877 let (mut latest, mut monitor) = reporter.subscribe().await;
3878 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3879 while latest < required_containers {
3880 latest = monitor.recv().await.expect("event missing");
3881 }
3882 }));
3883 }
3884 join_all(finalizers).await;
3885
3886 let byz = &participants[0];
3888 let mut count_nullify_and_finalize = 0;
3889 for reporter in reporters.iter() {
3890 {
3892 let faults = reporter.faults.lock();
3893 assert_eq!(faults.len(), 1);
3894 let faulter = faults.get(byz).expect("byzantine party is not faulter");
3895 for (_, faults) in faulter.iter() {
3896 for fault in faults.iter() {
3897 match fault {
3898 Activity::NullifyFinalize(_) => {
3899 count_nullify_and_finalize += 1;
3900 }
3901 _ => panic!("unexpected fault: {fault:?}"),
3902 }
3903 }
3904 }
3905 }
3906
3907 reporter.assert_no_invalid();
3909 }
3910 assert!(count_nullify_and_finalize > 0);
3911
3912 let blocked = oracle.blocked().await.unwrap();
3914 assert!(!blocked.is_empty());
3915 for (a, b) in blocked {
3916 assert_ne!(&a, byz);
3917 assert_eq!(&b, byz);
3918 }
3919 });
3920 }
3921
3922 #[test_group("slow")]
3923 #[test_traced]
3924 fn test_nuller() {
3925 for seed in 0..5 {
3926 nuller::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3927 nuller::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3928 nuller::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3929 nuller::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3930 nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3931 nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3932 nuller::<_, _, RoundRobin>(seed, ed25519::fixture);
3933 nuller::<_, _, RoundRobin>(seed, secp256r1::fixture);
3934 }
3935 }
3936
3937 fn outdated<S, F, L>(seed: u64, mut fixture: F)
3938 where
3939 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3940 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3941 L: Elector<S>,
3942 {
3943 let n = 4;
3945 let required_containers = View::new(100);
3946 let activity_timeout = ViewDelta::new(10);
3947 let skip_timeout = ViewDelta::new(5);
3948 let namespace = b"consensus".to_vec();
3949 let cfg = deterministic::Config::new()
3950 .with_seed(seed)
3951 .with_timeout(Some(Duration::from_secs(30)));
3952 let executor = deterministic::Runner::new(cfg);
3953 executor.start(|mut context| async move {
3954 let Fixture {
3956 participants,
3957 schemes,
3958 ..
3959 } = fixture(&mut context, &namespace, n);
3960 let mut oracle =
3961 start_test_network_with_peers(context.clone(), participants.clone(), true).await;
3962 let mut registrations = register_validators(&mut oracle, &participants).await;
3963
3964 let link = Link {
3966 latency: Duration::from_millis(10),
3967 jitter: Duration::from_millis(1),
3968 success_rate: 1.0,
3969 };
3970 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3971
3972 let elector = L::default();
3974 let relay = Arc::new(mocks::relay::Relay::new());
3975 let mut reporters = Vec::new();
3976 for (idx_scheme, validator) in participants.iter().enumerate() {
3977 let context = context.with_label(&format!("validator_{}", *validator));
3979
3980 let reporter_config = mocks::reporter::Config {
3982 participants: participants.clone().try_into().unwrap(),
3983 scheme: schemes[idx_scheme].clone(),
3984 elector: elector.clone(),
3985 };
3986 let reporter =
3987 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3988 let (pending, recovered, resolver) = registrations
3989 .remove(validator)
3990 .expect("validator should be registered");
3991 if idx_scheme == 0 {
3992 let cfg = mocks::outdated::Config {
3993 scheme: schemes[idx_scheme].clone(),
3994 view_delta: ViewDelta::new(activity_timeout.get().saturating_mul(4)),
3995 };
3996 let engine: mocks::outdated::Outdated<_, _, Sha256> =
3997 mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
3998 engine.start(pending);
3999 } else {
4000 reporters.push(reporter.clone());
4001 let application_cfg = mocks::application::Config {
4002 hasher: Sha256::default(),
4003 relay: relay.clone(),
4004 me: validator.clone(),
4005 propose_latency: (10.0, 5.0),
4006 verify_latency: (10.0, 5.0),
4007 certify_latency: (10.0, 5.0),
4008 should_certify: mocks::application::Certifier::Sometimes,
4009 };
4010 let (actor, application) = mocks::application::Application::new(
4011 context.with_label("application"),
4012 application_cfg,
4013 );
4014 actor.start();
4015 let blocker = oracle.control(validator.clone());
4016 let cfg = config::Config {
4017 scheme: schemes[idx_scheme].clone(),
4018 elector: elector.clone(),
4019 blocker,
4020 automaton: application.clone(),
4021 relay: application.clone(),
4022 reporter: reporter.clone(),
4023 strategy: Sequential,
4024 partition: validator.clone().to_string(),
4025 mailbox_size: 1024,
4026 epoch: Epoch::new(333),
4027 leader_timeout: Duration::from_secs(1),
4028 certification_timeout: Duration::from_secs(2),
4029 timeout_retry: Duration::from_secs(10),
4030 fetch_timeout: Duration::from_secs(1),
4031 activity_timeout,
4032 skip_timeout,
4033 fetch_concurrent: 4,
4034 replay_buffer: NZUsize!(1024 * 1024),
4035 write_buffer: NZUsize!(1024 * 1024),
4036 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4037 forwarding: ForwardingPolicy::Disabled,
4038 };
4039 let engine = Engine::new(context.with_label("engine"), cfg);
4040 engine.start(pending, recovered, resolver);
4041 }
4042 }
4043
4044 let mut finalizers = Vec::new();
4046 for reporter in reporters.iter_mut() {
4047 let (mut latest, mut monitor) = reporter.subscribe().await;
4048 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4049 while latest < required_containers {
4050 latest = monitor.recv().await.expect("event missing");
4051 }
4052 }));
4053 }
4054 join_all(finalizers).await;
4055
4056 for reporter in reporters.iter() {
4058 reporter.assert_no_faults();
4060
4061 reporter.assert_no_invalid();
4063 }
4064
4065 let blocked = oracle.blocked().await.unwrap();
4067 assert!(blocked.is_empty());
4068 });
4069 }
4070
4071 #[test_group("slow")]
4072 #[test_traced]
4073 fn test_outdated() {
4074 for seed in 0..5 {
4075 outdated::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
4076 outdated::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
4077 outdated::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
4078 outdated::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
4079 outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
4080 outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
4081 outdated::<_, _, RoundRobin>(seed, ed25519::fixture);
4082 outdated::<_, _, RoundRobin>(seed, secp256r1::fixture);
4083 }
4084 }
4085
4086 fn run_1k<S, F, L>(mut fixture: F)
4087 where
4088 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4089 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4090 L: Elector<S>,
4091 {
4092 let n = 10;
4094 let required_containers = View::new(1_000);
4095 let activity_timeout = ViewDelta::new(10);
4096 let skip_timeout = ViewDelta::new(5);
4097 let namespace = b"consensus".to_vec();
4098 let cfg = deterministic::Config::new();
4099 let executor = deterministic::Runner::new(cfg);
4100 executor.start(|mut context| async move {
4101 let Fixture {
4103 participants,
4104 schemes,
4105 ..
4106 } = fixture(&mut context, &namespace, n);
4107 let mut oracle =
4108 start_test_network_with_peers(context.clone(), participants.clone(), true).await;
4109 let mut registrations = register_validators(&mut oracle, &participants).await;
4110
4111 let link = Link {
4113 latency: Duration::from_millis(80),
4114 jitter: Duration::from_millis(10),
4115 success_rate: 0.98,
4116 };
4117 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4118
4119 let elector = L::default();
4121 let relay = Arc::new(mocks::relay::Relay::new());
4122 let mut reporters = Vec::new();
4123 let mut engine_handlers = Vec::new();
4124 for (idx, validator) in participants.iter().enumerate() {
4125 let context = context.with_label(&format!("validator_{}", *validator));
4127
4128 let reporter_config = mocks::reporter::Config {
4130 participants: participants.clone().try_into().unwrap(),
4131 scheme: schemes[idx].clone(),
4132 elector: elector.clone(),
4133 };
4134 let reporter =
4135 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
4136 reporters.push(reporter.clone());
4137 let application_cfg = mocks::application::Config {
4138 hasher: Sha256::default(),
4139 relay: relay.clone(),
4140 me: validator.clone(),
4141 propose_latency: (100.0, 50.0),
4142 verify_latency: (50.0, 40.0),
4143 certify_latency: (50.0, 40.0),
4144 should_certify: mocks::application::Certifier::Sometimes,
4145 };
4146 let (actor, application) = mocks::application::Application::new(
4147 context.with_label("application"),
4148 application_cfg,
4149 );
4150 actor.start();
4151 let blocker = oracle.control(validator.clone());
4152 let cfg = config::Config {
4153 scheme: schemes[idx].clone(),
4154 elector: elector.clone(),
4155 blocker,
4156 automaton: application.clone(),
4157 relay: application.clone(),
4158 reporter: reporter.clone(),
4159 strategy: Sequential,
4160 partition: validator.to_string(),
4161 mailbox_size: 1024,
4162 epoch: Epoch::new(333),
4163 leader_timeout: Duration::from_secs(1),
4164 certification_timeout: Duration::from_secs(2),
4165 timeout_retry: Duration::from_secs(10),
4166 fetch_timeout: Duration::from_secs(1),
4167 activity_timeout,
4168 skip_timeout,
4169 fetch_concurrent: 4,
4170 replay_buffer: NZUsize!(1024 * 1024),
4171 write_buffer: NZUsize!(1024 * 1024),
4172 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4173 forwarding: ForwardingPolicy::Disabled,
4174 };
4175 let engine = Engine::new(context.with_label("engine"), cfg);
4176
4177 let (pending, recovered, resolver) = registrations
4179 .remove(validator)
4180 .expect("validator should be registered");
4181 engine_handlers.push(engine.start(pending, recovered, resolver));
4182 }
4183
4184 let mut finalizers = Vec::new();
4186 for reporter in reporters.iter_mut() {
4187 let (mut latest, mut monitor) = reporter.subscribe().await;
4188 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4189 while latest < required_containers {
4190 latest = monitor.recv().await.expect("event missing");
4191 }
4192 }));
4193 }
4194 join_all(finalizers).await;
4195
4196 for reporter in reporters.iter() {
4198 reporter.assert_no_faults();
4200
4201 reporter.assert_no_invalid();
4203 }
4204
4205 let blocked = oracle.blocked().await.unwrap();
4207 assert!(blocked.is_empty());
4208 })
4209 }
4210
4211 #[test_group("slow")]
4212 #[test_traced]
4213 fn test_1k_bls12381_threshold_vrf_min_pk() {
4214 run_1k::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
4215 }
4216
4217 #[test_group("slow")]
4218 #[test_traced]
4219 fn test_1k_bls12381_threshold_vrf_min_sig() {
4220 run_1k::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
4221 }
4222
4223 #[test_group("slow")]
4224 #[test_traced]
4225 fn test_1k_bls12381_threshold_std_min_pk() {
4226 run_1k::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
4227 }
4228
4229 #[test_group("slow")]
4230 #[test_traced]
4231 fn test_1k_bls12381_threshold_std_min_sig() {
4232 run_1k::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
4233 }
4234
4235 #[test_group("slow")]
4236 #[test_traced]
4237 fn test_1k_bls12381_multisig_min_pk() {
4238 run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4239 }
4240
4241 #[test_group("slow")]
4242 #[test_traced]
4243 fn test_1k_bls12381_multisig_min_sig() {
4244 run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
4245 }
4246
4247 #[test_group("slow")]
4248 #[test_traced]
4249 fn test_1k_ed25519() {
4250 run_1k::<_, _, RoundRobin>(ed25519::fixture);
4251 }
4252
4253 #[test_group("slow")]
4254 #[test_traced]
4255 fn test_1k_secp256r1() {
4256 run_1k::<_, _, RoundRobin>(secp256r1::fixture);
4257 }
4258
4259 fn engine_shutdown<S, F, L>(seed: u64, mut fixture: F, graceful: bool)
4260 where
4261 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4262 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4263 L: Elector<S>,
4264 {
4265 let n = 1;
4266 let namespace = b"consensus".to_vec();
4267 let cfg = deterministic::Config::default()
4268 .with_seed(seed)
4269 .with_timeout(Some(Duration::from_secs(10)));
4270 let executor = deterministic::Runner::new(cfg);
4271 executor.start(|mut context| async move {
4272 let Fixture {
4274 participants,
4275 schemes,
4276 ..
4277 } = fixture(&mut context, &namespace, n);
4278 let mut oracle =
4279 start_test_network_with_peers(context.clone(), participants.clone(), true).await;
4280 let mut registrations = register_validators(&mut oracle, &participants).await;
4281
4282 let link = Link {
4284 latency: Duration::from_millis(1),
4285 jitter: Duration::from_millis(0),
4286 success_rate: 1.0,
4287 };
4288 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4289
4290 let elector = L::default();
4292 let reporter_config = mocks::reporter::Config {
4293 participants: participants.clone().try_into().unwrap(),
4294 scheme: schemes[0].clone(),
4295 elector: elector.clone(),
4296 };
4297 let reporter =
4298 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
4299 let relay = Arc::new(mocks::relay::Relay::new());
4300 let application_cfg = mocks::application::Config {
4301 hasher: Sha256::default(),
4302 relay: relay.clone(),
4303 me: participants[0].clone(),
4304 propose_latency: (1.0, 0.0),
4305 verify_latency: (1.0, 0.0),
4306 certify_latency: (1.0, 0.0),
4307 should_certify: mocks::application::Certifier::Sometimes,
4308 };
4309 let (actor, application) = mocks::application::Application::new(
4310 context.with_label("application"),
4311 application_cfg,
4312 );
4313 actor.start();
4314 let blocker = oracle.control(participants[0].clone());
4315 let cfg = config::Config {
4316 scheme: schemes[0].clone(),
4317 elector: elector.clone(),
4318 blocker,
4319 automaton: application.clone(),
4320 relay: application.clone(),
4321 reporter: reporter.clone(),
4322 strategy: Sequential,
4323 partition: participants[0].clone().to_string(),
4324 mailbox_size: 64,
4325 epoch: Epoch::new(333),
4326 leader_timeout: Duration::from_millis(50),
4327 certification_timeout: Duration::from_millis(100),
4328 timeout_retry: Duration::from_millis(250),
4329 fetch_timeout: Duration::from_millis(50),
4330 activity_timeout: ViewDelta::new(4),
4331 skip_timeout: ViewDelta::new(2),
4332 fetch_concurrent: 4,
4333 replay_buffer: NZUsize!(1024 * 16),
4334 write_buffer: NZUsize!(1024 * 16),
4335 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4336 forwarding: ForwardingPolicy::Disabled,
4337 };
4338 let engine = Engine::new(context.with_label("engine"), cfg);
4339
4340 let (pending, recovered, resolver) = registrations
4342 .remove(&participants[0])
4343 .expect("validator should be registered");
4344 let handle = engine.start(pending, recovered, resolver);
4345
4346 context.sleep(Duration::from_millis(1000)).await;
4348
4349 let running_before = count_running_tasks(&context, "engine");
4351 assert!(
4352 running_before > 0,
4353 "at least one engine task should be running"
4354 );
4355
4356 context.sleep(Duration::from_millis(1500)).await;
4358 assert!(
4359 count_running_tasks(&context, "engine") > 0,
4360 "engine tasks should still be running"
4361 );
4362
4363 let running_after = if graceful {
4365 let metrics_context = context.clone();
4366 let result = context.stop(0, Some(Duration::from_secs(5))).await;
4367 assert!(
4368 result.is_ok(),
4369 "graceful shutdown should complete: {result:?}"
4370 );
4371 count_running_tasks(&metrics_context, "engine")
4372 } else {
4373 handle.abort();
4374 let _ = handle.await; context.sleep(Duration::from_millis(1000)).await;
4378 count_running_tasks(&context, "engine")
4379 };
4380 assert_eq!(
4381 running_after, 0,
4382 "all engine tasks should be stopped, but {running_after} still running"
4383 );
4384 });
4385 }
4386
4387 #[test_group("slow")]
4388 #[test_traced]
4389 fn test_children_shutdown_on_engine_abort() {
4390 for seed in 0..10 {
4391 engine_shutdown::<_, _, Random>(
4392 seed,
4393 bls12381_threshold_vrf::fixture::<MinPk, _>,
4394 false,
4395 );
4396 engine_shutdown::<_, _, Random>(
4397 seed,
4398 bls12381_threshold_vrf::fixture::<MinSig, _>,
4399 false,
4400 );
4401 engine_shutdown::<_, _, RoundRobin>(
4402 seed,
4403 bls12381_threshold_std::fixture::<MinPk, _>,
4404 false,
4405 );
4406 engine_shutdown::<_, _, RoundRobin>(
4407 seed,
4408 bls12381_threshold_std::fixture::<MinSig, _>,
4409 false,
4410 );
4411 engine_shutdown::<_, _, RoundRobin>(
4412 seed,
4413 bls12381_multisig::fixture::<MinPk, _>,
4414 false,
4415 );
4416 engine_shutdown::<_, _, RoundRobin>(
4417 seed,
4418 bls12381_multisig::fixture::<MinSig, _>,
4419 false,
4420 );
4421 engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, false);
4422 engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, false);
4423 }
4424 }
4425
4426 #[test_group("slow")]
4427 #[test_traced]
4428 fn test_graceful_shutdown() {
4429 for seed in 0..10 {
4430 engine_shutdown::<_, _, Random>(
4431 seed,
4432 bls12381_threshold_vrf::fixture::<MinPk, _>,
4433 true,
4434 );
4435 engine_shutdown::<_, _, Random>(
4436 seed,
4437 bls12381_threshold_vrf::fixture::<MinSig, _>,
4438 true,
4439 );
4440 engine_shutdown::<_, _, RoundRobin>(
4441 seed,
4442 bls12381_threshold_std::fixture::<MinPk, _>,
4443 true,
4444 );
4445 engine_shutdown::<_, _, RoundRobin>(
4446 seed,
4447 bls12381_threshold_std::fixture::<MinSig, _>,
4448 true,
4449 );
4450 engine_shutdown::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>, true);
4451 engine_shutdown::<_, _, RoundRobin>(
4452 seed,
4453 bls12381_multisig::fixture::<MinSig, _>,
4454 true,
4455 );
4456 engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, true);
4457 engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, true);
4458 }
4459 }
4460
4461 fn attributable_reporter_filtering<S, F, L>(mut fixture: F)
4462 where
4463 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4464 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4465 L: Elector<S>,
4466 {
4467 let n = 3;
4468 let required_containers = View::new(10);
4469 let activity_timeout = ViewDelta::new(10);
4470 let skip_timeout = ViewDelta::new(5);
4471 let namespace = b"consensus".to_vec();
4472 let executor = deterministic::Runner::timed(Duration::from_secs(30));
4473 executor.start(|mut context| async move {
4474 let Fixture {
4476 participants,
4477 schemes,
4478 ..
4479 } = fixture(&mut context, &namespace, n);
4480 let mut oracle =
4481 start_test_network_with_peers(context.clone(), participants.clone(), false).await;
4482 let mut registrations = register_validators(&mut oracle, &participants).await;
4483
4484 let link = Link {
4486 latency: Duration::from_millis(10),
4487 jitter: Duration::from_millis(1),
4488 success_rate: 1.0,
4489 };
4490 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4491
4492 let elector = L::default();
4494 let relay = Arc::new(mocks::relay::Relay::new());
4495 let mut reporters = Vec::new();
4496 for (idx, validator) in participants.iter().enumerate() {
4497 let context = context.with_label(&format!("validator_{}", *validator));
4498
4499 let reporter_config = mocks::reporter::Config {
4500 participants: participants.clone().try_into().unwrap(),
4501 scheme: schemes[idx].clone(),
4502 elector: elector.clone(),
4503 };
4504 let mock_reporter = mocks::reporter::Reporter::new(
4505 context.with_label("mock_reporter"),
4506 reporter_config,
4507 );
4508
4509 let attributable_reporter = scheme::reporter::AttributableReporter::new(
4511 context.with_label("rng"),
4512 schemes[idx].clone(),
4513 mock_reporter.clone(),
4514 Sequential,
4515 true, );
4517 reporters.push(mock_reporter.clone());
4518
4519 let application_cfg = mocks::application::Config {
4520 hasher: Sha256::default(),
4521 relay: relay.clone(),
4522 me: validator.clone(),
4523 propose_latency: (10.0, 5.0),
4524 verify_latency: (10.0, 5.0),
4525 certify_latency: (10.0, 5.0),
4526 should_certify: mocks::application::Certifier::Sometimes,
4527 };
4528 let (actor, application) = mocks::application::Application::new(
4529 context.with_label("application"),
4530 application_cfg,
4531 );
4532 actor.start();
4533 let blocker = oracle.control(validator.clone());
4534 let cfg = config::Config {
4535 scheme: schemes[idx].clone(),
4536 elector: elector.clone(),
4537 blocker,
4538 automaton: application.clone(),
4539 relay: application.clone(),
4540 reporter: attributable_reporter,
4541 strategy: Sequential,
4542 partition: validator.to_string(),
4543 mailbox_size: 1024,
4544 epoch: Epoch::new(333),
4545 leader_timeout: Duration::from_secs(1),
4546 certification_timeout: Duration::from_secs(2),
4547 timeout_retry: Duration::from_secs(10),
4548 fetch_timeout: Duration::from_secs(1),
4549 activity_timeout,
4550 skip_timeout,
4551 fetch_concurrent: 4,
4552 replay_buffer: NZUsize!(1024 * 1024),
4553 write_buffer: NZUsize!(1024 * 1024),
4554 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4555 forwarding: ForwardingPolicy::Disabled,
4556 };
4557 let engine = Engine::new(context.with_label("engine"), cfg);
4558
4559 let (pending, recovered, resolver) = registrations
4561 .remove(validator)
4562 .expect("validator should be registered");
4563 engine.start(pending, recovered, resolver);
4564 }
4565
4566 let mut finalizers = Vec::new();
4568 for reporter in reporters.iter_mut() {
4569 let (mut latest, mut monitor) = reporter.subscribe().await;
4570 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4571 while latest < required_containers {
4572 latest = monitor.recv().await.expect("event missing");
4573 }
4574 }));
4575 }
4576 join_all(finalizers).await;
4577
4578 for reporter in reporters.iter() {
4580 reporter.assert_no_faults();
4582
4583 reporter.assert_no_invalid();
4585
4586 {
4588 let notarizations = reporter.notarizations.lock();
4589 let finalizations = reporter.finalizations.lock();
4590 assert!(
4591 !notarizations.is_empty() || !finalizations.is_empty(),
4592 "Certificates should be reported"
4593 );
4594 }
4595
4596 let notarizes = reporter.notarizes.lock();
4598 let last_view = notarizes.keys().max().cloned().unwrap_or_default();
4599 for (view, payloads) in notarizes.iter() {
4600 if *view == last_view {
4601 continue; }
4603
4604 let signers: usize = payloads.values().map(|signers| signers.len()).sum();
4605
4606 if S::is_attributable() {
4608 assert!(signers > 1, "view {view}: {signers}");
4609 } else {
4610 assert_eq!(signers, 0);
4612 }
4613 }
4614
4615 let finalizes = reporter.finalizes.lock();
4617 for (_, payloads) in finalizes.iter() {
4618 let signers: usize = payloads.values().map(|signers| signers.len()).sum();
4619
4620 if S::is_attributable() {
4622 assert!(signers > 1);
4623 } else {
4624 assert_eq!(signers, 0);
4626 }
4627 }
4628 }
4629
4630 let blocked = oracle.blocked().await.unwrap();
4632 assert!(blocked.is_empty());
4633 });
4634 }
4635
4636 #[test_traced]
4637 fn test_attributable_reporter_filtering() {
4638 attributable_reporter_filtering::<_, _, Random>(
4639 bls12381_threshold_vrf::fixture::<MinPk, _>,
4640 );
4641 attributable_reporter_filtering::<_, _, Random>(
4642 bls12381_threshold_vrf::fixture::<MinSig, _>,
4643 );
4644 attributable_reporter_filtering::<_, _, RoundRobin>(
4645 bls12381_threshold_std::fixture::<MinPk, _>,
4646 );
4647 attributable_reporter_filtering::<_, _, RoundRobin>(
4648 bls12381_threshold_std::fixture::<MinSig, _>,
4649 );
4650 attributable_reporter_filtering::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4651 attributable_reporter_filtering::<_, _, RoundRobin>(
4652 bls12381_multisig::fixture::<MinSig, _>,
4653 );
4654 attributable_reporter_filtering::<_, _, RoundRobin>(ed25519::fixture);
4655 attributable_reporter_filtering::<_, _, RoundRobin>(secp256r1::fixture);
4656 }
4657
4658 fn split_views_no_lockup<S, F, L>(mut fixture: F)
4659 where
4660 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4661 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4662 L: Elector<S>,
4663 {
4664 enum ParticipantType {
4677 Group1, Group2, Ignorant, Byzantine, }
4682 let get_type = |idx: usize| -> ParticipantType {
4683 match idx {
4684 0..3 => ParticipantType::Group1,
4685 3..6 => ParticipantType::Group2,
4686 6 => ParticipantType::Ignorant,
4687 7..10 => ParticipantType::Byzantine,
4688 _ => unreachable!(),
4689 }
4690 };
4691
4692 let n = 10;
4694 let quorum = quorum(n) as usize;
4695 assert_eq!(quorum, 7);
4696 let activity_timeout = ViewDelta::new(10);
4697 let skip_timeout = ViewDelta::new(5);
4698 let namespace = b"consensus".to_vec();
4699 let executor = deterministic::Runner::timed(Duration::from_secs(300));
4700 executor.start(|mut context| async move {
4701 let Fixture {
4703 participants,
4704 schemes,
4705 ..
4706 } = fixture(&mut context, &namespace, n);
4707 let mut oracle =
4708 start_test_network_with_peers(context.clone(), participants.clone(), false).await;
4709 let mut registrations = register_validators(&mut oracle, &participants).await;
4710
4711 let build_finalization = |proposal: &Proposal<D>| -> TFinalization<_, D> {
4715 let votes: Vec<_> = (0..=quorum)
4716 .map(|i| TFinalize::sign(&schemes[i], proposal.clone()).unwrap())
4717 .collect();
4718 TFinalization::from_finalizes(&schemes[0], &votes, &Sequential)
4719 .expect("finalization quorum")
4720 };
4721 let build_notarization = |proposal: &Proposal<D>| -> TNotarization<_, D> {
4723 let votes: Vec<_> = (0..=quorum)
4724 .map(|i| TNotarize::sign(&schemes[i], proposal.clone()).unwrap())
4725 .collect();
4726 TNotarization::from_notarizes(&schemes[0], &votes, &Sequential)
4727 .expect("notarization quorum")
4728 };
4729 let build_nullification = |round: Round| -> TNullification<_> {
4730 let votes: Vec<_> = (0..=quorum)
4731 .map(|i| TNullify::sign::<D>(&schemes[i], round).unwrap())
4732 .collect();
4733 TNullification::from_nullifies(&schemes[0], &votes, &Sequential)
4734 .expect("nullification quorum")
4735 };
4736 let f_view = 1;
4738 let round_f = Round::new(Epoch::new(333), View::new(f_view));
4739 let payload_b0 = Sha256::hash(b"B_F");
4740 let proposal_b0 = Proposal::new(round_f, View::new(f_view - 1), payload_b0);
4741 let payload_b1a = Sha256::hash(b"B_G1");
4742 let proposal_b1a = Proposal::new(
4743 Round::new(Epoch::new(333), View::new(f_view + 1)),
4744 View::new(f_view),
4745 payload_b1a,
4746 );
4747 let payload_b1b = Sha256::hash(b"B_G2");
4748 let proposal_b1b = Proposal::new(
4749 Round::new(Epoch::new(333), View::new(f_view + 2)),
4750 View::new(f_view),
4751 payload_b1b,
4752 );
4753
4754 let b0_notarization = build_notarization(&proposal_b0);
4756 let b0_finalization = build_finalization(&proposal_b0);
4757 let b1a_notarization = build_notarization(&proposal_b1a);
4759 let b1b_notarization = build_notarization(&proposal_b1b);
4760 let null_a = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 1)));
4762 let null_b = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 2)));
4763
4764 let injector_pk = PrivateKey::from_seed(1_000_000).public_key();
4766 let (mut injector_sender, _inj_certificate_receiver) = oracle
4767 .control(injector_pk.clone())
4768 .register(1, TEST_QUOTA)
4769 .await
4770 .unwrap();
4771
4772 let link = Link {
4774 latency: Duration::from_millis(10),
4775 jitter: Duration::from_millis(0),
4776 success_rate: 1.0,
4777 };
4778 for p in participants.iter() {
4779 oracle
4780 .add_link(injector_pk.clone(), p.clone(), link.clone())
4781 .await
4782 .unwrap();
4783 }
4784 oracle
4785 .manager()
4786 .track(
4787 1,
4788 TrackedPeers::new(
4789 Set::from_iter_dedup(participants.iter().cloned()),
4790 Set::from_iter_dedup(std::slice::from_ref(&injector_pk).iter().cloned()),
4791 ),
4792 )
4793 .await;
4794 context.sleep(Duration::from_millis(10)).await;
4795
4796 let msg = Certificate::<_, D>::Notarization(b0_notarization).encode();
4800 injector_sender
4801 .send(Recipients::All, msg, true)
4802 .await
4803 .unwrap();
4804 let msg = Certificate::<_, D>::Finalization(b0_finalization).encode();
4805 injector_sender
4806 .send(Recipients::All, msg, true)
4807 .await
4808 .unwrap();
4809 let notarization_msg = Certificate::<_, D>::Notarization(b1a_notarization);
4811 let nullification_msg = Certificate::<_, D>::Nullification(null_a.clone());
4812 for (i, participant) in participants.iter().enumerate() {
4813 let recipient = Recipients::One(participant.clone());
4814 let msg = match get_type(i) {
4815 ParticipantType::Group1 => notarization_msg.encode(),
4816 _ => nullification_msg.encode(),
4817 };
4818 injector_sender.send(recipient, msg, true).await.unwrap();
4819 }
4820 let notarization_msg = Certificate::<_, D>::Notarization(b1b_notarization);
4822 let nullification_msg = Certificate::<_, D>::Nullification(null_b.clone());
4823 for (i, participant) in participants.iter().enumerate() {
4824 let recipient = Recipients::One(participant.clone());
4825 let msg = match get_type(i) {
4826 ParticipantType::Group2 => notarization_msg.encode(),
4827 _ => nullification_msg.encode(),
4828 };
4829 injector_sender.send(recipient, msg, true).await.unwrap();
4830 }
4831
4832 let elector = L::default();
4838 let relay = Arc::new(mocks::relay::Relay::new());
4839 let mut honest_reporters = Vec::new();
4840 for (idx, validator) in participants.iter().enumerate() {
4841 let (pending, recovered, resolver) = registrations
4842 .remove(validator)
4843 .expect("validator should be registered");
4844 let participant_type = get_type(idx);
4845 if matches!(participant_type, ParticipantType::Byzantine) {
4846 let cfg = mocks::nullify_only::Config {
4848 scheme: schemes[idx].clone(),
4849 };
4850 let engine: mocks::nullify_only::NullifyOnly<_, _, Sha256> =
4851 mocks::nullify_only::NullifyOnly::new(
4852 context.with_label(&format!("byzantine_{}", *validator)),
4853 cfg,
4854 );
4855 engine.start(pending);
4856 drop(recovered);
4858 drop(resolver);
4859 } else {
4860 let reporter_config = mocks::reporter::Config {
4862 participants: participants.clone().try_into().unwrap(),
4863 scheme: schemes[idx].clone(),
4864 elector: elector.clone(),
4865 };
4866 let reporter = mocks::reporter::Reporter::new(
4867 context.with_label(&format!("reporter_{}", *validator)),
4868 reporter_config,
4869 );
4870 honest_reporters.push(reporter.clone());
4871
4872 let application_cfg = mocks::application::Config {
4873 hasher: Sha256::default(),
4874 relay: relay.clone(),
4875 me: validator.clone(),
4876 propose_latency: (250.0, 50.0), verify_latency: (10.0, 5.0),
4878 certify_latency: (10.0, 5.0),
4879 should_certify: mocks::application::Certifier::Sometimes,
4880 };
4881 let (actor, application) = mocks::application::Application::new(
4882 context.with_label(&format!("application_{}", *validator)),
4883 application_cfg,
4884 );
4885 actor.start();
4886 let blocker = oracle.control(validator.clone());
4887 let cfg = config::Config {
4888 scheme: schemes[idx].clone(),
4889 elector: elector.clone(),
4890 blocker,
4891 automaton: application.clone(),
4892 relay: application.clone(),
4893 reporter: reporter.clone(),
4894 strategy: Sequential,
4895 partition: validator.to_string(),
4896 mailbox_size: 1024,
4897 epoch: Epoch::new(333),
4898 leader_timeout: Duration::from_secs(10),
4899 certification_timeout: Duration::from_secs(10),
4900 timeout_retry: Duration::from_secs(10),
4901 fetch_timeout: Duration::from_secs(1),
4902 activity_timeout,
4903 skip_timeout,
4904 fetch_concurrent: 4,
4905 replay_buffer: NZUsize!(1024 * 1024),
4906 write_buffer: NZUsize!(1024 * 1024),
4907 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4908 forwarding: ForwardingPolicy::Disabled,
4909 };
4910 let engine =
4911 Engine::new(context.with_label(&format!("engine_{}", *validator)), cfg);
4912 engine.start(pending, recovered, resolver);
4913 }
4914 }
4915
4916 context.sleep(Duration::from_secs(2)).await;
4918
4919 let view = View::new(f_view);
4924 for reporter in honest_reporters.iter() {
4925 let finalizations = reporter.finalizations.lock();
4926 assert!(finalizations.contains_key(&view));
4927 }
4928
4929 let view = View::new(f_view + 1);
4933 for (i, reporter) in honest_reporters.iter().enumerate() {
4934 let finalizations = reporter.finalizations.lock();
4935 assert!(!finalizations.contains_key(&view));
4936 let nullifications = reporter.nullifications.lock();
4937 let notarizations = reporter.notarizations.lock();
4938 match get_type(i) {
4939 ParticipantType::Group1 => {
4940 assert!(notarizations.contains_key(&view));
4941 assert!(!nullifications.contains_key(&view));
4942 }
4943 _ => {
4944 assert!(nullifications.contains_key(&view));
4945 assert!(!notarizations.contains_key(&view));
4946 }
4947 }
4948 }
4949
4950 let view = View::new(f_view + 2);
4954 for (i, reporter) in honest_reporters.iter().enumerate() {
4955 let finalizations = reporter.finalizations.lock();
4956 assert!(!finalizations.contains_key(&view));
4957 let nullifications = reporter.nullifications.lock();
4958 let notarizations = reporter.notarizations.lock();
4959 match get_type(i) {
4960 ParticipantType::Group2 => {
4961 assert!(notarizations.contains_key(&view));
4962 assert!(!nullifications.contains_key(&view));
4963 }
4964 _ => {
4965 assert!(nullifications.contains_key(&view));
4966 assert!(!notarizations.contains_key(&view));
4967 }
4968 }
4969 }
4970
4971 let next_view = View::new(f_view + 3);
4973 for (i, reporter) in honest_reporters.iter().enumerate() {
4974 let nullifies = reporter.nullifies.lock();
4975 assert!(!nullifies.contains_key(&next_view), "reporter {i}");
4976 }
4977
4978 link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
4982
4983 {
4985 let target = View::new(f_view + 3);
4986 let mut finalizers = Vec::new();
4987 for reporter in honest_reporters.iter_mut() {
4988 let (mut latest, mut monitor) = reporter.subscribe().await;
4989 finalizers.push(context.with_label("resume_finalizer").spawn(
4990 move |_| async move {
4991 while latest < target {
4992 latest = monitor.recv().await.expect("event missing");
4993 }
4994 },
4995 ));
4996 }
4997 join_all(finalizers).await;
4998 }
4999
5000 for reporter in honest_reporters.iter() {
5002 reporter.assert_no_faults();
5003 reporter.assert_no_invalid();
5004 }
5005 let blocked = oracle.blocked().await.unwrap();
5006 assert!(blocked.is_empty(), "blocked peers: {blocked:?}");
5007 });
5008 }
5009
5010 #[test_group("slow")]
5011 #[test_traced]
5012 fn test_split_views_no_lockup() {
5013 split_views_no_lockup::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
5014 split_views_no_lockup::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
5015 split_views_no_lockup::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
5016 split_views_no_lockup::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
5017 split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
5018 split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
5019 split_views_no_lockup::<_, _, RoundRobin>(ed25519::fixture);
5020 split_views_no_lockup::<_, _, RoundRobin>(secp256r1::fixture);
5021 }
5022
5023 fn tle<V, L>()
5024 where
5025 V: Variant,
5026 L: Elector<bls12381_threshold_vrf::Scheme<PublicKey, V>>,
5027 {
5028 let n = 4;
5030 let namespace = b"consensus".to_vec();
5031 let activity_timeout = ViewDelta::new(100);
5032 let skip_timeout = ViewDelta::new(50);
5033 let executor = deterministic::Runner::timed(Duration::from_secs(30));
5034 executor.start(|mut context| async move {
5035 let Fixture {
5037 participants,
5038 schemes,
5039 ..
5040 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, &namespace, n);
5041 let mut oracle =
5042 start_test_network_with_peers(context.clone(), participants.clone(), true).await;
5043 let mut registrations = register_validators(&mut oracle, &participants).await;
5044
5045 let link = Link {
5047 latency: Duration::from_millis(10),
5048 jitter: Duration::from_millis(5),
5049 success_rate: 1.0,
5050 };
5051 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5052
5053 let elector = L::default();
5055 let relay = Arc::new(mocks::relay::Relay::new());
5056 let mut reporters = Vec::new();
5057 let mut engine_handlers = Vec::new();
5058 let monitor_reporter = Arc::new(Mutex::new(None));
5059 for (idx, validator) in participants.iter().enumerate() {
5060 let context = context.with_label(&format!("validator_{}", *validator));
5062
5063 let reporter_config = mocks::reporter::Config {
5065 participants: participants.clone().try_into().unwrap(),
5066 scheme: schemes[idx].clone(),
5067 elector: elector.clone(),
5068 };
5069 let reporter =
5070 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5071 reporters.push(reporter.clone());
5072 if idx == 0 {
5073 *monitor_reporter.lock() = Some(reporter.clone());
5074 }
5075
5076 let application_cfg = mocks::application::Config {
5078 hasher: Sha256::default(),
5079 relay: relay.clone(),
5080 me: validator.clone(),
5081 propose_latency: (10.0, 5.0),
5082 verify_latency: (10.0, 5.0),
5083 certify_latency: (10.0, 5.0),
5084 should_certify: mocks::application::Certifier::Sometimes,
5085 };
5086 let (actor, application) = mocks::application::Application::new(
5087 context.with_label("application"),
5088 application_cfg,
5089 );
5090 actor.start();
5091 let blocker = oracle.control(validator.clone());
5092 let cfg = config::Config {
5093 scheme: schemes[idx].clone(),
5094 elector: elector.clone(),
5095 blocker,
5096 automaton: application.clone(),
5097 relay: application.clone(),
5098 reporter: reporter.clone(),
5099 strategy: Sequential,
5100 partition: validator.to_string(),
5101 mailbox_size: 1024,
5102 epoch: Epoch::new(333),
5103 leader_timeout: Duration::from_millis(100),
5104 certification_timeout: Duration::from_millis(200),
5105 timeout_retry: Duration::from_millis(500),
5106 fetch_timeout: Duration::from_millis(100),
5107 activity_timeout,
5108 skip_timeout,
5109 fetch_concurrent: 4,
5110 replay_buffer: NZUsize!(1024 * 1024),
5111 write_buffer: NZUsize!(1024 * 1024),
5112 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5113 forwarding: ForwardingPolicy::Disabled,
5114 };
5115 let engine = Engine::new(context.with_label("engine"), cfg);
5116
5117 let (pending, recovered, resolver) = registrations
5119 .remove(validator)
5120 .expect("validator should be registered");
5121 engine_handlers.push(engine.start(pending, recovered, resolver));
5122 }
5123
5124 let target = Round::new(Epoch::new(333), View::new(10)); let message = b"Secret message for future view10"; let ciphertext = schemes[0].encrypt(&mut context, target, *message);
5130
5131 let reporter = monitor_reporter.lock().clone().unwrap();
5133 loop {
5134 context.sleep(Duration::from_millis(100)).await;
5136 let notarizations = reporter.notarizations.lock();
5137 let Some(notarization) = notarizations.get(&target.view()) else {
5138 continue;
5139 };
5140
5141 let seed = notarization.seed();
5143 let decrypted = seed
5144 .decrypt(&ciphertext)
5145 .expect("Decryption should succeed with valid seed signature");
5146 assert_eq!(
5147 message,
5148 decrypted.as_ref(),
5149 "Decrypted message should match original message"
5150 );
5151 break;
5152 }
5153 });
5154 }
5155
5156 #[test_traced]
5157 fn test_tle() {
5158 tle::<MinPk, Random>();
5159 tle::<MinSig, Random>();
5160 }
5161
5162 fn hailstorm<S, F, L>(
5163 seed: u64,
5164 shutdowns: usize,
5165 interval: ViewDelta,
5166 mut fixture: F,
5167 ) -> String
5168 where
5169 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5170 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5171 L: Elector<S>,
5172 {
5173 let n = 5;
5175 let activity_timeout = ViewDelta::new(10);
5176 let skip_timeout = ViewDelta::new(5);
5177 let namespace = b"consensus".to_vec();
5178 let cfg = deterministic::Config::new().with_seed(seed);
5179 let executor = deterministic::Runner::new(cfg);
5180 executor.start(|mut context| async move {
5181 let Fixture {
5183 participants,
5184 schemes,
5185 ..
5186 } = fixture(&mut context, &namespace, n);
5187 let mut oracle =
5188 start_test_network_with_peers(context.clone(), participants.clone(), true).await;
5189 let mut registrations = register_validators(&mut oracle, &participants).await;
5190
5191 let link = Link {
5193 latency: Duration::from_millis(10),
5194 jitter: Duration::from_millis(1),
5195 success_rate: 1.0,
5196 };
5197 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5198
5199 let elector = L::default();
5201 let relay = Arc::new(mocks::relay::Relay::new());
5202 let mut reporters = BTreeMap::new();
5203 let mut engine_handlers = BTreeMap::new();
5204 for (idx, validator) in participants.iter().enumerate() {
5205 let context = context.with_label(&format!("validator_{}", *validator));
5207
5208 let reporter_config = mocks::reporter::Config {
5210 participants: participants.clone().try_into().unwrap(),
5211 scheme: schemes[idx].clone(),
5212 elector: elector.clone(),
5213 };
5214 let reporter =
5215 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5216 reporters.insert(idx, reporter.clone());
5217 let application_cfg = mocks::application::Config {
5218 hasher: Sha256::default(),
5219 relay: relay.clone(),
5220 me: validator.clone(),
5221 propose_latency: (10.0, 5.0),
5222 verify_latency: (10.0, 5.0),
5223 certify_latency: (10.0, 5.0),
5224 should_certify: mocks::application::Certifier::Sometimes,
5225 };
5226 let (actor, application) = mocks::application::Application::new(
5227 context.with_label("application"),
5228 application_cfg,
5229 );
5230 actor.start();
5231 let blocker = oracle.control(validator.clone());
5232 let cfg = config::Config {
5233 scheme: schemes[idx].clone(),
5234 elector: elector.clone(),
5235 blocker,
5236 automaton: application.clone(),
5237 relay: application.clone(),
5238 reporter: reporter.clone(),
5239 strategy: Sequential,
5240 partition: validator.to_string(),
5241 mailbox_size: 1024,
5242 epoch: Epoch::new(333),
5243 leader_timeout: Duration::from_secs(1),
5244 certification_timeout: Duration::from_secs(2),
5245 timeout_retry: Duration::from_secs(10),
5246 fetch_timeout: Duration::from_secs(1),
5247 activity_timeout,
5248 skip_timeout,
5249 fetch_concurrent: 4,
5250 replay_buffer: NZUsize!(1024 * 1024),
5251 write_buffer: NZUsize!(1024 * 1024),
5252 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5253 forwarding: ForwardingPolicy::Disabled,
5254 };
5255 let engine = Engine::new(context.with_label("engine"), cfg);
5256
5257 let (pending, recovered, resolver) = registrations
5259 .remove(validator)
5260 .expect("validator should be registered");
5261 engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
5262 }
5263
5264 let mut target = View::zero();
5266 for i in 0..shutdowns {
5267 target = target.saturating_add(interval);
5269
5270 let mut finalizers = Vec::new();
5272 for (_, reporter) in reporters.iter_mut() {
5273 let (mut latest, mut monitor) = reporter.subscribe().await;
5274 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5275 while latest < target {
5276 latest = monitor.recv().await.expect("event missing");
5277 }
5278 }));
5279 }
5280 join_all(finalizers).await;
5281 target = target.saturating_add(interval);
5282
5283 let idx = context.gen_range(0..engine_handlers.len());
5285 let validator = &participants[idx];
5286 let handle = engine_handlers.remove(&idx).unwrap();
5287 handle.abort();
5288 let _ = handle.await;
5289 let selected_reporter = reporters.remove(&idx).unwrap();
5290 info!(idx, ?validator, "shutdown validator");
5291
5292 let mut finalizers = Vec::new();
5294 for (_, reporter) in reporters.iter_mut() {
5295 let (mut latest, mut monitor) = reporter.subscribe().await;
5296 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5297 while latest < target {
5298 latest = monitor.recv().await.expect("event missing");
5299 }
5300 }));
5301 }
5302 join_all(finalizers).await;
5303 target = target.saturating_add(interval);
5304
5305 info!(idx, ?validator, "restarting validator");
5307 let context =
5308 context.with_label(&format!("validator_{}_restarted_{}", *validator, i));
5309
5310 let (pending, recovered, resolver) =
5312 register_validator(&mut oracle, validator.clone()).await;
5313 let application_cfg = mocks::application::Config {
5314 hasher: Sha256::default(),
5315 relay: relay.clone(),
5316 me: validator.clone(),
5317 propose_latency: (10.0, 5.0),
5318 verify_latency: (10.0, 5.0),
5319 certify_latency: (10.0, 5.0),
5320 should_certify: mocks::application::Certifier::Sometimes,
5321 };
5322 let (actor, application) = mocks::application::Application::new(
5323 context.with_label("application"),
5324 application_cfg,
5325 );
5326 actor.start();
5327 reporters.insert(idx, selected_reporter.clone());
5328 let blocker = oracle.control(validator.clone());
5329 let cfg = config::Config {
5330 scheme: schemes[idx].clone(),
5331 elector: elector.clone(),
5332 blocker,
5333 automaton: application.clone(),
5334 relay: application.clone(),
5335 reporter: selected_reporter,
5336 strategy: Sequential,
5337 partition: validator.to_string(),
5338 mailbox_size: 1024,
5339 epoch: Epoch::new(333),
5340 leader_timeout: Duration::from_secs(1),
5341 certification_timeout: Duration::from_secs(2),
5342 timeout_retry: Duration::from_secs(10),
5343 fetch_timeout: Duration::from_secs(1),
5344 activity_timeout,
5345 skip_timeout,
5346 fetch_concurrent: 4,
5347 replay_buffer: NZUsize!(1024 * 1024),
5348 write_buffer: NZUsize!(1024 * 1024),
5349 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5350 forwarding: ForwardingPolicy::Disabled,
5351 };
5352 let engine = Engine::new(context.with_label("engine"), cfg);
5353 engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
5354
5355 let mut finalizers = Vec::new();
5357 for (_, reporter) in reporters.iter_mut() {
5358 let (mut latest, mut monitor) = reporter.subscribe().await;
5359 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5360 while latest < target {
5361 latest = monitor.recv().await.expect("event missing");
5362 }
5363 }));
5364 }
5365 join_all(finalizers).await;
5366 info!(idx, ?validator, "validator recovered");
5367 }
5368
5369 let latest_complete = target.saturating_sub(activity_timeout);
5371 for (_, reporter) in reporters.iter() {
5372 reporter.assert_no_faults();
5374
5375 reporter.assert_no_invalid();
5377
5378 let mut notarized = HashMap::new();
5380 let mut finalized = HashMap::new();
5381 {
5382 let notarizes = reporter.notarizes.lock();
5383 for view in View::range(View::new(1), latest_complete) {
5384 let Some(payloads) = notarizes.get(&view) else {
5386 continue;
5387 };
5388 if payloads.len() > 1 {
5389 panic!("view: {view}");
5390 }
5391 let (digest, _) = payloads.iter().next().unwrap();
5392 notarized.insert(view, *digest);
5393 }
5394 }
5395 {
5396 let notarizations = reporter.notarizations.lock();
5397 for view in View::range(View::new(1), latest_complete) {
5398 let Some(notarization) = notarizations.get(&view) else {
5400 continue;
5401 };
5402 let Some(digest) = notarized.get(&view) else {
5403 continue;
5404 };
5405 assert_eq!(¬arization.proposal.payload, digest);
5406 }
5407 }
5408 {
5409 let finalizes = reporter.finalizes.lock();
5410 for view in View::range(View::new(1), latest_complete) {
5411 let Some(payloads) = finalizes.get(&view) else {
5413 continue;
5414 };
5415 if payloads.len() > 1 {
5416 panic!("view: {view}");
5417 }
5418 let (digest, _) = payloads.iter().next().unwrap();
5419 finalized.insert(view, *digest);
5420
5421 if view > latest_complete {
5423 continue;
5424 }
5425
5426 let nullifies = reporter.nullifies.lock();
5428 let Some(nullifies) = nullifies.get(&view) else {
5429 continue;
5430 };
5431 for (_, finalizers) in payloads.iter() {
5432 for finalizer in finalizers.iter() {
5433 if nullifies.contains(finalizer) {
5434 panic!("should not nullify and finalize at same view");
5435 }
5436 }
5437 }
5438 }
5439 }
5440 {
5441 let finalizations = reporter.finalizations.lock();
5442 for view in View::range(View::new(1), latest_complete) {
5443 let Some(finalization) = finalizations.get(&view) else {
5445 continue;
5446 };
5447 let Some(digest) = finalized.get(&view) else {
5448 continue;
5449 };
5450 assert_eq!(&finalization.proposal.payload, digest);
5451 }
5452 }
5453 }
5454
5455 let blocked = oracle.blocked().await.unwrap();
5457 assert!(blocked.is_empty());
5458
5459 context.auditor().state()
5461 })
5462 }
5463
5464 #[test_group("slow")]
5465 #[test_traced]
5466 fn test_hailstorm_bls12381_threshold_vrf_min_pk() {
5467 assert_eq!(
5468 hailstorm::<_, _, Random>(
5469 0,
5470 10,
5471 ViewDelta::new(15),
5472 bls12381_threshold_vrf::fixture::<MinPk, _>
5473 ),
5474 hailstorm::<_, _, Random>(
5475 0,
5476 10,
5477 ViewDelta::new(15),
5478 bls12381_threshold_vrf::fixture::<MinPk, _>
5479 ),
5480 );
5481 }
5482
5483 #[test_group("slow")]
5484 #[test_traced]
5485 fn test_hailstorm_bls12381_threshold_vrf_min_sig() {
5486 assert_eq!(
5487 hailstorm::<_, _, Random>(
5488 0,
5489 10,
5490 ViewDelta::new(15),
5491 bls12381_threshold_vrf::fixture::<MinSig, _>
5492 ),
5493 hailstorm::<_, _, Random>(
5494 0,
5495 10,
5496 ViewDelta::new(15),
5497 bls12381_threshold_vrf::fixture::<MinSig, _>
5498 ),
5499 );
5500 }
5501
5502 #[test_group("slow")]
5503 #[test_traced]
5504 fn test_hailstorm_bls12381_threshold_std_min_pk() {
5505 assert_eq!(
5506 hailstorm::<_, _, RoundRobin>(
5507 0,
5508 10,
5509 ViewDelta::new(15),
5510 bls12381_threshold_std::fixture::<MinPk, _>
5511 ),
5512 hailstorm::<_, _, RoundRobin>(
5513 0,
5514 10,
5515 ViewDelta::new(15),
5516 bls12381_threshold_std::fixture::<MinPk, _>
5517 ),
5518 );
5519 }
5520
5521 #[test_group("slow")]
5522 #[test_traced]
5523 fn test_hailstorm_bls12381_threshold_std_min_sig() {
5524 assert_eq!(
5525 hailstorm::<_, _, RoundRobin>(
5526 0,
5527 10,
5528 ViewDelta::new(15),
5529 bls12381_threshold_std::fixture::<MinSig, _>
5530 ),
5531 hailstorm::<_, _, RoundRobin>(
5532 0,
5533 10,
5534 ViewDelta::new(15),
5535 bls12381_threshold_std::fixture::<MinSig, _>
5536 ),
5537 );
5538 }
5539
5540 #[test_group("slow")]
5541 #[test_traced]
5542 fn test_hailstorm_bls12381_multisig_min_pk() {
5543 assert_eq!(
5544 hailstorm::<_, _, RoundRobin>(
5545 0,
5546 10,
5547 ViewDelta::new(15),
5548 bls12381_multisig::fixture::<MinPk, _>
5549 ),
5550 hailstorm::<_, _, RoundRobin>(
5551 0,
5552 10,
5553 ViewDelta::new(15),
5554 bls12381_multisig::fixture::<MinPk, _>
5555 ),
5556 );
5557 }
5558
5559 #[test_group("slow")]
5560 #[test_traced]
5561 fn test_hailstorm_bls12381_multisig_min_sig() {
5562 assert_eq!(
5563 hailstorm::<_, _, RoundRobin>(
5564 0,
5565 10,
5566 ViewDelta::new(15),
5567 bls12381_multisig::fixture::<MinSig, _>
5568 ),
5569 hailstorm::<_, _, RoundRobin>(
5570 0,
5571 10,
5572 ViewDelta::new(15),
5573 bls12381_multisig::fixture::<MinSig, _>
5574 ),
5575 );
5576 }
5577
5578 #[test_group("slow")]
5579 #[test_traced]
5580 fn test_hailstorm_ed25519() {
5581 assert_eq!(
5582 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture),
5583 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture)
5584 );
5585 }
5586
5587 #[test_group("slow")]
5588 #[test_traced]
5589 fn test_hailstorm_secp256r1() {
5590 assert_eq!(
5591 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture),
5592 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture)
5593 );
5594 }
5595
5596 #[derive(Clone, Copy, Debug)]
5633 struct TwinsCampaign {
5634 n: u32,
5635 rounds: usize,
5636 mode: twins::Mode,
5637 max_cases: usize,
5638 trailing_finalizations: usize,
5639 }
5640
5641 fn twins_campaign<S, F, L>(
5642 rng: &mut StdRng,
5643 campaign: TwinsCampaign,
5644 link: Link,
5645 mut fixture: F,
5646 ) where
5647 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5648 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5649 L: Elector<S>,
5650 {
5651 let n = campaign.n;
5652 let faults = N3f1::max_faults(n) as usize;
5653 let cases = twins::cases(
5654 rng,
5655 twins::Framework {
5656 participants: n as usize,
5657 faults,
5658 rounds: campaign.rounds,
5659 mode: campaign.mode,
5660 max_cases: campaign.max_cases,
5661 },
5662 );
5663 assert!(
5664 !cases.is_empty(),
5665 "twins campaign should generate at least one case"
5666 );
5667 for case in cases {
5668 let scenario = case.scenario.clone();
5669 let twin_indices = case.compromised.clone();
5670 assert_eq!(
5671 twin_indices.len(),
5672 faults,
5673 "unexpected twins count for n={n} (expected f={faults})",
5674 );
5675
5676 let activity_timeout = ViewDelta::new(10);
5677 let skip_timeout = ViewDelta::new(5);
5678 let namespace = b"consensus".to_vec();
5679 let link = link.clone();
5680 let trailing_finalizations = campaign.trailing_finalizations;
5681 let mut case_fixture =
5682 |ctx: &mut deterministic::Context, ns: &[u8], n: u32| fixture(ctx, ns, n);
5683 let cfg = deterministic::Config::new()
5684 .with_rng(Box::new(StdRng::from_rng(&mut *rng).unwrap()));
5685 let executor = deterministic::Runner::new(cfg);
5686 executor.start(|mut context| async move {
5687 let Fixture {
5688 participants,
5689 schemes,
5690 ..
5691 } = case_fixture(&mut context, &namespace, n);
5692 let participants: Arc<[_]> = participants.into();
5693 let mut oracle = start_test_network_with_peers(
5694 context.clone(),
5695 participants.iter().cloned(),
5696 false,
5697 )
5698 .await;
5699 let mut registrations = register_validators(&mut oracle, &participants).await;
5700 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5701
5702 let elector = TwinsElector::new(L::default(), &scenario, n as usize);
5703 let relay = Arc::new(mocks::relay::Relay::new());
5704 let mut reporters = Vec::new();
5705 let mut engine_handlers = Vec::new();
5706 let twin_index_set: HashSet<usize> = twin_indices.iter().copied().collect();
5707
5708 for idx in twin_indices.iter().copied() {
5710 let validator = &participants[idx];
5711 let (
5712 (vote_sender, vote_receiver),
5713 (certificate_sender, certificate_receiver),
5714 (_resolver_sender, _resolver_receiver),
5715 ) = registrations
5716 .remove(validator)
5717 .expect("validator should be registered");
5718
5719 let make_vote_forwarder = || {
5720 let participants = participants.clone();
5721 let scenario = scenario.clone();
5722 move |origin: SplitOrigin, _: &Recipients<_>, message: &IoBuf| {
5723 let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
5724 let (primary, secondary) =
5725 scenario.partitions(msg.view(), participants.as_ref());
5726 match origin {
5727 SplitOrigin::Primary => Some(Recipients::Some(primary)),
5728 SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
5729 }
5730 }
5731 };
5732 let make_certificate_forwarder = || {
5733 let codec = schemes[idx].certificate_codec_config();
5734 let participants = participants.clone();
5735 let scenario = scenario.clone();
5736 move |origin: SplitOrigin, _: &Recipients<_>, message: &IoBuf| {
5737 let msg: Certificate<S, D> =
5738 Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
5739 let (primary, secondary) =
5740 scenario.partitions(msg.view(), participants.as_ref());
5741 match origin {
5742 SplitOrigin::Primary => Some(Recipients::Some(primary)),
5743 SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
5744 }
5745 }
5746 };
5747 let make_vote_router = || {
5748 let participants = participants.clone();
5749 let scenario = scenario.clone();
5750 move |(sender, message): &(_, IoBuf)| {
5751 let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
5752 scenario.route(msg.view(), sender, participants.as_ref())
5753 }
5754 };
5755 let make_certificate_router = || {
5756 let codec = schemes[idx].certificate_codec_config();
5757 let participants = participants.clone();
5758 let scenario = scenario.clone();
5759 move |(sender, message): &(_, IoBuf)| {
5760 let msg: Certificate<S, D> =
5761 Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
5762 scenario.route(msg.view(), sender, participants.as_ref())
5763 }
5764 };
5765 let (vote_sender_primary, vote_sender_secondary) =
5766 vote_sender.split_with(make_vote_forwarder());
5767 let (vote_receiver_primary, vote_receiver_secondary) = vote_receiver
5768 .split_with(
5769 context.with_label(&format!("pending_split_{idx}")),
5770 make_vote_router(),
5771 );
5772 let (certificate_sender_primary, certificate_sender_secondary) =
5773 certificate_sender.split_with(make_certificate_forwarder());
5774 let (certificate_receiver_primary, certificate_receiver_secondary) =
5775 certificate_receiver.split_with(
5776 context.with_label(&format!("recovered_split_{idx}")),
5777 make_certificate_router(),
5778 );
5779
5780 for (twin_label, pending, recovered) in [
5781 (
5782 "primary",
5783 (vote_sender_primary, vote_receiver_primary),
5784 (certificate_sender_primary, certificate_receiver_primary),
5785 ),
5786 (
5787 "secondary",
5788 (vote_sender_secondary, vote_receiver_secondary),
5789 (certificate_sender_secondary, certificate_receiver_secondary),
5790 ),
5791 ] {
5792 let label = format!("twin_{idx}_{twin_label}");
5793 let context = context.with_label(&label);
5794
5795 let reporter_config = mocks::reporter::Config {
5796 participants: participants.as_ref().try_into().unwrap(),
5797 scheme: schemes[idx].clone(),
5798 elector: elector.clone(),
5799 };
5800 let reporter = mocks::reporter::Reporter::new(
5801 context.with_label("reporter"),
5802 reporter_config,
5803 );
5804 reporters.push(reporter.clone());
5805
5806 let application_cfg = mocks::application::Config {
5807 hasher: Sha256::default(),
5808 relay: relay.clone(),
5809 me: validator.clone(),
5810 propose_latency: (10.0, 5.0),
5811 verify_latency: (10.0, 5.0),
5812 certify_latency: (10.0, 5.0),
5813 should_certify: mocks::application::Certifier::Sometimes,
5814 };
5815 let (actor, application) = mocks::application::Application::new(
5816 context.with_label("application"),
5817 application_cfg,
5818 );
5819 actor.start();
5820
5821 let blocker = oracle.control(validator.clone());
5822 let cfg = config::Config {
5823 scheme: schemes[idx].clone(),
5824 elector: elector.clone(),
5825 blocker,
5826 automaton: application.clone(),
5827 relay: application.clone(),
5828 reporter: reporter.clone(),
5829 strategy: Sequential,
5830 partition: label,
5831 mailbox_size: 1024,
5832 epoch: Epoch::new(333),
5833 leader_timeout: Duration::from_secs(1),
5834 certification_timeout: Duration::from_millis(1_500),
5835 timeout_retry: Duration::from_secs(10),
5836 fetch_timeout: Duration::from_secs(1),
5837 activity_timeout,
5838 skip_timeout,
5839 fetch_concurrent: 4,
5840 replay_buffer: NZUsize!(1024 * 1024),
5841 write_buffer: NZUsize!(1024 * 1024),
5842 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5843 forwarding: ForwardingPolicy::Disabled,
5844 };
5845 let engine = Engine::new(context.with_label("engine"), cfg);
5846 engine_handlers.push(engine.start(
5847 pending,
5848 recovered,
5849 inert_channel(participants.as_ref()),
5850 ));
5851 }
5852 }
5853
5854 let honest_start = reporters.len();
5856 for (idx, validator) in participants.iter().enumerate() {
5857 if twin_index_set.contains(&idx) {
5858 continue;
5859 }
5860
5861 let label = format!("honest_{idx}");
5862 let context = context.with_label(&label);
5863
5864 let reporter_config = mocks::reporter::Config {
5865 participants: participants.as_ref().try_into().unwrap(),
5866 scheme: schemes[idx].clone(),
5867 elector: elector.clone(),
5868 };
5869 let reporter = mocks::reporter::Reporter::new(
5870 context.with_label("reporter"),
5871 reporter_config,
5872 );
5873 reporters.push(reporter.clone());
5874
5875 let application_cfg = mocks::application::Config {
5876 hasher: Sha256::default(),
5877 relay: relay.clone(),
5878 me: validator.clone(),
5879 propose_latency: (10.0, 5.0),
5880 verify_latency: (10.0, 5.0),
5881 certify_latency: (10.0, 5.0),
5882 should_certify: mocks::application::Certifier::Sometimes,
5883 };
5884 let (actor, application) = mocks::application::Application::new(
5885 context.with_label("application"),
5886 application_cfg,
5887 );
5888 actor.start();
5889
5890 let blocker = oracle.control(validator.clone());
5891 let cfg = config::Config {
5892 scheme: schemes[idx].clone(),
5893 elector: elector.clone(),
5894 blocker,
5895 automaton: application.clone(),
5896 relay: application.clone(),
5897 reporter: reporter.clone(),
5898 strategy: Sequential,
5899 partition: label,
5900 mailbox_size: 1024,
5901 epoch: Epoch::new(333),
5902 leader_timeout: Duration::from_secs(1),
5903 certification_timeout: Duration::from_millis(1_500),
5904 timeout_retry: Duration::from_secs(10),
5905 fetch_timeout: Duration::from_secs(1),
5906 activity_timeout,
5907 skip_timeout,
5908 fetch_concurrent: 4,
5909 replay_buffer: NZUsize!(1024 * 1024),
5910 write_buffer: NZUsize!(1024 * 1024),
5911 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5912 forwarding: ForwardingPolicy::Disabled,
5913 };
5914 let engine = Engine::new(context.with_label("engine"), cfg);
5915
5916 let (
5917 (pending_sender, pending_receiver),
5918 (recovered_sender, recovered_receiver),
5919 _,
5920 ) = registrations
5921 .remove(validator)
5922 .expect("validator should be registered");
5923 engine_handlers.push(engine.start(
5924 (pending_sender, pending_receiver),
5925 (recovered_sender, recovered_receiver),
5926 inert_channel(participants.as_ref()),
5927 ));
5928 }
5929
5930 let prefix_end = View::new(scenario.rounds().len() as u64);
5940 let mut finalizers = Vec::new();
5941 for (i, reporter) in reporters.iter_mut().skip(honest_start).enumerate() {
5942 let (_latest, mut monitor) = reporter.subscribe().await;
5943 let required = trailing_finalizations;
5944 let label = format!("finalizer_{i}");
5945 finalizers.push(context.with_label(&label).spawn(move |_| async move {
5946 let mut count = 0usize;
5947 while count < required {
5948 let view = monitor.recv().await.expect("event missing");
5949 if view > prefix_end {
5950 count += 1;
5951 }
5952 }
5953 }));
5954 }
5955 join_all(finalizers).await;
5956
5957 let mut finalized_at_view: BTreeMap<View, D> = BTreeMap::new();
5959 for reporter in reporters.iter().skip(honest_start) {
5960 let finalizations = reporter.finalizations.lock();
5961 for (view, finalization) in finalizations.iter() {
5962 let digest = finalization.proposal.payload;
5963 if let Some(existing) = finalized_at_view.get(view) {
5964 assert_eq!(
5965 existing, &digest,
5966 "safety violation: conflicting finalizations at view {view}"
5967 );
5968 } else {
5969 finalized_at_view.insert(*view, digest);
5970 }
5971 }
5972 }
5973
5974 for reporter in reporters.iter().skip(honest_start) {
5976 reporter.assert_no_invalid();
5977 }
5978
5979 let twin_identities: HashSet<_> = twin_indices
5981 .iter()
5982 .map(|idx| participants[*idx].clone())
5983 .collect();
5984 let mut notarized_by_honest_signer: BTreeMap<View, HashMap<PublicKey, D>> =
5985 BTreeMap::new();
5986 let mut finalized_by_honest_signer: BTreeMap<View, HashMap<PublicKey, D>> =
5987 BTreeMap::new();
5988 for reporter in reporters.iter().skip(honest_start) {
5989 let notarizes = reporter.notarizes.lock();
5990 for (view, payloads) in notarizes.iter() {
5991 let signers = notarized_by_honest_signer.entry(*view).or_default();
5992 for (digest, payload_signers) in payloads.iter() {
5993 for signer in payload_signers.iter() {
5994 if twin_identities.contains(signer) {
5995 continue;
5996 }
5997 if let Some(existing) = signers.insert(signer.clone(), *digest) {
5998 assert_eq!(
5999 existing, *digest,
6000 "honest signer produced conflicting notarizes at view {view}"
6001 );
6002 }
6003 }
6004 }
6005 }
6006
6007 let finalizes = reporter.finalizes.lock();
6008 for (view, payloads) in finalizes.iter() {
6009 let signers = finalized_by_honest_signer.entry(*view).or_default();
6010 for (digest, payload_signers) in payloads.iter() {
6011 for signer in payload_signers.iter() {
6012 if twin_identities.contains(signer) {
6013 continue;
6014 }
6015 if let Some(existing) = signers.insert(signer.clone(), *digest) {
6016 assert_eq!(
6017 existing, *digest,
6018 "honest signer produced conflicting finalizes at view {view}"
6019 );
6020 }
6021 }
6022 }
6023 }
6024 }
6025
6026 for reporter in reporters.iter().skip(honest_start) {
6028 let faults = reporter.faults.lock();
6029 for (faulter, _) in faults.iter() {
6030 assert!(
6031 twin_identities.contains(faulter),
6032 "fault from non-twin participant"
6033 );
6034 }
6035 }
6036
6037 let blocked = oracle.blocked().await.unwrap();
6038 for (_, faulter) in blocked {
6039 assert!(
6040 twin_identities.contains(&faulter),
6041 "blocked peer attributed to non-twin participant"
6042 );
6043 }
6044 });
6045 }
6046 }
6047
6048 const TWINS_CAMPAIGN: TwinsCampaign = TwinsCampaign {
6049 n: 5,
6050 rounds: 3,
6051 mode: twins::Mode::Sampled,
6052 max_cases: 20,
6053 trailing_finalizations: 10,
6054 };
6055
6056 const TWINS_LINK: Link = Link {
6057 latency: Duration::from_millis(500),
6058 jitter: Duration::from_millis(500),
6059 success_rate: 1.0,
6060 };
6061
6062 #[test_group("slow")]
6063 #[test_traced("INFO")]
6064 fn test_twins_sampled() {
6065 for link in [
6066 Link {
6067 latency: Duration::from_millis(10),
6068 jitter: Duration::from_millis(10),
6069 success_rate: 1.0,
6070 },
6071 TWINS_LINK,
6072 ] {
6073 twins_campaign::<_, _, RoundRobin>(
6074 &mut test_rng(),
6075 TWINS_CAMPAIGN,
6076 link,
6077 scheme_mocks::fixture,
6078 );
6079 }
6080 }
6081
6082 #[test_group("slow")]
6083 #[test_traced("INFO")]
6084 fn test_twins_sustained() {
6085 let campaign = TwinsCampaign {
6086 mode: twins::Mode::Sustained,
6087 ..TWINS_CAMPAIGN
6088 };
6089 for link in [
6090 Link {
6091 latency: Duration::from_millis(10),
6092 jitter: Duration::from_millis(10),
6093 success_rate: 1.0,
6094 },
6095 TWINS_LINK,
6096 ] {
6097 twins_campaign::<_, _, RoundRobin>(
6098 &mut test_rng(),
6099 campaign,
6100 link,
6101 scheme_mocks::fixture,
6102 );
6103 }
6104 }
6105
6106 #[test_group("slow")]
6107 #[test_traced("INFO")]
6108 fn test_twins_large_sampled() {
6109 let campaign = TwinsCampaign {
6110 n: 10,
6111 rounds: 5,
6112 ..TWINS_CAMPAIGN
6113 };
6114 twins_campaign::<_, _, RoundRobin>(
6115 &mut test_rng(),
6116 campaign,
6117 TWINS_LINK,
6118 scheme_mocks::fixture,
6119 );
6120 }
6121
6122 #[test_group("slow")]
6123 #[test_traced("INFO")]
6124 fn test_twins_large_sustained() {
6125 let campaign = TwinsCampaign {
6126 n: 10,
6127 rounds: 5,
6128 mode: twins::Mode::Sustained,
6129 ..TWINS_CAMPAIGN
6130 };
6131 twins_campaign::<_, _, RoundRobin>(
6132 &mut test_rng(),
6133 campaign,
6134 TWINS_LINK,
6135 scheme_mocks::fixture,
6136 );
6137 }
6138
6139 #[test_group("slow")]
6140 #[test_traced("INFO")]
6141 fn test_twins_multisig_min_pk() {
6142 twins_campaign::<_, _, RoundRobin>(
6143 &mut test_rng(),
6144 TWINS_CAMPAIGN,
6145 TWINS_LINK,
6146 bls12381_multisig::fixture::<MinPk, _>,
6147 );
6148 }
6149
6150 #[test_group("slow")]
6151 #[test_traced("INFO")]
6152 fn test_twins_multisig_min_sig() {
6153 twins_campaign::<_, _, RoundRobin>(
6154 &mut test_rng(),
6155 TWINS_CAMPAIGN,
6156 TWINS_LINK,
6157 bls12381_multisig::fixture::<MinSig, _>,
6158 );
6159 }
6160
6161 #[test_group("slow")]
6162 #[test_traced("INFO")]
6163 fn test_twins_threshold_vrf_min_pk() {
6164 twins_campaign::<_, _, Random>(
6165 &mut test_rng(),
6166 TWINS_CAMPAIGN,
6167 TWINS_LINK,
6168 bls12381_threshold_vrf::fixture::<MinPk, _>,
6169 );
6170 }
6171
6172 #[test_group("slow")]
6173 #[test_traced("INFO")]
6174 fn test_twins_threshold_vrf_min_sig() {
6175 twins_campaign::<_, _, Random>(
6176 &mut test_rng(),
6177 TWINS_CAMPAIGN,
6178 TWINS_LINK,
6179 bls12381_threshold_vrf::fixture::<MinSig, _>,
6180 );
6181 }
6182
6183 #[test_group("slow")]
6184 #[test_traced("INFO")]
6185 fn test_twins_threshold_std_min_pk() {
6186 twins_campaign::<_, _, RoundRobin>(
6187 &mut test_rng(),
6188 TWINS_CAMPAIGN,
6189 TWINS_LINK,
6190 bls12381_threshold_std::fixture::<MinPk, _>,
6191 );
6192 }
6193
6194 #[test_group("slow")]
6195 #[test_traced("INFO")]
6196 fn test_twins_threshold_std_min_sig() {
6197 twins_campaign::<_, _, RoundRobin>(
6198 &mut test_rng(),
6199 TWINS_CAMPAIGN,
6200 TWINS_LINK,
6201 bls12381_threshold_std::fixture::<MinSig, _>,
6202 );
6203 }
6204
6205 #[test_group("slow")]
6206 #[test_traced("INFO")]
6207 fn test_twins_ed25519() {
6208 twins_campaign::<_, _, RoundRobin>(
6209 &mut test_rng(),
6210 TWINS_CAMPAIGN,
6211 TWINS_LINK,
6212 ed25519::fixture,
6213 );
6214 }
6215
6216 #[test_group("slow")]
6217 #[test_traced("INFO")]
6218 fn test_twins_secp256r1() {
6219 twins_campaign::<_, _, RoundRobin>(
6220 &mut test_rng(),
6221 TWINS_CAMPAIGN,
6222 TWINS_LINK,
6223 secp256r1::fixture,
6224 );
6225 }
6226}