1pub mod elector;
239pub mod scheme;
240pub mod types;
241
242cfg_if::cfg_if! {
243 if #[cfg(not(target_arch = "wasm32"))] {
244 mod actors;
245 pub mod config;
246 pub use config::Config;
247 mod engine;
248 pub use engine::Engine;
249 mod metrics;
250 }
251}
252
253#[cfg(any(test, feature = "fuzz"))]
254pub mod mocks;
255
256use crate::types::{View, ViewDelta};
257
258pub(crate) const fn min_active(activity_timeout: ViewDelta, last_finalized: View) -> View {
260 last_finalized.saturating_sub(activity_timeout)
261}
262
263pub(crate) fn interesting(
267 activity_timeout: ViewDelta,
268 last_finalized: View,
269 current: View,
270 pending: View,
271 allow_future: bool,
272) -> bool {
273 if pending.is_zero() {
275 return false;
276 }
277 if pending < min_active(activity_timeout, last_finalized) {
278 return false;
279 }
280 if !allow_future && pending > current.next() {
281 return false;
282 }
283 true
284}
285
286#[cfg(test)]
288pub(crate) fn quorum(n: u32) -> u32 {
289 use commonware_utils::{Faults, N3f1};
290
291 N3f1::quorum(n)
292}
293
294#[cfg(test)]
295mod tests {
296 use super::*;
297 use crate::{
298 simplex::{
299 elector::{Config as Elector, Random, RoundRobin},
300 mocks::twins::Strategy,
301 scheme::{
302 bls12381_multisig, bls12381_threshold, bls12381_threshold::Seedable, ed25519,
303 secp256r1, Scheme,
304 },
305 types::{
306 Certificate, Finalization as TFinalization, Finalize as TFinalize,
307 Notarization as TNotarization, Notarize as TNotarize,
308 Nullification as TNullification, Nullify as TNullify, Proposal, Vote,
309 },
310 },
311 types::{Epoch, Round},
312 Monitor, Viewable,
313 };
314 use bytes::Bytes;
315 use commonware_codec::{Decode, DecodeExt, Encode};
316 use commonware_cryptography::{
317 bls12381::primitives::variant::{MinPk, MinSig, Variant},
318 certificate::mocks::Fixture,
319 ed25519::{PrivateKey, PublicKey},
320 sha256::{Digest as Sha256Digest, Digest as D},
321 Hasher as _, Sha256, Signer as _,
322 };
323 use commonware_macros::{select, test_group, test_traced};
324 use commonware_p2p::{
325 simulated::{Config, Link, Network, Oracle, Receiver, Sender, SplitOrigin, SplitTarget},
326 Recipients, Sender as _,
327 };
328 use commonware_parallel::Sequential;
329 use commonware_runtime::{
330 buffer::PoolRef, count_running_tasks, deterministic, Clock, Metrics, Quota, Runner, Spawner,
331 };
332 use commonware_utils::{test_rng, Faults, N3f1, NZUsize, NZU16};
333 use engine::Engine;
334 use futures::{future::join_all, StreamExt};
335 use rand::{rngs::StdRng, Rng as _};
336 use std::{
337 collections::{BTreeMap, HashMap},
338 num::{NonZeroU16, NonZeroU32, NonZeroUsize},
339 sync::{Arc, Mutex},
340 time::Duration,
341 };
342 use tracing::{debug, info, warn};
343 use types::Activity;
344
345 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
346 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
347 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
348
349 #[test]
350 fn test_interesting() {
351 let activity_timeout = ViewDelta::new(10);
352
353 assert!(!interesting(
355 activity_timeout,
356 View::zero(),
357 View::zero(),
358 View::zero(),
359 false
360 ));
361 assert!(!interesting(
362 activity_timeout,
363 View::zero(),
364 View::new(1),
365 View::zero(),
366 true
367 ));
368
369 assert!(!interesting(
371 activity_timeout,
372 View::new(20),
373 View::new(25),
374 View::new(5), false
376 ));
377
378 assert!(interesting(
380 activity_timeout,
381 View::new(20),
382 View::new(25),
383 View::new(10), false
385 ));
386
387 assert!(!interesting(
389 activity_timeout,
390 View::new(20),
391 View::new(25),
392 View::new(27),
393 false
394 ));
395
396 assert!(interesting(
398 activity_timeout,
399 View::new(20),
400 View::new(25),
401 View::new(27),
402 true
403 ));
404
405 assert!(interesting(
407 activity_timeout,
408 View::new(20),
409 View::new(25),
410 View::new(26),
411 false
412 ));
413
414 assert!(interesting(
416 activity_timeout,
417 View::new(20),
418 View::new(25),
419 View::new(22),
420 false
421 ));
422
423 assert!(interesting(
426 activity_timeout,
427 View::zero(),
428 View::new(5),
429 View::new(1),
430 false
431 ));
432 }
433
434 async fn register_validator(
436 oracle: &mut Oracle<PublicKey, deterministic::Context>,
437 validator: PublicKey,
438 ) -> (
439 (
440 Sender<PublicKey, deterministic::Context>,
441 Receiver<PublicKey>,
442 ),
443 (
444 Sender<PublicKey, deterministic::Context>,
445 Receiver<PublicKey>,
446 ),
447 (
448 Sender<PublicKey, deterministic::Context>,
449 Receiver<PublicKey>,
450 ),
451 ) {
452 let control = oracle.control(validator.clone());
453 let (vote_sender, vote_receiver) = control.register(0, TEST_QUOTA).await.unwrap();
454 let (certificate_sender, certificate_receiver) =
455 control.register(1, TEST_QUOTA).await.unwrap();
456 let (resolver_sender, resolver_receiver) = control.register(2, TEST_QUOTA).await.unwrap();
457 (
458 (vote_sender, vote_receiver),
459 (certificate_sender, certificate_receiver),
460 (resolver_sender, resolver_receiver),
461 )
462 }
463
464 async fn register_validators(
466 oracle: &mut Oracle<PublicKey, deterministic::Context>,
467 validators: &[PublicKey],
468 ) -> HashMap<
469 PublicKey,
470 (
471 (
472 Sender<PublicKey, deterministic::Context>,
473 Receiver<PublicKey>,
474 ),
475 (
476 Sender<PublicKey, deterministic::Context>,
477 Receiver<PublicKey>,
478 ),
479 (
480 Sender<PublicKey, deterministic::Context>,
481 Receiver<PublicKey>,
482 ),
483 ),
484 > {
485 let mut registrations = HashMap::new();
486 for validator in validators.iter() {
487 let registration = register_validator(oracle, validator.clone()).await;
488 registrations.insert(validator.clone(), registration);
489 }
490 registrations
491 }
492
493 enum Action {
495 Link(Link),
496 Update(Link), Unlink,
498 }
499
500 async fn link_validators(
506 oracle: &mut Oracle<PublicKey, deterministic::Context>,
507 validators: &[PublicKey],
508 action: Action,
509 restrict_to: Option<fn(usize, usize, usize) -> bool>,
510 ) {
511 for (i1, v1) in validators.iter().enumerate() {
512 for (i2, v2) in validators.iter().enumerate() {
513 if v2 == v1 {
515 continue;
516 }
517
518 if let Some(f) = restrict_to {
520 if !f(validators.len(), i1, i2) {
521 continue;
522 }
523 }
524
525 match action {
527 Action::Update(_) | Action::Unlink => {
528 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
529 }
530 _ => {}
531 }
532
533 match action {
535 Action::Link(ref link) | Action::Update(ref link) => {
536 oracle
537 .add_link(v1.clone(), v2.clone(), link.clone())
538 .await
539 .unwrap();
540 }
541 _ => {}
542 }
543 }
544 }
545 }
546
547 fn all_online<S, F, L>(mut fixture: F)
548 where
549 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
550 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
551 L: Elector<S>,
552 {
553 let n = 5;
555 let quorum = quorum(n) as usize;
556 let required_containers = View::new(100);
557 let activity_timeout = ViewDelta::new(10);
558 let skip_timeout = ViewDelta::new(5);
559 let namespace = b"consensus".to_vec();
560 let executor = deterministic::Runner::timed(Duration::from_secs(300));
561 executor.start(|mut context| async move {
562 let (network, mut oracle) = Network::new(
564 context.with_label("network"),
565 Config {
566 max_size: 1024 * 1024,
567 disconnect_on_block: true,
568 tracked_peer_sets: None,
569 },
570 );
571
572 network.start();
574
575 let Fixture {
577 participants,
578 schemes,
579 ..
580 } = fixture(&mut context, &namespace, n);
581 let mut registrations = register_validators(&mut oracle, &participants).await;
582
583 let link = Link {
585 latency: Duration::from_millis(10),
586 jitter: Duration::from_millis(1),
587 success_rate: 1.0,
588 };
589 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
590
591 let elector = L::default();
593 let relay = Arc::new(mocks::relay::Relay::new());
594 let mut reporters = Vec::new();
595 let mut engine_handlers = Vec::new();
596 for (idx, validator) in participants.iter().enumerate() {
597 let context = context.with_label(&format!("validator_{}", *validator));
599
600 let reporter_config = mocks::reporter::Config {
602 participants: participants.clone().try_into().unwrap(),
603 scheme: schemes[idx].clone(),
604 elector: elector.clone(),
605 };
606 let reporter =
607 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
608 reporters.push(reporter.clone());
609 let application_cfg = mocks::application::Config {
610 hasher: Sha256::default(),
611 relay: relay.clone(),
612 me: validator.clone(),
613 propose_latency: (10.0, 5.0),
614 verify_latency: (10.0, 5.0),
615 certify_latency: (10.0, 5.0),
616 should_certify: mocks::application::Certifier::Sometimes,
617 };
618 let (actor, application) = mocks::application::Application::new(
619 context.with_label("application"),
620 application_cfg,
621 );
622 actor.start();
623 let blocker = oracle.control(validator.clone());
624 let cfg = config::Config {
625 scheme: schemes[idx].clone(),
626 elector: elector.clone(),
627 blocker,
628 automaton: application.clone(),
629 relay: application.clone(),
630 reporter: reporter.clone(),
631 strategy: Sequential,
632 partition: validator.to_string(),
633 mailbox_size: 1024,
634 epoch: Epoch::new(333),
635 leader_timeout: Duration::from_secs(1),
636 notarization_timeout: Duration::from_secs(2),
637 nullify_retry: Duration::from_secs(10),
638 fetch_timeout: Duration::from_secs(1),
639 activity_timeout,
640 skip_timeout,
641 fetch_concurrent: 4,
642 replay_buffer: NZUsize!(1024 * 1024),
643 write_buffer: NZUsize!(1024 * 1024),
644 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
645 };
646 let engine = Engine::new(context.with_label("engine"), cfg);
647
648 let (pending, recovered, resolver) = registrations
650 .remove(validator)
651 .expect("validator should be registered");
652 engine_handlers.push(engine.start(pending, recovered, resolver));
653 }
654
655 let mut finalizers = Vec::new();
657 for reporter in reporters.iter_mut() {
658 let (mut latest, mut monitor) = reporter.subscribe().await;
659 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
660 while latest < required_containers {
661 latest = monitor.next().await.expect("event missing");
662 }
663 }));
664 }
665 join_all(finalizers).await;
666
667 let latest_complete = required_containers.saturating_sub(activity_timeout);
669 for reporter in reporters.iter() {
670 {
672 let faults = reporter.faults.lock().unwrap();
673 assert!(faults.is_empty());
674 }
675
676 {
678 let invalid = reporter.invalid.lock().unwrap();
679 assert_eq!(*invalid, 0);
680 }
681
682 {
684 let certified = reporter.certified.lock().unwrap();
685 for view in View::range(View::new(1), latest_complete) {
686 if !certified.contains(&view) {
688 panic!("view: {view}");
689 }
690 }
691 }
692
693 let mut notarized = HashMap::new();
695 let mut finalized = HashMap::new();
696 {
697 let notarizes = reporter.notarizes.lock().unwrap();
698 for view in View::range(View::new(1), latest_complete) {
699 let Some(payloads) = notarizes.get(&view) else {
701 continue;
702 };
703 if payloads.len() > 1 {
704 panic!("view: {view}");
705 }
706 let (digest, notarizers) = payloads.iter().next().unwrap();
707 notarized.insert(view, *digest);
708
709 if notarizers.len() < quorum {
710 panic!("view: {view}");
713 }
714 }
715 }
716 {
717 let notarizations = reporter.notarizations.lock().unwrap();
718 for view in View::range(View::new(1), latest_complete) {
719 let Some(notarization) = notarizations.get(&view) else {
721 continue;
722 };
723 let Some(digest) = notarized.get(&view) else {
724 continue;
725 };
726 assert_eq!(¬arization.proposal.payload, digest);
727 }
728 }
729 {
730 let finalizes = reporter.finalizes.lock().unwrap();
731 for view in View::range(View::new(1), latest_complete) {
732 let Some(payloads) = finalizes.get(&view) else {
734 continue;
735 };
736 if payloads.len() > 1 {
737 panic!("view: {view}");
738 }
739 let (digest, finalizers) = payloads.iter().next().unwrap();
740 finalized.insert(view, *digest);
741
742 if view > latest_complete {
744 continue;
745 }
746
747 if finalizers.len() < quorum {
749 panic!("view: {view}");
752 }
753
754 let nullifies = reporter.nullifies.lock().unwrap();
756 let Some(nullifies) = nullifies.get(&view) else {
757 continue;
758 };
759 for (_, finalizers) in payloads.iter() {
760 for finalizer in finalizers.iter() {
761 if nullifies.contains(finalizer) {
762 panic!("should not nullify and finalize at same view");
763 }
764 }
765 }
766 }
767 }
768 {
769 let finalizations = reporter.finalizations.lock().unwrap();
770 for view in View::range(View::new(1), latest_complete) {
771 let Some(finalization) = finalizations.get(&view) else {
773 continue;
774 };
775 let Some(digest) = finalized.get(&view) else {
776 continue;
777 };
778 assert_eq!(&finalization.proposal.payload, digest);
779 }
780 }
781 }
782
783 let blocked = oracle.blocked().await.unwrap();
785 assert!(blocked.is_empty());
786 });
787 }
788
789 #[test_traced]
790 fn test_all_online() {
791 all_online::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
792 all_online::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
793 all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
794 all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
795 all_online::<_, _, RoundRobin>(ed25519::fixture);
796 all_online::<_, _, RoundRobin>(secp256r1::fixture);
797 }
798
799 fn observer<S, F, L>(mut fixture: F)
800 where
801 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
802 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
803 L: Elector<S>,
804 {
805 let n_active = 5;
807 let required_containers = View::new(100);
808 let activity_timeout = ViewDelta::new(10);
809 let skip_timeout = ViewDelta::new(5);
810 let namespace = b"consensus".to_vec();
811 let executor = deterministic::Runner::timed(Duration::from_secs(300));
812 executor.start(|mut context| async move {
813 let (network, mut oracle) = Network::new(
815 context.with_label("network"),
816 Config {
817 max_size: 1024 * 1024,
818 disconnect_on_block: true,
819 tracked_peer_sets: None,
820 },
821 );
822
823 network.start();
825
826 let Fixture {
828 participants,
829 schemes,
830 verifier,
831 ..
832 } = fixture(&mut context, &namespace, n_active);
833
834 let private_key_observer = PrivateKey::from_seed(n_active as u64);
836 let public_key_observer = private_key_observer.public_key();
837
838 let mut all_validators = participants.clone();
840 all_validators.push(public_key_observer.clone());
841 all_validators.sort();
842 let mut registrations = register_validators(&mut oracle, &all_validators).await;
843
844 let link = Link {
846 latency: Duration::from_millis(10),
847 jitter: Duration::from_millis(1),
848 success_rate: 1.0,
849 };
850 link_validators(&mut oracle, &all_validators, Action::Link(link), None).await;
851
852 let elector = L::default();
854 let relay = Arc::new(mocks::relay::Relay::new());
855 let mut reporters = Vec::new();
856
857 for (idx, validator) in participants.iter().enumerate() {
858 let is_observer = *validator == public_key_observer;
859
860 let context = context.with_label(&format!("validator_{}", *validator));
862
863 let signing = if is_observer {
865 verifier.clone()
866 } else {
867 schemes[idx].clone()
868 };
869 let reporter_config = mocks::reporter::Config {
870 participants: participants.clone().try_into().unwrap(),
871 scheme: signing.clone(),
872 elector: elector.clone(),
873 };
874 let reporter =
875 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
876 reporters.push(reporter.clone());
877 let application_cfg = mocks::application::Config {
878 hasher: Sha256::default(),
879 relay: relay.clone(),
880 me: validator.clone(),
881 propose_latency: (10.0, 5.0),
882 verify_latency: (10.0, 5.0),
883 certify_latency: (10.0, 5.0),
884 should_certify: mocks::application::Certifier::Sometimes,
885 };
886 let (actor, application) = mocks::application::Application::new(
887 context.with_label("application"),
888 application_cfg,
889 );
890 actor.start();
891 let blocker = oracle.control(validator.clone());
892 let cfg = config::Config {
893 scheme: signing.clone(),
894 elector: elector.clone(),
895 blocker,
896 automaton: application.clone(),
897 relay: application.clone(),
898 reporter: reporter.clone(),
899 strategy: Sequential,
900 partition: validator.to_string(),
901 mailbox_size: 1024,
902 epoch: Epoch::new(333),
903 leader_timeout: Duration::from_secs(1),
904 notarization_timeout: Duration::from_secs(2),
905 nullify_retry: Duration::from_secs(10),
906 fetch_timeout: Duration::from_secs(1),
907 activity_timeout,
908 skip_timeout,
909 fetch_concurrent: 4,
910 replay_buffer: NZUsize!(1024 * 1024),
911 write_buffer: NZUsize!(1024 * 1024),
912 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
913 };
914 let engine = Engine::new(context.with_label("engine"), cfg);
915
916 let (pending, recovered, resolver) = registrations
918 .remove(validator)
919 .expect("validator should be registered");
920 engine.start(pending, recovered, resolver);
921 }
922
923 let mut finalizers = Vec::new();
925 for reporter in reporters.iter_mut() {
926 let (mut latest, mut monitor) = reporter.subscribe().await;
927 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
928 while latest < required_containers {
929 latest = monitor.next().await.expect("event missing");
930 }
931 }));
932 }
933 join_all(finalizers).await;
934
935 for reporter in reporters.iter() {
937 {
939 let faults = reporter.faults.lock().unwrap();
940 assert!(faults.is_empty());
941 }
942 {
943 let invalid = reporter.invalid.lock().unwrap();
944 assert_eq!(*invalid, 0);
945 }
946
947 let blocked = oracle.blocked().await.unwrap();
949 assert!(blocked.is_empty());
950 }
951 });
952 }
953
954 #[test_traced]
955 fn test_observer() {
956 observer::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
957 observer::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
958 observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
959 observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
960 observer::<_, _, RoundRobin>(ed25519::fixture);
961 observer::<_, _, RoundRobin>(secp256r1::fixture);
962 }
963
964 fn unclean_shutdown<S, F, L>(mut fixture: F)
965 where
966 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
967 F: FnMut(&mut StdRng, &[u8], u32) -> Fixture<S>,
968 L: Elector<S>,
969 {
970 let n = 5;
972 let required_containers = View::new(100);
973 let activity_timeout = ViewDelta::new(10);
974 let skip_timeout = ViewDelta::new(5);
975 let namespace = b"consensus".to_vec();
976
977 let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
979 let supervised = Arc::new(Mutex::new(Vec::new()));
980 let mut prev_checkpoint = None;
981
982 let mut rng = test_rng();
984 let Fixture {
985 participants,
986 schemes,
987 ..
988 } = fixture(&mut rng, &namespace, n);
989
990 let relay = Arc::new(mocks::relay::Relay::<Sha256Digest, S::PublicKey>::new());
992
993 loop {
994 let rng = rng.clone();
995 let participants = participants.clone();
996 let schemes = schemes.clone();
997 let shutdowns = shutdowns.clone();
998 let supervised = supervised.clone();
999 let relay = relay.clone();
1000 relay.deregister_all(); let f = |mut context: deterministic::Context| async move {
1003 let (network, mut oracle) = Network::new(
1005 context.with_label("network"),
1006 Config {
1007 max_size: 1024 * 1024,
1008 disconnect_on_block: true,
1009 tracked_peer_sets: None,
1010 },
1011 );
1012
1013 network.start();
1015
1016 let mut registrations = register_validators(&mut oracle, &participants).await;
1018
1019 let link = Link {
1021 latency: Duration::from_millis(50),
1022 jitter: Duration::from_millis(50),
1023 success_rate: 1.0,
1024 };
1025 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1026
1027 let elector = L::default();
1029 let relay = Arc::new(mocks::relay::Relay::new());
1030 let mut reporters = HashMap::new();
1031 let mut engine_handlers = Vec::new();
1032 for (idx, validator) in participants.iter().enumerate() {
1033 let context = context.with_label(&format!("validator_{}", *validator));
1035
1036 let reporter_config = mocks::reporter::Config {
1038 participants: participants.clone().try_into().unwrap(),
1039 scheme: schemes[idx].clone(),
1040 elector: elector.clone(),
1041 };
1042 let reporter = mocks::reporter::Reporter::new(rng.clone(), reporter_config);
1043 reporters.insert(validator.clone(), reporter.clone());
1044 let application_cfg = mocks::application::Config {
1045 hasher: Sha256::default(),
1046 relay: relay.clone(),
1047 me: validator.clone(),
1048 propose_latency: (10.0, 5.0),
1049 verify_latency: (10.0, 5.0),
1050 certify_latency: (10.0, 5.0),
1051 should_certify: mocks::application::Certifier::Sometimes,
1052 };
1053 let (actor, application) = mocks::application::Application::new(
1054 context.with_label("application"),
1055 application_cfg,
1056 );
1057 actor.start();
1058 let blocker = oracle.control(validator.clone());
1059 let cfg = config::Config {
1060 scheme: schemes[idx].clone(),
1061 elector: elector.clone(),
1062 blocker,
1063 automaton: application.clone(),
1064 relay: application.clone(),
1065 reporter: reporter.clone(),
1066 strategy: Sequential,
1067 partition: validator.to_string(),
1068 mailbox_size: 1024,
1069 epoch: Epoch::new(333),
1070 leader_timeout: Duration::from_secs(1),
1071 notarization_timeout: Duration::from_secs(2),
1072 nullify_retry: Duration::from_secs(10),
1073 fetch_timeout: Duration::from_secs(1),
1074 activity_timeout,
1075 skip_timeout,
1076 fetch_concurrent: 4,
1077 replay_buffer: NZUsize!(1024 * 1024),
1078 write_buffer: NZUsize!(1024 * 1024),
1079 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1080 };
1081 let engine = Engine::new(context.with_label("engine"), cfg);
1082
1083 let (pending, recovered, resolver) = registrations
1085 .remove(validator)
1086 .expect("validator should be registered");
1087 engine_handlers.push(engine.start(pending, recovered, resolver));
1088 }
1089
1090 let mut finalizers = Vec::new();
1092 for (_, reporter) in reporters.iter_mut() {
1093 let (mut latest, mut monitor) = reporter.subscribe().await;
1094 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1095 while latest < required_containers {
1096 latest = monitor.next().await.expect("event missing");
1097 }
1098 }));
1099 }
1100
1101 let wait =
1103 context.gen_range(Duration::from_millis(100)..Duration::from_millis(2_000));
1104 let result = select! {
1105 _ = context.sleep(wait) => {
1106 {
1108 let mut shutdowns = shutdowns.lock().unwrap();
1109 debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
1110 *shutdowns += 1;
1111 }
1112 supervised.lock().unwrap().push(reporters);
1113 false
1114 },
1115 _ = join_all(finalizers) => {
1116 let supervised = supervised.lock().unwrap();
1118 for reporters in supervised.iter() {
1119 for (_, reporter) in reporters.iter() {
1120 let faults = reporter.faults.lock().unwrap();
1121 assert!(faults.is_empty());
1122 }
1123 }
1124 true
1125 }
1126 };
1127
1128 let blocked = oracle.blocked().await.unwrap();
1130 assert!(blocked.is_empty());
1131
1132 result
1133 };
1134
1135 let (complete, checkpoint) = prev_checkpoint
1136 .map_or_else(
1137 || deterministic::Runner::timed(Duration::from_secs(180)),
1138 deterministic::Runner::from,
1139 )
1140 .start_and_recover(f);
1141
1142 if complete {
1144 break;
1145 }
1146
1147 prev_checkpoint = Some(checkpoint);
1148 }
1149 }
1150
1151 #[test_group("slow")]
1152 #[test_traced]
1153 fn test_unclean_shutdown() {
1154 unclean_shutdown::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
1155 unclean_shutdown::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
1156 unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1157 unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1158 unclean_shutdown::<_, _, RoundRobin>(ed25519::fixture);
1159 unclean_shutdown::<_, _, RoundRobin>(secp256r1::fixture);
1160 }
1161
1162 fn backfill<S, F, L>(mut fixture: F)
1163 where
1164 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1165 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1166 L: Elector<S>,
1167 {
1168 let n = 4;
1170 let required_containers = View::new(100);
1171 let activity_timeout = ViewDelta::new(10);
1172 let skip_timeout = ViewDelta::new(5);
1173 let namespace = b"consensus".to_vec();
1174 let executor = deterministic::Runner::timed(Duration::from_secs(240));
1175 executor.start(|mut context| async move {
1176 let (network, mut oracle) = Network::new(
1178 context.with_label("network"),
1179 Config {
1180 max_size: 1024 * 1024,
1181 disconnect_on_block: true,
1182 tracked_peer_sets: None,
1183 },
1184 );
1185
1186 network.start();
1188
1189 let Fixture {
1191 participants,
1192 schemes,
1193 ..
1194 } = fixture(&mut context, &namespace, n);
1195 let mut registrations = register_validators(&mut oracle, &participants).await;
1196
1197 let link = Link {
1199 latency: Duration::from_millis(10),
1200 jitter: Duration::from_millis(1),
1201 success_rate: 1.0,
1202 };
1203 link_validators(
1204 &mut oracle,
1205 &participants,
1206 Action::Link(link),
1207 Some(|_, i, j| ![i, j].contains(&0usize)),
1208 )
1209 .await;
1210
1211 let elector = L::default();
1213 let relay = Arc::new(mocks::relay::Relay::new());
1214 let mut reporters = Vec::new();
1215 let mut engine_handlers = Vec::new();
1216 for (idx_scheme, validator) in participants.iter().enumerate() {
1217 if idx_scheme == 0 {
1219 continue;
1220 }
1221
1222 let context = context.with_label(&format!("validator_{}", *validator));
1224
1225 let reporter_config = mocks::reporter::Config {
1227 participants: participants.clone().try_into().unwrap(),
1228 scheme: schemes[idx_scheme].clone(),
1229 elector: elector.clone(),
1230 };
1231 let reporter =
1232 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1233 reporters.push(reporter.clone());
1234 let application_cfg = mocks::application::Config {
1235 hasher: Sha256::default(),
1236 relay: relay.clone(),
1237 me: validator.clone(),
1238 propose_latency: (10.0, 5.0),
1239 verify_latency: (10.0, 5.0),
1240 certify_latency: (10.0, 5.0),
1241 should_certify: mocks::application::Certifier::Sometimes,
1242 };
1243 let (actor, application) = mocks::application::Application::new(
1244 context.with_label("application"),
1245 application_cfg,
1246 );
1247 actor.start();
1248 let blocker = oracle.control(validator.clone());
1249 let cfg = config::Config {
1250 scheme: schemes[idx_scheme].clone(),
1251 elector: elector.clone(),
1252 blocker,
1253 automaton: application.clone(),
1254 relay: application.clone(),
1255 reporter: reporter.clone(),
1256 strategy: Sequential,
1257 partition: validator.to_string(),
1258 mailbox_size: 1024,
1259 epoch: Epoch::new(333),
1260 leader_timeout: Duration::from_secs(1),
1261 notarization_timeout: Duration::from_secs(2),
1262 nullify_retry: Duration::from_secs(10),
1263 fetch_timeout: Duration::from_secs(1),
1264 activity_timeout,
1265 skip_timeout,
1266 fetch_concurrent: 4,
1267 replay_buffer: NZUsize!(1024 * 1024),
1268 write_buffer: NZUsize!(1024 * 1024),
1269 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1270 };
1271 let engine = Engine::new(context.with_label("engine"), cfg);
1272
1273 let (pending, recovered, resolver) = registrations
1275 .remove(validator)
1276 .expect("validator should be registered");
1277 engine_handlers.push(engine.start(pending, recovered, resolver));
1278 }
1279
1280 let mut finalizers = Vec::new();
1282 for reporter in reporters.iter_mut() {
1283 let (mut latest, mut monitor) = reporter.subscribe().await;
1284 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1285 while latest < required_containers {
1286 latest = monitor.next().await.expect("event missing");
1287 }
1288 }));
1289 }
1290 join_all(finalizers).await;
1291
1292 let link = Link {
1294 latency: Duration::from_secs(3),
1295 jitter: Duration::from_millis(0),
1296 success_rate: 1.0,
1297 };
1298 link_validators(
1299 &mut oracle,
1300 &participants,
1301 Action::Update(link.clone()),
1302 Some(|_, i, j| ![i, j].contains(&0usize)),
1303 )
1304 .await;
1305
1306 context.sleep(Duration::from_secs(60)).await;
1308
1309 link_validators(
1311 &mut oracle,
1312 &participants,
1313 Action::Unlink,
1314 Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
1315 )
1316 .await;
1317
1318 let me = participants[0].clone();
1320 let context = context.with_label(&format!("validator_{me}"));
1321
1322 link_validators(
1324 &mut oracle,
1325 &participants,
1326 Action::Link(link),
1327 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
1328 )
1329 .await;
1330
1331 let link = Link {
1333 latency: Duration::from_millis(10),
1334 jitter: Duration::from_millis(3),
1335 success_rate: 1.0,
1336 };
1337 link_validators(
1338 &mut oracle,
1339 &participants,
1340 Action::Update(link),
1341 Some(|_, i, j| ![i, j].contains(&1usize)),
1342 )
1343 .await;
1344
1345 let reporter_config = mocks::reporter::Config {
1347 participants: participants.clone().try_into().unwrap(),
1348 scheme: schemes[0].clone(),
1349 elector: elector.clone(),
1350 };
1351 let mut reporter =
1352 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1353 reporters.push(reporter.clone());
1354 let application_cfg = mocks::application::Config {
1355 hasher: Sha256::default(),
1356 relay: relay.clone(),
1357 me: me.clone(),
1358 propose_latency: (10.0, 5.0),
1359 verify_latency: (10.0, 5.0),
1360 certify_latency: (10.0, 5.0),
1361 should_certify: mocks::application::Certifier::Sometimes,
1362 };
1363 let (actor, application) = mocks::application::Application::new(
1364 context.with_label("application"),
1365 application_cfg,
1366 );
1367 actor.start();
1368 let blocker = oracle.control(me.clone());
1369 let cfg = config::Config {
1370 scheme: schemes[0].clone(),
1371 elector: elector.clone(),
1372 blocker,
1373 automaton: application.clone(),
1374 relay: application.clone(),
1375 reporter: reporter.clone(),
1376 strategy: Sequential,
1377 partition: me.to_string(),
1378 mailbox_size: 1024,
1379 epoch: Epoch::new(333),
1380 leader_timeout: Duration::from_secs(1),
1381 notarization_timeout: Duration::from_secs(2),
1382 nullify_retry: Duration::from_secs(10),
1383 fetch_timeout: Duration::from_secs(1),
1384 activity_timeout,
1385 skip_timeout,
1386 fetch_concurrent: 4,
1387 replay_buffer: NZUsize!(1024 * 1024),
1388 write_buffer: NZUsize!(1024 * 1024),
1389 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1390 };
1391 let engine = Engine::new(context.with_label("engine"), cfg);
1392
1393 let (pending, recovered, resolver) = registrations
1395 .remove(&me)
1396 .expect("validator should be registered");
1397 engine_handlers.push(engine.start(pending, recovered, resolver));
1398
1399 let (mut latest, mut monitor) = reporter.subscribe().await;
1401 while latest < required_containers {
1402 latest = monitor.next().await.expect("event missing");
1403 }
1404
1405 let blocked = oracle.blocked().await.unwrap();
1407 assert!(blocked.is_empty());
1408 });
1409 }
1410
1411 #[test_traced]
1412 fn test_backfill() {
1413 backfill::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
1414 backfill::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
1415 backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1416 backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1417 backfill::<_, _, RoundRobin>(ed25519::fixture);
1418 backfill::<_, _, RoundRobin>(secp256r1::fixture);
1419 }
1420
1421 fn one_offline<S, F, L>(mut fixture: F)
1422 where
1423 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1424 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1425 L: Elector<S>,
1426 {
1427 let n = 5;
1429 let quorum = quorum(n) as usize;
1430 let required_containers = View::new(100);
1431 let activity_timeout = ViewDelta::new(10);
1432 let skip_timeout = ViewDelta::new(5);
1433 let max_exceptions = 10;
1434 let namespace = b"consensus".to_vec();
1435 let executor = deterministic::Runner::timed(Duration::from_secs(300));
1436 executor.start(|mut context| async move {
1437 let (network, mut oracle) = Network::new(
1439 context.with_label("network"),
1440 Config {
1441 max_size: 1024 * 1024,
1442 disconnect_on_block: true,
1443 tracked_peer_sets: None,
1444 },
1445 );
1446
1447 network.start();
1449
1450 let Fixture {
1452 participants,
1453 schemes,
1454 ..
1455 } = fixture(&mut context, &namespace, n);
1456 let mut registrations = register_validators(&mut oracle, &participants).await;
1457
1458 let link = Link {
1460 latency: Duration::from_millis(10),
1461 jitter: Duration::from_millis(1),
1462 success_rate: 1.0,
1463 };
1464 link_validators(
1465 &mut oracle,
1466 &participants,
1467 Action::Link(link),
1468 Some(|_, i, j| ![i, j].contains(&0usize)),
1469 )
1470 .await;
1471
1472 let elector = L::default();
1474 let relay = Arc::new(mocks::relay::Relay::new());
1475 let mut reporters = Vec::new();
1476 let mut engine_handlers = Vec::new();
1477 for (idx_scheme, validator) in participants.iter().enumerate() {
1478 if idx_scheme == 0 {
1480 continue;
1481 }
1482
1483 let context = context.with_label(&format!("validator_{}", *validator));
1485
1486 let reporter_config = mocks::reporter::Config {
1488 participants: participants.clone().try_into().unwrap(),
1489 scheme: schemes[idx_scheme].clone(),
1490 elector: elector.clone(),
1491 };
1492 let reporter =
1493 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1494 reporters.push(reporter.clone());
1495 let application_cfg = mocks::application::Config {
1496 hasher: Sha256::default(),
1497 relay: relay.clone(),
1498 me: validator.clone(),
1499 propose_latency: (10.0, 5.0),
1500 verify_latency: (10.0, 5.0),
1501 certify_latency: (10.0, 5.0),
1502 should_certify: mocks::application::Certifier::Sometimes,
1503 };
1504 let (actor, application) = mocks::application::Application::new(
1505 context.with_label("application"),
1506 application_cfg,
1507 );
1508 actor.start();
1509 let blocker = oracle.control(validator.clone());
1510 let cfg = config::Config {
1511 scheme: schemes[idx_scheme].clone(),
1512 elector: elector.clone(),
1513 blocker,
1514 automaton: application.clone(),
1515 relay: application.clone(),
1516 reporter: reporter.clone(),
1517 strategy: Sequential,
1518 partition: validator.to_string(),
1519 mailbox_size: 1024,
1520 epoch: Epoch::new(333),
1521 leader_timeout: Duration::from_secs(1),
1522 notarization_timeout: Duration::from_secs(2),
1523 nullify_retry: Duration::from_secs(10),
1524 fetch_timeout: Duration::from_secs(1),
1525 activity_timeout,
1526 skip_timeout,
1527 fetch_concurrent: 4,
1528 replay_buffer: NZUsize!(1024 * 1024),
1529 write_buffer: NZUsize!(1024 * 1024),
1530 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1531 };
1532 let engine = Engine::new(context.with_label("engine"), cfg);
1533
1534 let (pending, recovered, resolver) = registrations
1536 .remove(validator)
1537 .expect("validator should be registered");
1538 engine_handlers.push(engine.start(pending, recovered, resolver));
1539 }
1540
1541 let mut finalizers = Vec::new();
1543 for reporter in reporters.iter_mut() {
1544 let (mut latest, mut monitor) = reporter.subscribe().await;
1545 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1546 while latest < required_containers {
1547 latest = monitor.next().await.expect("event missing");
1548 }
1549 }));
1550 }
1551 join_all(finalizers).await;
1552
1553 let exceptions = 0;
1555 let offline = &participants[0];
1556 for reporter in reporters.iter() {
1557 {
1559 let faults = reporter.faults.lock().unwrap();
1560 assert!(faults.is_empty());
1561 }
1562
1563 {
1565 let invalid = reporter.invalid.lock().unwrap();
1566 assert_eq!(*invalid, 0);
1567 }
1568
1569 let mut exceptions = 0;
1571 {
1572 let notarizes = reporter.notarizes.lock().unwrap();
1573 for (view, payloads) in notarizes.iter() {
1574 for (_, participants) in payloads.iter() {
1575 if participants.contains(offline) {
1576 panic!("view: {view}");
1577 }
1578 }
1579 }
1580 }
1581 {
1582 let nullifies = reporter.nullifies.lock().unwrap();
1583 for (view, participants) in nullifies.iter() {
1584 if participants.contains(offline) {
1585 panic!("view: {view}");
1586 }
1587 }
1588 }
1589 {
1590 let finalizes = reporter.finalizes.lock().unwrap();
1591 for (view, payloads) in finalizes.iter() {
1592 for (_, finalizers) in payloads.iter() {
1593 if finalizers.contains(offline) {
1594 panic!("view: {view}");
1595 }
1596 }
1597 }
1598 }
1599
1600 let mut offline_views = Vec::new();
1602 {
1603 let leaders = reporter.leaders.lock().unwrap();
1604 for (view, leader) in leaders.iter() {
1605 if leader == offline {
1606 offline_views.push(*view);
1607 }
1608 }
1609 }
1610 assert!(!offline_views.is_empty());
1611
1612 {
1614 let nullifies = reporter.nullifies.lock().unwrap();
1615 for view in offline_views.iter() {
1616 let nullifies = nullifies.get(view).map_or(0, |n| n.len());
1617 if nullifies < quorum {
1618 warn!("missing expected view nullifies: {}", view);
1619 exceptions += 1;
1620 }
1621 }
1622 }
1623 {
1624 let nullifications = reporter.nullifications.lock().unwrap();
1625 for view in offline_views.iter() {
1626 if !nullifications.contains_key(view) {
1627 warn!("missing expected view nullifies: {}", view);
1628 exceptions += 1;
1629 }
1630 }
1631 }
1632
1633 assert!(exceptions <= max_exceptions);
1635 }
1636 assert!(exceptions <= max_exceptions);
1637
1638 let blocked = oracle.blocked().await.unwrap();
1640 assert!(blocked.is_empty());
1641
1642 let encoded = context.encode();
1644 let lines = encoded.lines();
1645 let mut skipped_views = 0;
1646 let mut nodes_skipping = 0;
1647 for line in lines {
1648 if line.contains("_skipped_views_total") {
1649 let parts: Vec<&str> = line.split_whitespace().collect();
1650 if let Some(number_str) = parts.last() {
1651 if let Ok(number) = number_str.parse::<u64>() {
1652 if number > 0 {
1653 nodes_skipping += 1;
1654 }
1655 if number > skipped_views {
1656 skipped_views = number;
1657 }
1658 }
1659 }
1660 }
1661 }
1662 assert!(
1663 skipped_views > 0,
1664 "expected skipped views to be greater than 0"
1665 );
1666 assert_eq!(
1667 nodes_skipping,
1668 n - 1,
1669 "expected all online nodes to be skipping views"
1670 );
1671 });
1672 }
1673
1674 #[test_traced]
1675 fn test_one_offline() {
1676 one_offline::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
1677 one_offline::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
1678 one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1679 one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1680 one_offline::<_, _, RoundRobin>(ed25519::fixture);
1681 one_offline::<_, _, RoundRobin>(secp256r1::fixture);
1682 }
1683
1684 fn slow_validator<S, F, L>(mut fixture: F)
1685 where
1686 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1687 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1688 L: Elector<S>,
1689 {
1690 let n = 5;
1692 let required_containers = View::new(50);
1693 let activity_timeout = ViewDelta::new(10);
1694 let skip_timeout = ViewDelta::new(5);
1695 let namespace = b"consensus".to_vec();
1696 let executor = deterministic::Runner::timed(Duration::from_secs(300));
1697 executor.start(|mut context| async move {
1698 let (network, mut oracle) = Network::new(
1700 context.with_label("network"),
1701 Config {
1702 max_size: 1024 * 1024,
1703 disconnect_on_block: true,
1704 tracked_peer_sets: None,
1705 },
1706 );
1707
1708 network.start();
1710
1711 let Fixture {
1713 participants,
1714 schemes,
1715 ..
1716 } = fixture(&mut context, &namespace, n);
1717 let mut registrations = register_validators(&mut oracle, &participants).await;
1718
1719 let link = Link {
1721 latency: Duration::from_millis(10),
1722 jitter: Duration::from_millis(1),
1723 success_rate: 1.0,
1724 };
1725 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1726
1727 let elector = L::default();
1729 let relay = Arc::new(mocks::relay::Relay::new());
1730 let mut reporters = Vec::new();
1731 let mut engine_handlers = Vec::new();
1732 for (idx_scheme, validator) in participants.iter().enumerate() {
1733 let context = context.with_label(&format!("validator_{}", *validator));
1735
1736 let reporter_config = mocks::reporter::Config {
1738 participants: participants.clone().try_into().unwrap(),
1739 scheme: schemes[idx_scheme].clone(),
1740 elector: elector.clone(),
1741 };
1742 let reporter =
1743 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1744 reporters.push(reporter.clone());
1745 let application_cfg = if idx_scheme == 0 {
1746 mocks::application::Config {
1747 hasher: Sha256::default(),
1748 relay: relay.clone(),
1749 me: validator.clone(),
1750 propose_latency: (10_000.0, 0.0),
1751 verify_latency: (10_000.0, 5.0),
1752 certify_latency: (10_000.0, 5.0),
1753 should_certify: mocks::application::Certifier::Sometimes,
1754 }
1755 } else {
1756 mocks::application::Config {
1757 hasher: Sha256::default(),
1758 relay: relay.clone(),
1759 me: validator.clone(),
1760 propose_latency: (10.0, 5.0),
1761 verify_latency: (10.0, 5.0),
1762 certify_latency: (10.0, 5.0),
1763 should_certify: mocks::application::Certifier::Sometimes,
1764 }
1765 };
1766 let (actor, application) = mocks::application::Application::new(
1767 context.with_label("application"),
1768 application_cfg,
1769 );
1770 actor.start();
1771 let blocker = oracle.control(validator.clone());
1772 let cfg = config::Config {
1773 scheme: schemes[idx_scheme].clone(),
1774 elector: elector.clone(),
1775 blocker,
1776 automaton: application.clone(),
1777 relay: application.clone(),
1778 reporter: reporter.clone(),
1779 strategy: Sequential,
1780 partition: validator.to_string(),
1781 mailbox_size: 1024,
1782 epoch: Epoch::new(333),
1783 leader_timeout: Duration::from_secs(1),
1784 notarization_timeout: Duration::from_secs(2),
1785 nullify_retry: Duration::from_secs(10),
1786 fetch_timeout: Duration::from_secs(1),
1787 activity_timeout,
1788 skip_timeout,
1789 fetch_concurrent: 4,
1790 replay_buffer: NZUsize!(1024 * 1024),
1791 write_buffer: NZUsize!(1024 * 1024),
1792 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1793 };
1794 let engine = Engine::new(context.with_label("engine"), cfg);
1795
1796 let (pending, recovered, resolver) = registrations
1798 .remove(validator)
1799 .expect("validator should be registered");
1800 engine_handlers.push(engine.start(pending, recovered, resolver));
1801 }
1802
1803 let mut finalizers = Vec::new();
1805 for reporter in reporters.iter_mut() {
1806 let (mut latest, mut monitor) = reporter.subscribe().await;
1807 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1808 while latest < required_containers {
1809 latest = monitor.next().await.expect("event missing");
1810 }
1811 }));
1812 }
1813 join_all(finalizers).await;
1814
1815 let slow = &participants[0];
1817 for reporter in reporters.iter() {
1818 {
1820 let faults = reporter.faults.lock().unwrap();
1821 assert!(faults.is_empty());
1822 }
1823
1824 {
1826 let invalid = reporter.invalid.lock().unwrap();
1827 assert_eq!(*invalid, 0);
1828 }
1829
1830 let mut observed = false;
1832 {
1833 let notarizes = reporter.notarizes.lock().unwrap();
1834 for (_, payloads) in notarizes.iter() {
1835 for (_, participants) in payloads.iter() {
1836 if participants.contains(slow) {
1837 observed = true;
1838 break;
1839 }
1840 }
1841 }
1842 }
1843 {
1844 let finalizes = reporter.finalizes.lock().unwrap();
1845 for (_, payloads) in finalizes.iter() {
1846 for (_, finalizers) in payloads.iter() {
1847 if finalizers.contains(slow) {
1848 observed = true;
1849 break;
1850 }
1851 }
1852 }
1853 }
1854 assert!(observed);
1855 }
1856
1857 let blocked = oracle.blocked().await.unwrap();
1859 assert!(blocked.is_empty());
1860 });
1861 }
1862
1863 #[test_traced]
1864 fn test_slow_validator() {
1865 slow_validator::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
1866 slow_validator::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
1867 slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1868 slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1869 slow_validator::<_, _, RoundRobin>(ed25519::fixture);
1870 slow_validator::<_, _, RoundRobin>(secp256r1::fixture);
1871 }
1872
1873 fn all_recovery<S, F, L>(mut fixture: F)
1874 where
1875 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1876 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1877 L: Elector<S>,
1878 {
1879 let n = 5;
1881 let required_containers = View::new(100);
1882 let activity_timeout = ViewDelta::new(10);
1883 let skip_timeout = ViewDelta::new(2);
1884 let namespace = b"consensus".to_vec();
1885 let executor = deterministic::Runner::timed(Duration::from_secs(1800));
1886 executor.start(|mut context| async move {
1887 let (network, mut oracle) = Network::new(
1889 context.with_label("network"),
1890 Config {
1891 max_size: 1024 * 1024,
1892 disconnect_on_block: false,
1893 tracked_peer_sets: None,
1894 },
1895 );
1896
1897 network.start();
1899
1900 let Fixture {
1902 participants,
1903 schemes,
1904 ..
1905 } = fixture(&mut context, &namespace, n);
1906 let mut registrations = register_validators(&mut oracle, &participants).await;
1907
1908 let link = Link {
1910 latency: Duration::from_secs(3),
1911 jitter: Duration::from_millis(0),
1912 success_rate: 1.0,
1913 };
1914 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1915
1916 let elector = L::default();
1918 let relay = Arc::new(mocks::relay::Relay::new());
1919 let mut reporters = Vec::new();
1920 let mut engine_handlers = Vec::new();
1921 for (idx, validator) in participants.iter().enumerate() {
1922 let context = context.with_label(&format!("validator_{}", *validator));
1924
1925 let reporter_config = mocks::reporter::Config {
1927 participants: participants.clone().try_into().unwrap(),
1928 scheme: schemes[idx].clone(),
1929 elector: elector.clone(),
1930 };
1931 let reporter =
1932 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1933 reporters.push(reporter.clone());
1934 let application_cfg = mocks::application::Config {
1935 hasher: Sha256::default(),
1936 relay: relay.clone(),
1937 me: validator.clone(),
1938 propose_latency: (10.0, 5.0),
1939 verify_latency: (10.0, 5.0),
1940 certify_latency: (10.0, 5.0),
1941 should_certify: mocks::application::Certifier::Sometimes,
1942 };
1943 let (actor, application) = mocks::application::Application::new(
1944 context.with_label("application"),
1945 application_cfg,
1946 );
1947 actor.start();
1948 let blocker = oracle.control(validator.clone());
1949 let cfg = config::Config {
1950 scheme: schemes[idx].clone(),
1951 elector: elector.clone(),
1952 blocker,
1953 automaton: application.clone(),
1954 relay: application.clone(),
1955 reporter: reporter.clone(),
1956 strategy: Sequential,
1957 partition: validator.to_string(),
1958 mailbox_size: 1024,
1959 epoch: Epoch::new(333),
1960 leader_timeout: Duration::from_secs(1),
1961 notarization_timeout: Duration::from_secs(2),
1962 nullify_retry: Duration::from_secs(10),
1963 fetch_timeout: Duration::from_secs(1),
1964 activity_timeout,
1965 skip_timeout,
1966 fetch_concurrent: 4,
1967 replay_buffer: NZUsize!(1024 * 1024),
1968 write_buffer: NZUsize!(1024 * 1024),
1969 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1970 };
1971 let engine = Engine::new(context.with_label("engine"), cfg);
1972
1973 let (pending, recovered, resolver) = registrations
1975 .remove(validator)
1976 .expect("validator should be registered");
1977 engine_handlers.push(engine.start(pending, recovered, resolver));
1978 }
1979
1980 let mut finalizers = Vec::new();
1982 for reporter in reporters.iter_mut() {
1983 let (_, mut monitor) = reporter.subscribe().await;
1984 finalizers.push(
1985 context
1986 .with_label("finalizer")
1987 .spawn(move |context| async move {
1988 select! {
1989 _timeout = context.sleep(Duration::from_secs(60)) => {},
1990 _done = monitor.next() => {
1991 panic!("engine should not notarize or finalize anything");
1992 }
1993 }
1994 }),
1995 );
1996 }
1997 join_all(finalizers).await;
1998
1999 link_validators(&mut oracle, &participants, Action::Unlink, None).await;
2001
2002 context.sleep(Duration::from_secs(60)).await;
2004
2005 let mut latest = View::zero();
2007 for reporter in reporters.iter() {
2008 let nullifies = reporter.nullifies.lock().unwrap();
2009 let max = nullifies.keys().max().unwrap();
2010 if *max > latest {
2011 latest = *max;
2012 }
2013 }
2014
2015 let link = Link {
2017 latency: Duration::from_millis(10),
2018 jitter: Duration::from_millis(1),
2019 success_rate: 1.0,
2020 };
2021 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2022
2023 let mut finalizers = Vec::new();
2025 for reporter in reporters.iter_mut() {
2026 let (mut latest, mut monitor) = reporter.subscribe().await;
2027 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2028 while latest < required_containers {
2029 latest = monitor.next().await.expect("event missing");
2030 }
2031 }));
2032 }
2033 join_all(finalizers).await;
2034
2035 for reporter in reporters.iter() {
2037 {
2039 let faults = reporter.faults.lock().unwrap();
2040 assert!(faults.is_empty());
2041 }
2042
2043 {
2045 let invalid = reporter.invalid.lock().unwrap();
2046 assert_eq!(*invalid, 0);
2047 }
2048
2049 {
2054 let mut found = 0;
2058 let notarizations = reporter.notarizations.lock().unwrap();
2059 for view in View::range(latest, latest.saturating_add(activity_timeout)) {
2060 if notarizations.contains_key(&view) {
2061 found += 1;
2062 }
2063 }
2064 assert!(
2065 found >= activity_timeout.get().saturating_sub(2),
2066 "found: {found}"
2067 );
2068 }
2069 }
2070
2071 let blocked = oracle.blocked().await.unwrap();
2073 assert!(blocked.is_empty());
2074 });
2075 }
2076
2077 #[test_traced]
2078 fn test_all_recovery() {
2079 all_recovery::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
2080 all_recovery::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
2081 all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2082 all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2083 all_recovery::<_, _, RoundRobin>(ed25519::fixture);
2084 all_recovery::<_, _, RoundRobin>(secp256r1::fixture);
2085 }
2086
2087 fn partition<S, F, L>(mut fixture: F)
2088 where
2089 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2090 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2091 L: Elector<S>,
2092 {
2093 let n = 10;
2095 let required_containers = View::new(50);
2096 let activity_timeout = ViewDelta::new(10);
2097 let skip_timeout = ViewDelta::new(5);
2098 let namespace = b"consensus".to_vec();
2099 let executor = deterministic::Runner::timed(Duration::from_secs(900));
2100 executor.start(|mut context| async move {
2101 let (network, mut oracle) = Network::new(
2103 context.with_label("network"),
2104 Config {
2105 max_size: 1024 * 1024,
2106 disconnect_on_block: false,
2107 tracked_peer_sets: None,
2108 },
2109 );
2110
2111 network.start();
2113
2114 let Fixture {
2116 participants,
2117 schemes,
2118 ..
2119 } = fixture(&mut context, &namespace, n);
2120 let mut registrations = register_validators(&mut oracle, &participants).await;
2121
2122 let link = Link {
2124 latency: Duration::from_millis(10),
2125 jitter: Duration::from_millis(1),
2126 success_rate: 1.0,
2127 };
2128 link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
2129
2130 let elector = L::default();
2132 let relay = Arc::new(mocks::relay::Relay::new());
2133 let mut reporters = Vec::new();
2134 let mut engine_handlers = Vec::new();
2135 for (idx, validator) in participants.iter().enumerate() {
2136 let context = context.with_label(&format!("validator_{}", *validator));
2138
2139 let reporter_config = mocks::reporter::Config {
2141 participants: participants.clone().try_into().unwrap(),
2142 scheme: schemes[idx].clone(),
2143 elector: elector.clone(),
2144 };
2145 let reporter =
2146 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2147 reporters.push(reporter.clone());
2148 let application_cfg = mocks::application::Config {
2149 hasher: Sha256::default(),
2150 relay: relay.clone(),
2151 me: validator.clone(),
2152 propose_latency: (10.0, 5.0),
2153 verify_latency: (10.0, 5.0),
2154 certify_latency: (10.0, 5.0),
2155 should_certify: mocks::application::Certifier::Sometimes,
2156 };
2157 let (actor, application) = mocks::application::Application::new(
2158 context.with_label("application"),
2159 application_cfg,
2160 );
2161 actor.start();
2162 let blocker = oracle.control(validator.clone());
2163 let cfg = config::Config {
2164 scheme: schemes[idx].clone(),
2165 elector: elector.clone(),
2166 blocker,
2167 automaton: application.clone(),
2168 relay: application.clone(),
2169 reporter: reporter.clone(),
2170 strategy: Sequential,
2171 partition: validator.to_string(),
2172 mailbox_size: 1024,
2173 epoch: Epoch::new(333),
2174 leader_timeout: Duration::from_secs(1),
2175 notarization_timeout: Duration::from_secs(2),
2176 nullify_retry: Duration::from_secs(10),
2177 fetch_timeout: Duration::from_secs(1),
2178 activity_timeout,
2179 skip_timeout,
2180 fetch_concurrent: 4,
2181 replay_buffer: NZUsize!(1024 * 1024),
2182 write_buffer: NZUsize!(1024 * 1024),
2183 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2184 };
2185 let engine = Engine::new(context.with_label("engine"), cfg);
2186
2187 let (pending, recovered, resolver) = registrations
2189 .remove(validator)
2190 .expect("validator should be registered");
2191 engine_handlers.push(engine.start(pending, recovered, resolver));
2192 }
2193
2194 let mut finalizers = Vec::new();
2196 for reporter in reporters.iter_mut() {
2197 let (mut latest, mut monitor) = reporter.subscribe().await;
2198 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2199 while latest < required_containers {
2200 latest = monitor.next().await.expect("event missing");
2201 }
2202 }));
2203 }
2204 join_all(finalizers).await;
2205
2206 fn separated(n: usize, a: usize, b: usize) -> bool {
2208 let m = n / 2;
2209 (a < m && b >= m) || (a >= m && b < m)
2210 }
2211 link_validators(&mut oracle, &participants, Action::Unlink, Some(separated)).await;
2212
2213 context.sleep(Duration::from_secs(10)).await;
2215
2216 let mut finalizers = Vec::new();
2218 for reporter in reporters.iter_mut() {
2219 let (_, mut monitor) = reporter.subscribe().await;
2220 finalizers.push(
2221 context
2222 .with_label("finalizer")
2223 .spawn(move |context| async move {
2224 select! {
2225 _timeout = context.sleep(Duration::from_secs(60)) => {},
2226 _done = monitor.next() => {
2227 panic!("engine should not notarize or finalize anything");
2228 }
2229 }
2230 }),
2231 );
2232 }
2233 join_all(finalizers).await;
2234
2235 link_validators(
2237 &mut oracle,
2238 &participants,
2239 Action::Link(link),
2240 Some(separated),
2241 )
2242 .await;
2243
2244 let mut finalizers = Vec::new();
2246 for reporter in reporters.iter_mut() {
2247 let (mut latest, mut monitor) = reporter.subscribe().await;
2248 let required = latest.saturating_add(ViewDelta::new(required_containers.get()));
2249 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2250 while latest < required {
2251 latest = monitor.next().await.expect("event missing");
2252 }
2253 }));
2254 }
2255 join_all(finalizers).await;
2256
2257 for reporter in reporters.iter() {
2259 {
2261 let faults = reporter.faults.lock().unwrap();
2262 assert!(faults.is_empty());
2263 }
2264
2265 {
2267 let invalid = reporter.invalid.lock().unwrap();
2268 assert_eq!(*invalid, 0);
2269 }
2270 }
2271
2272 let blocked = oracle.blocked().await.unwrap();
2274 assert!(blocked.is_empty());
2275 });
2276 }
2277
2278 #[test_group("slow")]
2279 #[test_traced]
2280 fn test_partition() {
2281 partition::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
2282 partition::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
2283 partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2284 partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2285 partition::<_, _, RoundRobin>(ed25519::fixture);
2286 partition::<_, _, RoundRobin>(secp256r1::fixture);
2287 }
2288
2289 fn slow_and_lossy_links<S, F, L>(seed: u64, mut fixture: F) -> String
2290 where
2291 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2292 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2293 L: Elector<S>,
2294 {
2295 let n = 5;
2297 let required_containers = View::new(50);
2298 let activity_timeout = ViewDelta::new(10);
2299 let skip_timeout = ViewDelta::new(5);
2300 let namespace = b"consensus".to_vec();
2301 let cfg = deterministic::Config::new()
2302 .with_seed(seed)
2303 .with_timeout(Some(Duration::from_secs(5_000)));
2304 let executor = deterministic::Runner::new(cfg);
2305 executor.start(|mut context| async move {
2306 let (network, mut oracle) = Network::new(
2308 context.with_label("network"),
2309 Config {
2310 max_size: 1024 * 1024,
2311 disconnect_on_block: false,
2312 tracked_peer_sets: None,
2313 },
2314 );
2315
2316 network.start();
2318
2319 let Fixture {
2321 participants,
2322 schemes,
2323 ..
2324 } = fixture(&mut context, &namespace, n);
2325 let mut registrations = register_validators(&mut oracle, &participants).await;
2326
2327 let degraded_link = Link {
2329 latency: Duration::from_millis(200),
2330 jitter: Duration::from_millis(150),
2331 success_rate: 0.5,
2332 };
2333 link_validators(
2334 &mut oracle,
2335 &participants,
2336 Action::Link(degraded_link),
2337 None,
2338 )
2339 .await;
2340
2341 let elector = L::default();
2343 let relay = Arc::new(mocks::relay::Relay::new());
2344 let mut reporters = Vec::new();
2345 let mut engine_handlers = Vec::new();
2346 for (idx, validator) in participants.iter().enumerate() {
2347 let context = context.with_label(&format!("validator_{}", *validator));
2349
2350 let reporter_config = mocks::reporter::Config {
2352 participants: participants.clone().try_into().unwrap(),
2353 scheme: schemes[idx].clone(),
2354 elector: elector.clone(),
2355 };
2356 let reporter =
2357 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2358 reporters.push(reporter.clone());
2359 let application_cfg = mocks::application::Config {
2360 hasher: Sha256::default(),
2361 relay: relay.clone(),
2362 me: validator.clone(),
2363 propose_latency: (10.0, 5.0),
2364 verify_latency: (10.0, 5.0),
2365 certify_latency: (10.0, 5.0),
2366 should_certify: mocks::application::Certifier::Sometimes,
2367 };
2368 let (actor, application) = mocks::application::Application::new(
2369 context.with_label("application"),
2370 application_cfg,
2371 );
2372 actor.start();
2373 let blocker = oracle.control(validator.clone());
2374 let cfg = config::Config {
2375 scheme: schemes[idx].clone(),
2376 elector: elector.clone(),
2377 blocker,
2378 automaton: application.clone(),
2379 relay: application.clone(),
2380 reporter: reporter.clone(),
2381 strategy: Sequential,
2382 partition: validator.to_string(),
2383 mailbox_size: 1024,
2384 epoch: Epoch::new(333),
2385 leader_timeout: Duration::from_secs(1),
2386 notarization_timeout: Duration::from_secs(2),
2387 nullify_retry: Duration::from_secs(10),
2388 fetch_timeout: Duration::from_secs(1),
2389 activity_timeout,
2390 skip_timeout,
2391 fetch_concurrent: 4,
2392 replay_buffer: NZUsize!(1024 * 1024),
2393 write_buffer: NZUsize!(1024 * 1024),
2394 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2395 };
2396 let engine = Engine::new(context.with_label("engine"), cfg);
2397
2398 let (pending, recovered, resolver) = registrations
2400 .remove(validator)
2401 .expect("validator should be registered");
2402 engine_handlers.push(engine.start(pending, recovered, resolver));
2403 }
2404
2405 let mut finalizers = Vec::new();
2407 for reporter in reporters.iter_mut() {
2408 let (mut latest, mut monitor) = reporter.subscribe().await;
2409 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2410 while latest < required_containers {
2411 latest = monitor.next().await.expect("event missing");
2412 }
2413 }));
2414 }
2415 join_all(finalizers).await;
2416
2417 for reporter in reporters.iter() {
2419 {
2421 let faults = reporter.faults.lock().unwrap();
2422 assert!(faults.is_empty());
2423 }
2424
2425 {
2427 let invalid = reporter.invalid.lock().unwrap();
2428 assert_eq!(*invalid, 0);
2429 }
2430 }
2431
2432 let blocked = oracle.blocked().await.unwrap();
2434 assert!(blocked.is_empty());
2435
2436 context.auditor().state()
2437 })
2438 }
2439
2440 #[test_traced]
2441 fn test_slow_and_lossy_links() {
2442 slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold::fixture::<MinPk, _>);
2443 slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold::fixture::<MinSig, _>);
2444 slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinPk, _>);
2445 slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinSig, _>);
2446 slow_and_lossy_links::<_, _, RoundRobin>(0, ed25519::fixture);
2447 slow_and_lossy_links::<_, _, RoundRobin>(0, secp256r1::fixture);
2448 }
2449
2450 #[test_group("slow")]
2451 #[test_traced]
2452 fn test_determinism() {
2453 for seed in 1..6 {
2456 let ts_pk_state_1 =
2457 slow_and_lossy_links::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
2458 let ts_pk_state_2 =
2459 slow_and_lossy_links::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
2460 assert_eq!(ts_pk_state_1, ts_pk_state_2);
2461
2462 let ts_sig_state_1 = slow_and_lossy_links::<_, _, Random>(
2463 seed,
2464 bls12381_threshold::fixture::<MinSig, _>,
2465 );
2466 let ts_sig_state_2 = slow_and_lossy_links::<_, _, Random>(
2467 seed,
2468 bls12381_threshold::fixture::<MinSig, _>,
2469 );
2470 assert_eq!(ts_sig_state_1, ts_sig_state_2);
2471
2472 let ms_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2473 seed,
2474 bls12381_multisig::fixture::<MinPk, _>,
2475 );
2476 let ms_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2477 seed,
2478 bls12381_multisig::fixture::<MinPk, _>,
2479 );
2480 assert_eq!(ms_pk_state_1, ms_pk_state_2);
2481
2482 let ms_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2483 seed,
2484 bls12381_multisig::fixture::<MinSig, _>,
2485 );
2486 let ms_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2487 seed,
2488 bls12381_multisig::fixture::<MinSig, _>,
2489 );
2490 assert_eq!(ms_sig_state_1, ms_sig_state_2);
2491
2492 let ed_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
2493 let ed_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
2494 assert_eq!(ed_state_1, ed_state_2);
2495
2496 let secp_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
2497 let secp_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
2498 assert_eq!(secp_state_1, secp_state_2);
2499
2500 let states = [
2501 ("threshold-minpk", ts_pk_state_1),
2502 ("threshold-minsig", ts_sig_state_1),
2503 ("multisig-minpk", ms_pk_state_1),
2504 ("multisig-minsig", ms_sig_state_1),
2505 ("ed25519", ed_state_1),
2506 ("secp256r1", secp_state_1),
2507 ];
2508
2509 for pair in states.windows(2) {
2511 assert_ne!(
2512 pair[0].1, pair[1].1,
2513 "state {} equals state {}",
2514 pair[0].0, pair[1].0
2515 );
2516 }
2517 }
2518 }
2519
2520 fn conflicter<S, F, L>(seed: u64, mut fixture: F)
2521 where
2522 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2523 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2524 L: Elector<S>,
2525 {
2526 let n = 4;
2528 let required_containers = View::new(50);
2529 let activity_timeout = ViewDelta::new(10);
2530 let skip_timeout = ViewDelta::new(5);
2531 let namespace = b"consensus".to_vec();
2532 let cfg = deterministic::Config::new()
2533 .with_seed(seed)
2534 .with_timeout(Some(Duration::from_secs(30)));
2535 let executor = deterministic::Runner::new(cfg);
2536 executor.start(|mut context| async move {
2537 let (network, mut oracle) = Network::new(
2539 context.with_label("network"),
2540 Config {
2541 max_size: 1024 * 1024,
2542 disconnect_on_block: false,
2543 tracked_peer_sets: None,
2544 },
2545 );
2546
2547 network.start();
2549
2550 let Fixture {
2552 participants,
2553 schemes,
2554 ..
2555 } = fixture(&mut context, &namespace, n);
2556 let mut registrations = register_validators(&mut oracle, &participants).await;
2557
2558 let link = Link {
2560 latency: Duration::from_millis(10),
2561 jitter: Duration::from_millis(1),
2562 success_rate: 1.0,
2563 };
2564 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2565
2566 let elector = L::default();
2568 let relay = Arc::new(mocks::relay::Relay::new());
2569 let mut reporters = Vec::new();
2570 for (idx_scheme, validator) in participants.iter().enumerate() {
2571 let context = context.with_label(&format!("validator_{}", *validator));
2573
2574 let reporter_config = mocks::reporter::Config {
2576 participants: participants.clone().try_into().unwrap(),
2577 scheme: schemes[idx_scheme].clone(),
2578 elector: elector.clone(),
2579 };
2580 let reporter =
2581 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2582 let (pending, recovered, resolver) = registrations
2583 .remove(validator)
2584 .expect("validator should be registered");
2585 if idx_scheme == 0 {
2586 let cfg = mocks::conflicter::Config {
2587 scheme: schemes[idx_scheme].clone(),
2588 };
2589
2590 let engine: mocks::conflicter::Conflicter<_, _, Sha256> =
2591 mocks::conflicter::Conflicter::new(
2592 context.with_label("byzantine_engine"),
2593 cfg,
2594 );
2595 engine.start(pending);
2596 } else {
2597 reporters.push(reporter.clone());
2598 let application_cfg = mocks::application::Config {
2599 hasher: Sha256::default(),
2600 relay: relay.clone(),
2601 me: validator.clone(),
2602 propose_latency: (10.0, 5.0),
2603 verify_latency: (10.0, 5.0),
2604 certify_latency: (10.0, 5.0),
2605 should_certify: mocks::application::Certifier::Sometimes,
2606 };
2607 let (actor, application) = mocks::application::Application::new(
2608 context.with_label("application"),
2609 application_cfg,
2610 );
2611 actor.start();
2612 let blocker = oracle.control(validator.clone());
2613 let cfg = config::Config {
2614 scheme: schemes[idx_scheme].clone(),
2615 elector: elector.clone(),
2616 blocker,
2617 automaton: application.clone(),
2618 relay: application.clone(),
2619 reporter: reporter.clone(),
2620 strategy: Sequential,
2621 partition: validator.to_string(),
2622 mailbox_size: 1024,
2623 epoch: Epoch::new(333),
2624 leader_timeout: Duration::from_secs(1),
2625 notarization_timeout: Duration::from_secs(2),
2626 nullify_retry: Duration::from_secs(10),
2627 fetch_timeout: Duration::from_secs(1),
2628 activity_timeout,
2629 skip_timeout,
2630 fetch_concurrent: 4,
2631 replay_buffer: NZUsize!(1024 * 1024),
2632 write_buffer: NZUsize!(1024 * 1024),
2633 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2634 };
2635 let engine = Engine::new(context.with_label("engine"), cfg);
2636 engine.start(pending, recovered, resolver);
2637 }
2638 }
2639
2640 let mut finalizers = Vec::new();
2642 for reporter in reporters.iter_mut() {
2643 let (mut latest, mut monitor) = reporter.subscribe().await;
2644 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2645 while latest < required_containers {
2646 latest = monitor.next().await.expect("event missing");
2647 }
2648 }));
2649 }
2650 join_all(finalizers).await;
2651
2652 let byz = &participants[0];
2654 let mut count_conflicting = 0;
2655 for reporter in reporters.iter() {
2656 {
2658 let faults = reporter.faults.lock().unwrap();
2659 assert_eq!(faults.len(), 1);
2660 let faulter = faults.get(byz).expect("byzantine party is not faulter");
2661 for (_, faults) in faulter.iter() {
2662 for fault in faults.iter() {
2663 match fault {
2664 Activity::ConflictingNotarize(_) => {
2665 count_conflicting += 1;
2666 }
2667 Activity::ConflictingFinalize(_) => {
2668 count_conflicting += 1;
2669 }
2670 _ => panic!("unexpected fault: {fault:?}"),
2671 }
2672 }
2673 }
2674 }
2675
2676 {
2678 let invalid = reporter.invalid.lock().unwrap();
2679 assert_eq!(*invalid, 0);
2680 }
2681 }
2682 assert!(count_conflicting > 0);
2683
2684 let blocked = oracle.blocked().await.unwrap();
2686 assert!(!blocked.is_empty());
2687 for (a, b) in blocked {
2688 assert_ne!(&a, byz);
2689 assert_eq!(&b, byz);
2690 }
2691 });
2692 }
2693
2694 #[test_group("slow")]
2695 #[test_traced]
2696 fn test_conflicter() {
2697 for seed in 0..5 {
2698 conflicter::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
2699 conflicter::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
2700 conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
2701 conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
2702 conflicter::<_, _, RoundRobin>(seed, ed25519::fixture);
2703 conflicter::<_, _, RoundRobin>(seed, secp256r1::fixture);
2704 }
2705 }
2706
2707 fn invalid<S, F, L>(seed: u64, mut fixture: F)
2708 where
2709 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2710 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2711 L: Elector<S>,
2712 {
2713 let n = 4;
2715 let required_containers = View::new(50);
2716 let activity_timeout = ViewDelta::new(10);
2717 let skip_timeout = ViewDelta::new(5);
2718 let namespace = b"consensus".to_vec();
2719 let cfg = deterministic::Config::new()
2720 .with_seed(seed)
2721 .with_timeout(Some(Duration::from_secs(30)));
2722 let executor = deterministic::Runner::new(cfg);
2723 executor.start(|mut context| async move {
2724 let (network, mut oracle) = Network::new(
2726 context.with_label("network"),
2727 Config {
2728 max_size: 1024 * 1024,
2729 disconnect_on_block: false,
2730 tracked_peer_sets: None,
2731 },
2732 );
2733
2734 network.start();
2736
2737 let Fixture {
2739 participants,
2740 schemes,
2741 ..
2742 } = fixture(&mut context, &namespace, n);
2743
2744 let Fixture {
2746 schemes: wrong_schemes,
2747 ..
2748 } = fixture(&mut context, b"wrong-namespace", n);
2749
2750 let mut registrations = register_validators(&mut oracle, &participants).await;
2751
2752 let link = Link {
2754 latency: Duration::from_millis(10),
2755 jitter: Duration::from_millis(1),
2756 success_rate: 1.0,
2757 };
2758 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2759
2760 let elector = L::default();
2762 let relay = Arc::new(mocks::relay::Relay::new());
2763 let mut reporters = Vec::new();
2764 for (idx_scheme, validator) in participants.iter().enumerate() {
2765 let context = context.with_label(&format!("validator_{}", *validator));
2767
2768 let scheme = if idx_scheme == 0 {
2771 wrong_schemes[idx_scheme].clone()
2772 } else {
2773 schemes[idx_scheme].clone()
2774 };
2775
2776 let reporter_config = mocks::reporter::Config {
2777 participants: participants.clone().try_into().unwrap(),
2778 scheme: schemes[idx_scheme].clone(),
2779 elector: elector.clone(),
2780 };
2781 let reporter =
2782 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2783 reporters.push(reporter.clone());
2784
2785 let application_cfg = mocks::application::Config {
2786 hasher: Sha256::default(),
2787 relay: relay.clone(),
2788 me: validator.clone(),
2789 propose_latency: (10.0, 5.0),
2790 verify_latency: (10.0, 5.0),
2791 certify_latency: (10.0, 5.0),
2792 should_certify: mocks::application::Certifier::Sometimes,
2793 };
2794 let (actor, application) = mocks::application::Application::new(
2795 context.with_label("application"),
2796 application_cfg,
2797 );
2798 actor.start();
2799 let blocker = oracle.control(validator.clone());
2800 let cfg = config::Config {
2801 scheme,
2802 elector: elector.clone(),
2803 blocker,
2804 automaton: application.clone(),
2805 relay: application.clone(),
2806 reporter: reporter.clone(),
2807 strategy: Sequential,
2808 partition: validator.clone().to_string(),
2809 mailbox_size: 1024,
2810 epoch: Epoch::new(333),
2811 leader_timeout: Duration::from_secs(1),
2812 notarization_timeout: Duration::from_secs(2),
2813 nullify_retry: Duration::from_secs(10),
2814 fetch_timeout: Duration::from_secs(1),
2815 activity_timeout,
2816 skip_timeout,
2817 fetch_concurrent: 4,
2818 replay_buffer: NZUsize!(1024 * 1024),
2819 write_buffer: NZUsize!(1024 * 1024),
2820 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2821 };
2822 let engine = Engine::new(context.with_label("engine"), cfg);
2823 let (pending, recovered, resolver) = registrations
2824 .remove(validator)
2825 .expect("validator should be registered");
2826 engine.start(pending, recovered, resolver);
2827 }
2828
2829 let mut finalizers = Vec::new();
2831 for reporter in reporters.iter_mut().skip(1) {
2832 let (mut latest, mut monitor) = reporter.subscribe().await;
2833 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2834 while latest < required_containers {
2835 latest = monitor.next().await.expect("event missing");
2836 }
2837 }));
2838 }
2839 join_all(finalizers).await;
2840
2841 let mut invalid_count = 0;
2843 for reporter in reporters.iter().skip(1) {
2844 {
2846 let faults = reporter.faults.lock().unwrap();
2847 assert!(faults.is_empty());
2848 }
2849
2850 {
2852 let invalid = reporter.invalid.lock().unwrap();
2853 if *invalid > 0 {
2854 invalid_count += 1;
2855 }
2856 }
2857 }
2858
2859 assert_eq!(invalid_count, n - 1);
2861
2862 let blocked = oracle.blocked().await.unwrap();
2864 assert!(!blocked.is_empty());
2865 for (a, b) in blocked {
2866 if a != participants[0] {
2867 assert_eq!(b, participants[0]);
2868 }
2869 }
2870 });
2871 }
2872
2873 #[test_group("slow")]
2874 #[test_traced]
2875 fn test_invalid() {
2876 for seed in 0..5 {
2877 invalid::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
2878 invalid::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
2879 invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
2880 invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
2881 invalid::<_, _, RoundRobin>(seed, ed25519::fixture);
2882 invalid::<_, _, RoundRobin>(seed, secp256r1::fixture);
2883 }
2884 }
2885
2886 fn impersonator<S, F, L>(seed: u64, mut fixture: F)
2887 where
2888 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2889 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2890 L: Elector<S>,
2891 {
2892 let n = 4;
2894 let required_containers = View::new(50);
2895 let activity_timeout = ViewDelta::new(10);
2896 let skip_timeout = ViewDelta::new(5);
2897 let namespace = b"consensus".to_vec();
2898 let cfg = deterministic::Config::new()
2899 .with_seed(seed)
2900 .with_timeout(Some(Duration::from_secs(30)));
2901 let executor = deterministic::Runner::new(cfg);
2902 executor.start(|mut context| async move {
2903 let (network, mut oracle) = Network::new(
2905 context.with_label("network"),
2906 Config {
2907 max_size: 1024 * 1024,
2908 disconnect_on_block: false,
2909 tracked_peer_sets: None,
2910 },
2911 );
2912
2913 network.start();
2915
2916 let Fixture {
2918 participants,
2919 schemes,
2920 ..
2921 } = fixture(&mut context, &namespace, n);
2922 let mut registrations = register_validators(&mut oracle, &participants).await;
2923
2924 let link = Link {
2926 latency: Duration::from_millis(10),
2927 jitter: Duration::from_millis(1),
2928 success_rate: 1.0,
2929 };
2930 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2931
2932 let elector = L::default();
2934 let relay = Arc::new(mocks::relay::Relay::new());
2935 let mut reporters = Vec::new();
2936 for (idx_scheme, validator) in participants.iter().enumerate() {
2937 let context = context.with_label(&format!("validator_{}", *validator));
2939
2940 let reporter_config = mocks::reporter::Config {
2942 participants: participants.clone().try_into().unwrap(),
2943 scheme: schemes[idx_scheme].clone(),
2944 elector: elector.clone(),
2945 };
2946 let reporter =
2947 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2948 let (pending, recovered, resolver) = registrations
2949 .remove(validator)
2950 .expect("validator should be registered");
2951 if idx_scheme == 0 {
2952 let cfg = mocks::impersonator::Config {
2953 scheme: schemes[idx_scheme].clone(),
2954 };
2955
2956 let engine: mocks::impersonator::Impersonator<_, _, Sha256> =
2957 mocks::impersonator::Impersonator::new(
2958 context.with_label("byzantine_engine"),
2959 cfg,
2960 );
2961 engine.start(pending);
2962 } else {
2963 reporters.push(reporter.clone());
2964 let application_cfg = mocks::application::Config {
2965 hasher: Sha256::default(),
2966 relay: relay.clone(),
2967 me: validator.clone(),
2968 propose_latency: (10.0, 5.0),
2969 verify_latency: (10.0, 5.0),
2970 certify_latency: (10.0, 5.0),
2971 should_certify: mocks::application::Certifier::Sometimes,
2972 };
2973 let (actor, application) = mocks::application::Application::new(
2974 context.with_label("application"),
2975 application_cfg,
2976 );
2977 actor.start();
2978 let blocker = oracle.control(validator.clone());
2979 let cfg = config::Config {
2980 scheme: schemes[idx_scheme].clone(),
2981 elector: elector.clone(),
2982 blocker,
2983 automaton: application.clone(),
2984 relay: application.clone(),
2985 reporter: reporter.clone(),
2986 strategy: Sequential,
2987 partition: validator.clone().to_string(),
2988 mailbox_size: 1024,
2989 epoch: Epoch::new(333),
2990 leader_timeout: Duration::from_secs(1),
2991 notarization_timeout: Duration::from_secs(2),
2992 nullify_retry: Duration::from_secs(10),
2993 fetch_timeout: Duration::from_secs(1),
2994 activity_timeout,
2995 skip_timeout,
2996 fetch_concurrent: 4,
2997 replay_buffer: NZUsize!(1024 * 1024),
2998 write_buffer: NZUsize!(1024 * 1024),
2999 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3000 };
3001 let engine = Engine::new(context.with_label("engine"), cfg);
3002 engine.start(pending, recovered, resolver);
3003 }
3004 }
3005
3006 let mut finalizers = Vec::new();
3008 for reporter in reporters.iter_mut() {
3009 let (mut latest, mut monitor) = reporter.subscribe().await;
3010 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3011 while latest < required_containers {
3012 latest = monitor.next().await.expect("event missing");
3013 }
3014 }));
3015 }
3016 join_all(finalizers).await;
3017
3018 let byz = &participants[0];
3020 for reporter in reporters.iter() {
3021 {
3023 let faults = reporter.faults.lock().unwrap();
3024 assert!(faults.is_empty());
3025 }
3026
3027 {
3029 let invalid = reporter.invalid.lock().unwrap();
3030 assert_eq!(*invalid, 0);
3031 }
3032 }
3033
3034 let blocked = oracle.blocked().await.unwrap();
3036 assert!(!blocked.is_empty());
3037 for (a, b) in blocked {
3038 assert_ne!(&a, byz);
3039 assert_eq!(&b, byz);
3040 }
3041 });
3042 }
3043
3044 #[test_group("slow")]
3045 #[test_traced]
3046 fn test_impersonator() {
3047 for seed in 0..5 {
3048 impersonator::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
3049 impersonator::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
3050 impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3051 impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3052 impersonator::<_, _, RoundRobin>(seed, ed25519::fixture);
3053 impersonator::<_, _, RoundRobin>(seed, secp256r1::fixture);
3054 }
3055 }
3056
3057 fn equivocator<S, F, L>(seed: u64, mut fixture: F) -> bool
3058 where
3059 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3060 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3061 L: Elector<S>,
3062 {
3063 let n = 7;
3065 let required_containers = View::new(50);
3066 let activity_timeout = ViewDelta::new(10);
3067 let skip_timeout = ViewDelta::new(5);
3068 let namespace = b"consensus".to_vec();
3069 let cfg = deterministic::Config::new()
3070 .with_seed(seed)
3071 .with_timeout(Some(Duration::from_secs(60)));
3072 let executor = deterministic::Runner::new(cfg);
3073 executor.start(|mut context| async move {
3074 let (network, mut oracle) = Network::new(
3076 context.with_label("network"),
3077 Config {
3078 max_size: 1024 * 1024,
3079 disconnect_on_block: false,
3080 tracked_peer_sets: None,
3081 },
3082 );
3083
3084 network.start();
3086
3087 let Fixture {
3089 participants,
3090 schemes,
3091 ..
3092 } = fixture(&mut context, &namespace, n);
3093 let mut registrations = register_validators(&mut oracle, &participants).await;
3094
3095 let link = Link {
3097 latency: Duration::from_millis(10),
3098 jitter: Duration::from_millis(1),
3099 success_rate: 1.0,
3100 };
3101 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3102
3103 let elector = L::default();
3105 let mut engines = Vec::new();
3106 let relay = Arc::new(mocks::relay::Relay::new());
3107 let mut reporters = Vec::new();
3108 for (idx_scheme, validator) in participants.iter().enumerate() {
3109 let context = context.with_label(&format!("validator_{}", *validator));
3111
3112 let reporter_config = mocks::reporter::Config {
3114 participants: participants.clone().try_into().unwrap(),
3115 scheme: schemes[idx_scheme].clone(),
3116 elector: elector.clone(),
3117 };
3118 let reporter =
3119 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3120 reporters.push(reporter.clone());
3121 let (pending, recovered, resolver) = registrations
3122 .remove(validator)
3123 .expect("validator should be registered");
3124 if idx_scheme == 0 {
3125 let cfg = mocks::equivocator::Config {
3126 scheme: schemes[idx_scheme].clone(),
3127 epoch: Epoch::new(333),
3128 relay: relay.clone(),
3129 hasher: Sha256::default(),
3130 elector: elector.clone(),
3131 };
3132
3133 let engine = mocks::equivocator::Equivocator::new(
3134 context.with_label("byzantine_engine"),
3135 cfg,
3136 );
3137 engines.push(engine.start(pending, recovered));
3138 } else {
3139 let application_cfg = mocks::application::Config {
3140 hasher: Sha256::default(),
3141 relay: relay.clone(),
3142 me: validator.clone(),
3143 propose_latency: (10.0, 5.0),
3144 verify_latency: (10.0, 5.0),
3145 certify_latency: (10.0, 5.0),
3146 should_certify: mocks::application::Certifier::Sometimes,
3147 };
3148 let (actor, application) = mocks::application::Application::new(
3149 context.with_label("application"),
3150 application_cfg,
3151 );
3152 actor.start();
3153 let blocker = oracle.control(validator.clone());
3154 let cfg = config::Config {
3155 scheme: schemes[idx_scheme].clone(),
3156 elector: elector.clone(),
3157 blocker,
3158 automaton: application.clone(),
3159 relay: application.clone(),
3160 reporter: reporter.clone(),
3161 strategy: Sequential,
3162 partition: validator.to_string(),
3163 mailbox_size: 1024,
3164 epoch: Epoch::new(333),
3165 leader_timeout: Duration::from_secs(1),
3166 notarization_timeout: Duration::from_secs(2),
3167 nullify_retry: Duration::from_secs(10),
3168 fetch_timeout: Duration::from_secs(1),
3169 activity_timeout,
3170 skip_timeout,
3171 fetch_concurrent: 4,
3172 replay_buffer: NZUsize!(1024 * 1024),
3173 write_buffer: NZUsize!(1024 * 1024),
3174 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3175 };
3176 let engine = Engine::new(context.with_label("engine"), cfg);
3177 engines.push(engine.start(pending, recovered, resolver));
3178 }
3179 }
3180
3181 let mut finalizers = Vec::new();
3183 for reporter in reporters.iter_mut().skip(1) {
3184 let (mut latest, mut monitor) = reporter.subscribe().await;
3185 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3186 while latest < required_containers {
3187 latest = monitor.next().await.expect("event missing");
3188 }
3189 }));
3190 }
3191 join_all(finalizers).await;
3192
3193 let idx = context.gen_range(1..engines.len()); let validator = &participants[idx];
3196 let handle = engines.remove(idx);
3197 handle.abort();
3198 let _ = handle.await;
3199 reporters.remove(idx);
3200 info!(idx, ?validator, "aborted validator");
3201
3202 let mut finalizers = Vec::new();
3204 for reporter in reporters.iter_mut().skip(1) {
3205 let (mut latest, mut monitor) = reporter.subscribe().await;
3206 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3207 while latest < View::new(required_containers.get() * 2) {
3208 latest = monitor.next().await.expect("event missing");
3209 }
3210 }));
3211 }
3212 join_all(finalizers).await;
3213
3214 info!(idx, ?validator, "restarting validator");
3216 let context = context.with_label(&format!("validator_{}_restarted", *validator));
3217
3218 let reporter_config = mocks::reporter::Config {
3220 participants: participants.clone().try_into().unwrap(),
3221 scheme: schemes[idx].clone(),
3222 elector: elector.clone(),
3223 };
3224 let reporter =
3225 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3226 let (pending, recovered, resolver) =
3227 register_validator(&mut oracle, validator.clone()).await;
3228 reporters.push(reporter.clone());
3229 let application_cfg = mocks::application::Config {
3230 hasher: Sha256::default(),
3231 relay: relay.clone(),
3232 me: validator.clone(),
3233 propose_latency: (10.0, 5.0),
3234 verify_latency: (10.0, 5.0),
3235 certify_latency: (10.0, 5.0),
3236 should_certify: mocks::application::Certifier::Sometimes,
3237 };
3238 let (actor, application) = mocks::application::Application::new(
3239 context.with_label("application"),
3240 application_cfg,
3241 );
3242 actor.start();
3243 let blocker = oracle.control(validator.clone());
3244 let cfg = config::Config {
3245 scheme: schemes[idx].clone(),
3246 elector: elector.clone(),
3247 blocker,
3248 automaton: application.clone(),
3249 relay: application.clone(),
3250 reporter: reporter.clone(),
3251 strategy: Sequential,
3252 partition: validator.to_string(),
3253 mailbox_size: 1024,
3254 epoch: Epoch::new(333),
3255 leader_timeout: Duration::from_secs(1),
3256 notarization_timeout: Duration::from_secs(2),
3257 nullify_retry: Duration::from_secs(10),
3258 fetch_timeout: Duration::from_secs(1),
3259 activity_timeout,
3260 skip_timeout,
3261 fetch_concurrent: 4,
3262 replay_buffer: NZUsize!(1024 * 1024),
3263 write_buffer: NZUsize!(1024 * 1024),
3264 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3265 };
3266 let engine = Engine::new(context.with_label("engine"), cfg);
3267 engine.start(pending, recovered, resolver);
3268
3269 let mut finalizers = Vec::new();
3271 for reporter in reporters.iter_mut().skip(1) {
3272 let (mut latest, mut monitor) = reporter.subscribe().await;
3273 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3274 while latest < View::new(required_containers.get() * 3) {
3275 latest = monitor.next().await.expect("event missing");
3276 }
3277 }));
3278 }
3279 join_all(finalizers).await;
3280
3281 let byz = &participants[0];
3285 let blocked = oracle.blocked().await.unwrap();
3286 for (a, b) in &blocked {
3287 assert_ne!(a, byz);
3288 assert_eq!(b, byz);
3289 }
3290 !blocked.is_empty()
3291 })
3292 }
3293
3294 #[test_group("slow")]
3295 #[test_traced]
3296 fn test_equivocator_bls12381_threshold_min_pk() {
3297 let detected = (0..5)
3298 .any(|seed| equivocator::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>));
3299 assert!(
3300 detected,
3301 "expected at least one seed to detect equivocation"
3302 );
3303 }
3304
3305 #[test_group("slow")]
3306 #[test_traced]
3307 fn test_equivocator_bls12381_threshold_min_sig() {
3308 let detected = (0..5).any(|seed| {
3309 equivocator::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>)
3310 });
3311 assert!(
3312 detected,
3313 "expected at least one seed to detect equivocation"
3314 );
3315 }
3316
3317 #[test_group("slow")]
3318 #[test_traced]
3319 fn test_equivocator_bls12381_multisig_min_pk() {
3320 let detected = (0..5).any(|seed| {
3321 equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>)
3322 });
3323 assert!(
3324 detected,
3325 "expected at least one seed to detect equivocation"
3326 );
3327 }
3328
3329 #[test_group("slow")]
3330 #[test_traced]
3331 fn test_equivocator_bls12381_multisig_min_sig() {
3332 let detected = (0..5).any(|seed| {
3333 equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>)
3334 });
3335 assert!(
3336 detected,
3337 "expected at least one seed to detect equivocation"
3338 );
3339 }
3340
3341 #[test_group("slow")]
3342 #[test_traced]
3343 fn test_equivocator_ed25519() {
3344 let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, ed25519::fixture));
3345 assert!(
3346 detected,
3347 "expected at least one seed to detect equivocation"
3348 );
3349 }
3350
3351 #[test_group("slow")]
3352 #[test_traced]
3353 fn test_equivocator_secp256r1() {
3354 let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, secp256r1::fixture));
3355 assert!(
3356 detected,
3357 "expected at least one seed to detect equivocation"
3358 );
3359 }
3360
3361 fn reconfigurer<S, F, L>(seed: u64, mut fixture: F)
3362 where
3363 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3364 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3365 L: Elector<S>,
3366 {
3367 let n = 4;
3369 let required_containers = View::new(50);
3370 let activity_timeout = ViewDelta::new(10);
3371 let skip_timeout = ViewDelta::new(5);
3372 let namespace = b"consensus".to_vec();
3373 let cfg = deterministic::Config::new()
3374 .with_seed(seed)
3375 .with_timeout(Some(Duration::from_secs(30)));
3376 let executor = deterministic::Runner::new(cfg);
3377 executor.start(|mut context| async move {
3378 let (network, mut oracle) = Network::new(
3380 context.with_label("network"),
3381 Config {
3382 max_size: 1024 * 1024,
3383 disconnect_on_block: false,
3384 tracked_peer_sets: None,
3385 },
3386 );
3387
3388 network.start();
3390
3391 let Fixture {
3393 participants,
3394 schemes,
3395 ..
3396 } = fixture(&mut context, &namespace, n);
3397 let mut registrations = register_validators(&mut oracle, &participants).await;
3398
3399 let link = Link {
3401 latency: Duration::from_millis(10),
3402 jitter: Duration::from_millis(1),
3403 success_rate: 1.0,
3404 };
3405 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3406
3407 let elector = L::default();
3409 let relay = Arc::new(mocks::relay::Relay::new());
3410 let mut reporters = Vec::new();
3411 for (idx_scheme, validator) in participants.iter().enumerate() {
3412 let context = context.with_label(&format!("validator_{}", *validator));
3414
3415 let reporter_config = mocks::reporter::Config {
3417 participants: participants.clone().try_into().unwrap(),
3418 scheme: schemes[idx_scheme].clone(),
3419 elector: elector.clone(),
3420 };
3421 let reporter =
3422 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3423 let (pending, recovered, resolver) = registrations
3424 .remove(validator)
3425 .expect("validator should be registered");
3426 if idx_scheme == 0 {
3427 let cfg = mocks::reconfigurer::Config {
3428 scheme: schemes[idx_scheme].clone(),
3429 };
3430 let engine: mocks::reconfigurer::Reconfigurer<_, _, Sha256> =
3431 mocks::reconfigurer::Reconfigurer::new(
3432 context.with_label("byzantine_engine"),
3433 cfg,
3434 );
3435 engine.start(pending);
3436 } else {
3437 reporters.push(reporter.clone());
3438 let application_cfg = mocks::application::Config {
3439 hasher: Sha256::default(),
3440 relay: relay.clone(),
3441 me: validator.clone(),
3442 propose_latency: (10.0, 5.0),
3443 verify_latency: (10.0, 5.0),
3444 certify_latency: (10.0, 5.0),
3445 should_certify: mocks::application::Certifier::Sometimes,
3446 };
3447 let (actor, application) = mocks::application::Application::new(
3448 context.with_label("application"),
3449 application_cfg,
3450 );
3451 actor.start();
3452 let blocker = oracle.control(validator.clone());
3453 let cfg = config::Config {
3454 scheme: schemes[idx_scheme].clone(),
3455 elector: elector.clone(),
3456 blocker,
3457 automaton: application.clone(),
3458 relay: application.clone(),
3459 reporter: reporter.clone(),
3460 strategy: Sequential,
3461 partition: validator.to_string(),
3462 mailbox_size: 1024,
3463 epoch: Epoch::new(333),
3464 leader_timeout: Duration::from_secs(1),
3465 notarization_timeout: Duration::from_secs(2),
3466 nullify_retry: Duration::from_secs(10),
3467 fetch_timeout: Duration::from_secs(1),
3468 activity_timeout,
3469 skip_timeout,
3470 fetch_concurrent: 4,
3471 replay_buffer: NZUsize!(1024 * 1024),
3472 write_buffer: NZUsize!(1024 * 1024),
3473 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3474 };
3475 let engine = Engine::new(context.with_label("engine"), cfg);
3476 engine.start(pending, recovered, resolver);
3477 }
3478 }
3479
3480 let mut finalizers = Vec::new();
3482 for reporter in reporters.iter_mut() {
3483 let (mut latest, mut monitor) = reporter.subscribe().await;
3484 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3485 while latest < required_containers {
3486 latest = monitor.next().await.expect("event missing");
3487 }
3488 }));
3489 }
3490 join_all(finalizers).await;
3491
3492 let byz = &participants[0];
3494 for reporter in reporters.iter() {
3495 {
3497 let faults = reporter.faults.lock().unwrap();
3498 assert!(faults.is_empty());
3499 }
3500
3501 {
3503 let invalid = reporter.invalid.lock().unwrap();
3504 assert_eq!(*invalid, 0);
3505 }
3506 }
3507
3508 let blocked = oracle.blocked().await.unwrap();
3510 assert!(!blocked.is_empty());
3511 for (a, b) in blocked {
3512 assert_ne!(&a, byz);
3513 assert_eq!(&b, byz);
3514 }
3515 });
3516 }
3517
3518 #[test_group("slow")]
3519 #[test_traced]
3520 fn test_reconfigurer() {
3521 for seed in 0..5 {
3522 reconfigurer::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
3523 reconfigurer::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
3524 reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3525 reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3526 reconfigurer::<_, _, RoundRobin>(seed, ed25519::fixture);
3527 reconfigurer::<_, _, RoundRobin>(seed, secp256r1::fixture);
3528 }
3529 }
3530
3531 fn nuller<S, F, L>(seed: u64, mut fixture: F)
3532 where
3533 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3534 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3535 L: Elector<S>,
3536 {
3537 let n = 4;
3539 let required_containers = View::new(50);
3540 let activity_timeout = ViewDelta::new(10);
3541 let skip_timeout = ViewDelta::new(5);
3542 let namespace = b"consensus".to_vec();
3543 let cfg = deterministic::Config::new()
3544 .with_seed(seed)
3545 .with_timeout(Some(Duration::from_secs(30)));
3546 let executor = deterministic::Runner::new(cfg);
3547 executor.start(|mut context| async move {
3548 let (network, mut oracle) = Network::new(
3550 context.with_label("network"),
3551 Config {
3552 max_size: 1024 * 1024,
3553 disconnect_on_block: false,
3554 tracked_peer_sets: None,
3555 },
3556 );
3557
3558 network.start();
3560
3561 let Fixture {
3563 participants,
3564 schemes,
3565 ..
3566 } = fixture(&mut context, &namespace, n);
3567 let mut registrations = register_validators(&mut oracle, &participants).await;
3568
3569 let link = Link {
3571 latency: Duration::from_millis(10),
3572 jitter: Duration::from_millis(1),
3573 success_rate: 1.0,
3574 };
3575 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3576
3577 let elector = L::default();
3579 let relay = Arc::new(mocks::relay::Relay::new());
3580 let mut reporters = Vec::new();
3581 for (idx_scheme, validator) in participants.iter().enumerate() {
3582 let context = context.with_label(&format!("validator_{}", *validator));
3584
3585 let reporter_config = mocks::reporter::Config {
3587 participants: participants.clone().try_into().unwrap(),
3588 scheme: schemes[idx_scheme].clone(),
3589 elector: elector.clone(),
3590 };
3591 let reporter =
3592 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3593 let (pending, recovered, resolver) = registrations
3594 .remove(validator)
3595 .expect("validator should be registered");
3596 if idx_scheme == 0 {
3597 let cfg = mocks::nuller::Config {
3598 scheme: schemes[idx_scheme].clone(),
3599 };
3600 let engine: mocks::nuller::Nuller<_, _, Sha256> =
3601 mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
3602 engine.start(pending);
3603 } else {
3604 reporters.push(reporter.clone());
3605 let application_cfg = mocks::application::Config {
3606 hasher: Sha256::default(),
3607 relay: relay.clone(),
3608 me: validator.clone(),
3609 propose_latency: (10.0, 5.0),
3610 verify_latency: (10.0, 5.0),
3611 certify_latency: (10.0, 5.0),
3612 should_certify: mocks::application::Certifier::Sometimes,
3613 };
3614 let (actor, application) = mocks::application::Application::new(
3615 context.with_label("application"),
3616 application_cfg,
3617 );
3618 actor.start();
3619 let blocker = oracle.control(validator.clone());
3620 let cfg = config::Config {
3621 scheme: schemes[idx_scheme].clone(),
3622 elector: elector.clone(),
3623 blocker,
3624 automaton: application.clone(),
3625 relay: application.clone(),
3626 reporter: reporter.clone(),
3627 strategy: Sequential,
3628 partition: validator.clone().to_string(),
3629 mailbox_size: 1024,
3630 epoch: Epoch::new(333),
3631 leader_timeout: Duration::from_secs(1),
3632 notarization_timeout: Duration::from_secs(2),
3633 nullify_retry: Duration::from_secs(10),
3634 fetch_timeout: Duration::from_secs(1),
3635 activity_timeout,
3636 skip_timeout,
3637 fetch_concurrent: 4,
3638 replay_buffer: NZUsize!(1024 * 1024),
3639 write_buffer: NZUsize!(1024 * 1024),
3640 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3641 };
3642 let engine = Engine::new(context.with_label("engine"), cfg);
3643 engine.start(pending, recovered, resolver);
3644 }
3645 }
3646
3647 let mut finalizers = Vec::new();
3649 for reporter in reporters.iter_mut() {
3650 let (mut latest, mut monitor) = reporter.subscribe().await;
3651 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3652 while latest < required_containers {
3653 latest = monitor.next().await.expect("event missing");
3654 }
3655 }));
3656 }
3657 join_all(finalizers).await;
3658
3659 let byz = &participants[0];
3661 let mut count_nullify_and_finalize = 0;
3662 for reporter in reporters.iter() {
3663 {
3665 let faults = reporter.faults.lock().unwrap();
3666 assert_eq!(faults.len(), 1);
3667 let faulter = faults.get(byz).expect("byzantine party is not faulter");
3668 for (_, faults) in faulter.iter() {
3669 for fault in faults.iter() {
3670 match fault {
3671 Activity::NullifyFinalize(_) => {
3672 count_nullify_and_finalize += 1;
3673 }
3674 _ => panic!("unexpected fault: {fault:?}"),
3675 }
3676 }
3677 }
3678 }
3679
3680 {
3682 let invalid = reporter.invalid.lock().unwrap();
3683 assert_eq!(*invalid, 0);
3684 }
3685 }
3686 assert!(count_nullify_and_finalize > 0);
3687
3688 let blocked = oracle.blocked().await.unwrap();
3690 assert!(!blocked.is_empty());
3691 for (a, b) in blocked {
3692 assert_ne!(&a, byz);
3693 assert_eq!(&b, byz);
3694 }
3695 });
3696 }
3697
3698 #[test_group("slow")]
3699 #[test_traced]
3700 fn test_nuller() {
3701 for seed in 0..5 {
3702 nuller::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
3703 nuller::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
3704 nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3705 nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3706 nuller::<_, _, RoundRobin>(seed, ed25519::fixture);
3707 nuller::<_, _, RoundRobin>(seed, secp256r1::fixture);
3708 }
3709 }
3710
3711 fn outdated<S, F, L>(seed: u64, mut fixture: F)
3712 where
3713 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3714 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3715 L: Elector<S>,
3716 {
3717 let n = 4;
3719 let required_containers = View::new(100);
3720 let activity_timeout = ViewDelta::new(10);
3721 let skip_timeout = ViewDelta::new(5);
3722 let namespace = b"consensus".to_vec();
3723 let cfg = deterministic::Config::new()
3724 .with_seed(seed)
3725 .with_timeout(Some(Duration::from_secs(30)));
3726 let executor = deterministic::Runner::new(cfg);
3727 executor.start(|mut context| async move {
3728 let (network, mut oracle) = Network::new(
3730 context.with_label("network"),
3731 Config {
3732 max_size: 1024 * 1024,
3733 disconnect_on_block: false,
3734 tracked_peer_sets: None,
3735 },
3736 );
3737
3738 network.start();
3740
3741 let Fixture {
3743 participants,
3744 schemes,
3745 ..
3746 } = fixture(&mut context, &namespace, n);
3747 let mut registrations = register_validators(&mut oracle, &participants).await;
3748
3749 let link = Link {
3751 latency: Duration::from_millis(10),
3752 jitter: Duration::from_millis(1),
3753 success_rate: 1.0,
3754 };
3755 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3756
3757 let elector = L::default();
3759 let relay = Arc::new(mocks::relay::Relay::new());
3760 let mut reporters = Vec::new();
3761 for (idx_scheme, validator) in participants.iter().enumerate() {
3762 let context = context.with_label(&format!("validator_{}", *validator));
3764
3765 let reporter_config = mocks::reporter::Config {
3767 participants: participants.clone().try_into().unwrap(),
3768 scheme: schemes[idx_scheme].clone(),
3769 elector: elector.clone(),
3770 };
3771 let reporter =
3772 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3773 let (pending, recovered, resolver) = registrations
3774 .remove(validator)
3775 .expect("validator should be registered");
3776 if idx_scheme == 0 {
3777 let cfg = mocks::outdated::Config {
3778 scheme: schemes[idx_scheme].clone(),
3779 view_delta: ViewDelta::new(activity_timeout.get().saturating_mul(4)),
3780 };
3781 let engine: mocks::outdated::Outdated<_, _, Sha256> =
3782 mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
3783 engine.start(pending);
3784 } else {
3785 reporters.push(reporter.clone());
3786 let application_cfg = mocks::application::Config {
3787 hasher: Sha256::default(),
3788 relay: relay.clone(),
3789 me: validator.clone(),
3790 propose_latency: (10.0, 5.0),
3791 verify_latency: (10.0, 5.0),
3792 certify_latency: (10.0, 5.0),
3793 should_certify: mocks::application::Certifier::Sometimes,
3794 };
3795 let (actor, application) = mocks::application::Application::new(
3796 context.with_label("application"),
3797 application_cfg,
3798 );
3799 actor.start();
3800 let blocker = oracle.control(validator.clone());
3801 let cfg = config::Config {
3802 scheme: schemes[idx_scheme].clone(),
3803 elector: elector.clone(),
3804 blocker,
3805 automaton: application.clone(),
3806 relay: application.clone(),
3807 reporter: reporter.clone(),
3808 strategy: Sequential,
3809 partition: validator.clone().to_string(),
3810 mailbox_size: 1024,
3811 epoch: Epoch::new(333),
3812 leader_timeout: Duration::from_secs(1),
3813 notarization_timeout: Duration::from_secs(2),
3814 nullify_retry: Duration::from_secs(10),
3815 fetch_timeout: Duration::from_secs(1),
3816 activity_timeout,
3817 skip_timeout,
3818 fetch_concurrent: 4,
3819 replay_buffer: NZUsize!(1024 * 1024),
3820 write_buffer: NZUsize!(1024 * 1024),
3821 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3822 };
3823 let engine = Engine::new(context.with_label("engine"), cfg);
3824 engine.start(pending, recovered, resolver);
3825 }
3826 }
3827
3828 let mut finalizers = Vec::new();
3830 for reporter in reporters.iter_mut() {
3831 let (mut latest, mut monitor) = reporter.subscribe().await;
3832 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3833 while latest < required_containers {
3834 latest = monitor.next().await.expect("event missing");
3835 }
3836 }));
3837 }
3838 join_all(finalizers).await;
3839
3840 for reporter in reporters.iter() {
3842 {
3844 let faults = reporter.faults.lock().unwrap();
3845 assert!(faults.is_empty());
3846 }
3847
3848 {
3850 let invalid = reporter.invalid.lock().unwrap();
3851 assert_eq!(*invalid, 0);
3852 }
3853 }
3854
3855 let blocked = oracle.blocked().await.unwrap();
3857 assert!(blocked.is_empty());
3858 });
3859 }
3860
3861 #[test_group("slow")]
3862 #[test_traced]
3863 fn test_outdated() {
3864 for seed in 0..5 {
3865 outdated::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
3866 outdated::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
3867 outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3868 outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3869 outdated::<_, _, RoundRobin>(seed, ed25519::fixture);
3870 outdated::<_, _, RoundRobin>(seed, secp256r1::fixture);
3871 }
3872 }
3873
3874 fn run_1k<S, F, L>(mut fixture: F)
3875 where
3876 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3877 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3878 L: Elector<S>,
3879 {
3880 let n = 10;
3882 let required_containers = View::new(1_000);
3883 let activity_timeout = ViewDelta::new(10);
3884 let skip_timeout = ViewDelta::new(5);
3885 let namespace = b"consensus".to_vec();
3886 let cfg = deterministic::Config::new();
3887 let executor = deterministic::Runner::new(cfg);
3888 executor.start(|mut context| async move {
3889 let (network, mut oracle) = Network::new(
3891 context.with_label("network"),
3892 Config {
3893 max_size: 1024 * 1024,
3894 disconnect_on_block: false,
3895 tracked_peer_sets: None,
3896 },
3897 );
3898
3899 network.start();
3901
3902 let Fixture {
3904 participants,
3905 schemes,
3906 ..
3907 } = fixture(&mut context, &namespace, n);
3908 let mut registrations = register_validators(&mut oracle, &participants).await;
3909
3910 let link = Link {
3912 latency: Duration::from_millis(80),
3913 jitter: Duration::from_millis(10),
3914 success_rate: 0.98,
3915 };
3916 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3917
3918 let elector = L::default();
3920 let relay = Arc::new(mocks::relay::Relay::new());
3921 let mut reporters = Vec::new();
3922 let mut engine_handlers = Vec::new();
3923 for (idx, validator) in participants.iter().enumerate() {
3924 let context = context.with_label(&format!("validator_{}", *validator));
3926
3927 let reporter_config = mocks::reporter::Config {
3929 participants: participants.clone().try_into().unwrap(),
3930 scheme: schemes[idx].clone(),
3931 elector: elector.clone(),
3932 };
3933 let reporter =
3934 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3935 reporters.push(reporter.clone());
3936 let application_cfg = mocks::application::Config {
3937 hasher: Sha256::default(),
3938 relay: relay.clone(),
3939 me: validator.clone(),
3940 propose_latency: (100.0, 50.0),
3941 verify_latency: (50.0, 40.0),
3942 certify_latency: (50.0, 40.0),
3943 should_certify: mocks::application::Certifier::Sometimes,
3944 };
3945 let (actor, application) = mocks::application::Application::new(
3946 context.with_label("application"),
3947 application_cfg,
3948 );
3949 actor.start();
3950 let blocker = oracle.control(validator.clone());
3951 let cfg = config::Config {
3952 scheme: schemes[idx].clone(),
3953 elector: elector.clone(),
3954 blocker,
3955 automaton: application.clone(),
3956 relay: application.clone(),
3957 reporter: reporter.clone(),
3958 strategy: Sequential,
3959 partition: validator.to_string(),
3960 mailbox_size: 1024,
3961 epoch: Epoch::new(333),
3962 leader_timeout: Duration::from_secs(1),
3963 notarization_timeout: Duration::from_secs(2),
3964 nullify_retry: Duration::from_secs(10),
3965 fetch_timeout: Duration::from_secs(1),
3966 activity_timeout,
3967 skip_timeout,
3968 fetch_concurrent: 4,
3969 replay_buffer: NZUsize!(1024 * 1024),
3970 write_buffer: NZUsize!(1024 * 1024),
3971 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3972 };
3973 let engine = Engine::new(context.with_label("engine"), cfg);
3974
3975 let (pending, recovered, resolver) = registrations
3977 .remove(validator)
3978 .expect("validator should be registered");
3979 engine_handlers.push(engine.start(pending, recovered, resolver));
3980 }
3981
3982 let mut finalizers = Vec::new();
3984 for reporter in reporters.iter_mut() {
3985 let (mut latest, mut monitor) = reporter.subscribe().await;
3986 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3987 while latest < required_containers {
3988 latest = monitor.next().await.expect("event missing");
3989 }
3990 }));
3991 }
3992 join_all(finalizers).await;
3993
3994 for reporter in reporters.iter() {
3996 {
3998 let faults = reporter.faults.lock().unwrap();
3999 assert!(faults.is_empty());
4000 }
4001
4002 {
4004 let invalid = reporter.invalid.lock().unwrap();
4005 assert_eq!(*invalid, 0);
4006 }
4007 }
4008
4009 let blocked = oracle.blocked().await.unwrap();
4011 assert!(blocked.is_empty());
4012 })
4013 }
4014
4015 #[test_group("slow")]
4016 #[test_traced]
4017 fn test_1k_bls12381_threshold_min_pk() {
4018 run_1k::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
4019 }
4020
4021 #[test_group("slow")]
4022 #[test_traced]
4023 fn test_1k_bls12381_threshold_min_sig() {
4024 run_1k::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
4025 }
4026
4027 #[test_group("slow")]
4028 #[test_traced]
4029 fn test_1k_bls12381_multisig_min_pk() {
4030 run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4031 }
4032
4033 #[test_group("slow")]
4034 #[test_traced]
4035 fn test_1k_bls12381_multisig_min_sig() {
4036 run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
4037 }
4038
4039 #[test_group("slow")]
4040 #[test_traced]
4041 fn test_1k_ed25519() {
4042 run_1k::<_, _, RoundRobin>(ed25519::fixture);
4043 }
4044
4045 #[test_group("slow")]
4046 #[test_traced]
4047 fn test_1k_secp256r1() {
4048 run_1k::<_, _, RoundRobin>(secp256r1::fixture);
4049 }
4050
4051 fn engine_shutdown<S, F, L>(seed: u64, mut fixture: F, graceful: bool)
4052 where
4053 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4054 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4055 L: Elector<S>,
4056 {
4057 let n = 1;
4058 let namespace = b"consensus".to_vec();
4059 let cfg = deterministic::Config::default()
4060 .with_seed(seed)
4061 .with_timeout(Some(Duration::from_secs(10)));
4062 let executor = deterministic::Runner::new(cfg);
4063 executor.start(|mut context| async move {
4064 let (network, mut oracle) = Network::new(
4066 context.with_label("network"),
4067 Config {
4068 max_size: 1024 * 1024,
4069 disconnect_on_block: true,
4070 tracked_peer_sets: None,
4071 },
4072 );
4073
4074 network.start();
4076
4077 let Fixture {
4079 participants,
4080 schemes,
4081 ..
4082 } = fixture(&mut context, &namespace, n);
4083 let mut registrations = register_validators(&mut oracle, &participants).await;
4084
4085 let link = Link {
4087 latency: Duration::from_millis(1),
4088 jitter: Duration::from_millis(0),
4089 success_rate: 1.0,
4090 };
4091 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4092
4093 let elector = L::default();
4095 let reporter_config = mocks::reporter::Config {
4096 participants: participants.clone().try_into().unwrap(),
4097 scheme: schemes[0].clone(),
4098 elector: elector.clone(),
4099 };
4100 let reporter =
4101 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
4102 let relay = Arc::new(mocks::relay::Relay::new());
4103 let application_cfg = mocks::application::Config {
4104 hasher: Sha256::default(),
4105 relay: relay.clone(),
4106 me: participants[0].clone(),
4107 propose_latency: (1.0, 0.0),
4108 verify_latency: (1.0, 0.0),
4109 certify_latency: (1.0, 0.0),
4110 should_certify: mocks::application::Certifier::Sometimes,
4111 };
4112 let (actor, application) = mocks::application::Application::new(
4113 context.with_label("application"),
4114 application_cfg,
4115 );
4116 actor.start();
4117 let blocker = oracle.control(participants[0].clone());
4118 let cfg = config::Config {
4119 scheme: schemes[0].clone(),
4120 elector: elector.clone(),
4121 blocker,
4122 automaton: application.clone(),
4123 relay: application.clone(),
4124 reporter: reporter.clone(),
4125 strategy: Sequential,
4126 partition: participants[0].clone().to_string(),
4127 mailbox_size: 64,
4128 epoch: Epoch::new(333),
4129 leader_timeout: Duration::from_millis(50),
4130 notarization_timeout: Duration::from_millis(100),
4131 nullify_retry: Duration::from_millis(250),
4132 fetch_timeout: Duration::from_millis(50),
4133 activity_timeout: ViewDelta::new(4),
4134 skip_timeout: ViewDelta::new(2),
4135 fetch_concurrent: 4,
4136 replay_buffer: NZUsize!(1024 * 16),
4137 write_buffer: NZUsize!(1024 * 16),
4138 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4139 };
4140 let engine = Engine::new(context.with_label("engine"), cfg);
4141
4142 let (pending, recovered, resolver) = registrations
4144 .remove(&participants[0])
4145 .expect("validator should be registered");
4146 let handle = engine.start(pending, recovered, resolver);
4147
4148 context.sleep(Duration::from_millis(1000)).await;
4150
4151 let running_before = count_running_tasks(&context, "engine");
4153 assert!(
4154 running_before > 0,
4155 "at least one engine task should be running"
4156 );
4157
4158 context.sleep(Duration::from_millis(1500)).await;
4160 assert!(
4161 count_running_tasks(&context, "engine") > 0,
4162 "engine tasks should still be running"
4163 );
4164
4165 let running_after = if graceful {
4167 let metrics_context = context.clone();
4168 let result = context.stop(0, Some(Duration::from_secs(5))).await;
4169 assert!(
4170 result.is_ok(),
4171 "graceful shutdown should complete: {result:?}"
4172 );
4173 count_running_tasks(&metrics_context, "engine")
4174 } else {
4175 handle.abort();
4176 let _ = handle.await; context.sleep(Duration::from_millis(1000)).await;
4180 count_running_tasks(&context, "engine")
4181 };
4182 assert_eq!(
4183 running_after, 0,
4184 "all engine tasks should be stopped, but {running_after} still running"
4185 );
4186 });
4187 }
4188
4189 #[test_traced]
4190 fn test_children_shutdown_on_engine_abort() {
4191 for seed in 0..10 {
4192 engine_shutdown::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>, false);
4193 engine_shutdown::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>, false);
4194 engine_shutdown::<_, _, RoundRobin>(
4195 seed,
4196 bls12381_multisig::fixture::<MinPk, _>,
4197 false,
4198 );
4199 engine_shutdown::<_, _, RoundRobin>(
4200 seed,
4201 bls12381_multisig::fixture::<MinSig, _>,
4202 false,
4203 );
4204 engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, false);
4205 engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, false);
4206 }
4207 }
4208
4209 #[test_traced]
4210 fn test_graceful_shutdown() {
4211 for seed in 0..10 {
4212 engine_shutdown::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>, true);
4213 engine_shutdown::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>, true);
4214 engine_shutdown::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>, true);
4215 engine_shutdown::<_, _, RoundRobin>(
4216 seed,
4217 bls12381_multisig::fixture::<MinSig, _>,
4218 true,
4219 );
4220 engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, true);
4221 engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, true);
4222 }
4223 }
4224
4225 fn attributable_reporter_filtering<S, F, L>(mut fixture: F)
4226 where
4227 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4228 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4229 L: Elector<S>,
4230 {
4231 let n = 3;
4232 let required_containers = View::new(10);
4233 let activity_timeout = ViewDelta::new(10);
4234 let skip_timeout = ViewDelta::new(5);
4235 let namespace = b"consensus".to_vec();
4236 let executor = deterministic::Runner::timed(Duration::from_secs(30));
4237 executor.start(|mut context| async move {
4238 let (network, mut oracle) = Network::new(
4240 context.with_label("network"),
4241 Config {
4242 max_size: 1024 * 1024,
4243 disconnect_on_block: false,
4244 tracked_peer_sets: None,
4245 },
4246 );
4247 network.start();
4248
4249 let Fixture {
4251 participants,
4252 schemes,
4253 ..
4254 } = fixture(&mut context, &namespace, n);
4255 let mut registrations = register_validators(&mut oracle, &participants).await;
4256
4257 let link = Link {
4259 latency: Duration::from_millis(10),
4260 jitter: Duration::from_millis(1),
4261 success_rate: 1.0,
4262 };
4263 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4264
4265 let elector = L::default();
4267 let relay = Arc::new(mocks::relay::Relay::new());
4268 let mut reporters = Vec::new();
4269 for (idx, validator) in participants.iter().enumerate() {
4270 let context = context.with_label(&format!("validator_{}", *validator));
4271
4272 let reporter_config = mocks::reporter::Config {
4273 participants: participants.clone().try_into().unwrap(),
4274 scheme: schemes[idx].clone(),
4275 elector: elector.clone(),
4276 };
4277 let mock_reporter = mocks::reporter::Reporter::new(
4278 context.with_label("mock_reporter"),
4279 reporter_config,
4280 );
4281
4282 let attributable_reporter = scheme::reporter::AttributableReporter::new(
4284 context.with_label("rng"),
4285 schemes[idx].clone(),
4286 mock_reporter.clone(),
4287 Sequential,
4288 true, );
4290 reporters.push(mock_reporter.clone());
4291
4292 let application_cfg = mocks::application::Config {
4293 hasher: Sha256::default(),
4294 relay: relay.clone(),
4295 me: validator.clone(),
4296 propose_latency: (10.0, 5.0),
4297 verify_latency: (10.0, 5.0),
4298 certify_latency: (10.0, 5.0),
4299 should_certify: mocks::application::Certifier::Sometimes,
4300 };
4301 let (actor, application) = mocks::application::Application::new(
4302 context.with_label("application"),
4303 application_cfg,
4304 );
4305 actor.start();
4306 let blocker = oracle.control(validator.clone());
4307 let cfg = config::Config {
4308 scheme: schemes[idx].clone(),
4309 elector: elector.clone(),
4310 blocker,
4311 automaton: application.clone(),
4312 relay: application.clone(),
4313 reporter: attributable_reporter,
4314 strategy: Sequential,
4315 partition: validator.to_string(),
4316 mailbox_size: 1024,
4317 epoch: Epoch::new(333),
4318 leader_timeout: Duration::from_secs(1),
4319 notarization_timeout: Duration::from_secs(2),
4320 nullify_retry: Duration::from_secs(10),
4321 fetch_timeout: Duration::from_secs(1),
4322 activity_timeout,
4323 skip_timeout,
4324 fetch_concurrent: 4,
4325 replay_buffer: NZUsize!(1024 * 1024),
4326 write_buffer: NZUsize!(1024 * 1024),
4327 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4328 };
4329 let engine = Engine::new(context.with_label("engine"), cfg);
4330
4331 let (pending, recovered, resolver) = registrations
4333 .remove(validator)
4334 .expect("validator should be registered");
4335 engine.start(pending, recovered, resolver);
4336 }
4337
4338 let mut finalizers = Vec::new();
4340 for reporter in reporters.iter_mut() {
4341 let (mut latest, mut monitor) = reporter.subscribe().await;
4342 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4343 while latest < required_containers {
4344 latest = monitor.next().await.expect("event missing");
4345 }
4346 }));
4347 }
4348 join_all(finalizers).await;
4349
4350 for reporter in reporters.iter() {
4352 {
4354 let faults = reporter.faults.lock().unwrap();
4355 assert!(faults.is_empty(), "No faults should be reported");
4356 }
4357
4358 {
4360 let invalid = reporter.invalid.lock().unwrap();
4361 assert_eq!(*invalid, 0, "No invalid signatures");
4362 }
4363
4364 {
4366 let notarizations = reporter.notarizations.lock().unwrap();
4367 let finalizations = reporter.finalizations.lock().unwrap();
4368 assert!(
4369 !notarizations.is_empty() || !finalizations.is_empty(),
4370 "Certificates should be reported"
4371 );
4372 }
4373
4374 let notarizes = reporter.notarizes.lock().unwrap();
4376 let last_view = notarizes.keys().max().cloned().unwrap_or_default();
4377 for (view, payloads) in notarizes.iter() {
4378 if *view == last_view {
4379 continue; }
4381
4382 let signers: usize = payloads.values().map(|signers| signers.len()).sum();
4383
4384 if S::is_attributable() {
4386 assert!(signers > 1, "view {view}: {signers}");
4387 } else {
4388 assert_eq!(signers, 0);
4390 }
4391 }
4392
4393 let finalizes = reporter.finalizes.lock().unwrap();
4395 for (_, payloads) in finalizes.iter() {
4396 let signers: usize = payloads.values().map(|signers| signers.len()).sum();
4397
4398 if S::is_attributable() {
4400 assert!(signers > 1);
4401 } else {
4402 assert_eq!(signers, 0);
4404 }
4405 }
4406 }
4407
4408 let blocked = oracle.blocked().await.unwrap();
4410 assert!(blocked.is_empty());
4411 });
4412 }
4413
4414 #[test_traced]
4415 fn test_attributable_reporter_filtering() {
4416 attributable_reporter_filtering::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
4417 attributable_reporter_filtering::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
4418 attributable_reporter_filtering::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4419 attributable_reporter_filtering::<_, _, RoundRobin>(
4420 bls12381_multisig::fixture::<MinSig, _>,
4421 );
4422 attributable_reporter_filtering::<_, _, RoundRobin>(ed25519::fixture);
4423 attributable_reporter_filtering::<_, _, RoundRobin>(secp256r1::fixture);
4424 }
4425
4426 fn split_views_no_lockup<S, F, L>(mut fixture: F)
4427 where
4428 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4429 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4430 L: Elector<S>,
4431 {
4432 enum ParticipantType {
4445 Group1, Group2, Ignorant, Byzantine, }
4450 let get_type = |idx: usize| -> ParticipantType {
4451 match idx {
4452 0..3 => ParticipantType::Group1,
4453 3..6 => ParticipantType::Group2,
4454 6 => ParticipantType::Ignorant,
4455 7..10 => ParticipantType::Byzantine,
4456 _ => unreachable!(),
4457 }
4458 };
4459
4460 let n = 10;
4462 let quorum = quorum(n) as usize;
4463 assert_eq!(quorum, 7);
4464 let activity_timeout = ViewDelta::new(10);
4465 let skip_timeout = ViewDelta::new(5);
4466 let namespace = b"consensus".to_vec();
4467 let executor = deterministic::Runner::timed(Duration::from_secs(300));
4468 executor.start(|mut context| async move {
4469 let (network, mut oracle) = Network::new(
4471 context.with_label("network"),
4472 Config {
4473 max_size: 1024 * 1024,
4474 disconnect_on_block: false,
4475 tracked_peer_sets: None,
4476 },
4477 );
4478 network.start();
4479
4480 let Fixture {
4482 participants,
4483 schemes,
4484 ..
4485 } = fixture(&mut context, &namespace, n);
4486 let mut registrations = register_validators(&mut oracle, &participants).await;
4487
4488 let elector = L::default();
4494 let relay = Arc::new(mocks::relay::Relay::new());
4495 let mut honest_reporters = Vec::new();
4496 for (idx, validator) in participants.iter().enumerate() {
4497 let (pending, recovered, resolver) = registrations
4498 .remove(validator)
4499 .expect("validator should be registered");
4500 let participant_type = get_type(idx);
4501 if matches!(participant_type, ParticipantType::Byzantine) {
4502 let cfg = mocks::nullify_only::Config {
4504 scheme: schemes[idx].clone(),
4505 };
4506 let engine: mocks::nullify_only::NullifyOnly<_, _, Sha256> =
4507 mocks::nullify_only::NullifyOnly::new(
4508 context.with_label(&format!("byzantine_{}", *validator)),
4509 cfg,
4510 );
4511 engine.start(pending);
4512 drop(recovered);
4514 drop(resolver);
4515 } else {
4516 let reporter_config = mocks::reporter::Config {
4518 participants: participants.clone().try_into().unwrap(),
4519 scheme: schemes[idx].clone(),
4520 elector: elector.clone(),
4521 };
4522 let reporter = mocks::reporter::Reporter::new(
4523 context.with_label(&format!("reporter_{}", *validator)),
4524 reporter_config,
4525 );
4526 honest_reporters.push(reporter.clone());
4527
4528 let application_cfg = mocks::application::Config {
4529 hasher: Sha256::default(),
4530 relay: relay.clone(),
4531 me: validator.clone(),
4532 propose_latency: (10.0, 5.0),
4533 verify_latency: (10.0, 5.0),
4534 certify_latency: (10.0, 5.0),
4535 should_certify: mocks::application::Certifier::Sometimes,
4536 };
4537 let (actor, application) = mocks::application::Application::new(
4538 context.with_label(&format!("application_{}", *validator)),
4539 application_cfg,
4540 );
4541 actor.start();
4542 let blocker = oracle.control(validator.clone());
4543 let cfg = config::Config {
4544 scheme: schemes[idx].clone(),
4545 elector: elector.clone(),
4546 blocker,
4547 automaton: application.clone(),
4548 relay: application.clone(),
4549 reporter: reporter.clone(),
4550 strategy: Sequential,
4551 partition: validator.to_string(),
4552 mailbox_size: 1024,
4553 epoch: Epoch::new(333),
4554 leader_timeout: Duration::from_secs(10),
4555 notarization_timeout: Duration::from_secs(10),
4556 nullify_retry: Duration::from_secs(10),
4557 fetch_timeout: Duration::from_secs(1),
4558 activity_timeout,
4559 skip_timeout,
4560 fetch_concurrent: 4,
4561 replay_buffer: NZUsize!(1024 * 1024),
4562 write_buffer: NZUsize!(1024 * 1024),
4563 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4564 };
4565 let engine =
4566 Engine::new(context.with_label(&format!("engine_{}", *validator)), cfg);
4567 engine.start(pending, recovered, resolver);
4568 }
4569 }
4570
4571 let build_finalization = |proposal: &Proposal<D>| -> TFinalization<_, D> {
4575 let votes: Vec<_> = (0..=quorum)
4576 .map(|i| TFinalize::sign(&schemes[i], proposal.clone()).unwrap())
4577 .collect();
4578 TFinalization::from_finalizes(&schemes[0], &votes, &Sequential)
4579 .expect("finalization quorum")
4580 };
4581 let build_notarization = |proposal: &Proposal<D>| -> TNotarization<_, D> {
4583 let votes: Vec<_> = (0..=quorum)
4584 .map(|i| TNotarize::sign(&schemes[i], proposal.clone()).unwrap())
4585 .collect();
4586 TNotarization::from_notarizes(&schemes[0], &votes, &Sequential)
4587 .expect("notarization quorum")
4588 };
4589 let build_nullification = |round: Round| -> TNullification<_> {
4590 let votes: Vec<_> = (0..=quorum)
4591 .map(|i| TNullify::sign::<D>(&schemes[i], round).unwrap())
4592 .collect();
4593 TNullification::from_nullifies(&schemes[0], &votes, &Sequential)
4594 .expect("nullification quorum")
4595 };
4596 let f_view = 1;
4598 let round_f = Round::new(Epoch::new(333), View::new(f_view));
4599 let payload_b0 = Sha256::hash(b"B_F");
4600 let proposal_b0 = Proposal::new(round_f, View::new(f_view - 1), payload_b0);
4601 let payload_b1a = Sha256::hash(b"B_G1");
4602 let proposal_b1a = Proposal::new(
4603 Round::new(Epoch::new(333), View::new(f_view + 1)),
4604 View::new(f_view),
4605 payload_b1a,
4606 );
4607 let payload_b1b = Sha256::hash(b"B_G2");
4608 let proposal_b1b = Proposal::new(
4609 Round::new(Epoch::new(333), View::new(f_view + 2)),
4610 View::new(f_view),
4611 payload_b1b,
4612 );
4613
4614 let b0_notarization = build_notarization(&proposal_b0);
4616 let b0_finalization = build_finalization(&proposal_b0);
4617 let b1a_notarization = build_notarization(&proposal_b1a);
4619 let b1b_notarization = build_notarization(&proposal_b1b);
4620 let null_a = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 1)));
4622 let null_b = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 2)));
4623
4624 let injector_pk = PrivateKey::from_seed(1_000_000).public_key();
4626 let (mut injector_sender, _inj_certificate_receiver) = oracle
4627 .control(injector_pk.clone())
4628 .register(1, TEST_QUOTA)
4629 .await
4630 .unwrap();
4631
4632 let link = Link {
4634 latency: Duration::from_millis(10),
4635 jitter: Duration::from_millis(0),
4636 success_rate: 1.0,
4637 };
4638 for p in participants.iter() {
4639 oracle
4640 .add_link(injector_pk.clone(), p.clone(), link.clone())
4641 .await
4642 .unwrap();
4643 }
4644
4645 let notarization_msg = Certificate::<_, D>::Notarization(b1b_notarization);
4653 let nullification_msg = Certificate::<_, D>::Nullification(null_b.clone());
4654 for (i, participant) in participants.iter().enumerate() {
4655 let recipient = Recipients::One(participant.clone());
4656 let msg = match get_type(i) {
4657 ParticipantType::Group2 => notarization_msg.encode(),
4658 _ => nullification_msg.encode(),
4659 };
4660 injector_sender.send(recipient, msg, true).await.unwrap();
4661 }
4662 let notarization_msg = Certificate::<_, D>::Notarization(b1a_notarization);
4664 let nullification_msg = Certificate::<_, D>::Nullification(null_a.clone());
4665 for (i, participant) in participants.iter().enumerate() {
4666 let recipient = Recipients::One(participant.clone());
4667 let msg = match get_type(i) {
4668 ParticipantType::Group1 => notarization_msg.encode(),
4669 _ => nullification_msg.encode(),
4670 };
4671 injector_sender.send(recipient, msg, true).await.unwrap();
4672 }
4673 let msg = Certificate::<_, D>::Notarization(b0_notarization).encode();
4675 injector_sender
4676 .send(Recipients::All, msg, true)
4677 .await
4678 .unwrap();
4679 let msg = Certificate::<_, D>::Finalization(b0_finalization).encode();
4680 injector_sender
4681 .send(Recipients::All, msg, true)
4682 .await
4683 .unwrap();
4684
4685 debug!("waiting for certificates to propagate");
4688 context.sleep(Duration::from_secs(5)).await;
4689 debug!("certificates propagated");
4690
4691 let view = View::new(f_view);
4696 for reporter in honest_reporters.iter() {
4697 let finalizations = reporter.finalizations.lock().unwrap();
4698 assert!(finalizations.contains_key(&view));
4699 }
4700
4701 let view = View::new(f_view + 1);
4705 for (i, reporter) in honest_reporters.iter().enumerate() {
4706 let finalizations = reporter.finalizations.lock().unwrap();
4707 assert!(!finalizations.contains_key(&view));
4708 let nullifications = reporter.nullifications.lock().unwrap();
4709 let notarizations = reporter.notarizations.lock().unwrap();
4710 match get_type(i) {
4711 ParticipantType::Group1 => {
4712 assert!(notarizations.contains_key(&view));
4713 assert!(!nullifications.contains_key(&view));
4714 }
4715 _ => {
4716 assert!(nullifications.contains_key(&view));
4717 assert!(!notarizations.contains_key(&view));
4718 }
4719 }
4720 }
4721
4722 let view = View::new(f_view + 2);
4726 for (i, reporter) in honest_reporters.iter().enumerate() {
4727 let finalizations = reporter.finalizations.lock().unwrap();
4728 assert!(!finalizations.contains_key(&view));
4729 let nullifications = reporter.nullifications.lock().unwrap();
4730 let notarizations = reporter.notarizations.lock().unwrap();
4731 match get_type(i) {
4732 ParticipantType::Group2 => {
4733 assert!(notarizations.contains_key(&view));
4734 assert!(!nullifications.contains_key(&view));
4735 }
4736 _ => {
4737 assert!(nullifications.contains_key(&view));
4738 assert!(!notarizations.contains_key(&view));
4739 }
4740 }
4741 }
4742
4743 let next_view = View::new(f_view + 3);
4745 for (i, reporter) in honest_reporters.iter().enumerate() {
4746 let nullifies = reporter.nullifies.lock().unwrap();
4747 assert!(!nullifies.contains_key(&next_view), "reporter {i}");
4748 }
4749
4750 link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
4754
4755 {
4757 let target = View::new(f_view + 3);
4758 let mut finalizers = Vec::new();
4759 for reporter in honest_reporters.iter_mut() {
4760 let (mut latest, mut monitor) = reporter.subscribe().await;
4761 finalizers.push(context.with_label("resume_finalizer").spawn(
4762 move |_| async move {
4763 while latest < target {
4764 latest = monitor.next().await.expect("event missing");
4765 }
4766 },
4767 ));
4768 }
4769 join_all(finalizers).await;
4770 }
4771
4772 for reporter in honest_reporters.iter() {
4774 {
4775 let faults = reporter.faults.lock().unwrap();
4776 assert!(faults.is_empty());
4777 }
4778 {
4779 let invalid = reporter.invalid.lock().unwrap();
4780 assert_eq!(*invalid, 0);
4781 }
4782 }
4783 let blocked = oracle.blocked().await.unwrap();
4784 assert!(blocked.is_empty());
4785 });
4786 }
4787
4788 #[test_traced]
4789 fn test_split_views_no_lockup() {
4790 split_views_no_lockup::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
4791 split_views_no_lockup::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
4792 split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4793 split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
4794 split_views_no_lockup::<_, _, RoundRobin>(ed25519::fixture);
4795 split_views_no_lockup::<_, _, RoundRobin>(secp256r1::fixture);
4796 }
4797
4798 fn tle<V, L>()
4799 where
4800 V: Variant,
4801 L: Elector<bls12381_threshold::Scheme<PublicKey, V>>,
4802 {
4803 let n = 4;
4805 let namespace = b"consensus".to_vec();
4806 let activity_timeout = ViewDelta::new(100);
4807 let skip_timeout = ViewDelta::new(50);
4808 let executor = deterministic::Runner::timed(Duration::from_secs(30));
4809 executor.start(|mut context| async move {
4810 let (network, mut oracle) = Network::new(
4812 context.with_label("network"),
4813 Config {
4814 max_size: 1024 * 1024,
4815 disconnect_on_block: false,
4816 tracked_peer_sets: None,
4817 },
4818 );
4819
4820 network.start();
4822
4823 let Fixture {
4825 participants,
4826 schemes,
4827 ..
4828 } = bls12381_threshold::fixture::<V, _>(&mut context, &namespace, n);
4829 let mut registrations = register_validators(&mut oracle, &participants).await;
4830
4831 let link = Link {
4833 latency: Duration::from_millis(10),
4834 jitter: Duration::from_millis(5),
4835 success_rate: 1.0,
4836 };
4837 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4838
4839 let elector = L::default();
4841 let relay = Arc::new(mocks::relay::Relay::new());
4842 let mut reporters = Vec::new();
4843 let mut engine_handlers = Vec::new();
4844 let monitor_reporter = Arc::new(Mutex::new(None));
4845 for (idx, validator) in participants.iter().enumerate() {
4846 let context = context.with_label(&format!("validator_{}", *validator));
4848
4849 let reporter_config = mocks::reporter::Config {
4851 participants: participants.clone().try_into().unwrap(),
4852 scheme: schemes[idx].clone(),
4853 elector: elector.clone(),
4854 };
4855 let reporter =
4856 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
4857 reporters.push(reporter.clone());
4858 if idx == 0 {
4859 *monitor_reporter.lock().unwrap() = Some(reporter.clone());
4860 }
4861
4862 let application_cfg = mocks::application::Config {
4864 hasher: Sha256::default(),
4865 relay: relay.clone(),
4866 me: validator.clone(),
4867 propose_latency: (10.0, 5.0),
4868 verify_latency: (10.0, 5.0),
4869 certify_latency: (10.0, 5.0),
4870 should_certify: mocks::application::Certifier::Sometimes,
4871 };
4872 let (actor, application) = mocks::application::Application::new(
4873 context.with_label("application"),
4874 application_cfg,
4875 );
4876 actor.start();
4877 let blocker = oracle.control(validator.clone());
4878 let cfg = config::Config {
4879 scheme: schemes[idx].clone(),
4880 elector: elector.clone(),
4881 blocker,
4882 automaton: application.clone(),
4883 relay: application.clone(),
4884 reporter: reporter.clone(),
4885 strategy: Sequential,
4886 partition: validator.to_string(),
4887 mailbox_size: 1024,
4888 epoch: Epoch::new(333),
4889 leader_timeout: Duration::from_millis(100),
4890 notarization_timeout: Duration::from_millis(200),
4891 nullify_retry: Duration::from_millis(500),
4892 fetch_timeout: Duration::from_millis(100),
4893 activity_timeout,
4894 skip_timeout,
4895 fetch_concurrent: 4,
4896 replay_buffer: NZUsize!(1024 * 1024),
4897 write_buffer: NZUsize!(1024 * 1024),
4898 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4899 };
4900 let engine = Engine::new(context.with_label("engine"), cfg);
4901
4902 let (pending, recovered, resolver) = registrations
4904 .remove(validator)
4905 .expect("validator should be registered");
4906 engine_handlers.push(engine.start(pending, recovered, resolver));
4907 }
4908
4909 let target = Round::new(Epoch::new(333), View::new(10)); let message = b"Secret message for future view10"; let ciphertext = schemes[0].encrypt(&mut context, target, *message);
4915
4916 let reporter = monitor_reporter.lock().unwrap().clone().unwrap();
4918 loop {
4919 context.sleep(Duration::from_millis(100)).await;
4921 let notarizations = reporter.notarizations.lock().unwrap();
4922 let Some(notarization) = notarizations.get(&target.view()) else {
4923 continue;
4924 };
4925
4926 let seed = notarization.seed();
4928 let decrypted = seed
4929 .decrypt(&ciphertext)
4930 .expect("Decryption should succeed with valid seed signature");
4931 assert_eq!(
4932 message,
4933 decrypted.as_ref(),
4934 "Decrypted message should match original message"
4935 );
4936 break;
4937 }
4938 });
4939 }
4940
4941 #[test_traced]
4942 fn test_tle() {
4943 tle::<MinPk, Random>();
4944 tle::<MinSig, Random>();
4945 }
4946
4947 fn hailstorm<S, F, L>(
4948 seed: u64,
4949 shutdowns: usize,
4950 interval: ViewDelta,
4951 mut fixture: F,
4952 ) -> String
4953 where
4954 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4955 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4956 L: Elector<S>,
4957 {
4958 let n = 5;
4960 let activity_timeout = ViewDelta::new(10);
4961 let skip_timeout = ViewDelta::new(5);
4962 let namespace = b"consensus".to_vec();
4963 let cfg = deterministic::Config::new().with_seed(seed);
4964 let executor = deterministic::Runner::new(cfg);
4965 executor.start(|mut context| async move {
4966 let (network, mut oracle) = Network::new(
4968 context.with_label("network"),
4969 Config {
4970 max_size: 1024 * 1024,
4971 disconnect_on_block: true,
4972 tracked_peer_sets: None,
4973 },
4974 );
4975
4976 network.start();
4978
4979 let Fixture {
4981 participants,
4982 schemes,
4983 ..
4984 } = fixture(&mut context, &namespace, n);
4985 let mut registrations = register_validators(&mut oracle, &participants).await;
4986
4987 let link = Link {
4989 latency: Duration::from_millis(10),
4990 jitter: Duration::from_millis(1),
4991 success_rate: 1.0,
4992 };
4993 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4994
4995 let elector = L::default();
4997 let relay = Arc::new(mocks::relay::Relay::new());
4998 let mut reporters = BTreeMap::new();
4999 let mut engine_handlers = BTreeMap::new();
5000 for (idx, validator) in participants.iter().enumerate() {
5001 let context = context.with_label(&format!("validator_{}", *validator));
5003
5004 let reporter_config = mocks::reporter::Config {
5006 participants: participants.clone().try_into().unwrap(),
5007 scheme: schemes[idx].clone(),
5008 elector: elector.clone(),
5009 };
5010 let reporter =
5011 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5012 reporters.insert(idx, reporter.clone());
5013 let application_cfg = mocks::application::Config {
5014 hasher: Sha256::default(),
5015 relay: relay.clone(),
5016 me: validator.clone(),
5017 propose_latency: (10.0, 5.0),
5018 verify_latency: (10.0, 5.0),
5019 certify_latency: (10.0, 5.0),
5020 should_certify: mocks::application::Certifier::Sometimes,
5021 };
5022 let (actor, application) = mocks::application::Application::new(
5023 context.with_label("application"),
5024 application_cfg,
5025 );
5026 actor.start();
5027 let blocker = oracle.control(validator.clone());
5028 let cfg = config::Config {
5029 scheme: schemes[idx].clone(),
5030 elector: elector.clone(),
5031 blocker,
5032 automaton: application.clone(),
5033 relay: application.clone(),
5034 reporter: reporter.clone(),
5035 strategy: Sequential,
5036 partition: validator.to_string(),
5037 mailbox_size: 1024,
5038 epoch: Epoch::new(333),
5039 leader_timeout: Duration::from_secs(1),
5040 notarization_timeout: Duration::from_secs(2),
5041 nullify_retry: Duration::from_secs(10),
5042 fetch_timeout: Duration::from_secs(1),
5043 activity_timeout,
5044 skip_timeout,
5045 fetch_concurrent: 4,
5046 replay_buffer: NZUsize!(1024 * 1024),
5047 write_buffer: NZUsize!(1024 * 1024),
5048 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5049 };
5050 let engine = Engine::new(context.with_label("engine"), cfg);
5051
5052 let (pending, recovered, resolver) = registrations
5054 .remove(validator)
5055 .expect("validator should be registered");
5056 engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
5057 }
5058
5059 let mut target = View::zero();
5061 for i in 0..shutdowns {
5062 target = target.saturating_add(interval);
5064
5065 let mut finalizers = Vec::new();
5067 for (_, reporter) in reporters.iter_mut() {
5068 let (mut latest, mut monitor) = reporter.subscribe().await;
5069 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5070 while latest < target {
5071 latest = monitor.next().await.expect("event missing");
5072 }
5073 }));
5074 }
5075 join_all(finalizers).await;
5076 target = target.saturating_add(interval);
5077
5078 let idx = context.gen_range(0..engine_handlers.len());
5080 let validator = &participants[idx];
5081 let handle = engine_handlers.remove(&idx).unwrap();
5082 handle.abort();
5083 let _ = handle.await;
5084 let selected_reporter = reporters.remove(&idx).unwrap();
5085 info!(idx, ?validator, "shutdown validator");
5086
5087 let mut finalizers = Vec::new();
5089 for (_, reporter) in reporters.iter_mut() {
5090 let (mut latest, mut monitor) = reporter.subscribe().await;
5091 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5092 while latest < target {
5093 latest = monitor.next().await.expect("event missing");
5094 }
5095 }));
5096 }
5097 join_all(finalizers).await;
5098 target = target.saturating_add(interval);
5099
5100 info!(idx, ?validator, "restarting validator");
5102 let context =
5103 context.with_label(&format!("validator_{}_restarted_{}", *validator, i));
5104
5105 let (pending, recovered, resolver) =
5107 register_validator(&mut oracle, validator.clone()).await;
5108 let application_cfg = mocks::application::Config {
5109 hasher: Sha256::default(),
5110 relay: relay.clone(),
5111 me: validator.clone(),
5112 propose_latency: (10.0, 5.0),
5113 verify_latency: (10.0, 5.0),
5114 certify_latency: (10.0, 5.0),
5115 should_certify: mocks::application::Certifier::Sometimes,
5116 };
5117 let (actor, application) = mocks::application::Application::new(
5118 context.with_label("application"),
5119 application_cfg,
5120 );
5121 actor.start();
5122 reporters.insert(idx, selected_reporter.clone());
5123 let blocker = oracle.control(validator.clone());
5124 let cfg = config::Config {
5125 scheme: schemes[idx].clone(),
5126 elector: elector.clone(),
5127 blocker,
5128 automaton: application.clone(),
5129 relay: application.clone(),
5130 reporter: selected_reporter,
5131 strategy: Sequential,
5132 partition: validator.to_string(),
5133 mailbox_size: 1024,
5134 epoch: Epoch::new(333),
5135 leader_timeout: Duration::from_secs(1),
5136 notarization_timeout: Duration::from_secs(2),
5137 nullify_retry: Duration::from_secs(10),
5138 fetch_timeout: Duration::from_secs(1),
5139 activity_timeout,
5140 skip_timeout,
5141 fetch_concurrent: 4,
5142 replay_buffer: NZUsize!(1024 * 1024),
5143 write_buffer: NZUsize!(1024 * 1024),
5144 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5145 };
5146 let engine = Engine::new(context.with_label("engine"), cfg);
5147 engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
5148
5149 let mut finalizers = Vec::new();
5151 for (_, reporter) in reporters.iter_mut() {
5152 let (mut latest, mut monitor) = reporter.subscribe().await;
5153 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5154 while latest < target {
5155 latest = monitor.next().await.expect("event missing");
5156 }
5157 }));
5158 }
5159 join_all(finalizers).await;
5160 info!(idx, ?validator, "validator recovered");
5161 }
5162
5163 let latest_complete = target.saturating_sub(activity_timeout);
5165 for (_, reporter) in reporters.iter() {
5166 {
5168 let faults = reporter.faults.lock().unwrap();
5169 assert!(faults.is_empty());
5170 }
5171
5172 {
5174 let invalid = reporter.invalid.lock().unwrap();
5175 assert_eq!(*invalid, 0);
5176 }
5177
5178 let mut notarized = HashMap::new();
5180 let mut finalized = HashMap::new();
5181 {
5182 let notarizes = reporter.notarizes.lock().unwrap();
5183 for view in View::range(View::new(1), latest_complete) {
5184 let Some(payloads) = notarizes.get(&view) else {
5186 continue;
5187 };
5188 if payloads.len() > 1 {
5189 panic!("view: {view}");
5190 }
5191 let (digest, _) = payloads.iter().next().unwrap();
5192 notarized.insert(view, *digest);
5193 }
5194 }
5195 {
5196 let notarizations = reporter.notarizations.lock().unwrap();
5197 for view in View::range(View::new(1), latest_complete) {
5198 let Some(notarization) = notarizations.get(&view) else {
5200 continue;
5201 };
5202 let Some(digest) = notarized.get(&view) else {
5203 continue;
5204 };
5205 assert_eq!(¬arization.proposal.payload, digest);
5206 }
5207 }
5208 {
5209 let finalizes = reporter.finalizes.lock().unwrap();
5210 for view in View::range(View::new(1), latest_complete) {
5211 let Some(payloads) = finalizes.get(&view) else {
5213 continue;
5214 };
5215 if payloads.len() > 1 {
5216 panic!("view: {view}");
5217 }
5218 let (digest, _) = payloads.iter().next().unwrap();
5219 finalized.insert(view, *digest);
5220
5221 if view > latest_complete {
5223 continue;
5224 }
5225
5226 let nullifies = reporter.nullifies.lock().unwrap();
5228 let Some(nullifies) = nullifies.get(&view) else {
5229 continue;
5230 };
5231 for (_, finalizers) in payloads.iter() {
5232 for finalizer in finalizers.iter() {
5233 if nullifies.contains(finalizer) {
5234 panic!("should not nullify and finalize at same view");
5235 }
5236 }
5237 }
5238 }
5239 }
5240 {
5241 let finalizations = reporter.finalizations.lock().unwrap();
5242 for view in View::range(View::new(1), latest_complete) {
5243 let Some(finalization) = finalizations.get(&view) else {
5245 continue;
5246 };
5247 let Some(digest) = finalized.get(&view) else {
5248 continue;
5249 };
5250 assert_eq!(&finalization.proposal.payload, digest);
5251 }
5252 }
5253 }
5254
5255 let blocked = oracle.blocked().await.unwrap();
5257 assert!(blocked.is_empty());
5258
5259 context.auditor().state()
5261 })
5262 }
5263
5264 #[test_group("slow")]
5265 #[test_traced]
5266 fn test_hailstorm_bls12381_threshold_min_pk() {
5267 assert_eq!(
5268 hailstorm::<_, _, Random>(
5269 0,
5270 10,
5271 ViewDelta::new(15),
5272 bls12381_threshold::fixture::<MinPk, _>
5273 ),
5274 hailstorm::<_, _, Random>(
5275 0,
5276 10,
5277 ViewDelta::new(15),
5278 bls12381_threshold::fixture::<MinPk, _>
5279 ),
5280 );
5281 }
5282
5283 #[test_group("slow")]
5284 #[test_traced]
5285 fn test_hailstorm_bls12381_threshold_min_sig() {
5286 assert_eq!(
5287 hailstorm::<_, _, Random>(
5288 0,
5289 10,
5290 ViewDelta::new(15),
5291 bls12381_threshold::fixture::<MinSig, _>
5292 ),
5293 hailstorm::<_, _, Random>(
5294 0,
5295 10,
5296 ViewDelta::new(15),
5297 bls12381_threshold::fixture::<MinSig, _>
5298 ),
5299 );
5300 }
5301
5302 #[test_group("slow")]
5303 #[test_traced]
5304 fn test_hailstorm_bls12381_multisig_min_pk() {
5305 assert_eq!(
5306 hailstorm::<_, _, RoundRobin>(
5307 0,
5308 10,
5309 ViewDelta::new(15),
5310 bls12381_multisig::fixture::<MinPk, _>
5311 ),
5312 hailstorm::<_, _, RoundRobin>(
5313 0,
5314 10,
5315 ViewDelta::new(15),
5316 bls12381_multisig::fixture::<MinPk, _>
5317 ),
5318 );
5319 }
5320
5321 #[test_group("slow")]
5322 #[test_traced]
5323 fn test_hailstorm_bls12381_multisig_min_sig() {
5324 assert_eq!(
5325 hailstorm::<_, _, RoundRobin>(
5326 0,
5327 10,
5328 ViewDelta::new(15),
5329 bls12381_multisig::fixture::<MinSig, _>
5330 ),
5331 hailstorm::<_, _, RoundRobin>(
5332 0,
5333 10,
5334 ViewDelta::new(15),
5335 bls12381_multisig::fixture::<MinSig, _>
5336 ),
5337 );
5338 }
5339
5340 #[test_group("slow")]
5341 #[test_traced]
5342 fn test_hailstorm_ed25519() {
5343 assert_eq!(
5344 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture),
5345 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture)
5346 );
5347 }
5348
5349 #[test_group("slow")]
5350 #[test_traced]
5351 fn test_hailstorm_secp256r1() {
5352 assert_eq!(
5353 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture),
5354 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture)
5355 );
5356 }
5357
5358 fn twins<S, F, L>(seed: u64, n: u32, strategy: Strategy, link: Link, mut fixture: F)
5360 where
5361 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5362 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5363 L: Elector<S>,
5364 {
5365 let faults = N3f1::max_faults(n);
5366 let required_containers = View::new(100);
5367 let activity_timeout = ViewDelta::new(10);
5368 let skip_timeout = ViewDelta::new(5);
5369 let namespace = b"consensus".to_vec();
5370 let cfg = deterministic::Config::new()
5371 .with_seed(seed)
5372 .with_timeout(Some(Duration::from_secs(900)));
5373 let executor = deterministic::Runner::new(cfg);
5374 executor.start(|mut context| async move {
5375 let (network, mut oracle) = Network::new(
5376 context.with_label("network"),
5377 Config {
5378 max_size: 1024 * 1024,
5379 disconnect_on_block: false,
5380 tracked_peer_sets: None,
5381 },
5382 );
5383 network.start();
5384
5385 let Fixture {
5386 participants,
5387 schemes,
5388 ..
5389 } = fixture(&mut context, &namespace, n);
5390 let participants: Arc<[_]> = participants.into();
5391 let mut registrations = register_validators(&mut oracle, &participants).await;
5392 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5393
5394 let elector = L::default();
5397 let relay = Arc::new(mocks::relay::Relay::new());
5398 let mut reporters = Vec::new();
5399 let mut engine_handlers = Vec::new();
5400
5401 for (idx, validator) in participants.iter().enumerate().take(faults as usize) {
5403 let (
5404 (vote_sender, vote_receiver),
5405 (certificate_sender, certificate_receiver),
5406 (resolver_sender, resolver_receiver),
5407 ) = registrations
5408 .remove(validator)
5409 .expect("validator should be registered");
5410
5411 let make_vote_forwarder = || {
5413 let participants = participants.clone();
5414 move |origin: SplitOrigin, _: &Recipients<_>, message: &Bytes| {
5415 let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
5416 let (primary, secondary) =
5417 strategy.partitions(msg.view(), participants.as_ref());
5418 match origin {
5419 SplitOrigin::Primary => Some(Recipients::Some(primary)),
5420 SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
5421 }
5422 }
5423 };
5424 let make_certificate_forwarder = || {
5426 let codec = schemes[idx].certificate_codec_config();
5427 let participants = participants.clone();
5428 move |origin: SplitOrigin, _: &Recipients<_>, message: &Bytes| {
5429 let msg: Certificate<S, D> =
5430 Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
5431 let (primary, secondary) =
5432 strategy.partitions(msg.view(), participants.as_ref());
5433 match origin {
5434 SplitOrigin::Primary => Some(Recipients::Some(primary)),
5435 SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
5436 }
5437 }
5438 };
5439 let make_drop_forwarder =
5440 || move |_: SplitOrigin, _: &Recipients<_>, _: &Bytes| None;
5441
5442 let make_vote_router = || {
5444 let participants = participants.clone();
5445 move |(sender, message): &(_, Bytes)| {
5446 let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
5447 strategy.route(msg.view(), sender, participants.as_ref())
5448 }
5449 };
5450 let make_certificate_router = || {
5452 let codec = schemes[idx].certificate_codec_config();
5453 let participants = participants.clone();
5454 move |(sender, message): &(_, Bytes)| {
5455 let msg: Certificate<S, D> =
5456 Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
5457 strategy.route(msg.view(), sender, participants.as_ref())
5458 }
5459 };
5460 let make_drop_router = || move |(_, _): &(_, _)| SplitTarget::None;
5461
5462 let (vote_sender_primary, vote_sender_secondary) =
5464 vote_sender.split_with(make_vote_forwarder());
5465 let (vote_receiver_primary, vote_receiver_secondary) = vote_receiver.split_with(
5466 context.with_label(&format!("pending_split_{idx}")),
5467 make_vote_router(),
5468 );
5469 let (certificate_sender_primary, certificate_sender_secondary) =
5470 certificate_sender.split_with(make_certificate_forwarder());
5471 let (certificate_receiver_primary, certificate_receiver_secondary) =
5472 certificate_receiver.split_with(
5473 context.with_label(&format!("recovered_split_{idx}")),
5474 make_certificate_router(),
5475 );
5476
5477 let (resolver_sender_primary, resolver_sender_secondary) =
5479 resolver_sender.split_with(make_drop_forwarder());
5480 let (resolver_receiver_primary, resolver_receiver_secondary) = resolver_receiver
5481 .split_with(
5482 context.with_label(&format!("resolver_split_{idx}")),
5483 make_drop_router(),
5484 );
5485
5486 for (twin_label, pending, recovered, resolver) in [
5487 (
5488 "primary",
5489 (vote_sender_primary, vote_receiver_primary),
5490 (certificate_sender_primary, certificate_receiver_primary),
5491 (resolver_sender_primary, resolver_receiver_primary),
5492 ),
5493 (
5494 "secondary",
5495 (vote_sender_secondary, vote_receiver_secondary),
5496 (certificate_sender_secondary, certificate_receiver_secondary),
5497 (resolver_sender_secondary, resolver_receiver_secondary),
5498 ),
5499 ] {
5500 let label = format!("twin_{idx}_{twin_label}");
5501 let context = context.with_label(&label);
5502
5503 let reporter_config = mocks::reporter::Config {
5504 participants: participants.as_ref().try_into().unwrap(),
5505 scheme: schemes[idx].clone(),
5506 elector: elector.clone(),
5507 };
5508 let reporter = mocks::reporter::Reporter::new(
5509 context.with_label("reporter"),
5510 reporter_config,
5511 );
5512 reporters.push(reporter.clone());
5513
5514 let application_cfg = mocks::application::Config {
5515 hasher: Sha256::default(),
5516 relay: relay.clone(),
5517 me: validator.clone(),
5518 propose_latency: (10.0, 5.0),
5519 verify_latency: (10.0, 5.0),
5520 certify_latency: (10.0, 5.0),
5521 should_certify: mocks::application::Certifier::Sometimes,
5522 };
5523 let (actor, application) = mocks::application::Application::new(
5524 context.with_label("application"),
5525 application_cfg,
5526 );
5527 actor.start();
5528
5529 let blocker = oracle.control(validator.clone());
5530 let cfg = config::Config {
5531 scheme: schemes[idx].clone(),
5532 elector: elector.clone(),
5533 blocker,
5534 automaton: application.clone(),
5535 relay: application.clone(),
5536 reporter: reporter.clone(),
5537 strategy: Sequential,
5538 partition: label,
5539 mailbox_size: 1024,
5540 epoch: Epoch::new(333),
5541 leader_timeout: Duration::from_secs(1),
5542 notarization_timeout: Duration::from_secs(2),
5543 nullify_retry: Duration::from_secs(10),
5544 fetch_timeout: Duration::from_secs(1),
5545 activity_timeout,
5546 skip_timeout,
5547 fetch_concurrent: 4,
5548 replay_buffer: NZUsize!(1024 * 1024),
5549 write_buffer: NZUsize!(1024 * 1024),
5550 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5551 };
5552 let engine = Engine::new(context.with_label("engine"), cfg);
5553 engine_handlers.push(engine.start(pending, recovered, resolver));
5554 }
5555 }
5556
5557 for (idx, validator) in participants.iter().enumerate().skip(faults as usize) {
5559 let label = format!("honest_{idx}");
5560 let context = context.with_label(&label);
5561
5562 let reporter_config = mocks::reporter::Config {
5563 participants: participants.as_ref().try_into().unwrap(),
5564 scheme: schemes[idx].clone(),
5565 elector: elector.clone(),
5566 };
5567 let reporter =
5568 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5569 reporters.push(reporter.clone());
5570
5571 let application_cfg = mocks::application::Config {
5572 hasher: Sha256::default(),
5573 relay: relay.clone(),
5574 me: validator.clone(),
5575 propose_latency: (10.0, 5.0),
5576 verify_latency: (10.0, 5.0),
5577 certify_latency: (10.0, 5.0),
5578 should_certify: mocks::application::Certifier::Sometimes,
5579 };
5580 let (actor, application) = mocks::application::Application::new(
5581 context.with_label("application"),
5582 application_cfg,
5583 );
5584 actor.start();
5585
5586 let blocker = oracle.control(validator.clone());
5587 let cfg = config::Config {
5588 scheme: schemes[idx].clone(),
5589 elector: elector.clone(),
5590 blocker,
5591 automaton: application.clone(),
5592 relay: application.clone(),
5593 reporter: reporter.clone(),
5594 strategy: Sequential,
5595 partition: label,
5596 mailbox_size: 1024,
5597 epoch: Epoch::new(333),
5598 leader_timeout: Duration::from_secs(1),
5599 notarization_timeout: Duration::from_secs(2),
5600 nullify_retry: Duration::from_secs(10),
5601 fetch_timeout: Duration::from_secs(1),
5602 activity_timeout,
5603 skip_timeout,
5604 fetch_concurrent: 4,
5605 replay_buffer: NZUsize!(1024 * 1024),
5606 write_buffer: NZUsize!(1024 * 1024),
5607 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5608 };
5609 let engine = Engine::new(context.with_label("engine"), cfg);
5610
5611 let (pending, recovered, resolver) = registrations
5612 .remove(validator)
5613 .expect("validator should be registered");
5614 engine_handlers.push(engine.start(pending, recovered, resolver));
5615 }
5616
5617 let mut finalizers = Vec::new();
5619 for reporter in reporters.iter_mut() {
5620 let (mut latest, mut monitor) = reporter.subscribe().await;
5621 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5622 while latest < required_containers {
5623 latest = monitor.next().await.expect("event missing");
5624 }
5625 }));
5626 }
5627 join_all(finalizers).await;
5628
5629 let honest_start = faults as usize * 2; let mut finalized_at_view: BTreeMap<View, D> = BTreeMap::new();
5632 for reporter in reporters.iter().skip(honest_start) {
5633 let finalizations = reporter.finalizations.lock().unwrap();
5634 for (view, finalization) in finalizations.iter() {
5635 let digest = finalization.proposal.payload;
5636 if let Some(existing) = finalized_at_view.get(view) {
5637 assert_eq!(
5638 existing, &digest,
5639 "safety violation: conflicting finalizations at view {view}"
5640 );
5641 } else {
5642 finalized_at_view.insert(*view, digest);
5643 }
5644 }
5645 }
5646
5647 for reporter in reporters.iter().skip(honest_start) {
5649 let invalid = reporter.invalid.lock().unwrap();
5650 assert_eq!(*invalid, 0, "invalid signatures detected");
5651 }
5652
5653 let twin_identities: Vec<_> = participants.iter().take(faults as usize).collect();
5655 for reporter in reporters.iter().skip(honest_start) {
5656 let faults = reporter.faults.lock().unwrap();
5657 for (faulter, _) in faults.iter() {
5658 assert!(
5659 twin_identities.contains(&faulter),
5660 "fault from non-twin participant"
5661 );
5662 }
5663 }
5664
5665 let blocked = oracle.blocked().await.unwrap();
5667 for (_, faulter) in blocked.iter() {
5668 assert!(
5669 twin_identities.contains(&faulter),
5670 "blocked connection from non-twin participant"
5671 );
5672 }
5673 });
5674 }
5675
5676 fn test_twins<S, F, L>(mut fixture: F)
5677 where
5678 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5679 F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5680 L: Elector<S>,
5681 {
5682 for strategy in [
5683 Strategy::View,
5684 Strategy::Fixed(3),
5685 Strategy::Isolate(4),
5686 Strategy::Broadcast,
5687 Strategy::Shuffle,
5688 ] {
5689 for link in [
5690 Link {
5691 latency: Duration::from_millis(10),
5692 jitter: Duration::from_millis(1),
5693 success_rate: 1.0,
5694 },
5695 Link {
5696 latency: Duration::from_millis(200),
5697 jitter: Duration::from_millis(150),
5698 success_rate: 0.75,
5699 },
5700 ] {
5701 twins::<S, _, L>(0, 5, strategy, link, |namespace, context, n| {
5702 fixture(namespace, context, n)
5703 });
5704 }
5705 }
5706 }
5707
5708 #[test_group("slow")]
5709 #[test_traced]
5710 fn test_twins_multisig_min_pk() {
5711 test_twins::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
5712 }
5713
5714 #[test_group("slow")]
5715 #[test_traced]
5716 fn test_twins_multisig_min_sig() {
5717 test_twins::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
5718 }
5719
5720 #[test_group("slow")]
5721 #[test_traced]
5722 fn test_twins_threshold_min_pk() {
5723 test_twins::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
5724 }
5725
5726 #[test_group("slow")]
5727 #[test_traced]
5728 fn test_twins_threshold_min_sig() {
5729 test_twins::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
5730 }
5731
5732 #[test_group("slow")]
5733 #[test_traced]
5734 fn test_twins_ed25519() {
5735 test_twins::<_, _, RoundRobin>(ed25519::fixture);
5736 }
5737
5738 #[test_group("slow")]
5739 #[test_traced]
5740 fn test_twins_secp256r1() {
5741 test_twins::<_, _, RoundRobin>(secp256r1::fixture);
5742 }
5743
5744 #[test_group("slow")]
5745 #[test_traced]
5746 fn test_twins_large_view() {
5747 twins::<_, _, Random>(
5748 0,
5749 10,
5750 Strategy::View,
5751 Link {
5752 latency: Duration::from_millis(200),
5753 jitter: Duration::from_millis(150),
5754 success_rate: 0.75,
5755 },
5756 bls12381_threshold::fixture::<MinPk, _>,
5757 );
5758 }
5759
5760 #[test_group("slow")]
5761 #[test_traced]
5762 fn test_twins_large_shuffle() {
5763 twins::<_, _, Random>(
5764 0,
5765 10,
5766 Strategy::Shuffle,
5767 Link {
5768 latency: Duration::from_millis(200),
5769 jitter: Duration::from_millis(150),
5770 success_rate: 0.75,
5771 },
5772 bls12381_threshold::fixture::<MinPk, _>,
5773 );
5774 }
5775}