1pub mod elector;
262pub mod scheme;
263pub mod types;
264
265cfg_if::cfg_if! {
266 if #[cfg(not(target_arch = "wasm32"))] {
267 mod actors;
268 pub mod config;
269 pub use config::Config;
270 mod engine;
271 pub use engine::Engine;
272 mod metrics;
273 }
274}
275
276#[cfg(any(test, feature = "fuzz"))]
277pub mod mocks;
278
279#[cfg(not(target_arch = "wasm32"))]
280use crate::types::{View, ViewDelta};
281
282#[cfg(not(target_arch = "wasm32"))]
284pub(crate) const fn min_active(activity_timeout: ViewDelta, last_finalized: View) -> View {
285 last_finalized.saturating_sub(activity_timeout)
286}
287
288#[cfg(not(target_arch = "wasm32"))]
292pub(crate) fn interesting(
293 activity_timeout: ViewDelta,
294 last_finalized: View,
295 current: View,
296 pending: View,
297 allow_future: bool,
298) -> bool {
299 if pending.is_zero() {
301 return false;
302 }
303 if pending < min_active(activity_timeout, last_finalized) {
304 return false;
305 }
306 if !allow_future && pending > current.next() {
307 return false;
308 }
309 true
310}
311
312#[cfg(test)]
314pub(crate) fn quorum(n: u32) -> u32 {
315 use commonware_utils::{Faults, N3f1};
316
317 N3f1::quorum(n)
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323 use crate::{
324 simplex::{
325 elector::{Config as Elector, Random, RoundRobin},
326 mocks::twins::Strategy,
327 scheme::{
328 bls12381_multisig,
329 bls12381_threshold::{
330 standard as bls12381_threshold_std,
331 vrf::{self as bls12381_threshold_vrf, Seedable},
332 },
333 ed25519, secp256r1, Scheme,
334 },
335 types::{
336 Certificate, Finalization as TFinalization, Finalize as TFinalize,
337 Notarization as TNotarization, Notarize as TNotarize,
338 Nullification as TNullification, Nullify as TNullify, Proposal, Vote,
339 },
340 },
341 types::{Epoch, Round},
342 Monitor, Viewable,
343 };
344 use commonware_codec::{Decode, DecodeExt, Encode};
345 use commonware_cryptography::{
346 bls12381::primitives::variant::{MinPk, MinSig, Variant},
347 certificate::mocks::Fixture,
348 ed25519::{PrivateKey, PublicKey},
349 sha256::{Digest as Sha256Digest, Digest as D},
350 Hasher as _, Sha256, Signer as _,
351 };
352 use commonware_macros::{select, test_group, test_traced};
353 use commonware_p2p::{
354 simulated::{Config, Link, Network, Oracle, Receiver, Sender, SplitOrigin, SplitTarget},
355 Recipients, Sender as _,
356 };
357 use commonware_parallel::Sequential;
358 use commonware_runtime::{
359 buffer::paged::CacheRef, count_running_tasks, deterministic, Clock, IoBuf, Metrics, Quota,
360 Runner, Spawner,
361 };
362 use commonware_utils::{test_rng, Faults, N3f1, NZUsize, NZU16};
363 use engine::Engine;
364 use futures::future::join_all;
365 use rand::{rngs::StdRng, Rng as _};
366 use std::{
367 collections::{BTreeMap, HashMap},
368 num::{NonZeroU16, NonZeroU32, NonZeroUsize},
369 sync::{Arc, Mutex},
370 time::Duration,
371 };
372 use tracing::{debug, info, warn};
373 use types::Activity;
374
375 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
376 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
377 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
378
379 #[test]
380 fn test_interesting() {
381 let activity_timeout = ViewDelta::new(10);
382
383 assert!(!interesting(
385 activity_timeout,
386 View::zero(),
387 View::zero(),
388 View::zero(),
389 false
390 ));
391 assert!(!interesting(
392 activity_timeout,
393 View::zero(),
394 View::new(1),
395 View::zero(),
396 true
397 ));
398
399 assert!(!interesting(
401 activity_timeout,
402 View::new(20),
403 View::new(25),
404 View::new(5), false
406 ));
407
408 assert!(interesting(
410 activity_timeout,
411 View::new(20),
412 View::new(25),
413 View::new(10), false
415 ));
416
417 assert!(!interesting(
419 activity_timeout,
420 View::new(20),
421 View::new(25),
422 View::new(27),
423 false
424 ));
425
426 assert!(interesting(
428 activity_timeout,
429 View::new(20),
430 View::new(25),
431 View::new(27),
432 true
433 ));
434
435 assert!(interesting(
437 activity_timeout,
438 View::new(20),
439 View::new(25),
440 View::new(26),
441 false
442 ));
443
444 assert!(interesting(
446 activity_timeout,
447 View::new(20),
448 View::new(25),
449 View::new(22),
450 false
451 ));
452
453 assert!(interesting(
456 activity_timeout,
457 View::zero(),
458 View::new(5),
459 View::new(1),
460 false
461 ));
462 }
463
464 async fn register_validator(
466 oracle: &mut Oracle<PublicKey, deterministic::Context>,
467 validator: PublicKey,
468 ) -> (
469 (
470 Sender<PublicKey, deterministic::Context>,
471 Receiver<PublicKey>,
472 ),
473 (
474 Sender<PublicKey, deterministic::Context>,
475 Receiver<PublicKey>,
476 ),
477 (
478 Sender<PublicKey, deterministic::Context>,
479 Receiver<PublicKey>,
480 ),
481 ) {
482 let control = oracle.control(validator.clone());
483 let (vote_sender, vote_receiver) = control.register(0, TEST_QUOTA).await.unwrap();
484 let (certificate_sender, certificate_receiver) =
485 control.register(1, TEST_QUOTA).await.unwrap();
486 let (resolver_sender, resolver_receiver) = control.register(2, TEST_QUOTA).await.unwrap();
487 (
488 (vote_sender, vote_receiver),
489 (certificate_sender, certificate_receiver),
490 (resolver_sender, resolver_receiver),
491 )
492 }
493
494 async fn register_validators(
496 oracle: &mut Oracle<PublicKey, deterministic::Context>,
497 validators: &[PublicKey],
498 ) -> HashMap<
499 PublicKey,
500 (
501 (
502 Sender<PublicKey, deterministic::Context>,
503 Receiver<PublicKey>,
504 ),
505 (
506 Sender<PublicKey, deterministic::Context>,
507 Receiver<PublicKey>,
508 ),
509 (
510 Sender<PublicKey, deterministic::Context>,
511 Receiver<PublicKey>,
512 ),
513 ),
514 > {
515 let mut registrations = HashMap::new();
516 for validator in validators.iter() {
517 let registration = register_validator(oracle, validator.clone()).await;
518 registrations.insert(validator.clone(), registration);
519 }
520 registrations
521 }
522
523 enum Action {
525 Link(Link),
526 Update(Link), Unlink,
528 }
529
530 async fn link_validators(
536 oracle: &mut Oracle<PublicKey, deterministic::Context>,
537 validators: &[PublicKey],
538 action: Action,
539 restrict_to: Option<fn(usize, usize, usize) -> bool>,
540 ) {
541 for (i1, v1) in validators.iter().enumerate() {
542 for (i2, v2) in validators.iter().enumerate() {
543 if v2 == v1 {
545 continue;
546 }
547
548 if let Some(f) = restrict_to {
550 if !f(validators.len(), i1, i2) {
551 continue;
552 }
553 }
554
555 match action {
557 Action::Update(_) | Action::Unlink => {
558 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
559 }
560 _ => {}
561 }
562
563 match action {
565 Action::Link(ref link) | Action::Update(ref link) => {
566 oracle
567 .add_link(v1.clone(), v2.clone(), link.clone())
568 .await
569 .unwrap();
570 }
571 _ => {}
572 }
573 }
574 }
575 }
576
577 fn count_nonzero_metric_lines(encoded: &str, patterns: &[&str]) -> u32 {
579 encoded
580 .lines()
581 .filter(|line| patterns.iter().all(|p| line.contains(p)))
582 .filter(|line| {
583 line.split_whitespace()
584 .last()
585 .and_then(|s| s.parse::<u64>().ok())
586 .is_some_and(|n| n > 0)
587 })
588 .count() as u32
589 }
590
591 fn all_online<S, F, L>(mut fixture: F)
592 where
593 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
594 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
595 L: Elector<S>,
596 {
597 let n = 5;
599 let quorum = quorum(n) as usize;
600 let required_containers = View::new(100);
601 let activity_timeout = ViewDelta::new(10);
602 let skip_timeout = ViewDelta::new(5);
603 let namespace = b"consensus".to_vec();
604 let executor = deterministic::Runner::timed(Duration::from_secs(300));
605 executor.start(|mut context| async move {
606 let (network, mut oracle) = Network::new(
608 context.with_label("network"),
609 Config {
610 max_size: 1024 * 1024,
611 disconnect_on_block: true,
612 tracked_peer_sets: None,
613 },
614 );
615
616 network.start();
618
619 let Fixture {
621 participants,
622 schemes,
623 ..
624 } = fixture(&mut context, &namespace, n);
625 let mut registrations = register_validators(&mut oracle, &participants).await;
626
627 let link = Link {
629 latency: Duration::from_millis(10),
630 jitter: Duration::from_millis(1),
631 success_rate: 1.0,
632 };
633 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
634
635 let elector = L::default();
637 let relay = Arc::new(mocks::relay::Relay::new());
638 let mut reporters = Vec::new();
639 let mut engine_handlers = Vec::new();
640 for (idx, validator) in participants.iter().enumerate() {
641 let context = context.with_label(&format!("validator_{}", *validator));
643
644 let reporter_config = mocks::reporter::Config {
646 participants: participants.clone().try_into().unwrap(),
647 scheme: schemes[idx].clone(),
648 elector: elector.clone(),
649 };
650 let reporter =
651 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
652 reporters.push(reporter.clone());
653 let application_cfg = mocks::application::Config {
654 hasher: Sha256::default(),
655 relay: relay.clone(),
656 me: validator.clone(),
657 propose_latency: (10.0, 5.0),
658 verify_latency: (10.0, 5.0),
659 certify_latency: (10.0, 5.0),
660 should_certify: mocks::application::Certifier::Sometimes,
661 };
662 let (actor, application) = mocks::application::Application::new(
663 context.with_label("application"),
664 application_cfg,
665 );
666 actor.start();
667 let blocker = oracle.control(validator.clone());
668 let cfg = config::Config {
669 scheme: schemes[idx].clone(),
670 elector: elector.clone(),
671 blocker,
672 automaton: application.clone(),
673 relay: application.clone(),
674 reporter: reporter.clone(),
675 strategy: Sequential,
676 partition: validator.to_string(),
677 mailbox_size: 1024,
678 epoch: Epoch::new(333),
679 leader_timeout: Duration::from_secs(1),
680 notarization_timeout: Duration::from_secs(2),
681 nullify_retry: Duration::from_secs(10),
682 fetch_timeout: Duration::from_secs(1),
683 activity_timeout,
684 skip_timeout,
685 fetch_concurrent: 4,
686 replay_buffer: NZUsize!(1024 * 1024),
687 write_buffer: NZUsize!(1024 * 1024),
688 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
689 };
690 let engine = Engine::new(context.with_label("engine"), cfg);
691
692 let (pending, recovered, resolver) = registrations
694 .remove(validator)
695 .expect("validator should be registered");
696 engine_handlers.push(engine.start(pending, recovered, resolver));
697 }
698
699 let mut finalizers = Vec::new();
701 for reporter in reporters.iter_mut() {
702 let (mut latest, mut monitor) = reporter.subscribe().await;
703 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
704 while latest < required_containers {
705 latest = monitor.recv().await.expect("event missing");
706 }
707 }));
708 }
709 join_all(finalizers).await;
710
711 let latest_complete = required_containers.saturating_sub(activity_timeout);
713 for reporter in reporters.iter() {
714 {
716 let faults = reporter.faults.lock().unwrap();
717 assert!(faults.is_empty());
718 }
719
720 {
722 let invalid = reporter.invalid.lock().unwrap();
723 assert_eq!(*invalid, 0);
724 }
725
726 {
728 let certified = reporter.certified.lock().unwrap();
729 for view in View::range(View::new(1), latest_complete) {
730 if !certified.contains(&view) {
732 panic!("view: {view}");
733 }
734 }
735 }
736
737 let mut notarized = HashMap::new();
739 let mut finalized = HashMap::new();
740 {
741 let notarizes = reporter.notarizes.lock().unwrap();
742 for view in View::range(View::new(1), latest_complete) {
743 let Some(payloads) = notarizes.get(&view) else {
745 continue;
746 };
747 if payloads.len() > 1 {
748 panic!("view: {view}");
749 }
750 let (digest, notarizers) = payloads.iter().next().unwrap();
751 notarized.insert(view, *digest);
752
753 if notarizers.len() < quorum {
754 panic!("view: {view}");
757 }
758 }
759 }
760 {
761 let notarizations = reporter.notarizations.lock().unwrap();
762 for view in View::range(View::new(1), latest_complete) {
763 let Some(notarization) = notarizations.get(&view) else {
765 continue;
766 };
767 let Some(digest) = notarized.get(&view) else {
768 continue;
769 };
770 assert_eq!(¬arization.proposal.payload, digest);
771 }
772 }
773 {
774 let finalizes = reporter.finalizes.lock().unwrap();
775 for view in View::range(View::new(1), latest_complete) {
776 let Some(payloads) = finalizes.get(&view) else {
778 continue;
779 };
780 if payloads.len() > 1 {
781 panic!("view: {view}");
782 }
783 let (digest, finalizers) = payloads.iter().next().unwrap();
784 finalized.insert(view, *digest);
785
786 if view > latest_complete {
788 continue;
789 }
790
791 if finalizers.len() < quorum {
793 panic!("view: {view}");
796 }
797
798 let nullifies = reporter.nullifies.lock().unwrap();
800 let Some(nullifies) = nullifies.get(&view) else {
801 continue;
802 };
803 for (_, finalizers) in payloads.iter() {
804 for finalizer in finalizers.iter() {
805 if nullifies.contains(finalizer) {
806 panic!("should not nullify and finalize at same view");
807 }
808 }
809 }
810 }
811 }
812 {
813 let finalizations = reporter.finalizations.lock().unwrap();
814 for view in View::range(View::new(1), latest_complete) {
815 let Some(finalization) = finalizations.get(&view) else {
817 continue;
818 };
819 let Some(digest) = finalized.get(&view) else {
820 continue;
821 };
822 assert_eq!(&finalization.proposal.payload, digest);
823 }
824 }
825 }
826
827 let blocked = oracle.blocked().await.unwrap();
829 assert!(blocked.is_empty());
830 });
831 }
832
833 #[test_traced]
834 fn test_all_online() {
835 all_online::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
836 all_online::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
837 all_online::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
838 all_online::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
839 all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
840 all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
841 all_online::<_, _, RoundRobin>(ed25519::fixture);
842 all_online::<_, _, RoundRobin>(secp256r1::fixture);
843 }
844
845 fn observer<S, F, L>(mut fixture: F)
846 where
847 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
848 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
849 L: Elector<S>,
850 {
851 let n_active = 5;
853 let required_containers = View::new(100);
854 let activity_timeout = ViewDelta::new(10);
855 let skip_timeout = ViewDelta::new(5);
856 let namespace = b"consensus".to_vec();
857 let executor = deterministic::Runner::timed(Duration::from_secs(300));
858 executor.start(|mut context| async move {
859 let (network, mut oracle) = Network::new(
861 context.with_label("network"),
862 Config {
863 max_size: 1024 * 1024,
864 disconnect_on_block: true,
865 tracked_peer_sets: None,
866 },
867 );
868
869 network.start();
871
872 let Fixture {
874 participants,
875 schemes,
876 verifier,
877 ..
878 } = fixture(&mut context, &namespace, n_active);
879
880 let private_key_observer = PrivateKey::from_seed(n_active as u64);
882 let public_key_observer = private_key_observer.public_key();
883
884 let mut all_validators = participants.clone();
886 all_validators.push(public_key_observer.clone());
887 all_validators.sort();
888 let mut registrations = register_validators(&mut oracle, &all_validators).await;
889
890 let link = Link {
892 latency: Duration::from_millis(10),
893 jitter: Duration::from_millis(1),
894 success_rate: 1.0,
895 };
896 link_validators(&mut oracle, &all_validators, Action::Link(link), None).await;
897
898 let elector = L::default();
900 let relay = Arc::new(mocks::relay::Relay::new());
901 let mut reporters = Vec::new();
902
903 for (idx, validator) in participants.iter().enumerate() {
904 let is_observer = *validator == public_key_observer;
905
906 let context = context.with_label(&format!("validator_{}", *validator));
908
909 let signing = if is_observer {
911 verifier.clone()
912 } else {
913 schemes[idx].clone()
914 };
915 let reporter_config = mocks::reporter::Config {
916 participants: participants.clone().try_into().unwrap(),
917 scheme: signing.clone(),
918 elector: elector.clone(),
919 };
920 let reporter =
921 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
922 reporters.push(reporter.clone());
923 let application_cfg = mocks::application::Config {
924 hasher: Sha256::default(),
925 relay: relay.clone(),
926 me: validator.clone(),
927 propose_latency: (10.0, 5.0),
928 verify_latency: (10.0, 5.0),
929 certify_latency: (10.0, 5.0),
930 should_certify: mocks::application::Certifier::Sometimes,
931 };
932 let (actor, application) = mocks::application::Application::new(
933 context.with_label("application"),
934 application_cfg,
935 );
936 actor.start();
937 let blocker = oracle.control(validator.clone());
938 let cfg = config::Config {
939 scheme: signing.clone(),
940 elector: elector.clone(),
941 blocker,
942 automaton: application.clone(),
943 relay: application.clone(),
944 reporter: reporter.clone(),
945 strategy: Sequential,
946 partition: validator.to_string(),
947 mailbox_size: 1024,
948 epoch: Epoch::new(333),
949 leader_timeout: Duration::from_secs(1),
950 notarization_timeout: Duration::from_secs(2),
951 nullify_retry: Duration::from_secs(10),
952 fetch_timeout: Duration::from_secs(1),
953 activity_timeout,
954 skip_timeout,
955 fetch_concurrent: 4,
956 replay_buffer: NZUsize!(1024 * 1024),
957 write_buffer: NZUsize!(1024 * 1024),
958 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
959 };
960 let engine = Engine::new(context.with_label("engine"), cfg);
961
962 let (pending, recovered, resolver) = registrations
964 .remove(validator)
965 .expect("validator should be registered");
966 engine.start(pending, recovered, resolver);
967 }
968
969 let mut finalizers = Vec::new();
971 for reporter in reporters.iter_mut() {
972 let (mut latest, mut monitor) = reporter.subscribe().await;
973 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
974 while latest < required_containers {
975 latest = monitor.recv().await.expect("event missing");
976 }
977 }));
978 }
979 join_all(finalizers).await;
980
981 for reporter in reporters.iter() {
983 {
985 let faults = reporter.faults.lock().unwrap();
986 assert!(faults.is_empty());
987 }
988 {
989 let invalid = reporter.invalid.lock().unwrap();
990 assert_eq!(*invalid, 0);
991 }
992
993 let blocked = oracle.blocked().await.unwrap();
995 assert!(blocked.is_empty());
996 }
997 });
998 }
999
1000 #[test_traced]
1001 fn test_observer() {
1002 observer::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1003 observer::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1004 observer::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1005 observer::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1006 observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1007 observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1008 observer::<_, _, RoundRobin>(ed25519::fixture);
1009 observer::<_, _, RoundRobin>(secp256r1::fixture);
1010 }
1011
1012 fn unclean_shutdown<S, F, L>(mut fixture: F)
1013 where
1014 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1015 F: FnMut(&mut StdRng, &[u8], u32) -> Fixture<S>,
1016 L: Elector<S>,
1017 {
1018 let n = 5;
1020 let required_containers = View::new(100);
1021 let activity_timeout = ViewDelta::new(10);
1022 let skip_timeout = ViewDelta::new(5);
1023 let namespace = b"consensus".to_vec();
1024
1025 let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
1027 let supervised = Arc::new(Mutex::new(Vec::new()));
1028 let mut prev_checkpoint = None;
1029
1030 let mut rng = test_rng();
1032 let Fixture {
1033 participants,
1034 schemes,
1035 ..
1036 } = fixture(&mut rng, &namespace, n);
1037
1038 let relay = Arc::new(mocks::relay::Relay::<Sha256Digest, S::PublicKey>::new());
1040
1041 loop {
1042 let rng = rng.clone();
1043 let participants = participants.clone();
1044 let schemes = schemes.clone();
1045 let shutdowns = shutdowns.clone();
1046 let supervised = supervised.clone();
1047 let relay = relay.clone();
1048 relay.deregister_all(); let f = |mut context: deterministic::Context| async move {
1051 let (network, mut oracle) = Network::new(
1053 context.with_label("network"),
1054 Config {
1055 max_size: 1024 * 1024,
1056 disconnect_on_block: true,
1057 tracked_peer_sets: None,
1058 },
1059 );
1060
1061 network.start();
1063
1064 let mut registrations = register_validators(&mut oracle, &participants).await;
1066
1067 let link = Link {
1069 latency: Duration::from_millis(50),
1070 jitter: Duration::from_millis(50),
1071 success_rate: 1.0,
1072 };
1073 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1074
1075 let elector = L::default();
1077 let relay = Arc::new(mocks::relay::Relay::new());
1078 let mut reporters = HashMap::new();
1079 let mut engine_handlers = Vec::new();
1080 for (idx, validator) in participants.iter().enumerate() {
1081 let context = context.with_label(&format!("validator_{}", *validator));
1083
1084 let reporter_config = mocks::reporter::Config {
1086 participants: participants.clone().try_into().unwrap(),
1087 scheme: schemes[idx].clone(),
1088 elector: elector.clone(),
1089 };
1090 let reporter = mocks::reporter::Reporter::new(rng.clone(), reporter_config);
1091 reporters.insert(validator.clone(), reporter.clone());
1092 let application_cfg = mocks::application::Config {
1093 hasher: Sha256::default(),
1094 relay: relay.clone(),
1095 me: validator.clone(),
1096 propose_latency: (10.0, 5.0),
1097 verify_latency: (10.0, 5.0),
1098 certify_latency: (10.0, 5.0),
1099 should_certify: mocks::application::Certifier::Sometimes,
1100 };
1101 let (actor, application) = mocks::application::Application::new(
1102 context.with_label("application"),
1103 application_cfg,
1104 );
1105 actor.start();
1106 let blocker = oracle.control(validator.clone());
1107 let cfg = config::Config {
1108 scheme: schemes[idx].clone(),
1109 elector: elector.clone(),
1110 blocker,
1111 automaton: application.clone(),
1112 relay: application.clone(),
1113 reporter: reporter.clone(),
1114 strategy: Sequential,
1115 partition: validator.to_string(),
1116 mailbox_size: 1024,
1117 epoch: Epoch::new(333),
1118 leader_timeout: Duration::from_secs(1),
1119 notarization_timeout: Duration::from_secs(2),
1120 nullify_retry: Duration::from_secs(10),
1121 fetch_timeout: Duration::from_secs(1),
1122 activity_timeout,
1123 skip_timeout,
1124 fetch_concurrent: 4,
1125 replay_buffer: NZUsize!(1024 * 1024),
1126 write_buffer: NZUsize!(1024 * 1024),
1127 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1128 };
1129 let engine = Engine::new(context.with_label("engine"), cfg);
1130
1131 let (pending, recovered, resolver) = registrations
1133 .remove(validator)
1134 .expect("validator should be registered");
1135 engine_handlers.push(engine.start(pending, recovered, resolver));
1136 }
1137
1138 let mut finalizers = Vec::new();
1140 for (_, reporter) in reporters.iter_mut() {
1141 let (mut latest, mut monitor) = reporter.subscribe().await;
1142 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1143 while latest < required_containers {
1144 latest = monitor.recv().await.expect("event missing");
1145 }
1146 }));
1147 }
1148
1149 let wait =
1151 context.gen_range(Duration::from_millis(100)..Duration::from_millis(2_000));
1152 let result = select! {
1153 _ = context.sleep(wait) => {
1154 {
1156 let mut shutdowns = shutdowns.lock().unwrap();
1157 debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
1158 *shutdowns += 1;
1159 }
1160 supervised.lock().unwrap().push(reporters);
1161 false
1162 },
1163 _ = join_all(finalizers) => {
1164 let supervised = supervised.lock().unwrap();
1166 for reporters in supervised.iter() {
1167 for (_, reporter) in reporters.iter() {
1168 let faults = reporter.faults.lock().unwrap();
1169 assert!(faults.is_empty());
1170 }
1171 }
1172 true
1173 },
1174 };
1175
1176 let blocked = oracle.blocked().await.unwrap();
1178 assert!(blocked.is_empty());
1179
1180 result
1181 };
1182
1183 let (complete, checkpoint) = prev_checkpoint
1184 .map_or_else(
1185 || deterministic::Runner::timed(Duration::from_secs(180)),
1186 deterministic::Runner::from,
1187 )
1188 .start_and_recover(f);
1189
1190 if complete {
1192 break;
1193 }
1194
1195 prev_checkpoint = Some(checkpoint);
1196 }
1197 }
1198
1199 #[test_group("slow")]
1200 #[test_traced]
1201 fn test_unclean_shutdown() {
1202 unclean_shutdown::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1203 unclean_shutdown::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1204 unclean_shutdown::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1205 unclean_shutdown::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1206 unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1207 unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1208 unclean_shutdown::<_, _, RoundRobin>(ed25519::fixture);
1209 unclean_shutdown::<_, _, RoundRobin>(secp256r1::fixture);
1210 }
1211
1212 fn backfill<S, F, L>(mut fixture: F)
1213 where
1214 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1215 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1216 L: Elector<S>,
1217 {
1218 let n = 4;
1220 let required_containers = View::new(100);
1221 let activity_timeout = ViewDelta::new(10);
1222 let skip_timeout = ViewDelta::new(5);
1223 let namespace = b"consensus".to_vec();
1224 let executor = deterministic::Runner::timed(Duration::from_secs(240));
1225 executor.start(|mut context| async move {
1226 let (network, mut oracle) = Network::new(
1228 context.with_label("network"),
1229 Config {
1230 max_size: 1024 * 1024,
1231 disconnect_on_block: true,
1232 tracked_peer_sets: None,
1233 },
1234 );
1235
1236 network.start();
1238
1239 let Fixture {
1241 participants,
1242 schemes,
1243 ..
1244 } = fixture(&mut context, &namespace, n);
1245 let mut registrations = register_validators(&mut oracle, &participants).await;
1246
1247 let link = Link {
1249 latency: Duration::from_millis(10),
1250 jitter: Duration::from_millis(1),
1251 success_rate: 1.0,
1252 };
1253 link_validators(
1254 &mut oracle,
1255 &participants,
1256 Action::Link(link),
1257 Some(|_, i, j| ![i, j].contains(&0usize)),
1258 )
1259 .await;
1260
1261 let elector = L::default();
1263 let relay = Arc::new(mocks::relay::Relay::new());
1264 let mut reporters = Vec::new();
1265 let mut engine_handlers = Vec::new();
1266 for (idx_scheme, validator) in participants.iter().enumerate() {
1267 if idx_scheme == 0 {
1269 continue;
1270 }
1271
1272 let context = context.with_label(&format!("validator_{}", *validator));
1274
1275 let reporter_config = mocks::reporter::Config {
1277 participants: participants.clone().try_into().unwrap(),
1278 scheme: schemes[idx_scheme].clone(),
1279 elector: elector.clone(),
1280 };
1281 let reporter =
1282 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1283 reporters.push(reporter.clone());
1284 let application_cfg = mocks::application::Config {
1285 hasher: Sha256::default(),
1286 relay: relay.clone(),
1287 me: validator.clone(),
1288 propose_latency: (10.0, 5.0),
1289 verify_latency: (10.0, 5.0),
1290 certify_latency: (10.0, 5.0),
1291 should_certify: mocks::application::Certifier::Sometimes,
1292 };
1293 let (actor, application) = mocks::application::Application::new(
1294 context.with_label("application"),
1295 application_cfg,
1296 );
1297 actor.start();
1298 let blocker = oracle.control(validator.clone());
1299 let cfg = config::Config {
1300 scheme: schemes[idx_scheme].clone(),
1301 elector: elector.clone(),
1302 blocker,
1303 automaton: application.clone(),
1304 relay: application.clone(),
1305 reporter: reporter.clone(),
1306 strategy: Sequential,
1307 partition: validator.to_string(),
1308 mailbox_size: 1024,
1309 epoch: Epoch::new(333),
1310 leader_timeout: Duration::from_secs(1),
1311 notarization_timeout: Duration::from_secs(2),
1312 nullify_retry: Duration::from_secs(10),
1313 fetch_timeout: Duration::from_secs(1),
1314 activity_timeout,
1315 skip_timeout,
1316 fetch_concurrent: 4,
1317 replay_buffer: NZUsize!(1024 * 1024),
1318 write_buffer: NZUsize!(1024 * 1024),
1319 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1320 };
1321 let engine = Engine::new(context.with_label("engine"), cfg);
1322
1323 let (pending, recovered, resolver) = registrations
1325 .remove(validator)
1326 .expect("validator should be registered");
1327 engine_handlers.push(engine.start(pending, recovered, resolver));
1328 }
1329
1330 let mut finalizers = Vec::new();
1332 for reporter in reporters.iter_mut() {
1333 let (mut latest, mut monitor) = reporter.subscribe().await;
1334 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1335 while latest < required_containers {
1336 latest = monitor.recv().await.expect("event missing");
1337 }
1338 }));
1339 }
1340 join_all(finalizers).await;
1341
1342 let link = Link {
1344 latency: Duration::from_secs(3),
1345 jitter: Duration::from_millis(0),
1346 success_rate: 1.0,
1347 };
1348 link_validators(
1349 &mut oracle,
1350 &participants,
1351 Action::Update(link.clone()),
1352 Some(|_, i, j| ![i, j].contains(&0usize)),
1353 )
1354 .await;
1355
1356 context.sleep(Duration::from_secs(60)).await;
1358
1359 link_validators(
1361 &mut oracle,
1362 &participants,
1363 Action::Unlink,
1364 Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
1365 )
1366 .await;
1367
1368 let me = participants[0].clone();
1370 let context = context.with_label(&format!("validator_{me}"));
1371
1372 link_validators(
1374 &mut oracle,
1375 &participants,
1376 Action::Link(link),
1377 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
1378 )
1379 .await;
1380
1381 let link = Link {
1383 latency: Duration::from_millis(10),
1384 jitter: Duration::from_millis(3),
1385 success_rate: 1.0,
1386 };
1387 link_validators(
1388 &mut oracle,
1389 &participants,
1390 Action::Update(link),
1391 Some(|_, i, j| ![i, j].contains(&1usize)),
1392 )
1393 .await;
1394
1395 let reporter_config = mocks::reporter::Config {
1397 participants: participants.clone().try_into().unwrap(),
1398 scheme: schemes[0].clone(),
1399 elector: elector.clone(),
1400 };
1401 let mut reporter =
1402 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1403 reporters.push(reporter.clone());
1404 let application_cfg = mocks::application::Config {
1405 hasher: Sha256::default(),
1406 relay: relay.clone(),
1407 me: me.clone(),
1408 propose_latency: (10.0, 5.0),
1409 verify_latency: (10.0, 5.0),
1410 certify_latency: (10.0, 5.0),
1411 should_certify: mocks::application::Certifier::Sometimes,
1412 };
1413 let (actor, application) = mocks::application::Application::new(
1414 context.with_label("application"),
1415 application_cfg,
1416 );
1417 actor.start();
1418 let blocker = oracle.control(me.clone());
1419 let cfg = config::Config {
1420 scheme: schemes[0].clone(),
1421 elector: elector.clone(),
1422 blocker,
1423 automaton: application.clone(),
1424 relay: application.clone(),
1425 reporter: reporter.clone(),
1426 strategy: Sequential,
1427 partition: me.to_string(),
1428 mailbox_size: 1024,
1429 epoch: Epoch::new(333),
1430 leader_timeout: Duration::from_secs(1),
1431 notarization_timeout: Duration::from_secs(2),
1432 nullify_retry: Duration::from_secs(10),
1433 fetch_timeout: Duration::from_secs(1),
1434 activity_timeout,
1435 skip_timeout,
1436 fetch_concurrent: 4,
1437 replay_buffer: NZUsize!(1024 * 1024),
1438 write_buffer: NZUsize!(1024 * 1024),
1439 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1440 };
1441 let engine = Engine::new(context.with_label("engine"), cfg);
1442
1443 let (pending, recovered, resolver) = registrations
1445 .remove(&me)
1446 .expect("validator should be registered");
1447 engine_handlers.push(engine.start(pending, recovered, resolver));
1448
1449 let (mut latest, mut monitor) = reporter.subscribe().await;
1451 while latest < required_containers {
1452 latest = monitor.recv().await.expect("event missing");
1453 }
1454
1455 let blocked = oracle.blocked().await.unwrap();
1457 assert!(blocked.is_empty());
1458 });
1459 }
1460
1461 #[test_traced]
1462 fn test_backfill() {
1463 backfill::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1464 backfill::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1465 backfill::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1466 backfill::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1467 backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1468 backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1469 backfill::<_, _, RoundRobin>(ed25519::fixture);
1470 backfill::<_, _, RoundRobin>(secp256r1::fixture);
1471 }
1472
1473 fn one_offline<S, F, L>(mut fixture: F)
1474 where
1475 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1476 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1477 L: Elector<S>,
1478 {
1479 let n = 5;
1481 let quorum = quorum(n) as usize;
1482 let required_containers = View::new(100);
1483 let activity_timeout = ViewDelta::new(10);
1484 let skip_timeout = ViewDelta::new(5);
1485 let max_exceptions = 10;
1486 let namespace = b"consensus".to_vec();
1487 let executor = deterministic::Runner::timed(Duration::from_secs(300));
1488 executor.start(|mut context| async move {
1489 let (network, mut oracle) = Network::new(
1491 context.with_label("network"),
1492 Config {
1493 max_size: 1024 * 1024,
1494 disconnect_on_block: true,
1495 tracked_peer_sets: None,
1496 },
1497 );
1498
1499 network.start();
1501
1502 let Fixture {
1504 participants,
1505 schemes,
1506 ..
1507 } = fixture(&mut context, &namespace, n);
1508 let mut registrations = register_validators(&mut oracle, &participants).await;
1509
1510 let link = Link {
1512 latency: Duration::from_millis(10),
1513 jitter: Duration::from_millis(1),
1514 success_rate: 1.0,
1515 };
1516 link_validators(
1517 &mut oracle,
1518 &participants,
1519 Action::Link(link),
1520 Some(|_, i, j| ![i, j].contains(&0usize)),
1521 )
1522 .await;
1523
1524 let elector = L::default();
1526 let relay = Arc::new(mocks::relay::Relay::new());
1527 let mut reporters = Vec::new();
1528 let mut engine_handlers = Vec::new();
1529 for (idx_scheme, validator) in participants.iter().enumerate() {
1530 if idx_scheme == 0 {
1532 continue;
1533 }
1534
1535 let context = context.with_label(&format!("validator_{}", *validator));
1537
1538 let reporter_config = mocks::reporter::Config {
1540 participants: participants.clone().try_into().unwrap(),
1541 scheme: schemes[idx_scheme].clone(),
1542 elector: elector.clone(),
1543 };
1544 let reporter =
1545 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1546 reporters.push(reporter.clone());
1547 let application_cfg = mocks::application::Config {
1548 hasher: Sha256::default(),
1549 relay: relay.clone(),
1550 me: validator.clone(),
1551 propose_latency: (10.0, 5.0),
1552 verify_latency: (10.0, 5.0),
1553 certify_latency: (10.0, 5.0),
1554 should_certify: mocks::application::Certifier::Sometimes,
1555 };
1556 let (actor, application) = mocks::application::Application::new(
1557 context.with_label("application"),
1558 application_cfg,
1559 );
1560 actor.start();
1561 let blocker = oracle.control(validator.clone());
1562 let cfg = config::Config {
1563 scheme: schemes[idx_scheme].clone(),
1564 elector: elector.clone(),
1565 blocker,
1566 automaton: application.clone(),
1567 relay: application.clone(),
1568 reporter: reporter.clone(),
1569 strategy: Sequential,
1570 partition: validator.to_string(),
1571 mailbox_size: 1024,
1572 epoch: Epoch::new(333),
1573 leader_timeout: Duration::from_secs(1),
1574 notarization_timeout: Duration::from_secs(2),
1575 nullify_retry: Duration::from_secs(10),
1576 fetch_timeout: Duration::from_secs(1),
1577 activity_timeout,
1578 skip_timeout,
1579 fetch_concurrent: 4,
1580 replay_buffer: NZUsize!(1024 * 1024),
1581 write_buffer: NZUsize!(1024 * 1024),
1582 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1583 };
1584 let engine = Engine::new(context.with_label("engine"), cfg);
1585
1586 let (pending, recovered, resolver) = registrations
1588 .remove(validator)
1589 .expect("validator should be registered");
1590 engine_handlers.push(engine.start(pending, recovered, resolver));
1591 }
1592
1593 let mut finalizers = Vec::new();
1595 for reporter in reporters.iter_mut() {
1596 let (mut latest, mut monitor) = reporter.subscribe().await;
1597 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1598 while latest < required_containers {
1599 latest = monitor.recv().await.expect("event missing");
1600 }
1601 }));
1602 }
1603 join_all(finalizers).await;
1604
1605 let exceptions = 0;
1607 let offline = &participants[0];
1608 for reporter in reporters.iter() {
1609 {
1611 let faults = reporter.faults.lock().unwrap();
1612 assert!(faults.is_empty());
1613 }
1614
1615 {
1617 let invalid = reporter.invalid.lock().unwrap();
1618 assert_eq!(*invalid, 0);
1619 }
1620
1621 let mut exceptions = 0;
1623 {
1624 let notarizes = reporter.notarizes.lock().unwrap();
1625 for (view, payloads) in notarizes.iter() {
1626 for (_, participants) in payloads.iter() {
1627 if participants.contains(offline) {
1628 panic!("view: {view}");
1629 }
1630 }
1631 }
1632 }
1633 {
1634 let nullifies = reporter.nullifies.lock().unwrap();
1635 for (view, participants) in nullifies.iter() {
1636 if participants.contains(offline) {
1637 panic!("view: {view}");
1638 }
1639 }
1640 }
1641 {
1642 let finalizes = reporter.finalizes.lock().unwrap();
1643 for (view, payloads) in finalizes.iter() {
1644 for (_, finalizers) in payloads.iter() {
1645 if finalizers.contains(offline) {
1646 panic!("view: {view}");
1647 }
1648 }
1649 }
1650 }
1651
1652 let mut offline_views = Vec::new();
1654 {
1655 let leaders = reporter.leaders.lock().unwrap();
1656 for (view, leader) in leaders.iter() {
1657 if leader == offline {
1658 offline_views.push(*view);
1659 }
1660 }
1661 }
1662 assert!(!offline_views.is_empty());
1663
1664 {
1666 let nullifies = reporter.nullifies.lock().unwrap();
1667 for view in offline_views.iter() {
1668 let nullifies = nullifies.get(view).map_or(0, |n| n.len());
1669 if nullifies < quorum {
1670 warn!("missing expected view nullifies: {}", view);
1671 exceptions += 1;
1672 }
1673 }
1674 }
1675 {
1676 let nullifications = reporter.nullifications.lock().unwrap();
1677 for view in offline_views.iter() {
1678 if !nullifications.contains_key(view) {
1679 warn!("missing expected view nullifies: {}", view);
1680 exceptions += 1;
1681 }
1682 }
1683 }
1684
1685 assert!(exceptions <= max_exceptions);
1687 }
1688 assert!(exceptions <= max_exceptions);
1689
1690 let blocked = oracle.blocked().await.unwrap();
1692 assert!(blocked.is_empty());
1693
1694 let encoded = context.encode();
1696 let peer_label = format!("peer=\"{}\"", offline);
1697 for metric in ["_skips_per_leader", "_nullifications_per_leader"] {
1698 assert_eq!(
1699 count_nonzero_metric_lines(&encoded, &[metric, &peer_label]),
1700 n - 1,
1701 "expected all online nodes to record {metric} for offline leader"
1702 );
1703 }
1704 });
1705 }
1706
1707 #[test_traced]
1708 fn test_one_offline() {
1709 one_offline::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1710 one_offline::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1711 one_offline::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1712 one_offline::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1713 one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1714 one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1715 one_offline::<_, _, RoundRobin>(ed25519::fixture);
1716 one_offline::<_, _, RoundRobin>(secp256r1::fixture);
1717 }
1718
1719 fn slow_validator<S, F, L>(mut fixture: F)
1720 where
1721 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1722 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1723 L: Elector<S>,
1724 {
1725 let n = 5;
1727 let required_containers = View::new(50);
1728 let activity_timeout = ViewDelta::new(10);
1729 let skip_timeout = ViewDelta::new(5);
1730 let namespace = b"consensus".to_vec();
1731 let executor = deterministic::Runner::timed(Duration::from_secs(300));
1732 executor.start(|mut context| async move {
1733 let (network, mut oracle) = Network::new(
1735 context.with_label("network"),
1736 Config {
1737 max_size: 1024 * 1024,
1738 disconnect_on_block: true,
1739 tracked_peer_sets: None,
1740 },
1741 );
1742
1743 network.start();
1745
1746 let Fixture {
1748 participants,
1749 schemes,
1750 ..
1751 } = fixture(&mut context, &namespace, n);
1752 let mut registrations = register_validators(&mut oracle, &participants).await;
1753
1754 let link = Link {
1756 latency: Duration::from_millis(10),
1757 jitter: Duration::from_millis(1),
1758 success_rate: 1.0,
1759 };
1760 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1761
1762 let elector = L::default();
1764 let relay = Arc::new(mocks::relay::Relay::new());
1765 let mut reporters = Vec::new();
1766 let mut engine_handlers = Vec::new();
1767 for (idx_scheme, validator) in participants.iter().enumerate() {
1768 let context = context.with_label(&format!("validator_{}", *validator));
1770
1771 let reporter_config = mocks::reporter::Config {
1773 participants: participants.clone().try_into().unwrap(),
1774 scheme: schemes[idx_scheme].clone(),
1775 elector: elector.clone(),
1776 };
1777 let reporter =
1778 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1779 reporters.push(reporter.clone());
1780 let application_cfg = if idx_scheme == 0 {
1781 mocks::application::Config {
1782 hasher: Sha256::default(),
1783 relay: relay.clone(),
1784 me: validator.clone(),
1785 propose_latency: (10_000.0, 0.0),
1786 verify_latency: (10_000.0, 5.0),
1787 certify_latency: (10_000.0, 5.0),
1788 should_certify: mocks::application::Certifier::Sometimes,
1789 }
1790 } else {
1791 mocks::application::Config {
1792 hasher: Sha256::default(),
1793 relay: relay.clone(),
1794 me: validator.clone(),
1795 propose_latency: (10.0, 5.0),
1796 verify_latency: (10.0, 5.0),
1797 certify_latency: (10.0, 5.0),
1798 should_certify: mocks::application::Certifier::Sometimes,
1799 }
1800 };
1801 let (actor, application) = mocks::application::Application::new(
1802 context.with_label("application"),
1803 application_cfg,
1804 );
1805 actor.start();
1806 let blocker = oracle.control(validator.clone());
1807 let cfg = config::Config {
1808 scheme: schemes[idx_scheme].clone(),
1809 elector: elector.clone(),
1810 blocker,
1811 automaton: application.clone(),
1812 relay: application.clone(),
1813 reporter: reporter.clone(),
1814 strategy: Sequential,
1815 partition: validator.to_string(),
1816 mailbox_size: 1024,
1817 epoch: Epoch::new(333),
1818 leader_timeout: Duration::from_secs(1),
1819 notarization_timeout: Duration::from_secs(2),
1820 nullify_retry: Duration::from_secs(10),
1821 fetch_timeout: Duration::from_secs(1),
1822 activity_timeout,
1823 skip_timeout,
1824 fetch_concurrent: 4,
1825 replay_buffer: NZUsize!(1024 * 1024),
1826 write_buffer: NZUsize!(1024 * 1024),
1827 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1828 };
1829 let engine = Engine::new(context.with_label("engine"), cfg);
1830
1831 let (pending, recovered, resolver) = registrations
1833 .remove(validator)
1834 .expect("validator should be registered");
1835 engine_handlers.push(engine.start(pending, recovered, resolver));
1836 }
1837
1838 let mut finalizers = Vec::new();
1840 for reporter in reporters.iter_mut() {
1841 let (mut latest, mut monitor) = reporter.subscribe().await;
1842 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1843 while latest < required_containers {
1844 latest = monitor.recv().await.expect("event missing");
1845 }
1846 }));
1847 }
1848 join_all(finalizers).await;
1849
1850 let slow = &participants[0];
1852 for reporter in reporters.iter() {
1853 {
1855 let faults = reporter.faults.lock().unwrap();
1856 assert!(faults.is_empty());
1857 }
1858
1859 {
1861 let invalid = reporter.invalid.lock().unwrap();
1862 assert_eq!(*invalid, 0);
1863 }
1864
1865 let mut observed = false;
1867 {
1868 let notarizes = reporter.notarizes.lock().unwrap();
1869 for (_, payloads) in notarizes.iter() {
1870 for (_, participants) in payloads.iter() {
1871 if participants.contains(slow) {
1872 observed = true;
1873 break;
1874 }
1875 }
1876 }
1877 }
1878 {
1879 let finalizes = reporter.finalizes.lock().unwrap();
1880 for (_, payloads) in finalizes.iter() {
1881 for (_, finalizers) in payloads.iter() {
1882 if finalizers.contains(slow) {
1883 observed = true;
1884 break;
1885 }
1886 }
1887 }
1888 }
1889 assert!(observed);
1890 }
1891
1892 let blocked = oracle.blocked().await.unwrap();
1894 assert!(blocked.is_empty());
1895 });
1896 }
1897
1898 #[test_traced]
1899 fn test_slow_validator() {
1900 slow_validator::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1901 slow_validator::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1902 slow_validator::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1903 slow_validator::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1904 slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1905 slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1906 slow_validator::<_, _, RoundRobin>(ed25519::fixture);
1907 slow_validator::<_, _, RoundRobin>(secp256r1::fixture);
1908 }
1909
1910 fn all_recovery<S, F, L>(mut fixture: F)
1911 where
1912 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1913 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1914 L: Elector<S>,
1915 {
1916 let n = 5;
1918 let required_containers = View::new(100);
1919 let activity_timeout = ViewDelta::new(10);
1920 let skip_timeout = ViewDelta::new(2);
1921 let namespace = b"consensus".to_vec();
1922 let executor = deterministic::Runner::timed(Duration::from_secs(1800));
1923 executor.start(|mut context| async move {
1924 let (network, mut oracle) = Network::new(
1926 context.with_label("network"),
1927 Config {
1928 max_size: 1024 * 1024,
1929 disconnect_on_block: false,
1930 tracked_peer_sets: None,
1931 },
1932 );
1933
1934 network.start();
1936
1937 let Fixture {
1939 participants,
1940 schemes,
1941 ..
1942 } = fixture(&mut context, &namespace, n);
1943 let mut registrations = register_validators(&mut oracle, &participants).await;
1944
1945 let link = Link {
1947 latency: Duration::from_secs(3),
1948 jitter: Duration::from_millis(0),
1949 success_rate: 1.0,
1950 };
1951 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1952
1953 let elector = L::default();
1955 let relay = Arc::new(mocks::relay::Relay::new());
1956 let mut reporters = Vec::new();
1957 let mut engine_handlers = Vec::new();
1958 for (idx, validator) in participants.iter().enumerate() {
1959 let context = context.with_label(&format!("validator_{}", *validator));
1961
1962 let reporter_config = mocks::reporter::Config {
1964 participants: participants.clone().try_into().unwrap(),
1965 scheme: schemes[idx].clone(),
1966 elector: elector.clone(),
1967 };
1968 let reporter =
1969 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1970 reporters.push(reporter.clone());
1971 let application_cfg = mocks::application::Config {
1972 hasher: Sha256::default(),
1973 relay: relay.clone(),
1974 me: validator.clone(),
1975 propose_latency: (10.0, 5.0),
1976 verify_latency: (10.0, 5.0),
1977 certify_latency: (10.0, 5.0),
1978 should_certify: mocks::application::Certifier::Sometimes,
1979 };
1980 let (actor, application) = mocks::application::Application::new(
1981 context.with_label("application"),
1982 application_cfg,
1983 );
1984 actor.start();
1985 let blocker = oracle.control(validator.clone());
1986 let cfg = config::Config {
1987 scheme: schemes[idx].clone(),
1988 elector: elector.clone(),
1989 blocker,
1990 automaton: application.clone(),
1991 relay: application.clone(),
1992 reporter: reporter.clone(),
1993 strategy: Sequential,
1994 partition: validator.to_string(),
1995 mailbox_size: 1024,
1996 epoch: Epoch::new(333),
1997 leader_timeout: Duration::from_secs(1),
1998 notarization_timeout: Duration::from_secs(2),
1999 nullify_retry: Duration::from_secs(10),
2000 fetch_timeout: Duration::from_secs(1),
2001 activity_timeout,
2002 skip_timeout,
2003 fetch_concurrent: 4,
2004 replay_buffer: NZUsize!(1024 * 1024),
2005 write_buffer: NZUsize!(1024 * 1024),
2006 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2007 };
2008 let engine = Engine::new(context.with_label("engine"), cfg);
2009
2010 let (pending, recovered, resolver) = registrations
2012 .remove(validator)
2013 .expect("validator should be registered");
2014 engine_handlers.push(engine.start(pending, recovered, resolver));
2015 }
2016
2017 let mut finalizers = Vec::new();
2019 for reporter in reporters.iter_mut() {
2020 let (_, mut monitor) = reporter.subscribe().await;
2021 finalizers.push(
2022 context
2023 .with_label("finalizer")
2024 .spawn(move |context| async move {
2025 select! {
2026 _timeout = context.sleep(Duration::from_secs(60)) => {},
2027 _done = monitor.recv() => {
2028 panic!("engine should not notarize or finalize anything");
2029 },
2030 }
2031 }),
2032 );
2033 }
2034 join_all(finalizers).await;
2035
2036 link_validators(&mut oracle, &participants, Action::Unlink, None).await;
2038
2039 context.sleep(Duration::from_secs(60)).await;
2041
2042 let mut latest = View::zero();
2044 for reporter in reporters.iter() {
2045 let nullifies = reporter.nullifies.lock().unwrap();
2046 let max = nullifies.keys().max().unwrap();
2047 if *max > latest {
2048 latest = *max;
2049 }
2050 }
2051
2052 let link = Link {
2054 latency: Duration::from_millis(10),
2055 jitter: Duration::from_millis(1),
2056 success_rate: 1.0,
2057 };
2058 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2059
2060 let mut finalizers = Vec::new();
2062 for reporter in reporters.iter_mut() {
2063 let (mut latest, mut monitor) = reporter.subscribe().await;
2064 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2065 while latest < required_containers {
2066 latest = monitor.recv().await.expect("event missing");
2067 }
2068 }));
2069 }
2070 join_all(finalizers).await;
2071
2072 for reporter in reporters.iter() {
2074 {
2076 let faults = reporter.faults.lock().unwrap();
2077 assert!(faults.is_empty());
2078 }
2079
2080 {
2082 let invalid = reporter.invalid.lock().unwrap();
2083 assert_eq!(*invalid, 0);
2084 }
2085
2086 {
2091 let mut found = 0;
2095 let notarizations = reporter.notarizations.lock().unwrap();
2096 for view in View::range(latest, latest.saturating_add(activity_timeout)) {
2097 if notarizations.contains_key(&view) {
2098 found += 1;
2099 }
2100 }
2101 assert!(
2102 found >= activity_timeout.get().saturating_sub(2),
2103 "found: {found}"
2104 );
2105 }
2106 }
2107
2108 let blocked = oracle.blocked().await.unwrap();
2110 assert!(blocked.is_empty());
2111 });
2112 }
2113
2114 #[test_traced]
2115 fn test_all_recovery() {
2116 all_recovery::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
2117 all_recovery::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
2118 all_recovery::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
2119 all_recovery::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
2120 all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2121 all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2122 all_recovery::<_, _, RoundRobin>(ed25519::fixture);
2123 all_recovery::<_, _, RoundRobin>(secp256r1::fixture);
2124 }
2125
2126 fn partition<S, F, L>(mut fixture: F)
2127 where
2128 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2129 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2130 L: Elector<S>,
2131 {
2132 let n = 10;
2134 let required_containers = View::new(50);
2135 let activity_timeout = ViewDelta::new(10);
2136 let skip_timeout = ViewDelta::new(5);
2137 let namespace = b"consensus".to_vec();
2138 let executor = deterministic::Runner::timed(Duration::from_secs(900));
2139 executor.start(|mut context| async move {
2140 let (network, mut oracle) = Network::new(
2142 context.with_label("network"),
2143 Config {
2144 max_size: 1024 * 1024,
2145 disconnect_on_block: false,
2146 tracked_peer_sets: None,
2147 },
2148 );
2149
2150 network.start();
2152
2153 let Fixture {
2155 participants,
2156 schemes,
2157 ..
2158 } = fixture(&mut context, &namespace, n);
2159 let mut registrations = register_validators(&mut oracle, &participants).await;
2160
2161 let link = Link {
2163 latency: Duration::from_millis(10),
2164 jitter: Duration::from_millis(1),
2165 success_rate: 1.0,
2166 };
2167 link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
2168
2169 let elector = L::default();
2171 let relay = Arc::new(mocks::relay::Relay::new());
2172 let mut reporters = Vec::new();
2173 let mut engine_handlers = Vec::new();
2174 for (idx, validator) in participants.iter().enumerate() {
2175 let context = context.with_label(&format!("validator_{}", *validator));
2177
2178 let reporter_config = mocks::reporter::Config {
2180 participants: participants.clone().try_into().unwrap(),
2181 scheme: schemes[idx].clone(),
2182 elector: elector.clone(),
2183 };
2184 let reporter =
2185 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2186 reporters.push(reporter.clone());
2187 let application_cfg = mocks::application::Config {
2188 hasher: Sha256::default(),
2189 relay: relay.clone(),
2190 me: validator.clone(),
2191 propose_latency: (10.0, 5.0),
2192 verify_latency: (10.0, 5.0),
2193 certify_latency: (10.0, 5.0),
2194 should_certify: mocks::application::Certifier::Sometimes,
2195 };
2196 let (actor, application) = mocks::application::Application::new(
2197 context.with_label("application"),
2198 application_cfg,
2199 );
2200 actor.start();
2201 let blocker = oracle.control(validator.clone());
2202 let cfg = config::Config {
2203 scheme: schemes[idx].clone(),
2204 elector: elector.clone(),
2205 blocker,
2206 automaton: application.clone(),
2207 relay: application.clone(),
2208 reporter: reporter.clone(),
2209 strategy: Sequential,
2210 partition: validator.to_string(),
2211 mailbox_size: 1024,
2212 epoch: Epoch::new(333),
2213 leader_timeout: Duration::from_secs(1),
2214 notarization_timeout: Duration::from_secs(2),
2215 nullify_retry: Duration::from_secs(10),
2216 fetch_timeout: Duration::from_secs(1),
2217 activity_timeout,
2218 skip_timeout,
2219 fetch_concurrent: 4,
2220 replay_buffer: NZUsize!(1024 * 1024),
2221 write_buffer: NZUsize!(1024 * 1024),
2222 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2223 };
2224 let engine = Engine::new(context.with_label("engine"), cfg);
2225
2226 let (pending, recovered, resolver) = registrations
2228 .remove(validator)
2229 .expect("validator should be registered");
2230 engine_handlers.push(engine.start(pending, recovered, resolver));
2231 }
2232
2233 let mut finalizers = Vec::new();
2235 for reporter in reporters.iter_mut() {
2236 let (mut latest, mut monitor) = reporter.subscribe().await;
2237 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2238 while latest < required_containers {
2239 latest = monitor.recv().await.expect("event missing");
2240 }
2241 }));
2242 }
2243 join_all(finalizers).await;
2244
2245 fn separated(n: usize, a: usize, b: usize) -> bool {
2247 let m = n / 2;
2248 (a < m && b >= m) || (a >= m && b < m)
2249 }
2250 link_validators(&mut oracle, &participants, Action::Unlink, Some(separated)).await;
2251
2252 context.sleep(Duration::from_secs(10)).await;
2254
2255 let mut finalizers = Vec::new();
2257 for reporter in reporters.iter_mut() {
2258 let (_, mut monitor) = reporter.subscribe().await;
2259 finalizers.push(
2260 context
2261 .with_label("finalizer")
2262 .spawn(move |context| async move {
2263 select! {
2264 _timeout = context.sleep(Duration::from_secs(60)) => {},
2265 _done = monitor.recv() => {
2266 panic!("engine should not notarize or finalize anything");
2267 },
2268 }
2269 }),
2270 );
2271 }
2272 join_all(finalizers).await;
2273
2274 link_validators(
2276 &mut oracle,
2277 &participants,
2278 Action::Link(link),
2279 Some(separated),
2280 )
2281 .await;
2282
2283 let mut finalizers = Vec::new();
2285 for reporter in reporters.iter_mut() {
2286 let (mut latest, mut monitor) = reporter.subscribe().await;
2287 let required = latest.saturating_add(ViewDelta::new(required_containers.get()));
2288 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2289 while latest < required {
2290 latest = monitor.recv().await.expect("event missing");
2291 }
2292 }));
2293 }
2294 join_all(finalizers).await;
2295
2296 for reporter in reporters.iter() {
2298 {
2300 let faults = reporter.faults.lock().unwrap();
2301 assert!(faults.is_empty());
2302 }
2303
2304 {
2306 let invalid = reporter.invalid.lock().unwrap();
2307 assert_eq!(*invalid, 0);
2308 }
2309 }
2310
2311 let blocked = oracle.blocked().await.unwrap();
2313 assert!(blocked.is_empty());
2314 });
2315 }
2316
2317 #[test_group("slow")]
2318 #[test_traced]
2319 fn test_partition() {
2320 partition::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
2321 partition::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
2322 partition::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
2323 partition::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
2324 partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2325 partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2326 partition::<_, _, RoundRobin>(ed25519::fixture);
2327 partition::<_, _, RoundRobin>(secp256r1::fixture);
2328 }
2329
2330 fn slow_and_lossy_links<S, F, L>(seed: u64, mut fixture: F) -> String
2331 where
2332 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2333 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2334 L: Elector<S>,
2335 {
2336 let n = 5;
2338 let required_containers = View::new(50);
2339 let activity_timeout = ViewDelta::new(10);
2340 let skip_timeout = ViewDelta::new(5);
2341 let namespace = b"consensus".to_vec();
2342 let cfg = deterministic::Config::new()
2343 .with_seed(seed)
2344 .with_timeout(Some(Duration::from_secs(5_000)));
2345 let executor = deterministic::Runner::new(cfg);
2346 executor.start(|mut context| async move {
2347 let (network, mut oracle) = Network::new(
2349 context.with_label("network"),
2350 Config {
2351 max_size: 1024 * 1024,
2352 disconnect_on_block: false,
2353 tracked_peer_sets: None,
2354 },
2355 );
2356
2357 network.start();
2359
2360 let Fixture {
2362 participants,
2363 schemes,
2364 ..
2365 } = fixture(&mut context, &namespace, n);
2366 let mut registrations = register_validators(&mut oracle, &participants).await;
2367
2368 let degraded_link = Link {
2370 latency: Duration::from_millis(200),
2371 jitter: Duration::from_millis(150),
2372 success_rate: 0.5,
2373 };
2374 link_validators(
2375 &mut oracle,
2376 &participants,
2377 Action::Link(degraded_link),
2378 None,
2379 )
2380 .await;
2381
2382 let elector = L::default();
2384 let relay = Arc::new(mocks::relay::Relay::new());
2385 let mut reporters = Vec::new();
2386 let mut engine_handlers = Vec::new();
2387 for (idx, validator) in participants.iter().enumerate() {
2388 let context = context.with_label(&format!("validator_{}", *validator));
2390
2391 let reporter_config = mocks::reporter::Config {
2393 participants: participants.clone().try_into().unwrap(),
2394 scheme: schemes[idx].clone(),
2395 elector: elector.clone(),
2396 };
2397 let reporter =
2398 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2399 reporters.push(reporter.clone());
2400 let application_cfg = mocks::application::Config {
2401 hasher: Sha256::default(),
2402 relay: relay.clone(),
2403 me: validator.clone(),
2404 propose_latency: (10.0, 5.0),
2405 verify_latency: (10.0, 5.0),
2406 certify_latency: (10.0, 5.0),
2407 should_certify: mocks::application::Certifier::Sometimes,
2408 };
2409 let (actor, application) = mocks::application::Application::new(
2410 context.with_label("application"),
2411 application_cfg,
2412 );
2413 actor.start();
2414 let blocker = oracle.control(validator.clone());
2415 let cfg = config::Config {
2416 scheme: schemes[idx].clone(),
2417 elector: elector.clone(),
2418 blocker,
2419 automaton: application.clone(),
2420 relay: application.clone(),
2421 reporter: reporter.clone(),
2422 strategy: Sequential,
2423 partition: validator.to_string(),
2424 mailbox_size: 1024,
2425 epoch: Epoch::new(333),
2426 leader_timeout: Duration::from_secs(1),
2427 notarization_timeout: Duration::from_secs(2),
2428 nullify_retry: Duration::from_secs(10),
2429 fetch_timeout: Duration::from_secs(1),
2430 activity_timeout,
2431 skip_timeout,
2432 fetch_concurrent: 4,
2433 replay_buffer: NZUsize!(1024 * 1024),
2434 write_buffer: NZUsize!(1024 * 1024),
2435 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2436 };
2437 let engine = Engine::new(context.with_label("engine"), cfg);
2438
2439 let (pending, recovered, resolver) = registrations
2441 .remove(validator)
2442 .expect("validator should be registered");
2443 engine_handlers.push(engine.start(pending, recovered, resolver));
2444 }
2445
2446 let mut finalizers = Vec::new();
2448 for reporter in reporters.iter_mut() {
2449 let (mut latest, mut monitor) = reporter.subscribe().await;
2450 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2451 while latest < required_containers {
2452 latest = monitor.recv().await.expect("event missing");
2453 }
2454 }));
2455 }
2456 join_all(finalizers).await;
2457
2458 for reporter in reporters.iter() {
2460 {
2462 let faults = reporter.faults.lock().unwrap();
2463 assert!(faults.is_empty());
2464 }
2465
2466 {
2468 let invalid = reporter.invalid.lock().unwrap();
2469 assert_eq!(*invalid, 0);
2470 }
2471 }
2472
2473 let blocked = oracle.blocked().await.unwrap();
2475 assert!(blocked.is_empty());
2476
2477 context.auditor().state()
2478 })
2479 }
2480
2481 #[test_traced]
2482 fn test_slow_and_lossy_links() {
2483 slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold_vrf::fixture::<MinPk, _>);
2484 slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold_vrf::fixture::<MinSig, _>);
2485 slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_threshold_std::fixture::<MinPk, _>);
2486 slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_threshold_std::fixture::<MinSig, _>);
2487 slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinPk, _>);
2488 slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinSig, _>);
2489 slow_and_lossy_links::<_, _, RoundRobin>(0, ed25519::fixture);
2490 slow_and_lossy_links::<_, _, RoundRobin>(0, secp256r1::fixture);
2491 }
2492
2493 #[test_group("slow")]
2494 #[test_traced]
2495 fn test_determinism() {
2496 for seed in 1..6 {
2499 let ts_vrf_pk_state_1 = slow_and_lossy_links::<_, _, Random>(
2500 seed,
2501 bls12381_threshold_vrf::fixture::<MinPk, _>,
2502 );
2503 let ts_vrf_pk_state_2 = slow_and_lossy_links::<_, _, Random>(
2504 seed,
2505 bls12381_threshold_vrf::fixture::<MinPk, _>,
2506 );
2507 assert_eq!(ts_vrf_pk_state_1, ts_vrf_pk_state_2);
2508
2509 let ts_vrf_sig_state_1 = slow_and_lossy_links::<_, _, Random>(
2510 seed,
2511 bls12381_threshold_vrf::fixture::<MinSig, _>,
2512 );
2513 let ts_vrf_sig_state_2 = slow_and_lossy_links::<_, _, Random>(
2514 seed,
2515 bls12381_threshold_vrf::fixture::<MinSig, _>,
2516 );
2517 assert_eq!(ts_vrf_sig_state_1, ts_vrf_sig_state_2);
2518
2519 let ts_std_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2520 seed,
2521 bls12381_threshold_std::fixture::<MinPk, _>,
2522 );
2523 let ts_std_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2524 seed,
2525 bls12381_threshold_std::fixture::<MinPk, _>,
2526 );
2527 assert_eq!(ts_std_pk_state_1, ts_std_pk_state_2);
2528
2529 let ts_std_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2530 seed,
2531 bls12381_threshold_std::fixture::<MinSig, _>,
2532 );
2533 let ts_std_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2534 seed,
2535 bls12381_threshold_std::fixture::<MinSig, _>,
2536 );
2537 assert_eq!(ts_std_sig_state_1, ts_std_sig_state_2);
2538
2539 let ms_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2540 seed,
2541 bls12381_multisig::fixture::<MinPk, _>,
2542 );
2543 let ms_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2544 seed,
2545 bls12381_multisig::fixture::<MinPk, _>,
2546 );
2547 assert_eq!(ms_pk_state_1, ms_pk_state_2);
2548
2549 let ms_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2550 seed,
2551 bls12381_multisig::fixture::<MinSig, _>,
2552 );
2553 let ms_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2554 seed,
2555 bls12381_multisig::fixture::<MinSig, _>,
2556 );
2557 assert_eq!(ms_sig_state_1, ms_sig_state_2);
2558
2559 let ed_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
2560 let ed_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
2561 assert_eq!(ed_state_1, ed_state_2);
2562
2563 let secp_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
2564 let secp_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
2565 assert_eq!(secp_state_1, secp_state_2);
2566
2567 let states = [
2568 ("threshold-vrf-minpk", ts_vrf_pk_state_1),
2569 ("threshold-vrf-minsig", ts_vrf_sig_state_1),
2570 ("threshold-std-minpk", ts_std_pk_state_1),
2571 ("threshold-std-minsig", ts_std_sig_state_1),
2572 ("multisig-minpk", ms_pk_state_1),
2573 ("multisig-minsig", ms_sig_state_1),
2574 ("ed25519", ed_state_1),
2575 ("secp256r1", secp_state_1),
2576 ];
2577
2578 for pair in states.windows(2) {
2580 assert_ne!(
2581 pair[0].1, pair[1].1,
2582 "state {} equals state {}",
2583 pair[0].0, pair[1].0
2584 );
2585 }
2586 }
2587 }
2588
2589 fn conflicter<S, F, L>(seed: u64, mut fixture: F)
2590 where
2591 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2592 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2593 L: Elector<S>,
2594 {
2595 let n = 4;
2597 let required_containers = View::new(50);
2598 let activity_timeout = ViewDelta::new(10);
2599 let skip_timeout = ViewDelta::new(5);
2600 let namespace = b"consensus".to_vec();
2601 let cfg = deterministic::Config::new()
2602 .with_seed(seed)
2603 .with_timeout(Some(Duration::from_secs(30)));
2604 let executor = deterministic::Runner::new(cfg);
2605 executor.start(|mut context| async move {
2606 let (network, mut oracle) = Network::new(
2608 context.with_label("network"),
2609 Config {
2610 max_size: 1024 * 1024,
2611 disconnect_on_block: false,
2612 tracked_peer_sets: None,
2613 },
2614 );
2615
2616 network.start();
2618
2619 let Fixture {
2621 participants,
2622 schemes,
2623 ..
2624 } = fixture(&mut context, &namespace, n);
2625 let mut registrations = register_validators(&mut oracle, &participants).await;
2626
2627 let link = Link {
2629 latency: Duration::from_millis(10),
2630 jitter: Duration::from_millis(1),
2631 success_rate: 1.0,
2632 };
2633 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2634
2635 let elector = L::default();
2637 let relay = Arc::new(mocks::relay::Relay::new());
2638 let mut reporters = Vec::new();
2639 for (idx_scheme, validator) in participants.iter().enumerate() {
2640 let context = context.with_label(&format!("validator_{}", *validator));
2642
2643 let reporter_config = mocks::reporter::Config {
2645 participants: participants.clone().try_into().unwrap(),
2646 scheme: schemes[idx_scheme].clone(),
2647 elector: elector.clone(),
2648 };
2649 let reporter =
2650 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2651 let (pending, recovered, resolver) = registrations
2652 .remove(validator)
2653 .expect("validator should be registered");
2654 if idx_scheme == 0 {
2655 let cfg = mocks::conflicter::Config {
2656 scheme: schemes[idx_scheme].clone(),
2657 };
2658
2659 let engine: mocks::conflicter::Conflicter<_, _, Sha256> =
2660 mocks::conflicter::Conflicter::new(
2661 context.with_label("byzantine_engine"),
2662 cfg,
2663 );
2664 engine.start(pending);
2665 } else {
2666 reporters.push(reporter.clone());
2667 let application_cfg = mocks::application::Config {
2668 hasher: Sha256::default(),
2669 relay: relay.clone(),
2670 me: validator.clone(),
2671 propose_latency: (10.0, 5.0),
2672 verify_latency: (10.0, 5.0),
2673 certify_latency: (10.0, 5.0),
2674 should_certify: mocks::application::Certifier::Sometimes,
2675 };
2676 let (actor, application) = mocks::application::Application::new(
2677 context.with_label("application"),
2678 application_cfg,
2679 );
2680 actor.start();
2681 let blocker = oracle.control(validator.clone());
2682 let cfg = config::Config {
2683 scheme: schemes[idx_scheme].clone(),
2684 elector: elector.clone(),
2685 blocker,
2686 automaton: application.clone(),
2687 relay: application.clone(),
2688 reporter: reporter.clone(),
2689 strategy: Sequential,
2690 partition: validator.to_string(),
2691 mailbox_size: 1024,
2692 epoch: Epoch::new(333),
2693 leader_timeout: Duration::from_secs(1),
2694 notarization_timeout: Duration::from_secs(2),
2695 nullify_retry: Duration::from_secs(10),
2696 fetch_timeout: Duration::from_secs(1),
2697 activity_timeout,
2698 skip_timeout,
2699 fetch_concurrent: 4,
2700 replay_buffer: NZUsize!(1024 * 1024),
2701 write_buffer: NZUsize!(1024 * 1024),
2702 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2703 };
2704 let engine = Engine::new(context.with_label("engine"), cfg);
2705 engine.start(pending, recovered, resolver);
2706 }
2707 }
2708
2709 let mut finalizers = Vec::new();
2711 for reporter in reporters.iter_mut() {
2712 let (mut latest, mut monitor) = reporter.subscribe().await;
2713 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2714 while latest < required_containers {
2715 latest = monitor.recv().await.expect("event missing");
2716 }
2717 }));
2718 }
2719 join_all(finalizers).await;
2720
2721 let byz = &participants[0];
2723 let mut count_conflicting = 0;
2724 for reporter in reporters.iter() {
2725 {
2727 let faults = reporter.faults.lock().unwrap();
2728 assert_eq!(faults.len(), 1);
2729 let faulter = faults.get(byz).expect("byzantine party is not faulter");
2730 for (_, faults) in faulter.iter() {
2731 for fault in faults.iter() {
2732 match fault {
2733 Activity::ConflictingNotarize(_) => {
2734 count_conflicting += 1;
2735 }
2736 Activity::ConflictingFinalize(_) => {
2737 count_conflicting += 1;
2738 }
2739 _ => panic!("unexpected fault: {fault:?}"),
2740 }
2741 }
2742 }
2743 }
2744
2745 {
2747 let invalid = reporter.invalid.lock().unwrap();
2748 assert_eq!(*invalid, 0);
2749 }
2750 }
2751 assert!(count_conflicting > 0);
2752
2753 let blocked = oracle.blocked().await.unwrap();
2755 assert!(!blocked.is_empty());
2756 for (a, b) in blocked {
2757 assert_ne!(&a, byz);
2758 assert_eq!(&b, byz);
2759 }
2760 });
2761 }
2762
2763 #[test_group("slow")]
2764 #[test_traced]
2765 fn test_conflicter() {
2766 for seed in 0..5 {
2767 conflicter::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
2768 conflicter::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
2769 conflicter::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
2770 conflicter::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
2771 conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
2772 conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
2773 conflicter::<_, _, RoundRobin>(seed, ed25519::fixture);
2774 conflicter::<_, _, RoundRobin>(seed, secp256r1::fixture);
2775 }
2776 }
2777
2778 fn invalid<S, F, L>(seed: u64, mut fixture: F)
2779 where
2780 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2781 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2782 L: Elector<S>,
2783 {
2784 let n = 4;
2786 let required_containers = View::new(50);
2787 let activity_timeout = ViewDelta::new(10);
2788 let skip_timeout = ViewDelta::new(5);
2789 let namespace = b"consensus".to_vec();
2790 let cfg = deterministic::Config::new()
2791 .with_seed(seed)
2792 .with_timeout(Some(Duration::from_secs(30)));
2793 let executor = deterministic::Runner::new(cfg);
2794 executor.start(|mut context| async move {
2795 let (network, mut oracle) = Network::new(
2797 context.with_label("network"),
2798 Config {
2799 max_size: 1024 * 1024,
2800 disconnect_on_block: false,
2801 tracked_peer_sets: None,
2802 },
2803 );
2804
2805 network.start();
2807
2808 let Fixture {
2810 participants,
2811 schemes,
2812 ..
2813 } = fixture(&mut context, &namespace, n);
2814
2815 let Fixture {
2817 schemes: wrong_schemes,
2818 ..
2819 } = fixture(&mut context, b"wrong-namespace", n);
2820
2821 let mut registrations = register_validators(&mut oracle, &participants).await;
2822
2823 let link = Link {
2825 latency: Duration::from_millis(10),
2826 jitter: Duration::from_millis(1),
2827 success_rate: 1.0,
2828 };
2829 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2830
2831 let elector = L::default();
2833 let relay = Arc::new(mocks::relay::Relay::new());
2834 let mut reporters = Vec::new();
2835 for (idx_scheme, validator) in participants.iter().enumerate() {
2836 let context = context.with_label(&format!("validator_{}", *validator));
2838
2839 let scheme = if idx_scheme == 0 {
2842 wrong_schemes[idx_scheme].clone()
2843 } else {
2844 schemes[idx_scheme].clone()
2845 };
2846
2847 let reporter_config = mocks::reporter::Config {
2848 participants: participants.clone().try_into().unwrap(),
2849 scheme: schemes[idx_scheme].clone(),
2850 elector: elector.clone(),
2851 };
2852 let reporter =
2853 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2854 reporters.push(reporter.clone());
2855
2856 let application_cfg = mocks::application::Config {
2857 hasher: Sha256::default(),
2858 relay: relay.clone(),
2859 me: validator.clone(),
2860 propose_latency: (10.0, 5.0),
2861 verify_latency: (10.0, 5.0),
2862 certify_latency: (10.0, 5.0),
2863 should_certify: mocks::application::Certifier::Sometimes,
2864 };
2865 let (actor, application) = mocks::application::Application::new(
2866 context.with_label("application"),
2867 application_cfg,
2868 );
2869 actor.start();
2870 let blocker = oracle.control(validator.clone());
2871 let cfg = config::Config {
2872 scheme,
2873 elector: elector.clone(),
2874 blocker,
2875 automaton: application.clone(),
2876 relay: application.clone(),
2877 reporter: reporter.clone(),
2878 strategy: Sequential,
2879 partition: validator.clone().to_string(),
2880 mailbox_size: 1024,
2881 epoch: Epoch::new(333),
2882 leader_timeout: Duration::from_secs(1),
2883 notarization_timeout: Duration::from_secs(2),
2884 nullify_retry: Duration::from_secs(10),
2885 fetch_timeout: Duration::from_secs(1),
2886 activity_timeout,
2887 skip_timeout,
2888 fetch_concurrent: 4,
2889 replay_buffer: NZUsize!(1024 * 1024),
2890 write_buffer: NZUsize!(1024 * 1024),
2891 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2892 };
2893 let engine = Engine::new(context.with_label("engine"), cfg);
2894 let (pending, recovered, resolver) = registrations
2895 .remove(validator)
2896 .expect("validator should be registered");
2897 engine.start(pending, recovered, resolver);
2898 }
2899
2900 let mut finalizers = Vec::new();
2902 for reporter in reporters.iter_mut().skip(1) {
2903 let (mut latest, mut monitor) = reporter.subscribe().await;
2904 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2905 while latest < required_containers {
2906 latest = monitor.recv().await.expect("event missing");
2907 }
2908 }));
2909 }
2910 join_all(finalizers).await;
2911
2912 let mut invalid_count = 0;
2914 for reporter in reporters.iter().skip(1) {
2915 {
2917 let faults = reporter.faults.lock().unwrap();
2918 assert!(faults.is_empty());
2919 }
2920
2921 {
2923 let invalid = reporter.invalid.lock().unwrap();
2924 if *invalid > 0 {
2925 invalid_count += 1;
2926 }
2927 }
2928 }
2929
2930 assert_eq!(invalid_count, n - 1);
2932
2933 let blocked = oracle.blocked().await.unwrap();
2935 assert!(!blocked.is_empty());
2936 for (a, b) in blocked {
2937 if a != participants[0] {
2938 assert_eq!(b, participants[0]);
2939 }
2940 }
2941 });
2942 }
2943
2944 #[test_group("slow")]
2945 #[test_traced]
2946 fn test_invalid() {
2947 for seed in 0..5 {
2948 invalid::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
2949 invalid::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
2950 invalid::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
2951 invalid::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
2952 invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
2953 invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
2954 invalid::<_, _, RoundRobin>(seed, ed25519::fixture);
2955 invalid::<_, _, RoundRobin>(seed, secp256r1::fixture);
2956 }
2957 }
2958
2959 fn impersonator<S, F, L>(seed: u64, mut fixture: F)
2960 where
2961 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2962 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2963 L: Elector<S>,
2964 {
2965 let n = 4;
2967 let required_containers = View::new(50);
2968 let activity_timeout = ViewDelta::new(10);
2969 let skip_timeout = ViewDelta::new(5);
2970 let namespace = b"consensus".to_vec();
2971 let cfg = deterministic::Config::new()
2972 .with_seed(seed)
2973 .with_timeout(Some(Duration::from_secs(30)));
2974 let executor = deterministic::Runner::new(cfg);
2975 executor.start(|mut context| async move {
2976 let (network, mut oracle) = Network::new(
2978 context.with_label("network"),
2979 Config {
2980 max_size: 1024 * 1024,
2981 disconnect_on_block: false,
2982 tracked_peer_sets: None,
2983 },
2984 );
2985
2986 network.start();
2988
2989 let Fixture {
2991 participants,
2992 schemes,
2993 ..
2994 } = fixture(&mut context, &namespace, n);
2995 let mut registrations = register_validators(&mut oracle, &participants).await;
2996
2997 let link = Link {
2999 latency: Duration::from_millis(10),
3000 jitter: Duration::from_millis(1),
3001 success_rate: 1.0,
3002 };
3003 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3004
3005 let elector = L::default();
3007 let relay = Arc::new(mocks::relay::Relay::new());
3008 let mut reporters = Vec::new();
3009 for (idx_scheme, validator) in participants.iter().enumerate() {
3010 let context = context.with_label(&format!("validator_{}", *validator));
3012
3013 let reporter_config = mocks::reporter::Config {
3015 participants: participants.clone().try_into().unwrap(),
3016 scheme: schemes[idx_scheme].clone(),
3017 elector: elector.clone(),
3018 };
3019 let reporter =
3020 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3021 let (pending, recovered, resolver) = registrations
3022 .remove(validator)
3023 .expect("validator should be registered");
3024 if idx_scheme == 0 {
3025 let cfg = mocks::impersonator::Config {
3026 scheme: schemes[idx_scheme].clone(),
3027 };
3028
3029 let engine: mocks::impersonator::Impersonator<_, _, Sha256> =
3030 mocks::impersonator::Impersonator::new(
3031 context.with_label("byzantine_engine"),
3032 cfg,
3033 );
3034 engine.start(pending);
3035 } else {
3036 reporters.push(reporter.clone());
3037 let application_cfg = mocks::application::Config {
3038 hasher: Sha256::default(),
3039 relay: relay.clone(),
3040 me: validator.clone(),
3041 propose_latency: (10.0, 5.0),
3042 verify_latency: (10.0, 5.0),
3043 certify_latency: (10.0, 5.0),
3044 should_certify: mocks::application::Certifier::Sometimes,
3045 };
3046 let (actor, application) = mocks::application::Application::new(
3047 context.with_label("application"),
3048 application_cfg,
3049 );
3050 actor.start();
3051 let blocker = oracle.control(validator.clone());
3052 let cfg = config::Config {
3053 scheme: schemes[idx_scheme].clone(),
3054 elector: elector.clone(),
3055 blocker,
3056 automaton: application.clone(),
3057 relay: application.clone(),
3058 reporter: reporter.clone(),
3059 strategy: Sequential,
3060 partition: validator.clone().to_string(),
3061 mailbox_size: 1024,
3062 epoch: Epoch::new(333),
3063 leader_timeout: Duration::from_secs(1),
3064 notarization_timeout: Duration::from_secs(2),
3065 nullify_retry: Duration::from_secs(10),
3066 fetch_timeout: Duration::from_secs(1),
3067 activity_timeout,
3068 skip_timeout,
3069 fetch_concurrent: 4,
3070 replay_buffer: NZUsize!(1024 * 1024),
3071 write_buffer: NZUsize!(1024 * 1024),
3072 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3073 };
3074 let engine = Engine::new(context.with_label("engine"), cfg);
3075 engine.start(pending, recovered, resolver);
3076 }
3077 }
3078
3079 let mut finalizers = Vec::new();
3081 for reporter in reporters.iter_mut() {
3082 let (mut latest, mut monitor) = reporter.subscribe().await;
3083 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3084 while latest < required_containers {
3085 latest = monitor.recv().await.expect("event missing");
3086 }
3087 }));
3088 }
3089 join_all(finalizers).await;
3090
3091 let byz = &participants[0];
3093 for reporter in reporters.iter() {
3094 {
3096 let faults = reporter.faults.lock().unwrap();
3097 assert!(faults.is_empty());
3098 }
3099
3100 {
3102 let invalid = reporter.invalid.lock().unwrap();
3103 assert_eq!(*invalid, 0);
3104 }
3105 }
3106
3107 let blocked = oracle.blocked().await.unwrap();
3109 assert!(!blocked.is_empty());
3110 for (a, b) in blocked {
3111 assert_ne!(&a, byz);
3112 assert_eq!(&b, byz);
3113 }
3114 });
3115 }
3116
3117 #[test_group("slow")]
3118 #[test_traced]
3119 fn test_impersonator() {
3120 for seed in 0..5 {
3121 impersonator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3122 impersonator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3123 impersonator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3124 impersonator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3125 impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3126 impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3127 impersonator::<_, _, RoundRobin>(seed, ed25519::fixture);
3128 impersonator::<_, _, RoundRobin>(seed, secp256r1::fixture);
3129 }
3130 }
3131
3132 fn equivocator<S, F, L>(seed: u64, mut fixture: F) -> bool
3133 where
3134 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3135 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3136 L: Elector<S>,
3137 {
3138 let n = 7;
3140 let required_containers = View::new(50);
3141 let activity_timeout = ViewDelta::new(10);
3142 let skip_timeout = ViewDelta::new(5);
3143 let namespace = b"consensus".to_vec();
3144 let cfg = deterministic::Config::new()
3145 .with_seed(seed)
3146 .with_timeout(Some(Duration::from_secs(60)));
3147 let executor = deterministic::Runner::new(cfg);
3148 executor.start(|mut context| async move {
3149 let (network, mut oracle) = Network::new(
3151 context.with_label("network"),
3152 Config {
3153 max_size: 1024 * 1024,
3154 disconnect_on_block: false,
3155 tracked_peer_sets: None,
3156 },
3157 );
3158
3159 network.start();
3161
3162 let Fixture {
3164 participants,
3165 schemes,
3166 ..
3167 } = fixture(&mut context, &namespace, n);
3168 let mut registrations = register_validators(&mut oracle, &participants).await;
3169
3170 let link = Link {
3172 latency: Duration::from_millis(10),
3173 jitter: Duration::from_millis(1),
3174 success_rate: 1.0,
3175 };
3176 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3177
3178 let elector = L::default();
3180 let mut engines = Vec::new();
3181 let relay = Arc::new(mocks::relay::Relay::new());
3182 let mut reporters = Vec::new();
3183 for (idx_scheme, validator) in participants.iter().enumerate() {
3184 let context = context.with_label(&format!("validator_{}", *validator));
3186
3187 let reporter_config = mocks::reporter::Config {
3189 participants: participants.clone().try_into().unwrap(),
3190 scheme: schemes[idx_scheme].clone(),
3191 elector: elector.clone(),
3192 };
3193 let reporter =
3194 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3195 reporters.push(reporter.clone());
3196 let (pending, recovered, resolver) = registrations
3197 .remove(validator)
3198 .expect("validator should be registered");
3199 if idx_scheme == 0 {
3200 let cfg = mocks::equivocator::Config {
3201 scheme: schemes[idx_scheme].clone(),
3202 epoch: Epoch::new(333),
3203 relay: relay.clone(),
3204 hasher: Sha256::default(),
3205 elector: elector.clone(),
3206 };
3207
3208 let engine = mocks::equivocator::Equivocator::new(
3209 context.with_label("byzantine_engine"),
3210 cfg,
3211 );
3212 engines.push(engine.start(pending, recovered));
3213 } else {
3214 let application_cfg = mocks::application::Config {
3215 hasher: Sha256::default(),
3216 relay: relay.clone(),
3217 me: validator.clone(),
3218 propose_latency: (10.0, 5.0),
3219 verify_latency: (10.0, 5.0),
3220 certify_latency: (10.0, 5.0),
3221 should_certify: mocks::application::Certifier::Sometimes,
3222 };
3223 let (actor, application) = mocks::application::Application::new(
3224 context.with_label("application"),
3225 application_cfg,
3226 );
3227 actor.start();
3228 let blocker = oracle.control(validator.clone());
3229 let cfg = config::Config {
3230 scheme: schemes[idx_scheme].clone(),
3231 elector: elector.clone(),
3232 blocker,
3233 automaton: application.clone(),
3234 relay: application.clone(),
3235 reporter: reporter.clone(),
3236 strategy: Sequential,
3237 partition: validator.to_string(),
3238 mailbox_size: 1024,
3239 epoch: Epoch::new(333),
3240 leader_timeout: Duration::from_secs(1),
3241 notarization_timeout: Duration::from_secs(2),
3242 nullify_retry: Duration::from_secs(10),
3243 fetch_timeout: Duration::from_secs(1),
3244 activity_timeout,
3245 skip_timeout,
3246 fetch_concurrent: 4,
3247 replay_buffer: NZUsize!(1024 * 1024),
3248 write_buffer: NZUsize!(1024 * 1024),
3249 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3250 };
3251 let engine = Engine::new(context.with_label("engine"), cfg);
3252 engines.push(engine.start(pending, recovered, resolver));
3253 }
3254 }
3255
3256 let mut finalizers = Vec::new();
3258 for reporter in reporters.iter_mut().skip(1) {
3259 let (mut latest, mut monitor) = reporter.subscribe().await;
3260 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3261 while latest < required_containers {
3262 latest = monitor.recv().await.expect("event missing");
3263 }
3264 }));
3265 }
3266 join_all(finalizers).await;
3267
3268 let idx = context.gen_range(1..engines.len()); let validator = &participants[idx];
3271 let handle = engines.remove(idx);
3272 handle.abort();
3273 let _ = handle.await;
3274 reporters.remove(idx);
3275 info!(idx, ?validator, "aborted validator");
3276
3277 let mut finalizers = Vec::new();
3279 for reporter in reporters.iter_mut().skip(1) {
3280 let (mut latest, mut monitor) = reporter.subscribe().await;
3281 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3282 while latest < View::new(required_containers.get() * 2) {
3283 latest = monitor.recv().await.expect("event missing");
3284 }
3285 }));
3286 }
3287 join_all(finalizers).await;
3288
3289 info!(idx, ?validator, "restarting validator");
3291 let context = context.with_label(&format!("validator_{}_restarted", *validator));
3292
3293 let reporter_config = mocks::reporter::Config {
3295 participants: participants.clone().try_into().unwrap(),
3296 scheme: schemes[idx].clone(),
3297 elector: elector.clone(),
3298 };
3299 let reporter =
3300 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3301 let (pending, recovered, resolver) =
3302 register_validator(&mut oracle, validator.clone()).await;
3303 reporters.push(reporter.clone());
3304 let application_cfg = mocks::application::Config {
3305 hasher: Sha256::default(),
3306 relay: relay.clone(),
3307 me: validator.clone(),
3308 propose_latency: (10.0, 5.0),
3309 verify_latency: (10.0, 5.0),
3310 certify_latency: (10.0, 5.0),
3311 should_certify: mocks::application::Certifier::Sometimes,
3312 };
3313 let (actor, application) = mocks::application::Application::new(
3314 context.with_label("application"),
3315 application_cfg,
3316 );
3317 actor.start();
3318 let blocker = oracle.control(validator.clone());
3319 let cfg = config::Config {
3320 scheme: schemes[idx].clone(),
3321 elector: elector.clone(),
3322 blocker,
3323 automaton: application.clone(),
3324 relay: application.clone(),
3325 reporter: reporter.clone(),
3326 strategy: Sequential,
3327 partition: validator.to_string(),
3328 mailbox_size: 1024,
3329 epoch: Epoch::new(333),
3330 leader_timeout: Duration::from_secs(1),
3331 notarization_timeout: Duration::from_secs(2),
3332 nullify_retry: Duration::from_secs(10),
3333 fetch_timeout: Duration::from_secs(1),
3334 activity_timeout,
3335 skip_timeout,
3336 fetch_concurrent: 4,
3337 replay_buffer: NZUsize!(1024 * 1024),
3338 write_buffer: NZUsize!(1024 * 1024),
3339 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3340 };
3341 let engine = Engine::new(context.with_label("engine"), cfg);
3342 engine.start(pending, recovered, resolver);
3343
3344 let mut finalizers = Vec::new();
3346 for reporter in reporters.iter_mut().skip(1) {
3347 let (mut latest, mut monitor) = reporter.subscribe().await;
3348 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3349 while latest < View::new(required_containers.get() * 3) {
3350 latest = monitor.recv().await.expect("event missing");
3351 }
3352 }));
3353 }
3354 join_all(finalizers).await;
3355
3356 let byz = &participants[0];
3360 let blocked = oracle.blocked().await.unwrap();
3361 for (a, b) in &blocked {
3362 assert_ne!(a, byz);
3363 assert_eq!(b, byz);
3364 }
3365 !blocked.is_empty()
3366 })
3367 }
3368
3369 #[test_group("slow")]
3370 #[test_traced]
3371 fn test_equivocator_bls12381_threshold_vrf_min_pk() {
3372 let detected = (0..5).any(|seed| {
3373 equivocator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>)
3374 });
3375 assert!(
3376 detected,
3377 "expected at least one seed to detect equivocation"
3378 );
3379 }
3380
3381 #[test_group("slow")]
3382 #[test_traced]
3383 fn test_equivocator_bls12381_threshold_vrf_min_sig() {
3384 let detected = (0..5).any(|seed| {
3385 equivocator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>)
3386 });
3387 assert!(
3388 detected,
3389 "expected at least one seed to detect equivocation"
3390 );
3391 }
3392
3393 #[test_group("slow")]
3394 #[test_traced]
3395 fn test_equivocator_bls12381_threshold_std_min_pk() {
3396 let detected = (0..5).any(|seed| {
3397 equivocator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>)
3398 });
3399 assert!(
3400 detected,
3401 "expected at least one seed to detect equivocation"
3402 );
3403 }
3404
3405 #[test_group("slow")]
3406 #[test_traced]
3407 fn test_equivocator_bls12381_threshold_std_min_sig() {
3408 let detected = (0..5).any(|seed| {
3409 equivocator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>)
3410 });
3411 assert!(
3412 detected,
3413 "expected at least one seed to detect equivocation"
3414 );
3415 }
3416
3417 #[test_group("slow")]
3418 #[test_traced]
3419 fn test_equivocator_bls12381_multisig_min_pk() {
3420 let detected = (0..5).any(|seed| {
3421 equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>)
3422 });
3423 assert!(
3424 detected,
3425 "expected at least one seed to detect equivocation"
3426 );
3427 }
3428
3429 #[test_group("slow")]
3430 #[test_traced]
3431 fn test_equivocator_bls12381_multisig_min_sig() {
3432 let detected = (0..5).any(|seed| {
3433 equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>)
3434 });
3435 assert!(
3436 detected,
3437 "expected at least one seed to detect equivocation"
3438 );
3439 }
3440
3441 #[test_group("slow")]
3442 #[test_traced]
3443 fn test_equivocator_ed25519() {
3444 let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, ed25519::fixture));
3445 assert!(
3446 detected,
3447 "expected at least one seed to detect equivocation"
3448 );
3449 }
3450
3451 #[test_group("slow")]
3452 #[test_traced]
3453 fn test_equivocator_secp256r1() {
3454 let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, secp256r1::fixture));
3455 assert!(
3456 detected,
3457 "expected at least one seed to detect equivocation"
3458 );
3459 }
3460
3461 fn reconfigurer<S, F, L>(seed: u64, mut fixture: F)
3462 where
3463 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3464 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3465 L: Elector<S>,
3466 {
3467 let n = 4;
3469 let required_containers = View::new(50);
3470 let activity_timeout = ViewDelta::new(10);
3471 let skip_timeout = ViewDelta::new(5);
3472 let namespace = b"consensus".to_vec();
3473 let cfg = deterministic::Config::new()
3474 .with_seed(seed)
3475 .with_timeout(Some(Duration::from_secs(30)));
3476 let executor = deterministic::Runner::new(cfg);
3477 executor.start(|mut context| async move {
3478 let (network, mut oracle) = Network::new(
3480 context.with_label("network"),
3481 Config {
3482 max_size: 1024 * 1024,
3483 disconnect_on_block: false,
3484 tracked_peer_sets: None,
3485 },
3486 );
3487
3488 network.start();
3490
3491 let Fixture {
3493 participants,
3494 schemes,
3495 ..
3496 } = fixture(&mut context, &namespace, n);
3497 let mut registrations = register_validators(&mut oracle, &participants).await;
3498
3499 let link = Link {
3501 latency: Duration::from_millis(10),
3502 jitter: Duration::from_millis(1),
3503 success_rate: 1.0,
3504 };
3505 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3506
3507 let elector = L::default();
3509 let relay = Arc::new(mocks::relay::Relay::new());
3510 let mut reporters = Vec::new();
3511 for (idx_scheme, validator) in participants.iter().enumerate() {
3512 let context = context.with_label(&format!("validator_{}", *validator));
3514
3515 let reporter_config = mocks::reporter::Config {
3517 participants: participants.clone().try_into().unwrap(),
3518 scheme: schemes[idx_scheme].clone(),
3519 elector: elector.clone(),
3520 };
3521 let reporter =
3522 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3523 let (pending, recovered, resolver) = registrations
3524 .remove(validator)
3525 .expect("validator should be registered");
3526 if idx_scheme == 0 {
3527 let cfg = mocks::reconfigurer::Config {
3528 scheme: schemes[idx_scheme].clone(),
3529 };
3530 let engine: mocks::reconfigurer::Reconfigurer<_, _, Sha256> =
3531 mocks::reconfigurer::Reconfigurer::new(
3532 context.with_label("byzantine_engine"),
3533 cfg,
3534 );
3535 engine.start(pending);
3536 } else {
3537 reporters.push(reporter.clone());
3538 let application_cfg = mocks::application::Config {
3539 hasher: Sha256::default(),
3540 relay: relay.clone(),
3541 me: validator.clone(),
3542 propose_latency: (10.0, 5.0),
3543 verify_latency: (10.0, 5.0),
3544 certify_latency: (10.0, 5.0),
3545 should_certify: mocks::application::Certifier::Sometimes,
3546 };
3547 let (actor, application) = mocks::application::Application::new(
3548 context.with_label("application"),
3549 application_cfg,
3550 );
3551 actor.start();
3552 let blocker = oracle.control(validator.clone());
3553 let cfg = config::Config {
3554 scheme: schemes[idx_scheme].clone(),
3555 elector: elector.clone(),
3556 blocker,
3557 automaton: application.clone(),
3558 relay: application.clone(),
3559 reporter: reporter.clone(),
3560 strategy: Sequential,
3561 partition: validator.to_string(),
3562 mailbox_size: 1024,
3563 epoch: Epoch::new(333),
3564 leader_timeout: Duration::from_secs(1),
3565 notarization_timeout: Duration::from_secs(2),
3566 nullify_retry: Duration::from_secs(10),
3567 fetch_timeout: Duration::from_secs(1),
3568 activity_timeout,
3569 skip_timeout,
3570 fetch_concurrent: 4,
3571 replay_buffer: NZUsize!(1024 * 1024),
3572 write_buffer: NZUsize!(1024 * 1024),
3573 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3574 };
3575 let engine = Engine::new(context.with_label("engine"), cfg);
3576 engine.start(pending, recovered, resolver);
3577 }
3578 }
3579
3580 let mut finalizers = Vec::new();
3582 for reporter in reporters.iter_mut() {
3583 let (mut latest, mut monitor) = reporter.subscribe().await;
3584 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3585 while latest < required_containers {
3586 latest = monitor.recv().await.expect("event missing");
3587 }
3588 }));
3589 }
3590 join_all(finalizers).await;
3591
3592 let byz = &participants[0];
3594 for reporter in reporters.iter() {
3595 {
3597 let faults = reporter.faults.lock().unwrap();
3598 assert!(faults.is_empty());
3599 }
3600
3601 {
3603 let invalid = reporter.invalid.lock().unwrap();
3604 assert_eq!(*invalid, 0);
3605 }
3606 }
3607
3608 let blocked = oracle.blocked().await.unwrap();
3610 assert!(!blocked.is_empty());
3611 for (a, b) in blocked {
3612 assert_ne!(&a, byz);
3613 assert_eq!(&b, byz);
3614 }
3615 });
3616 }
3617
3618 #[test_group("slow")]
3619 #[test_traced]
3620 fn test_reconfigurer() {
3621 for seed in 0..5 {
3622 reconfigurer::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3623 reconfigurer::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3624 reconfigurer::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3625 reconfigurer::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3626 reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3627 reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3628 reconfigurer::<_, _, RoundRobin>(seed, ed25519::fixture);
3629 reconfigurer::<_, _, RoundRobin>(seed, secp256r1::fixture);
3630 }
3631 }
3632
3633 fn nuller<S, F, L>(seed: u64, mut fixture: F)
3634 where
3635 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3636 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3637 L: Elector<S>,
3638 {
3639 let n = 4;
3641 let required_containers = View::new(50);
3642 let activity_timeout = ViewDelta::new(10);
3643 let skip_timeout = ViewDelta::new(5);
3644 let namespace = b"consensus".to_vec();
3645 let cfg = deterministic::Config::new()
3646 .with_seed(seed)
3647 .with_timeout(Some(Duration::from_secs(30)));
3648 let executor = deterministic::Runner::new(cfg);
3649 executor.start(|mut context| async move {
3650 let (network, mut oracle) = Network::new(
3652 context.with_label("network"),
3653 Config {
3654 max_size: 1024 * 1024,
3655 disconnect_on_block: false,
3656 tracked_peer_sets: None,
3657 },
3658 );
3659
3660 network.start();
3662
3663 let Fixture {
3665 participants,
3666 schemes,
3667 ..
3668 } = fixture(&mut context, &namespace, n);
3669 let mut registrations = register_validators(&mut oracle, &participants).await;
3670
3671 let link = Link {
3673 latency: Duration::from_millis(10),
3674 jitter: Duration::from_millis(1),
3675 success_rate: 1.0,
3676 };
3677 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3678
3679 let elector = L::default();
3681 let relay = Arc::new(mocks::relay::Relay::new());
3682 let mut reporters = Vec::new();
3683 for (idx_scheme, validator) in participants.iter().enumerate() {
3684 let context = context.with_label(&format!("validator_{}", *validator));
3686
3687 let reporter_config = mocks::reporter::Config {
3689 participants: participants.clone().try_into().unwrap(),
3690 scheme: schemes[idx_scheme].clone(),
3691 elector: elector.clone(),
3692 };
3693 let reporter =
3694 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3695 let (pending, recovered, resolver) = registrations
3696 .remove(validator)
3697 .expect("validator should be registered");
3698 if idx_scheme == 0 {
3699 let cfg = mocks::nuller::Config {
3700 scheme: schemes[idx_scheme].clone(),
3701 };
3702 let engine: mocks::nuller::Nuller<_, _, Sha256> =
3703 mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
3704 engine.start(pending);
3705 } else {
3706 reporters.push(reporter.clone());
3707 let application_cfg = mocks::application::Config {
3708 hasher: Sha256::default(),
3709 relay: relay.clone(),
3710 me: validator.clone(),
3711 propose_latency: (10.0, 5.0),
3712 verify_latency: (10.0, 5.0),
3713 certify_latency: (10.0, 5.0),
3714 should_certify: mocks::application::Certifier::Sometimes,
3715 };
3716 let (actor, application) = mocks::application::Application::new(
3717 context.with_label("application"),
3718 application_cfg,
3719 );
3720 actor.start();
3721 let blocker = oracle.control(validator.clone());
3722 let cfg = config::Config {
3723 scheme: schemes[idx_scheme].clone(),
3724 elector: elector.clone(),
3725 blocker,
3726 automaton: application.clone(),
3727 relay: application.clone(),
3728 reporter: reporter.clone(),
3729 strategy: Sequential,
3730 partition: validator.clone().to_string(),
3731 mailbox_size: 1024,
3732 epoch: Epoch::new(333),
3733 leader_timeout: Duration::from_secs(1),
3734 notarization_timeout: Duration::from_secs(2),
3735 nullify_retry: Duration::from_secs(10),
3736 fetch_timeout: Duration::from_secs(1),
3737 activity_timeout,
3738 skip_timeout,
3739 fetch_concurrent: 4,
3740 replay_buffer: NZUsize!(1024 * 1024),
3741 write_buffer: NZUsize!(1024 * 1024),
3742 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3743 };
3744 let engine = Engine::new(context.with_label("engine"), cfg);
3745 engine.start(pending, recovered, resolver);
3746 }
3747 }
3748
3749 let mut finalizers = Vec::new();
3751 for reporter in reporters.iter_mut() {
3752 let (mut latest, mut monitor) = reporter.subscribe().await;
3753 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3754 while latest < required_containers {
3755 latest = monitor.recv().await.expect("event missing");
3756 }
3757 }));
3758 }
3759 join_all(finalizers).await;
3760
3761 let byz = &participants[0];
3763 let mut count_nullify_and_finalize = 0;
3764 for reporter in reporters.iter() {
3765 {
3767 let faults = reporter.faults.lock().unwrap();
3768 assert_eq!(faults.len(), 1);
3769 let faulter = faults.get(byz).expect("byzantine party is not faulter");
3770 for (_, faults) in faulter.iter() {
3771 for fault in faults.iter() {
3772 match fault {
3773 Activity::NullifyFinalize(_) => {
3774 count_nullify_and_finalize += 1;
3775 }
3776 _ => panic!("unexpected fault: {fault:?}"),
3777 }
3778 }
3779 }
3780 }
3781
3782 {
3784 let invalid = reporter.invalid.lock().unwrap();
3785 assert_eq!(*invalid, 0);
3786 }
3787 }
3788 assert!(count_nullify_and_finalize > 0);
3789
3790 let blocked = oracle.blocked().await.unwrap();
3792 assert!(!blocked.is_empty());
3793 for (a, b) in blocked {
3794 assert_ne!(&a, byz);
3795 assert_eq!(&b, byz);
3796 }
3797 });
3798 }
3799
3800 #[test_group("slow")]
3801 #[test_traced]
3802 fn test_nuller() {
3803 for seed in 0..5 {
3804 nuller::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3805 nuller::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3806 nuller::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3807 nuller::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3808 nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3809 nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3810 nuller::<_, _, RoundRobin>(seed, ed25519::fixture);
3811 nuller::<_, _, RoundRobin>(seed, secp256r1::fixture);
3812 }
3813 }
3814
3815 fn outdated<S, F, L>(seed: u64, mut fixture: F)
3816 where
3817 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3818 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3819 L: Elector<S>,
3820 {
3821 let n = 4;
3823 let required_containers = View::new(100);
3824 let activity_timeout = ViewDelta::new(10);
3825 let skip_timeout = ViewDelta::new(5);
3826 let namespace = b"consensus".to_vec();
3827 let cfg = deterministic::Config::new()
3828 .with_seed(seed)
3829 .with_timeout(Some(Duration::from_secs(30)));
3830 let executor = deterministic::Runner::new(cfg);
3831 executor.start(|mut context| async move {
3832 let (network, mut oracle) = Network::new(
3834 context.with_label("network"),
3835 Config {
3836 max_size: 1024 * 1024,
3837 disconnect_on_block: false,
3838 tracked_peer_sets: None,
3839 },
3840 );
3841
3842 network.start();
3844
3845 let Fixture {
3847 participants,
3848 schemes,
3849 ..
3850 } = fixture(&mut context, &namespace, n);
3851 let mut registrations = register_validators(&mut oracle, &participants).await;
3852
3853 let link = Link {
3855 latency: Duration::from_millis(10),
3856 jitter: Duration::from_millis(1),
3857 success_rate: 1.0,
3858 };
3859 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3860
3861 let elector = L::default();
3863 let relay = Arc::new(mocks::relay::Relay::new());
3864 let mut reporters = Vec::new();
3865 for (idx_scheme, validator) in participants.iter().enumerate() {
3866 let context = context.with_label(&format!("validator_{}", *validator));
3868
3869 let reporter_config = mocks::reporter::Config {
3871 participants: participants.clone().try_into().unwrap(),
3872 scheme: schemes[idx_scheme].clone(),
3873 elector: elector.clone(),
3874 };
3875 let reporter =
3876 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3877 let (pending, recovered, resolver) = registrations
3878 .remove(validator)
3879 .expect("validator should be registered");
3880 if idx_scheme == 0 {
3881 let cfg = mocks::outdated::Config {
3882 scheme: schemes[idx_scheme].clone(),
3883 view_delta: ViewDelta::new(activity_timeout.get().saturating_mul(4)),
3884 };
3885 let engine: mocks::outdated::Outdated<_, _, Sha256> =
3886 mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
3887 engine.start(pending);
3888 } else {
3889 reporters.push(reporter.clone());
3890 let application_cfg = mocks::application::Config {
3891 hasher: Sha256::default(),
3892 relay: relay.clone(),
3893 me: validator.clone(),
3894 propose_latency: (10.0, 5.0),
3895 verify_latency: (10.0, 5.0),
3896 certify_latency: (10.0, 5.0),
3897 should_certify: mocks::application::Certifier::Sometimes,
3898 };
3899 let (actor, application) = mocks::application::Application::new(
3900 context.with_label("application"),
3901 application_cfg,
3902 );
3903 actor.start();
3904 let blocker = oracle.control(validator.clone());
3905 let cfg = config::Config {
3906 scheme: schemes[idx_scheme].clone(),
3907 elector: elector.clone(),
3908 blocker,
3909 automaton: application.clone(),
3910 relay: application.clone(),
3911 reporter: reporter.clone(),
3912 strategy: Sequential,
3913 partition: validator.clone().to_string(),
3914 mailbox_size: 1024,
3915 epoch: Epoch::new(333),
3916 leader_timeout: Duration::from_secs(1),
3917 notarization_timeout: Duration::from_secs(2),
3918 nullify_retry: Duration::from_secs(10),
3919 fetch_timeout: Duration::from_secs(1),
3920 activity_timeout,
3921 skip_timeout,
3922 fetch_concurrent: 4,
3923 replay_buffer: NZUsize!(1024 * 1024),
3924 write_buffer: NZUsize!(1024 * 1024),
3925 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3926 };
3927 let engine = Engine::new(context.with_label("engine"), cfg);
3928 engine.start(pending, recovered, resolver);
3929 }
3930 }
3931
3932 let mut finalizers = Vec::new();
3934 for reporter in reporters.iter_mut() {
3935 let (mut latest, mut monitor) = reporter.subscribe().await;
3936 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3937 while latest < required_containers {
3938 latest = monitor.recv().await.expect("event missing");
3939 }
3940 }));
3941 }
3942 join_all(finalizers).await;
3943
3944 for reporter in reporters.iter() {
3946 {
3948 let faults = reporter.faults.lock().unwrap();
3949 assert!(faults.is_empty());
3950 }
3951
3952 {
3954 let invalid = reporter.invalid.lock().unwrap();
3955 assert_eq!(*invalid, 0);
3956 }
3957 }
3958
3959 let blocked = oracle.blocked().await.unwrap();
3961 assert!(blocked.is_empty());
3962 });
3963 }
3964
3965 #[test_group("slow")]
3966 #[test_traced]
3967 fn test_outdated() {
3968 for seed in 0..5 {
3969 outdated::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3970 outdated::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3971 outdated::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3972 outdated::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3973 outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3974 outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3975 outdated::<_, _, RoundRobin>(seed, ed25519::fixture);
3976 outdated::<_, _, RoundRobin>(seed, secp256r1::fixture);
3977 }
3978 }
3979
3980 fn run_1k<S, F, L>(mut fixture: F)
3981 where
3982 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3983 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3984 L: Elector<S>,
3985 {
3986 let n = 10;
3988 let required_containers = View::new(1_000);
3989 let activity_timeout = ViewDelta::new(10);
3990 let skip_timeout = ViewDelta::new(5);
3991 let namespace = b"consensus".to_vec();
3992 let cfg = deterministic::Config::new();
3993 let executor = deterministic::Runner::new(cfg);
3994 executor.start(|mut context| async move {
3995 let (network, mut oracle) = Network::new(
3997 context.with_label("network"),
3998 Config {
3999 max_size: 1024 * 1024,
4000 disconnect_on_block: false,
4001 tracked_peer_sets: None,
4002 },
4003 );
4004
4005 network.start();
4007
4008 let Fixture {
4010 participants,
4011 schemes,
4012 ..
4013 } = fixture(&mut context, &namespace, n);
4014 let mut registrations = register_validators(&mut oracle, &participants).await;
4015
4016 let link = Link {
4018 latency: Duration::from_millis(80),
4019 jitter: Duration::from_millis(10),
4020 success_rate: 0.98,
4021 };
4022 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4023
4024 let elector = L::default();
4026 let relay = Arc::new(mocks::relay::Relay::new());
4027 let mut reporters = Vec::new();
4028 let mut engine_handlers = Vec::new();
4029 for (idx, validator) in participants.iter().enumerate() {
4030 let context = context.with_label(&format!("validator_{}", *validator));
4032
4033 let reporter_config = mocks::reporter::Config {
4035 participants: participants.clone().try_into().unwrap(),
4036 scheme: schemes[idx].clone(),
4037 elector: elector.clone(),
4038 };
4039 let reporter =
4040 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
4041 reporters.push(reporter.clone());
4042 let application_cfg = mocks::application::Config {
4043 hasher: Sha256::default(),
4044 relay: relay.clone(),
4045 me: validator.clone(),
4046 propose_latency: (100.0, 50.0),
4047 verify_latency: (50.0, 40.0),
4048 certify_latency: (50.0, 40.0),
4049 should_certify: mocks::application::Certifier::Sometimes,
4050 };
4051 let (actor, application) = mocks::application::Application::new(
4052 context.with_label("application"),
4053 application_cfg,
4054 );
4055 actor.start();
4056 let blocker = oracle.control(validator.clone());
4057 let cfg = config::Config {
4058 scheme: schemes[idx].clone(),
4059 elector: elector.clone(),
4060 blocker,
4061 automaton: application.clone(),
4062 relay: application.clone(),
4063 reporter: reporter.clone(),
4064 strategy: Sequential,
4065 partition: validator.to_string(),
4066 mailbox_size: 1024,
4067 epoch: Epoch::new(333),
4068 leader_timeout: Duration::from_secs(1),
4069 notarization_timeout: Duration::from_secs(2),
4070 nullify_retry: Duration::from_secs(10),
4071 fetch_timeout: Duration::from_secs(1),
4072 activity_timeout,
4073 skip_timeout,
4074 fetch_concurrent: 4,
4075 replay_buffer: NZUsize!(1024 * 1024),
4076 write_buffer: NZUsize!(1024 * 1024),
4077 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4078 };
4079 let engine = Engine::new(context.with_label("engine"), cfg);
4080
4081 let (pending, recovered, resolver) = registrations
4083 .remove(validator)
4084 .expect("validator should be registered");
4085 engine_handlers.push(engine.start(pending, recovered, resolver));
4086 }
4087
4088 let mut finalizers = Vec::new();
4090 for reporter in reporters.iter_mut() {
4091 let (mut latest, mut monitor) = reporter.subscribe().await;
4092 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4093 while latest < required_containers {
4094 latest = monitor.recv().await.expect("event missing");
4095 }
4096 }));
4097 }
4098 join_all(finalizers).await;
4099
4100 for reporter in reporters.iter() {
4102 {
4104 let faults = reporter.faults.lock().unwrap();
4105 assert!(faults.is_empty());
4106 }
4107
4108 {
4110 let invalid = reporter.invalid.lock().unwrap();
4111 assert_eq!(*invalid, 0);
4112 }
4113 }
4114
4115 let blocked = oracle.blocked().await.unwrap();
4117 assert!(blocked.is_empty());
4118 })
4119 }
4120
4121 #[test_group("slow")]
4122 #[test_traced]
4123 fn test_1k_bls12381_threshold_vrf_min_pk() {
4124 run_1k::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
4125 }
4126
4127 #[test_group("slow")]
4128 #[test_traced]
4129 fn test_1k_bls12381_threshold_vrf_min_sig() {
4130 run_1k::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
4131 }
4132
4133 #[test_group("slow")]
4134 #[test_traced]
4135 fn test_1k_bls12381_threshold_std_min_pk() {
4136 run_1k::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
4137 }
4138
4139 #[test_group("slow")]
4140 #[test_traced]
4141 fn test_1k_bls12381_threshold_std_min_sig() {
4142 run_1k::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
4143 }
4144
4145 #[test_group("slow")]
4146 #[test_traced]
4147 fn test_1k_bls12381_multisig_min_pk() {
4148 run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4149 }
4150
4151 #[test_group("slow")]
4152 #[test_traced]
4153 fn test_1k_bls12381_multisig_min_sig() {
4154 run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
4155 }
4156
4157 #[test_group("slow")]
4158 #[test_traced]
4159 fn test_1k_ed25519() {
4160 run_1k::<_, _, RoundRobin>(ed25519::fixture);
4161 }
4162
4163 #[test_group("slow")]
4164 #[test_traced]
4165 fn test_1k_secp256r1() {
4166 run_1k::<_, _, RoundRobin>(secp256r1::fixture);
4167 }
4168
4169 fn engine_shutdown<S, F, L>(seed: u64, mut fixture: F, graceful: bool)
4170 where
4171 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4172 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4173 L: Elector<S>,
4174 {
4175 let n = 1;
4176 let namespace = b"consensus".to_vec();
4177 let cfg = deterministic::Config::default()
4178 .with_seed(seed)
4179 .with_timeout(Some(Duration::from_secs(10)));
4180 let executor = deterministic::Runner::new(cfg);
4181 executor.start(|mut context| async move {
4182 let (network, mut oracle) = Network::new(
4184 context.with_label("network"),
4185 Config {
4186 max_size: 1024 * 1024,
4187 disconnect_on_block: true,
4188 tracked_peer_sets: None,
4189 },
4190 );
4191
4192 network.start();
4194
4195 let Fixture {
4197 participants,
4198 schemes,
4199 ..
4200 } = fixture(&mut context, &namespace, n);
4201 let mut registrations = register_validators(&mut oracle, &participants).await;
4202
4203 let link = Link {
4205 latency: Duration::from_millis(1),
4206 jitter: Duration::from_millis(0),
4207 success_rate: 1.0,
4208 };
4209 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4210
4211 let elector = L::default();
4213 let reporter_config = mocks::reporter::Config {
4214 participants: participants.clone().try_into().unwrap(),
4215 scheme: schemes[0].clone(),
4216 elector: elector.clone(),
4217 };
4218 let reporter =
4219 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
4220 let relay = Arc::new(mocks::relay::Relay::new());
4221 let application_cfg = mocks::application::Config {
4222 hasher: Sha256::default(),
4223 relay: relay.clone(),
4224 me: participants[0].clone(),
4225 propose_latency: (1.0, 0.0),
4226 verify_latency: (1.0, 0.0),
4227 certify_latency: (1.0, 0.0),
4228 should_certify: mocks::application::Certifier::Sometimes,
4229 };
4230 let (actor, application) = mocks::application::Application::new(
4231 context.with_label("application"),
4232 application_cfg,
4233 );
4234 actor.start();
4235 let blocker = oracle.control(participants[0].clone());
4236 let cfg = config::Config {
4237 scheme: schemes[0].clone(),
4238 elector: elector.clone(),
4239 blocker,
4240 automaton: application.clone(),
4241 relay: application.clone(),
4242 reporter: reporter.clone(),
4243 strategy: Sequential,
4244 partition: participants[0].clone().to_string(),
4245 mailbox_size: 64,
4246 epoch: Epoch::new(333),
4247 leader_timeout: Duration::from_millis(50),
4248 notarization_timeout: Duration::from_millis(100),
4249 nullify_retry: Duration::from_millis(250),
4250 fetch_timeout: Duration::from_millis(50),
4251 activity_timeout: ViewDelta::new(4),
4252 skip_timeout: ViewDelta::new(2),
4253 fetch_concurrent: 4,
4254 replay_buffer: NZUsize!(1024 * 16),
4255 write_buffer: NZUsize!(1024 * 16),
4256 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4257 };
4258 let engine = Engine::new(context.with_label("engine"), cfg);
4259
4260 let (pending, recovered, resolver) = registrations
4262 .remove(&participants[0])
4263 .expect("validator should be registered");
4264 let handle = engine.start(pending, recovered, resolver);
4265
4266 context.sleep(Duration::from_millis(1000)).await;
4268
4269 let running_before = count_running_tasks(&context, "engine");
4271 assert!(
4272 running_before > 0,
4273 "at least one engine task should be running"
4274 );
4275
4276 context.sleep(Duration::from_millis(1500)).await;
4278 assert!(
4279 count_running_tasks(&context, "engine") > 0,
4280 "engine tasks should still be running"
4281 );
4282
4283 let running_after = if graceful {
4285 let metrics_context = context.clone();
4286 let result = context.stop(0, Some(Duration::from_secs(5))).await;
4287 assert!(
4288 result.is_ok(),
4289 "graceful shutdown should complete: {result:?}"
4290 );
4291 count_running_tasks(&metrics_context, "engine")
4292 } else {
4293 handle.abort();
4294 let _ = handle.await; context.sleep(Duration::from_millis(1000)).await;
4298 count_running_tasks(&context, "engine")
4299 };
4300 assert_eq!(
4301 running_after, 0,
4302 "all engine tasks should be stopped, but {running_after} still running"
4303 );
4304 });
4305 }
4306
4307 #[test_traced]
4308 fn test_children_shutdown_on_engine_abort() {
4309 for seed in 0..10 {
4310 engine_shutdown::<_, _, Random>(
4311 seed,
4312 bls12381_threshold_vrf::fixture::<MinPk, _>,
4313 false,
4314 );
4315 engine_shutdown::<_, _, Random>(
4316 seed,
4317 bls12381_threshold_vrf::fixture::<MinSig, _>,
4318 false,
4319 );
4320 engine_shutdown::<_, _, RoundRobin>(
4321 seed,
4322 bls12381_threshold_std::fixture::<MinPk, _>,
4323 false,
4324 );
4325 engine_shutdown::<_, _, RoundRobin>(
4326 seed,
4327 bls12381_threshold_std::fixture::<MinSig, _>,
4328 false,
4329 );
4330 engine_shutdown::<_, _, RoundRobin>(
4331 seed,
4332 bls12381_multisig::fixture::<MinPk, _>,
4333 false,
4334 );
4335 engine_shutdown::<_, _, RoundRobin>(
4336 seed,
4337 bls12381_multisig::fixture::<MinSig, _>,
4338 false,
4339 );
4340 engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, false);
4341 engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, false);
4342 }
4343 }
4344
4345 #[test_traced]
4346 fn test_graceful_shutdown() {
4347 for seed in 0..10 {
4348 engine_shutdown::<_, _, Random>(
4349 seed,
4350 bls12381_threshold_vrf::fixture::<MinPk, _>,
4351 true,
4352 );
4353 engine_shutdown::<_, _, Random>(
4354 seed,
4355 bls12381_threshold_vrf::fixture::<MinSig, _>,
4356 true,
4357 );
4358 engine_shutdown::<_, _, RoundRobin>(
4359 seed,
4360 bls12381_threshold_std::fixture::<MinPk, _>,
4361 true,
4362 );
4363 engine_shutdown::<_, _, RoundRobin>(
4364 seed,
4365 bls12381_threshold_std::fixture::<MinSig, _>,
4366 true,
4367 );
4368 engine_shutdown::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>, true);
4369 engine_shutdown::<_, _, RoundRobin>(
4370 seed,
4371 bls12381_multisig::fixture::<MinSig, _>,
4372 true,
4373 );
4374 engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, true);
4375 engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, true);
4376 }
4377 }
4378
4379 fn attributable_reporter_filtering<S, F, L>(mut fixture: F)
4380 where
4381 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4382 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4383 L: Elector<S>,
4384 {
4385 let n = 3;
4386 let required_containers = View::new(10);
4387 let activity_timeout = ViewDelta::new(10);
4388 let skip_timeout = ViewDelta::new(5);
4389 let namespace = b"consensus".to_vec();
4390 let executor = deterministic::Runner::timed(Duration::from_secs(30));
4391 executor.start(|mut context| async move {
4392 let (network, mut oracle) = Network::new(
4394 context.with_label("network"),
4395 Config {
4396 max_size: 1024 * 1024,
4397 disconnect_on_block: false,
4398 tracked_peer_sets: None,
4399 },
4400 );
4401 network.start();
4402
4403 let Fixture {
4405 participants,
4406 schemes,
4407 ..
4408 } = fixture(&mut context, &namespace, n);
4409 let mut registrations = register_validators(&mut oracle, &participants).await;
4410
4411 let link = Link {
4413 latency: Duration::from_millis(10),
4414 jitter: Duration::from_millis(1),
4415 success_rate: 1.0,
4416 };
4417 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4418
4419 let elector = L::default();
4421 let relay = Arc::new(mocks::relay::Relay::new());
4422 let mut reporters = Vec::new();
4423 for (idx, validator) in participants.iter().enumerate() {
4424 let context = context.with_label(&format!("validator_{}", *validator));
4425
4426 let reporter_config = mocks::reporter::Config {
4427 participants: participants.clone().try_into().unwrap(),
4428 scheme: schemes[idx].clone(),
4429 elector: elector.clone(),
4430 };
4431 let mock_reporter = mocks::reporter::Reporter::new(
4432 context.with_label("mock_reporter"),
4433 reporter_config,
4434 );
4435
4436 let attributable_reporter = scheme::reporter::AttributableReporter::new(
4438 context.with_label("rng"),
4439 schemes[idx].clone(),
4440 mock_reporter.clone(),
4441 Sequential,
4442 true, );
4444 reporters.push(mock_reporter.clone());
4445
4446 let application_cfg = mocks::application::Config {
4447 hasher: Sha256::default(),
4448 relay: relay.clone(),
4449 me: validator.clone(),
4450 propose_latency: (10.0, 5.0),
4451 verify_latency: (10.0, 5.0),
4452 certify_latency: (10.0, 5.0),
4453 should_certify: mocks::application::Certifier::Sometimes,
4454 };
4455 let (actor, application) = mocks::application::Application::new(
4456 context.with_label("application"),
4457 application_cfg,
4458 );
4459 actor.start();
4460 let blocker = oracle.control(validator.clone());
4461 let cfg = config::Config {
4462 scheme: schemes[idx].clone(),
4463 elector: elector.clone(),
4464 blocker,
4465 automaton: application.clone(),
4466 relay: application.clone(),
4467 reporter: attributable_reporter,
4468 strategy: Sequential,
4469 partition: validator.to_string(),
4470 mailbox_size: 1024,
4471 epoch: Epoch::new(333),
4472 leader_timeout: Duration::from_secs(1),
4473 notarization_timeout: Duration::from_secs(2),
4474 nullify_retry: Duration::from_secs(10),
4475 fetch_timeout: Duration::from_secs(1),
4476 activity_timeout,
4477 skip_timeout,
4478 fetch_concurrent: 4,
4479 replay_buffer: NZUsize!(1024 * 1024),
4480 write_buffer: NZUsize!(1024 * 1024),
4481 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4482 };
4483 let engine = Engine::new(context.with_label("engine"), cfg);
4484
4485 let (pending, recovered, resolver) = registrations
4487 .remove(validator)
4488 .expect("validator should be registered");
4489 engine.start(pending, recovered, resolver);
4490 }
4491
4492 let mut finalizers = Vec::new();
4494 for reporter in reporters.iter_mut() {
4495 let (mut latest, mut monitor) = reporter.subscribe().await;
4496 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4497 while latest < required_containers {
4498 latest = monitor.recv().await.expect("event missing");
4499 }
4500 }));
4501 }
4502 join_all(finalizers).await;
4503
4504 for reporter in reporters.iter() {
4506 {
4508 let faults = reporter.faults.lock().unwrap();
4509 assert!(faults.is_empty(), "No faults should be reported");
4510 }
4511
4512 {
4514 let invalid = reporter.invalid.lock().unwrap();
4515 assert_eq!(*invalid, 0, "No invalid signatures");
4516 }
4517
4518 {
4520 let notarizations = reporter.notarizations.lock().unwrap();
4521 let finalizations = reporter.finalizations.lock().unwrap();
4522 assert!(
4523 !notarizations.is_empty() || !finalizations.is_empty(),
4524 "Certificates should be reported"
4525 );
4526 }
4527
4528 let notarizes = reporter.notarizes.lock().unwrap();
4530 let last_view = notarizes.keys().max().cloned().unwrap_or_default();
4531 for (view, payloads) in notarizes.iter() {
4532 if *view == last_view {
4533 continue; }
4535
4536 let signers: usize = payloads.values().map(|signers| signers.len()).sum();
4537
4538 if S::is_attributable() {
4540 assert!(signers > 1, "view {view}: {signers}");
4541 } else {
4542 assert_eq!(signers, 0);
4544 }
4545 }
4546
4547 let finalizes = reporter.finalizes.lock().unwrap();
4549 for (_, payloads) in finalizes.iter() {
4550 let signers: usize = payloads.values().map(|signers| signers.len()).sum();
4551
4552 if S::is_attributable() {
4554 assert!(signers > 1);
4555 } else {
4556 assert_eq!(signers, 0);
4558 }
4559 }
4560 }
4561
4562 let blocked = oracle.blocked().await.unwrap();
4564 assert!(blocked.is_empty());
4565 });
4566 }
4567
4568 #[test_traced]
4569 fn test_attributable_reporter_filtering() {
4570 attributable_reporter_filtering::<_, _, Random>(
4571 bls12381_threshold_vrf::fixture::<MinPk, _>,
4572 );
4573 attributable_reporter_filtering::<_, _, Random>(
4574 bls12381_threshold_vrf::fixture::<MinSig, _>,
4575 );
4576 attributable_reporter_filtering::<_, _, RoundRobin>(
4577 bls12381_threshold_std::fixture::<MinPk, _>,
4578 );
4579 attributable_reporter_filtering::<_, _, RoundRobin>(
4580 bls12381_threshold_std::fixture::<MinSig, _>,
4581 );
4582 attributable_reporter_filtering::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4583 attributable_reporter_filtering::<_, _, RoundRobin>(
4584 bls12381_multisig::fixture::<MinSig, _>,
4585 );
4586 attributable_reporter_filtering::<_, _, RoundRobin>(ed25519::fixture);
4587 attributable_reporter_filtering::<_, _, RoundRobin>(secp256r1::fixture);
4588 }
4589
4590 fn split_views_no_lockup<S, F, L>(mut fixture: F)
4591 where
4592 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4593 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4594 L: Elector<S>,
4595 {
4596 enum ParticipantType {
4609 Group1, Group2, Ignorant, Byzantine, }
4614 let get_type = |idx: usize| -> ParticipantType {
4615 match idx {
4616 0..3 => ParticipantType::Group1,
4617 3..6 => ParticipantType::Group2,
4618 6 => ParticipantType::Ignorant,
4619 7..10 => ParticipantType::Byzantine,
4620 _ => unreachable!(),
4621 }
4622 };
4623
4624 let n = 10;
4626 let quorum = quorum(n) as usize;
4627 assert_eq!(quorum, 7);
4628 let activity_timeout = ViewDelta::new(10);
4629 let skip_timeout = ViewDelta::new(5);
4630 let namespace = b"consensus".to_vec();
4631 let executor = deterministic::Runner::timed(Duration::from_secs(300));
4632 executor.start(|mut context| async move {
4633 let (network, mut oracle) = Network::new(
4635 context.with_label("network"),
4636 Config {
4637 max_size: 1024 * 1024,
4638 disconnect_on_block: false,
4639 tracked_peer_sets: None,
4640 },
4641 );
4642 network.start();
4643
4644 let Fixture {
4646 participants,
4647 schemes,
4648 ..
4649 } = fixture(&mut context, &namespace, n);
4650 let mut registrations = register_validators(&mut oracle, &participants).await;
4651
4652 let elector = L::default();
4658 let relay = Arc::new(mocks::relay::Relay::new());
4659 let mut honest_reporters = Vec::new();
4660 for (idx, validator) in participants.iter().enumerate() {
4661 let (pending, recovered, resolver) = registrations
4662 .remove(validator)
4663 .expect("validator should be registered");
4664 let participant_type = get_type(idx);
4665 if matches!(participant_type, ParticipantType::Byzantine) {
4666 let cfg = mocks::nullify_only::Config {
4668 scheme: schemes[idx].clone(),
4669 };
4670 let engine: mocks::nullify_only::NullifyOnly<_, _, Sha256> =
4671 mocks::nullify_only::NullifyOnly::new(
4672 context.with_label(&format!("byzantine_{}", *validator)),
4673 cfg,
4674 );
4675 engine.start(pending);
4676 drop(recovered);
4678 drop(resolver);
4679 } else {
4680 let reporter_config = mocks::reporter::Config {
4682 participants: participants.clone().try_into().unwrap(),
4683 scheme: schemes[idx].clone(),
4684 elector: elector.clone(),
4685 };
4686 let reporter = mocks::reporter::Reporter::new(
4687 context.with_label(&format!("reporter_{}", *validator)),
4688 reporter_config,
4689 );
4690 honest_reporters.push(reporter.clone());
4691
4692 let application_cfg = mocks::application::Config {
4693 hasher: Sha256::default(),
4694 relay: relay.clone(),
4695 me: validator.clone(),
4696 propose_latency: (10.0, 5.0),
4697 verify_latency: (10.0, 5.0),
4698 certify_latency: (10.0, 5.0),
4699 should_certify: mocks::application::Certifier::Sometimes,
4700 };
4701 let (actor, application) = mocks::application::Application::new(
4702 context.with_label(&format!("application_{}", *validator)),
4703 application_cfg,
4704 );
4705 actor.start();
4706 let blocker = oracle.control(validator.clone());
4707 let cfg = config::Config {
4708 scheme: schemes[idx].clone(),
4709 elector: elector.clone(),
4710 blocker,
4711 automaton: application.clone(),
4712 relay: application.clone(),
4713 reporter: reporter.clone(),
4714 strategy: Sequential,
4715 partition: validator.to_string(),
4716 mailbox_size: 1024,
4717 epoch: Epoch::new(333),
4718 leader_timeout: Duration::from_secs(10),
4719 notarization_timeout: Duration::from_secs(10),
4720 nullify_retry: Duration::from_secs(10),
4721 fetch_timeout: Duration::from_secs(1),
4722 activity_timeout,
4723 skip_timeout,
4724 fetch_concurrent: 4,
4725 replay_buffer: NZUsize!(1024 * 1024),
4726 write_buffer: NZUsize!(1024 * 1024),
4727 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4728 };
4729 let engine =
4730 Engine::new(context.with_label(&format!("engine_{}", *validator)), cfg);
4731 engine.start(pending, recovered, resolver);
4732 }
4733 }
4734
4735 let build_finalization = |proposal: &Proposal<D>| -> TFinalization<_, D> {
4739 let votes: Vec<_> = (0..=quorum)
4740 .map(|i| TFinalize::sign(&schemes[i], proposal.clone()).unwrap())
4741 .collect();
4742 TFinalization::from_finalizes(&schemes[0], &votes, &Sequential)
4743 .expect("finalization quorum")
4744 };
4745 let build_notarization = |proposal: &Proposal<D>| -> TNotarization<_, D> {
4747 let votes: Vec<_> = (0..=quorum)
4748 .map(|i| TNotarize::sign(&schemes[i], proposal.clone()).unwrap())
4749 .collect();
4750 TNotarization::from_notarizes(&schemes[0], &votes, &Sequential)
4751 .expect("notarization quorum")
4752 };
4753 let build_nullification = |round: Round| -> TNullification<_> {
4754 let votes: Vec<_> = (0..=quorum)
4755 .map(|i| TNullify::sign::<D>(&schemes[i], round).unwrap())
4756 .collect();
4757 TNullification::from_nullifies(&schemes[0], &votes, &Sequential)
4758 .expect("nullification quorum")
4759 };
4760 let f_view = 1;
4762 let round_f = Round::new(Epoch::new(333), View::new(f_view));
4763 let payload_b0 = Sha256::hash(b"B_F");
4764 let proposal_b0 = Proposal::new(round_f, View::new(f_view - 1), payload_b0);
4765 let payload_b1a = Sha256::hash(b"B_G1");
4766 let proposal_b1a = Proposal::new(
4767 Round::new(Epoch::new(333), View::new(f_view + 1)),
4768 View::new(f_view),
4769 payload_b1a,
4770 );
4771 let payload_b1b = Sha256::hash(b"B_G2");
4772 let proposal_b1b = Proposal::new(
4773 Round::new(Epoch::new(333), View::new(f_view + 2)),
4774 View::new(f_view),
4775 payload_b1b,
4776 );
4777
4778 let b0_notarization = build_notarization(&proposal_b0);
4780 let b0_finalization = build_finalization(&proposal_b0);
4781 let b1a_notarization = build_notarization(&proposal_b1a);
4783 let b1b_notarization = build_notarization(&proposal_b1b);
4784 let null_a = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 1)));
4786 let null_b = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 2)));
4787
4788 let injector_pk = PrivateKey::from_seed(1_000_000).public_key();
4790 let (mut injector_sender, _inj_certificate_receiver) = oracle
4791 .control(injector_pk.clone())
4792 .register(1, TEST_QUOTA)
4793 .await
4794 .unwrap();
4795
4796 let link = Link {
4798 latency: Duration::from_millis(10),
4799 jitter: Duration::from_millis(0),
4800 success_rate: 1.0,
4801 };
4802 for p in participants.iter() {
4803 oracle
4804 .add_link(injector_pk.clone(), p.clone(), link.clone())
4805 .await
4806 .unwrap();
4807 }
4808
4809 let notarization_msg = Certificate::<_, D>::Notarization(b1b_notarization);
4817 let nullification_msg = Certificate::<_, D>::Nullification(null_b.clone());
4818 for (i, participant) in participants.iter().enumerate() {
4819 let recipient = Recipients::One(participant.clone());
4820 let msg = match get_type(i) {
4821 ParticipantType::Group2 => notarization_msg.encode(),
4822 _ => nullification_msg.encode(),
4823 };
4824 injector_sender.send(recipient, msg, true).await.unwrap();
4825 }
4826 let notarization_msg = Certificate::<_, D>::Notarization(b1a_notarization);
4828 let nullification_msg = Certificate::<_, D>::Nullification(null_a.clone());
4829 for (i, participant) in participants.iter().enumerate() {
4830 let recipient = Recipients::One(participant.clone());
4831 let msg = match get_type(i) {
4832 ParticipantType::Group1 => notarization_msg.encode(),
4833 _ => nullification_msg.encode(),
4834 };
4835 injector_sender.send(recipient, msg, true).await.unwrap();
4836 }
4837 let msg = Certificate::<_, D>::Notarization(b0_notarization).encode();
4839 injector_sender
4840 .send(Recipients::All, msg, true)
4841 .await
4842 .unwrap();
4843 let msg = Certificate::<_, D>::Finalization(b0_finalization).encode();
4844 injector_sender
4845 .send(Recipients::All, msg, true)
4846 .await
4847 .unwrap();
4848
4849 debug!("waiting for certificates to propagate");
4852 context.sleep(Duration::from_secs(5)).await;
4853 debug!("certificates propagated");
4854
4855 let view = View::new(f_view);
4860 for reporter in honest_reporters.iter() {
4861 let finalizations = reporter.finalizations.lock().unwrap();
4862 assert!(finalizations.contains_key(&view));
4863 }
4864
4865 let view = View::new(f_view + 1);
4869 for (i, reporter) in honest_reporters.iter().enumerate() {
4870 let finalizations = reporter.finalizations.lock().unwrap();
4871 assert!(!finalizations.contains_key(&view));
4872 let nullifications = reporter.nullifications.lock().unwrap();
4873 let notarizations = reporter.notarizations.lock().unwrap();
4874 match get_type(i) {
4875 ParticipantType::Group1 => {
4876 assert!(notarizations.contains_key(&view));
4877 assert!(!nullifications.contains_key(&view));
4878 }
4879 _ => {
4880 assert!(nullifications.contains_key(&view));
4881 assert!(!notarizations.contains_key(&view));
4882 }
4883 }
4884 }
4885
4886 let view = View::new(f_view + 2);
4890 for (i, reporter) in honest_reporters.iter().enumerate() {
4891 let finalizations = reporter.finalizations.lock().unwrap();
4892 assert!(!finalizations.contains_key(&view));
4893 let nullifications = reporter.nullifications.lock().unwrap();
4894 let notarizations = reporter.notarizations.lock().unwrap();
4895 match get_type(i) {
4896 ParticipantType::Group2 => {
4897 assert!(notarizations.contains_key(&view));
4898 assert!(!nullifications.contains_key(&view));
4899 }
4900 _ => {
4901 assert!(nullifications.contains_key(&view));
4902 assert!(!notarizations.contains_key(&view));
4903 }
4904 }
4905 }
4906
4907 let next_view = View::new(f_view + 3);
4909 for (i, reporter) in honest_reporters.iter().enumerate() {
4910 let nullifies = reporter.nullifies.lock().unwrap();
4911 assert!(!nullifies.contains_key(&next_view), "reporter {i}");
4912 }
4913
4914 link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
4918
4919 {
4921 let target = View::new(f_view + 3);
4922 let mut finalizers = Vec::new();
4923 for reporter in honest_reporters.iter_mut() {
4924 let (mut latest, mut monitor) = reporter.subscribe().await;
4925 finalizers.push(context.with_label("resume_finalizer").spawn(
4926 move |_| async move {
4927 while latest < target {
4928 latest = monitor.recv().await.expect("event missing");
4929 }
4930 },
4931 ));
4932 }
4933 join_all(finalizers).await;
4934 }
4935
4936 for reporter in honest_reporters.iter() {
4938 {
4939 let faults = reporter.faults.lock().unwrap();
4940 assert!(faults.is_empty());
4941 }
4942 {
4943 let invalid = reporter.invalid.lock().unwrap();
4944 assert_eq!(*invalid, 0);
4945 }
4946 }
4947 let blocked = oracle.blocked().await.unwrap();
4948 assert!(blocked.is_empty());
4949 });
4950 }
4951
4952 #[test_traced]
4953 fn test_split_views_no_lockup() {
4954 split_views_no_lockup::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
4955 split_views_no_lockup::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
4956 split_views_no_lockup::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
4957 split_views_no_lockup::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
4958 split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4959 split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
4960 split_views_no_lockup::<_, _, RoundRobin>(ed25519::fixture);
4961 split_views_no_lockup::<_, _, RoundRobin>(secp256r1::fixture);
4962 }
4963
4964 fn tle<V, L>()
4965 where
4966 V: Variant,
4967 L: Elector<bls12381_threshold_vrf::Scheme<PublicKey, V>>,
4968 {
4969 let n = 4;
4971 let namespace = b"consensus".to_vec();
4972 let activity_timeout = ViewDelta::new(100);
4973 let skip_timeout = ViewDelta::new(50);
4974 let executor = deterministic::Runner::timed(Duration::from_secs(30));
4975 executor.start(|mut context| async move {
4976 let (network, mut oracle) = Network::new(
4978 context.with_label("network"),
4979 Config {
4980 max_size: 1024 * 1024,
4981 disconnect_on_block: false,
4982 tracked_peer_sets: None,
4983 },
4984 );
4985
4986 network.start();
4988
4989 let Fixture {
4991 participants,
4992 schemes,
4993 ..
4994 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, &namespace, n);
4995 let mut registrations = register_validators(&mut oracle, &participants).await;
4996
4997 let link = Link {
4999 latency: Duration::from_millis(10),
5000 jitter: Duration::from_millis(5),
5001 success_rate: 1.0,
5002 };
5003 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5004
5005 let elector = L::default();
5007 let relay = Arc::new(mocks::relay::Relay::new());
5008 let mut reporters = Vec::new();
5009 let mut engine_handlers = Vec::new();
5010 let monitor_reporter = Arc::new(Mutex::new(None));
5011 for (idx, validator) in participants.iter().enumerate() {
5012 let context = context.with_label(&format!("validator_{}", *validator));
5014
5015 let reporter_config = mocks::reporter::Config {
5017 participants: participants.clone().try_into().unwrap(),
5018 scheme: schemes[idx].clone(),
5019 elector: elector.clone(),
5020 };
5021 let reporter =
5022 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5023 reporters.push(reporter.clone());
5024 if idx == 0 {
5025 *monitor_reporter.lock().unwrap() = Some(reporter.clone());
5026 }
5027
5028 let application_cfg = mocks::application::Config {
5030 hasher: Sha256::default(),
5031 relay: relay.clone(),
5032 me: validator.clone(),
5033 propose_latency: (10.0, 5.0),
5034 verify_latency: (10.0, 5.0),
5035 certify_latency: (10.0, 5.0),
5036 should_certify: mocks::application::Certifier::Sometimes,
5037 };
5038 let (actor, application) = mocks::application::Application::new(
5039 context.with_label("application"),
5040 application_cfg,
5041 );
5042 actor.start();
5043 let blocker = oracle.control(validator.clone());
5044 let cfg = config::Config {
5045 scheme: schemes[idx].clone(),
5046 elector: elector.clone(),
5047 blocker,
5048 automaton: application.clone(),
5049 relay: application.clone(),
5050 reporter: reporter.clone(),
5051 strategy: Sequential,
5052 partition: validator.to_string(),
5053 mailbox_size: 1024,
5054 epoch: Epoch::new(333),
5055 leader_timeout: Duration::from_millis(100),
5056 notarization_timeout: Duration::from_millis(200),
5057 nullify_retry: Duration::from_millis(500),
5058 fetch_timeout: Duration::from_millis(100),
5059 activity_timeout,
5060 skip_timeout,
5061 fetch_concurrent: 4,
5062 replay_buffer: NZUsize!(1024 * 1024),
5063 write_buffer: NZUsize!(1024 * 1024),
5064 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5065 };
5066 let engine = Engine::new(context.with_label("engine"), cfg);
5067
5068 let (pending, recovered, resolver) = registrations
5070 .remove(validator)
5071 .expect("validator should be registered");
5072 engine_handlers.push(engine.start(pending, recovered, resolver));
5073 }
5074
5075 let target = Round::new(Epoch::new(333), View::new(10)); let message = b"Secret message for future view10"; let ciphertext = schemes[0].encrypt(&mut context, target, *message);
5081
5082 let reporter = monitor_reporter.lock().unwrap().clone().unwrap();
5084 loop {
5085 context.sleep(Duration::from_millis(100)).await;
5087 let notarizations = reporter.notarizations.lock().unwrap();
5088 let Some(notarization) = notarizations.get(&target.view()) else {
5089 continue;
5090 };
5091
5092 let seed = notarization.seed();
5094 let decrypted = seed
5095 .decrypt(&ciphertext)
5096 .expect("Decryption should succeed with valid seed signature");
5097 assert_eq!(
5098 message,
5099 decrypted.as_ref(),
5100 "Decrypted message should match original message"
5101 );
5102 break;
5103 }
5104 });
5105 }
5106
5107 #[test_traced]
5108 fn test_tle() {
5109 tle::<MinPk, Random>();
5110 tle::<MinSig, Random>();
5111 }
5112
5113 fn hailstorm<S, F, L>(
5114 seed: u64,
5115 shutdowns: usize,
5116 interval: ViewDelta,
5117 mut fixture: F,
5118 ) -> String
5119 where
5120 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5121 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5122 L: Elector<S>,
5123 {
5124 let n = 5;
5126 let activity_timeout = ViewDelta::new(10);
5127 let skip_timeout = ViewDelta::new(5);
5128 let namespace = b"consensus".to_vec();
5129 let cfg = deterministic::Config::new().with_seed(seed);
5130 let executor = deterministic::Runner::new(cfg);
5131 executor.start(|mut context| async move {
5132 let (network, mut oracle) = Network::new(
5134 context.with_label("network"),
5135 Config {
5136 max_size: 1024 * 1024,
5137 disconnect_on_block: true,
5138 tracked_peer_sets: None,
5139 },
5140 );
5141
5142 network.start();
5144
5145 let Fixture {
5147 participants,
5148 schemes,
5149 ..
5150 } = fixture(&mut context, &namespace, n);
5151 let mut registrations = register_validators(&mut oracle, &participants).await;
5152
5153 let link = Link {
5155 latency: Duration::from_millis(10),
5156 jitter: Duration::from_millis(1),
5157 success_rate: 1.0,
5158 };
5159 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5160
5161 let elector = L::default();
5163 let relay = Arc::new(mocks::relay::Relay::new());
5164 let mut reporters = BTreeMap::new();
5165 let mut engine_handlers = BTreeMap::new();
5166 for (idx, validator) in participants.iter().enumerate() {
5167 let context = context.with_label(&format!("validator_{}", *validator));
5169
5170 let reporter_config = mocks::reporter::Config {
5172 participants: participants.clone().try_into().unwrap(),
5173 scheme: schemes[idx].clone(),
5174 elector: elector.clone(),
5175 };
5176 let reporter =
5177 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5178 reporters.insert(idx, reporter.clone());
5179 let application_cfg = mocks::application::Config {
5180 hasher: Sha256::default(),
5181 relay: relay.clone(),
5182 me: validator.clone(),
5183 propose_latency: (10.0, 5.0),
5184 verify_latency: (10.0, 5.0),
5185 certify_latency: (10.0, 5.0),
5186 should_certify: mocks::application::Certifier::Sometimes,
5187 };
5188 let (actor, application) = mocks::application::Application::new(
5189 context.with_label("application"),
5190 application_cfg,
5191 );
5192 actor.start();
5193 let blocker = oracle.control(validator.clone());
5194 let cfg = config::Config {
5195 scheme: schemes[idx].clone(),
5196 elector: elector.clone(),
5197 blocker,
5198 automaton: application.clone(),
5199 relay: application.clone(),
5200 reporter: reporter.clone(),
5201 strategy: Sequential,
5202 partition: validator.to_string(),
5203 mailbox_size: 1024,
5204 epoch: Epoch::new(333),
5205 leader_timeout: Duration::from_secs(1),
5206 notarization_timeout: Duration::from_secs(2),
5207 nullify_retry: Duration::from_secs(10),
5208 fetch_timeout: Duration::from_secs(1),
5209 activity_timeout,
5210 skip_timeout,
5211 fetch_concurrent: 4,
5212 replay_buffer: NZUsize!(1024 * 1024),
5213 write_buffer: NZUsize!(1024 * 1024),
5214 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5215 };
5216 let engine = Engine::new(context.with_label("engine"), cfg);
5217
5218 let (pending, recovered, resolver) = registrations
5220 .remove(validator)
5221 .expect("validator should be registered");
5222 engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
5223 }
5224
5225 let mut target = View::zero();
5227 for i in 0..shutdowns {
5228 target = target.saturating_add(interval);
5230
5231 let mut finalizers = Vec::new();
5233 for (_, reporter) in reporters.iter_mut() {
5234 let (mut latest, mut monitor) = reporter.subscribe().await;
5235 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5236 while latest < target {
5237 latest = monitor.recv().await.expect("event missing");
5238 }
5239 }));
5240 }
5241 join_all(finalizers).await;
5242 target = target.saturating_add(interval);
5243
5244 let idx = context.gen_range(0..engine_handlers.len());
5246 let validator = &participants[idx];
5247 let handle = engine_handlers.remove(&idx).unwrap();
5248 handle.abort();
5249 let _ = handle.await;
5250 let selected_reporter = reporters.remove(&idx).unwrap();
5251 info!(idx, ?validator, "shutdown validator");
5252
5253 let mut finalizers = Vec::new();
5255 for (_, reporter) in reporters.iter_mut() {
5256 let (mut latest, mut monitor) = reporter.subscribe().await;
5257 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5258 while latest < target {
5259 latest = monitor.recv().await.expect("event missing");
5260 }
5261 }));
5262 }
5263 join_all(finalizers).await;
5264 target = target.saturating_add(interval);
5265
5266 info!(idx, ?validator, "restarting validator");
5268 let context =
5269 context.with_label(&format!("validator_{}_restarted_{}", *validator, i));
5270
5271 let (pending, recovered, resolver) =
5273 register_validator(&mut oracle, validator.clone()).await;
5274 let application_cfg = mocks::application::Config {
5275 hasher: Sha256::default(),
5276 relay: relay.clone(),
5277 me: validator.clone(),
5278 propose_latency: (10.0, 5.0),
5279 verify_latency: (10.0, 5.0),
5280 certify_latency: (10.0, 5.0),
5281 should_certify: mocks::application::Certifier::Sometimes,
5282 };
5283 let (actor, application) = mocks::application::Application::new(
5284 context.with_label("application"),
5285 application_cfg,
5286 );
5287 actor.start();
5288 reporters.insert(idx, selected_reporter.clone());
5289 let blocker = oracle.control(validator.clone());
5290 let cfg = config::Config {
5291 scheme: schemes[idx].clone(),
5292 elector: elector.clone(),
5293 blocker,
5294 automaton: application.clone(),
5295 relay: application.clone(),
5296 reporter: selected_reporter,
5297 strategy: Sequential,
5298 partition: validator.to_string(),
5299 mailbox_size: 1024,
5300 epoch: Epoch::new(333),
5301 leader_timeout: Duration::from_secs(1),
5302 notarization_timeout: Duration::from_secs(2),
5303 nullify_retry: Duration::from_secs(10),
5304 fetch_timeout: Duration::from_secs(1),
5305 activity_timeout,
5306 skip_timeout,
5307 fetch_concurrent: 4,
5308 replay_buffer: NZUsize!(1024 * 1024),
5309 write_buffer: NZUsize!(1024 * 1024),
5310 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5311 };
5312 let engine = Engine::new(context.with_label("engine"), cfg);
5313 engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
5314
5315 let mut finalizers = Vec::new();
5317 for (_, reporter) in reporters.iter_mut() {
5318 let (mut latest, mut monitor) = reporter.subscribe().await;
5319 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5320 while latest < target {
5321 latest = monitor.recv().await.expect("event missing");
5322 }
5323 }));
5324 }
5325 join_all(finalizers).await;
5326 info!(idx, ?validator, "validator recovered");
5327 }
5328
5329 let latest_complete = target.saturating_sub(activity_timeout);
5331 for (_, reporter) in reporters.iter() {
5332 {
5334 let faults = reporter.faults.lock().unwrap();
5335 assert!(faults.is_empty());
5336 }
5337
5338 {
5340 let invalid = reporter.invalid.lock().unwrap();
5341 assert_eq!(*invalid, 0);
5342 }
5343
5344 let mut notarized = HashMap::new();
5346 let mut finalized = HashMap::new();
5347 {
5348 let notarizes = reporter.notarizes.lock().unwrap();
5349 for view in View::range(View::new(1), latest_complete) {
5350 let Some(payloads) = notarizes.get(&view) else {
5352 continue;
5353 };
5354 if payloads.len() > 1 {
5355 panic!("view: {view}");
5356 }
5357 let (digest, _) = payloads.iter().next().unwrap();
5358 notarized.insert(view, *digest);
5359 }
5360 }
5361 {
5362 let notarizations = reporter.notarizations.lock().unwrap();
5363 for view in View::range(View::new(1), latest_complete) {
5364 let Some(notarization) = notarizations.get(&view) else {
5366 continue;
5367 };
5368 let Some(digest) = notarized.get(&view) else {
5369 continue;
5370 };
5371 assert_eq!(¬arization.proposal.payload, digest);
5372 }
5373 }
5374 {
5375 let finalizes = reporter.finalizes.lock().unwrap();
5376 for view in View::range(View::new(1), latest_complete) {
5377 let Some(payloads) = finalizes.get(&view) else {
5379 continue;
5380 };
5381 if payloads.len() > 1 {
5382 panic!("view: {view}");
5383 }
5384 let (digest, _) = payloads.iter().next().unwrap();
5385 finalized.insert(view, *digest);
5386
5387 if view > latest_complete {
5389 continue;
5390 }
5391
5392 let nullifies = reporter.nullifies.lock().unwrap();
5394 let Some(nullifies) = nullifies.get(&view) else {
5395 continue;
5396 };
5397 for (_, finalizers) in payloads.iter() {
5398 for finalizer in finalizers.iter() {
5399 if nullifies.contains(finalizer) {
5400 panic!("should not nullify and finalize at same view");
5401 }
5402 }
5403 }
5404 }
5405 }
5406 {
5407 let finalizations = reporter.finalizations.lock().unwrap();
5408 for view in View::range(View::new(1), latest_complete) {
5409 let Some(finalization) = finalizations.get(&view) else {
5411 continue;
5412 };
5413 let Some(digest) = finalized.get(&view) else {
5414 continue;
5415 };
5416 assert_eq!(&finalization.proposal.payload, digest);
5417 }
5418 }
5419 }
5420
5421 let blocked = oracle.blocked().await.unwrap();
5423 assert!(blocked.is_empty());
5424
5425 context.auditor().state()
5427 })
5428 }
5429
5430 #[test_group("slow")]
5431 #[test_traced]
5432 fn test_hailstorm_bls12381_threshold_vrf_min_pk() {
5433 assert_eq!(
5434 hailstorm::<_, _, Random>(
5435 0,
5436 10,
5437 ViewDelta::new(15),
5438 bls12381_threshold_vrf::fixture::<MinPk, _>
5439 ),
5440 hailstorm::<_, _, Random>(
5441 0,
5442 10,
5443 ViewDelta::new(15),
5444 bls12381_threshold_vrf::fixture::<MinPk, _>
5445 ),
5446 );
5447 }
5448
5449 #[test_group("slow")]
5450 #[test_traced]
5451 fn test_hailstorm_bls12381_threshold_vrf_min_sig() {
5452 assert_eq!(
5453 hailstorm::<_, _, Random>(
5454 0,
5455 10,
5456 ViewDelta::new(15),
5457 bls12381_threshold_vrf::fixture::<MinSig, _>
5458 ),
5459 hailstorm::<_, _, Random>(
5460 0,
5461 10,
5462 ViewDelta::new(15),
5463 bls12381_threshold_vrf::fixture::<MinSig, _>
5464 ),
5465 );
5466 }
5467
5468 #[test_group("slow")]
5469 #[test_traced]
5470 fn test_hailstorm_bls12381_threshold_std_min_pk() {
5471 assert_eq!(
5472 hailstorm::<_, _, RoundRobin>(
5473 0,
5474 10,
5475 ViewDelta::new(15),
5476 bls12381_threshold_std::fixture::<MinPk, _>
5477 ),
5478 hailstorm::<_, _, RoundRobin>(
5479 0,
5480 10,
5481 ViewDelta::new(15),
5482 bls12381_threshold_std::fixture::<MinPk, _>
5483 ),
5484 );
5485 }
5486
5487 #[test_group("slow")]
5488 #[test_traced]
5489 fn test_hailstorm_bls12381_threshold_std_min_sig() {
5490 assert_eq!(
5491 hailstorm::<_, _, RoundRobin>(
5492 0,
5493 10,
5494 ViewDelta::new(15),
5495 bls12381_threshold_std::fixture::<MinSig, _>
5496 ),
5497 hailstorm::<_, _, RoundRobin>(
5498 0,
5499 10,
5500 ViewDelta::new(15),
5501 bls12381_threshold_std::fixture::<MinSig, _>
5502 ),
5503 );
5504 }
5505
5506 #[test_group("slow")]
5507 #[test_traced]
5508 fn test_hailstorm_bls12381_multisig_min_pk() {
5509 assert_eq!(
5510 hailstorm::<_, _, RoundRobin>(
5511 0,
5512 10,
5513 ViewDelta::new(15),
5514 bls12381_multisig::fixture::<MinPk, _>
5515 ),
5516 hailstorm::<_, _, RoundRobin>(
5517 0,
5518 10,
5519 ViewDelta::new(15),
5520 bls12381_multisig::fixture::<MinPk, _>
5521 ),
5522 );
5523 }
5524
5525 #[test_group("slow")]
5526 #[test_traced]
5527 fn test_hailstorm_bls12381_multisig_min_sig() {
5528 assert_eq!(
5529 hailstorm::<_, _, RoundRobin>(
5530 0,
5531 10,
5532 ViewDelta::new(15),
5533 bls12381_multisig::fixture::<MinSig, _>
5534 ),
5535 hailstorm::<_, _, RoundRobin>(
5536 0,
5537 10,
5538 ViewDelta::new(15),
5539 bls12381_multisig::fixture::<MinSig, _>
5540 ),
5541 );
5542 }
5543
5544 #[test_group("slow")]
5545 #[test_traced]
5546 fn test_hailstorm_ed25519() {
5547 assert_eq!(
5548 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture),
5549 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture)
5550 );
5551 }
5552
5553 #[test_group("slow")]
5554 #[test_traced]
5555 fn test_hailstorm_secp256r1() {
5556 assert_eq!(
5557 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture),
5558 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture)
5559 );
5560 }
5561
5562 fn twins<S, F, L>(seed: u64, n: u32, strategy: Strategy, link: Link, mut fixture: F)
5564 where
5565 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5566 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5567 L: Elector<S>,
5568 {
5569 let faults = N3f1::max_faults(n);
5570 let required_containers = View::new(100);
5571 let activity_timeout = ViewDelta::new(10);
5572 let skip_timeout = ViewDelta::new(5);
5573 let namespace = b"consensus".to_vec();
5574 let cfg = deterministic::Config::new()
5575 .with_seed(seed)
5576 .with_timeout(Some(Duration::from_secs(900)));
5577 let executor = deterministic::Runner::new(cfg);
5578 executor.start(|mut context| async move {
5579 let (network, mut oracle) = Network::new(
5580 context.with_label("network"),
5581 Config {
5582 max_size: 1024 * 1024,
5583 disconnect_on_block: false,
5584 tracked_peer_sets: None,
5585 },
5586 );
5587 network.start();
5588
5589 let Fixture {
5590 participants,
5591 schemes,
5592 ..
5593 } = fixture(&mut context, &namespace, n);
5594 let participants: Arc<[_]> = participants.into();
5595 let mut registrations = register_validators(&mut oracle, &participants).await;
5596 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5597
5598 let elector = L::default();
5601 let relay = Arc::new(mocks::relay::Relay::new());
5602 let mut reporters = Vec::new();
5603 let mut engine_handlers = Vec::new();
5604
5605 for (idx, validator) in participants.iter().enumerate().take(faults as usize) {
5607 let (
5608 (vote_sender, vote_receiver),
5609 (certificate_sender, certificate_receiver),
5610 (resolver_sender, resolver_receiver),
5611 ) = registrations
5612 .remove(validator)
5613 .expect("validator should be registered");
5614
5615 let make_vote_forwarder = || {
5617 let participants = participants.clone();
5618 move |origin: SplitOrigin, _: &Recipients<_>, message: &IoBuf| {
5619 let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
5620 let (primary, secondary) =
5621 strategy.partitions(msg.view(), participants.as_ref());
5622 match origin {
5623 SplitOrigin::Primary => Some(Recipients::Some(primary)),
5624 SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
5625 }
5626 }
5627 };
5628 let make_certificate_forwarder = || {
5630 let codec = schemes[idx].certificate_codec_config();
5631 let participants = participants.clone();
5632 move |origin: SplitOrigin, _: &Recipients<_>, message: &IoBuf| {
5633 let msg: Certificate<S, D> =
5634 Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
5635 let (primary, secondary) =
5636 strategy.partitions(msg.view(), participants.as_ref());
5637 match origin {
5638 SplitOrigin::Primary => Some(Recipients::Some(primary)),
5639 SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
5640 }
5641 }
5642 };
5643 let make_drop_forwarder =
5644 || move |_: SplitOrigin, _: &Recipients<_>, _: &IoBuf| None;
5645
5646 let make_vote_router = || {
5648 let participants = participants.clone();
5649 move |(sender, message): &(_, IoBuf)| {
5650 let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
5651 strategy.route(msg.view(), sender, participants.as_ref())
5652 }
5653 };
5654 let make_certificate_router = || {
5656 let codec = schemes[idx].certificate_codec_config();
5657 let participants = participants.clone();
5658 move |(sender, message): &(_, IoBuf)| {
5659 let msg: Certificate<S, D> =
5660 Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
5661 strategy.route(msg.view(), sender, participants.as_ref())
5662 }
5663 };
5664 let make_drop_router = || move |(_, _): &(_, _)| SplitTarget::None;
5665
5666 let (vote_sender_primary, vote_sender_secondary) =
5668 vote_sender.split_with(make_vote_forwarder());
5669 let (vote_receiver_primary, vote_receiver_secondary) = vote_receiver.split_with(
5670 context.with_label(&format!("pending_split_{idx}")),
5671 make_vote_router(),
5672 );
5673 let (certificate_sender_primary, certificate_sender_secondary) =
5674 certificate_sender.split_with(make_certificate_forwarder());
5675 let (certificate_receiver_primary, certificate_receiver_secondary) =
5676 certificate_receiver.split_with(
5677 context.with_label(&format!("recovered_split_{idx}")),
5678 make_certificate_router(),
5679 );
5680
5681 let (resolver_sender_primary, resolver_sender_secondary) =
5683 resolver_sender.split_with(make_drop_forwarder());
5684 let (resolver_receiver_primary, resolver_receiver_secondary) = resolver_receiver
5685 .split_with(
5686 context.with_label(&format!("resolver_split_{idx}")),
5687 make_drop_router(),
5688 );
5689
5690 for (twin_label, pending, recovered, resolver) in [
5691 (
5692 "primary",
5693 (vote_sender_primary, vote_receiver_primary),
5694 (certificate_sender_primary, certificate_receiver_primary),
5695 (resolver_sender_primary, resolver_receiver_primary),
5696 ),
5697 (
5698 "secondary",
5699 (vote_sender_secondary, vote_receiver_secondary),
5700 (certificate_sender_secondary, certificate_receiver_secondary),
5701 (resolver_sender_secondary, resolver_receiver_secondary),
5702 ),
5703 ] {
5704 let label = format!("twin_{idx}_{twin_label}");
5705 let context = context.with_label(&label);
5706
5707 let reporter_config = mocks::reporter::Config {
5708 participants: participants.as_ref().try_into().unwrap(),
5709 scheme: schemes[idx].clone(),
5710 elector: elector.clone(),
5711 };
5712 let reporter = mocks::reporter::Reporter::new(
5713 context.with_label("reporter"),
5714 reporter_config,
5715 );
5716 reporters.push(reporter.clone());
5717
5718 let application_cfg = mocks::application::Config {
5719 hasher: Sha256::default(),
5720 relay: relay.clone(),
5721 me: validator.clone(),
5722 propose_latency: (10.0, 5.0),
5723 verify_latency: (10.0, 5.0),
5724 certify_latency: (10.0, 5.0),
5725 should_certify: mocks::application::Certifier::Sometimes,
5726 };
5727 let (actor, application) = mocks::application::Application::new(
5728 context.with_label("application"),
5729 application_cfg,
5730 );
5731 actor.start();
5732
5733 let blocker = oracle.control(validator.clone());
5734 let cfg = config::Config {
5735 scheme: schemes[idx].clone(),
5736 elector: elector.clone(),
5737 blocker,
5738 automaton: application.clone(),
5739 relay: application.clone(),
5740 reporter: reporter.clone(),
5741 strategy: Sequential,
5742 partition: label,
5743 mailbox_size: 1024,
5744 epoch: Epoch::new(333),
5745 leader_timeout: Duration::from_secs(1),
5746 notarization_timeout: Duration::from_secs(2),
5747 nullify_retry: Duration::from_secs(10),
5748 fetch_timeout: Duration::from_secs(1),
5749 activity_timeout,
5750 skip_timeout,
5751 fetch_concurrent: 4,
5752 replay_buffer: NZUsize!(1024 * 1024),
5753 write_buffer: NZUsize!(1024 * 1024),
5754 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5755 };
5756 let engine = Engine::new(context.with_label("engine"), cfg);
5757 engine_handlers.push(engine.start(pending, recovered, resolver));
5758 }
5759 }
5760
5761 for (idx, validator) in participants.iter().enumerate().skip(faults as usize) {
5763 let label = format!("honest_{idx}");
5764 let context = context.with_label(&label);
5765
5766 let reporter_config = mocks::reporter::Config {
5767 participants: participants.as_ref().try_into().unwrap(),
5768 scheme: schemes[idx].clone(),
5769 elector: elector.clone(),
5770 };
5771 let reporter =
5772 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5773 reporters.push(reporter.clone());
5774
5775 let application_cfg = mocks::application::Config {
5776 hasher: Sha256::default(),
5777 relay: relay.clone(),
5778 me: validator.clone(),
5779 propose_latency: (10.0, 5.0),
5780 verify_latency: (10.0, 5.0),
5781 certify_latency: (10.0, 5.0),
5782 should_certify: mocks::application::Certifier::Sometimes,
5783 };
5784 let (actor, application) = mocks::application::Application::new(
5785 context.with_label("application"),
5786 application_cfg,
5787 );
5788 actor.start();
5789
5790 let blocker = oracle.control(validator.clone());
5791 let cfg = config::Config {
5792 scheme: schemes[idx].clone(),
5793 elector: elector.clone(),
5794 blocker,
5795 automaton: application.clone(),
5796 relay: application.clone(),
5797 reporter: reporter.clone(),
5798 strategy: Sequential,
5799 partition: label,
5800 mailbox_size: 1024,
5801 epoch: Epoch::new(333),
5802 leader_timeout: Duration::from_secs(1),
5803 notarization_timeout: Duration::from_secs(2),
5804 nullify_retry: Duration::from_secs(10),
5805 fetch_timeout: Duration::from_secs(1),
5806 activity_timeout,
5807 skip_timeout,
5808 fetch_concurrent: 4,
5809 replay_buffer: NZUsize!(1024 * 1024),
5810 write_buffer: NZUsize!(1024 * 1024),
5811 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5812 };
5813 let engine = Engine::new(context.with_label("engine"), cfg);
5814
5815 let (pending, recovered, resolver) = registrations
5816 .remove(validator)
5817 .expect("validator should be registered");
5818 engine_handlers.push(engine.start(pending, recovered, resolver));
5819 }
5820
5821 let mut finalizers = Vec::new();
5823 for reporter in reporters.iter_mut() {
5824 let (mut latest, mut monitor) = reporter.subscribe().await;
5825 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5826 while latest < required_containers {
5827 latest = monitor.recv().await.expect("event missing");
5828 }
5829 }));
5830 }
5831 join_all(finalizers).await;
5832
5833 let honest_start = faults as usize * 2; let mut finalized_at_view: BTreeMap<View, D> = BTreeMap::new();
5836 for reporter in reporters.iter().skip(honest_start) {
5837 let finalizations = reporter.finalizations.lock().unwrap();
5838 for (view, finalization) in finalizations.iter() {
5839 let digest = finalization.proposal.payload;
5840 if let Some(existing) = finalized_at_view.get(view) {
5841 assert_eq!(
5842 existing, &digest,
5843 "safety violation: conflicting finalizations at view {view}"
5844 );
5845 } else {
5846 finalized_at_view.insert(*view, digest);
5847 }
5848 }
5849 }
5850
5851 for reporter in reporters.iter().skip(honest_start) {
5853 let invalid = reporter.invalid.lock().unwrap();
5854 assert_eq!(*invalid, 0, "invalid signatures detected");
5855 }
5856
5857 let twin_identities: Vec<_> = participants.iter().take(faults as usize).collect();
5859 for reporter in reporters.iter().skip(honest_start) {
5860 let faults = reporter.faults.lock().unwrap();
5861 for (faulter, _) in faults.iter() {
5862 assert!(
5863 twin_identities.contains(&faulter),
5864 "fault from non-twin participant"
5865 );
5866 }
5867 }
5868
5869 let blocked = oracle.blocked().await.unwrap();
5871 for (_, faulter) in blocked.iter() {
5872 assert!(
5873 twin_identities.contains(&faulter),
5874 "blocked connection from non-twin participant"
5875 );
5876 }
5877 });
5878 }
5879
5880 fn test_twins<S, F, L>(mut fixture: F)
5881 where
5882 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5883 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5884 L: Elector<S>,
5885 {
5886 for strategy in [
5887 Strategy::View,
5888 Strategy::Fixed(3),
5889 Strategy::Isolate(4),
5890 Strategy::Broadcast,
5891 Strategy::Shuffle,
5892 ] {
5893 for link in [
5894 Link {
5895 latency: Duration::from_millis(10),
5896 jitter: Duration::from_millis(1),
5897 success_rate: 1.0,
5898 },
5899 Link {
5900 latency: Duration::from_millis(200),
5901 jitter: Duration::from_millis(150),
5902 success_rate: 0.75,
5903 },
5904 ] {
5905 twins::<S, _, L>(0, 5, strategy, link, |namespace, context, n| {
5906 fixture(namespace, context, n)
5907 });
5908 }
5909 }
5910 }
5911
5912 #[test_group("slow")]
5913 #[test_traced]
5914 fn test_twins_multisig_min_pk() {
5915 test_twins::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
5916 }
5917
5918 #[test_group("slow")]
5919 #[test_traced]
5920 fn test_twins_multisig_min_sig() {
5921 test_twins::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
5922 }
5923
5924 #[test_group("slow")]
5925 #[test_traced]
5926 fn test_twins_threshold_vrf_min_pk() {
5927 test_twins::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
5928 }
5929
5930 #[test_group("slow")]
5931 #[test_traced]
5932 fn test_twins_threshold_vrf_min_sig() {
5933 test_twins::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
5934 }
5935
5936 #[test_group("slow")]
5937 #[test_traced]
5938 fn test_twins_threshold_std_min_pk() {
5939 test_twins::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
5940 }
5941
5942 #[test_group("slow")]
5943 #[test_traced]
5944 fn test_twins_threshold_std_min_sig() {
5945 test_twins::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
5946 }
5947
5948 #[test_group("slow")]
5949 #[test_traced]
5950 fn test_twins_ed25519() {
5951 test_twins::<_, _, RoundRobin>(ed25519::fixture);
5952 }
5953
5954 #[test_group("slow")]
5955 #[test_traced]
5956 fn test_twins_secp256r1() {
5957 test_twins::<_, _, RoundRobin>(secp256r1::fixture);
5958 }
5959
5960 #[test_group("slow")]
5961 #[test_traced]
5962 fn test_twins_large_view() {
5963 twins::<_, _, Random>(
5964 0,
5965 10,
5966 Strategy::View,
5967 Link {
5968 latency: Duration::from_millis(200),
5969 jitter: Duration::from_millis(150),
5970 success_rate: 0.75,
5971 },
5972 bls12381_threshold_vrf::fixture::<MinPk, _>,
5973 );
5974 }
5975
5976 #[test_group("slow")]
5977 #[test_traced]
5978 fn test_twins_large_shuffle() {
5979 twins::<_, _, Random>(
5980 0,
5981 10,
5982 Strategy::Shuffle,
5983 Link {
5984 latency: Duration::from_millis(200),
5985 jitter: Duration::from_millis(150),
5986 success_rate: 0.75,
5987 },
5988 bls12381_threshold_vrf::fixture::<MinPk, _>,
5989 );
5990 }
5991}