1pub mod elector;
227pub mod scheme;
228pub mod types;
229
230cfg_if::cfg_if! {
231 if #[cfg(not(target_arch = "wasm32"))] {
232 mod actors;
233 pub mod config;
234 pub use config::Config;
235 mod engine;
236 pub use engine::Engine;
237 mod metrics;
238 }
239}
240
241#[cfg(any(test, feature = "fuzz"))]
242pub mod mocks;
243
244use crate::types::{View, ViewDelta};
245
246pub(crate) const fn min_active(activity_timeout: ViewDelta, last_finalized: View) -> View {
248 last_finalized.saturating_sub(activity_timeout)
249}
250
251pub(crate) fn interesting(
255 activity_timeout: ViewDelta,
256 last_finalized: View,
257 current: View,
258 pending: View,
259 allow_future: bool,
260) -> bool {
261 if pending < min_active(activity_timeout, last_finalized) {
262 return false;
263 }
264 if !allow_future && pending > current.next() {
265 return false;
266 }
267 true
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273 use crate::{
274 simplex::{
275 elector::{Config as Elector, Random, RoundRobin},
276 mocks::twins::Strategy,
277 scheme::{
278 bls12381_multisig, bls12381_threshold, bls12381_threshold::Seedable, ed25519,
279 Scheme,
280 },
281 types::{
282 Certificate, Finalization as TFinalization, Finalize as TFinalize,
283 Notarization as TNotarization, Notarize as TNotarize,
284 Nullification as TNullification, Nullify as TNullify, Proposal, Vote,
285 },
286 },
287 types::{Epoch, Round},
288 Monitor, Viewable,
289 };
290 use bytes::Bytes;
291 use commonware_codec::{Decode, DecodeExt, Encode};
292 use commonware_cryptography::{
293 bls12381::primitives::variant::{MinPk, MinSig, Variant},
294 certificate::mocks::Fixture,
295 ed25519::{PrivateKey, PublicKey},
296 sha256::{Digest as Sha256Digest, Digest as D},
297 Hasher as _, Sha256, Signer as _,
298 };
299 use commonware_macros::{select, test_group, test_traced};
300 use commonware_p2p::{
301 simulated::{Config, Link, Network, Oracle, Receiver, Sender, SplitOrigin, SplitTarget},
302 Recipients, Sender as _,
303 };
304 use commonware_runtime::{
305 buffer::PoolRef, deterministic, Clock, Metrics, Quota, Runner, Spawner,
306 };
307 use commonware_utils::{max_faults, quorum, NZUsize};
308 use engine::Engine;
309 use futures::{future::join_all, StreamExt};
310 use rand::{rngs::StdRng, Rng as _, SeedableRng as _};
311 use std::{
312 collections::{BTreeMap, HashMap},
313 num::{NonZeroU32, NonZeroUsize},
314 sync::{Arc, Mutex},
315 time::Duration,
316 };
317 use tracing::{debug, info, warn};
318 use types::Activity;
319
320 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
321 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
322 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
323
324 async fn register_validator(
326 oracle: &mut Oracle<PublicKey, deterministic::Context>,
327 validator: PublicKey,
328 ) -> (
329 (
330 Sender<PublicKey, deterministic::Context>,
331 Receiver<PublicKey>,
332 ),
333 (
334 Sender<PublicKey, deterministic::Context>,
335 Receiver<PublicKey>,
336 ),
337 (
338 Sender<PublicKey, deterministic::Context>,
339 Receiver<PublicKey>,
340 ),
341 ) {
342 let mut control = oracle.control(validator.clone());
343 let (vote_sender, vote_receiver) = control.register(0, TEST_QUOTA).await.unwrap();
344 let (certificate_sender, certificate_receiver) =
345 control.register(1, TEST_QUOTA).await.unwrap();
346 let (resolver_sender, resolver_receiver) = control.register(2, TEST_QUOTA).await.unwrap();
347 (
348 (vote_sender, vote_receiver),
349 (certificate_sender, certificate_receiver),
350 (resolver_sender, resolver_receiver),
351 )
352 }
353
354 async fn register_validators(
356 oracle: &mut Oracle<PublicKey, deterministic::Context>,
357 validators: &[PublicKey],
358 ) -> HashMap<
359 PublicKey,
360 (
361 (
362 Sender<PublicKey, deterministic::Context>,
363 Receiver<PublicKey>,
364 ),
365 (
366 Sender<PublicKey, deterministic::Context>,
367 Receiver<PublicKey>,
368 ),
369 (
370 Sender<PublicKey, deterministic::Context>,
371 Receiver<PublicKey>,
372 ),
373 ),
374 > {
375 let mut registrations = HashMap::new();
376 for validator in validators.iter() {
377 let registration = register_validator(oracle, validator.clone()).await;
378 registrations.insert(validator.clone(), registration);
379 }
380 registrations
381 }
382
383 enum Action {
385 Link(Link),
386 Update(Link), Unlink,
388 }
389
390 async fn link_validators(
396 oracle: &mut Oracle<PublicKey, deterministic::Context>,
397 validators: &[PublicKey],
398 action: Action,
399 restrict_to: Option<fn(usize, usize, usize) -> bool>,
400 ) {
401 for (i1, v1) in validators.iter().enumerate() {
402 for (i2, v2) in validators.iter().enumerate() {
403 if v2 == v1 {
405 continue;
406 }
407
408 if let Some(f) = restrict_to {
410 if !f(validators.len(), i1, i2) {
411 continue;
412 }
413 }
414
415 match action {
417 Action::Update(_) | Action::Unlink => {
418 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
419 }
420 _ => {}
421 }
422
423 match action {
425 Action::Link(ref link) | Action::Update(ref link) => {
426 oracle
427 .add_link(v1.clone(), v2.clone(), link.clone())
428 .await
429 .unwrap();
430 }
431 _ => {}
432 }
433 }
434 }
435 }
436
437 fn all_online<S, F, L>(mut fixture: F)
438 where
439 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
440 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
441 L: Elector<S>,
442 {
443 let n = 5;
445 let quorum = quorum(n) as usize;
446 let required_containers = View::new(100);
447 let activity_timeout = ViewDelta::new(10);
448 let skip_timeout = ViewDelta::new(5);
449 let namespace = b"consensus".to_vec();
450 let executor = deterministic::Runner::timed(Duration::from_secs(300));
451 executor.start(|mut context| async move {
452 let (network, mut oracle) = Network::new(
454 context.with_label("network"),
455 Config {
456 max_size: 1024 * 1024,
457 disconnect_on_block: true,
458 tracked_peer_sets: None,
459 },
460 );
461
462 network.start();
464
465 let Fixture {
467 participants,
468 schemes,
469 ..
470 } = fixture(&mut context, n);
471 let mut registrations = register_validators(&mut oracle, &participants).await;
472
473 let link = Link {
475 latency: Duration::from_millis(10),
476 jitter: Duration::from_millis(1),
477 success_rate: 1.0,
478 };
479 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
480
481 let elector = L::default();
483 let relay = Arc::new(mocks::relay::Relay::new());
484 let mut reporters = Vec::new();
485 let mut engine_handlers = Vec::new();
486 for (idx, validator) in participants.iter().enumerate() {
487 let context = context.with_label(&format!("validator_{}", *validator));
489
490 let reporter_config = mocks::reporter::Config {
492 namespace: namespace.clone(),
493 participants: participants.clone().try_into().unwrap(),
494 scheme: schemes[idx].clone(),
495 elector: elector.clone(),
496 };
497 let reporter =
498 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
499 reporters.push(reporter.clone());
500 let application_cfg = mocks::application::Config {
501 hasher: Sha256::default(),
502 relay: relay.clone(),
503 me: validator.clone(),
504 propose_latency: (10.0, 5.0),
505 verify_latency: (10.0, 5.0),
506 certify_latency: (10.0, 5.0),
507 should_certify: mocks::application::Certifier::Sometimes,
508 };
509 let (actor, application) = mocks::application::Application::new(
510 context.with_label("application"),
511 application_cfg,
512 );
513 actor.start();
514 let blocker = oracle.control(validator.clone());
515 let cfg = config::Config {
516 scheme: schemes[idx].clone(),
517 elector: elector.clone(),
518 blocker,
519 automaton: application.clone(),
520 relay: application.clone(),
521 reporter: reporter.clone(),
522 partition: validator.to_string(),
523 mailbox_size: 1024,
524 epoch: Epoch::new(333),
525 namespace: namespace.clone(),
526 leader_timeout: Duration::from_secs(1),
527 notarization_timeout: Duration::from_secs(2),
528 nullify_retry: Duration::from_secs(10),
529 fetch_timeout: Duration::from_secs(1),
530 activity_timeout,
531 skip_timeout,
532 fetch_concurrent: 4,
533 replay_buffer: NZUsize!(1024 * 1024),
534 write_buffer: NZUsize!(1024 * 1024),
535 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
536 };
537 let engine = Engine::new(context.with_label("engine"), cfg);
538
539 let (pending, recovered, resolver) = registrations
541 .remove(validator)
542 .expect("validator should be registered");
543 engine_handlers.push(engine.start(pending, recovered, resolver));
544 }
545
546 let mut finalizers = Vec::new();
548 for reporter in reporters.iter_mut() {
549 let (mut latest, mut monitor) = reporter.subscribe().await;
550 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
551 while latest < required_containers {
552 latest = monitor.next().await.expect("event missing");
553 }
554 }));
555 }
556 join_all(finalizers).await;
557
558 let latest_complete = required_containers.saturating_sub(activity_timeout);
560 for reporter in reporters.iter() {
561 {
563 let faults = reporter.faults.lock().unwrap();
564 assert!(faults.is_empty());
565 }
566
567 {
569 let invalid = reporter.invalid.lock().unwrap();
570 assert_eq!(*invalid, 0);
571 }
572
573 {
575 let certified = reporter.certified.lock().unwrap();
576 for view in View::range(View::new(1), latest_complete) {
577 if !certified.contains(&view) {
579 panic!("view: {view}");
580 }
581 }
582 }
583
584 let mut notarized = HashMap::new();
586 let mut finalized = HashMap::new();
587 {
588 let notarizes = reporter.notarizes.lock().unwrap();
589 for view in View::range(View::new(1), latest_complete) {
590 let Some(payloads) = notarizes.get(&view) else {
592 continue;
593 };
594 if payloads.len() > 1 {
595 panic!("view: {view}");
596 }
597 let (digest, notarizers) = payloads.iter().next().unwrap();
598 notarized.insert(view, *digest);
599
600 if notarizers.len() < quorum {
601 panic!("view: {view}");
604 }
605 }
606 }
607 {
608 let notarizations = reporter.notarizations.lock().unwrap();
609 for view in View::range(View::new(1), latest_complete) {
610 let Some(notarization) = notarizations.get(&view) else {
612 continue;
613 };
614 let Some(digest) = notarized.get(&view) else {
615 continue;
616 };
617 assert_eq!(¬arization.proposal.payload, digest);
618 }
619 }
620 {
621 let finalizes = reporter.finalizes.lock().unwrap();
622 for view in View::range(View::new(1), latest_complete) {
623 let Some(payloads) = finalizes.get(&view) else {
625 continue;
626 };
627 if payloads.len() > 1 {
628 panic!("view: {view}");
629 }
630 let (digest, finalizers) = payloads.iter().next().unwrap();
631 finalized.insert(view, *digest);
632
633 if view > latest_complete {
635 continue;
636 }
637
638 if finalizers.len() < quorum {
640 panic!("view: {view}");
643 }
644
645 let nullifies = reporter.nullifies.lock().unwrap();
647 let Some(nullifies) = nullifies.get(&view) else {
648 continue;
649 };
650 for (_, finalizers) in payloads.iter() {
651 for finalizer in finalizers.iter() {
652 if nullifies.contains(finalizer) {
653 panic!("should not nullify and finalize at same view");
654 }
655 }
656 }
657 }
658 }
659 {
660 let finalizations = reporter.finalizations.lock().unwrap();
661 for view in View::range(View::new(1), latest_complete) {
662 let Some(finalization) = finalizations.get(&view) else {
664 continue;
665 };
666 let Some(digest) = finalized.get(&view) else {
667 continue;
668 };
669 assert_eq!(&finalization.proposal.payload, digest);
670 }
671 }
672 }
673
674 let blocked = oracle.blocked().await.unwrap();
676 assert!(blocked.is_empty());
677 });
678 }
679
680 #[test_traced]
681 fn test_all_online() {
682 all_online::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
683 all_online::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
684 all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
685 all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
686 all_online::<_, _, RoundRobin>(ed25519::fixture);
687 }
688
689 fn observer<S, F, L>(mut fixture: F)
690 where
691 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
692 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
693 L: Elector<S>,
694 {
695 let n_active = 5;
697 let required_containers = View::new(100);
698 let activity_timeout = ViewDelta::new(10);
699 let skip_timeout = ViewDelta::new(5);
700 let namespace = b"consensus".to_vec();
701 let executor = deterministic::Runner::timed(Duration::from_secs(300));
702 executor.start(|mut context| async move {
703 let (network, mut oracle) = Network::new(
705 context.with_label("network"),
706 Config {
707 max_size: 1024 * 1024,
708 disconnect_on_block: true,
709 tracked_peer_sets: None,
710 },
711 );
712
713 network.start();
715
716 let Fixture {
718 participants,
719 schemes,
720 verifier,
721 ..
722 } = fixture(&mut context, n_active);
723
724 let private_key_observer = PrivateKey::from_seed(n_active as u64);
726 let public_key_observer = private_key_observer.public_key();
727
728 let mut all_validators = participants.clone();
730 all_validators.push(public_key_observer.clone());
731 all_validators.sort();
732 let mut registrations = register_validators(&mut oracle, &all_validators).await;
733
734 let link = Link {
736 latency: Duration::from_millis(10),
737 jitter: Duration::from_millis(1),
738 success_rate: 1.0,
739 };
740 link_validators(&mut oracle, &all_validators, Action::Link(link), None).await;
741
742 let elector = L::default();
744 let relay = Arc::new(mocks::relay::Relay::new());
745 let mut reporters = Vec::new();
746
747 for (idx, validator) in participants.iter().enumerate() {
748 let is_observer = *validator == public_key_observer;
749
750 let context = context.with_label(&format!("validator_{}", *validator));
752
753 let signing = if is_observer {
755 verifier.clone()
756 } else {
757 schemes[idx].clone()
758 };
759 let reporter_config = mocks::reporter::Config {
760 namespace: namespace.clone(),
761 participants: participants.clone().try_into().unwrap(),
762 scheme: signing.clone(),
763 elector: elector.clone(),
764 };
765 let reporter =
766 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
767 reporters.push(reporter.clone());
768 let application_cfg = mocks::application::Config {
769 hasher: Sha256::default(),
770 relay: relay.clone(),
771 me: validator.clone(),
772 propose_latency: (10.0, 5.0),
773 verify_latency: (10.0, 5.0),
774 certify_latency: (10.0, 5.0),
775 should_certify: mocks::application::Certifier::Sometimes,
776 };
777 let (actor, application) = mocks::application::Application::new(
778 context.with_label("application"),
779 application_cfg,
780 );
781 actor.start();
782 let blocker = oracle.control(validator.clone());
783 let cfg = config::Config {
784 scheme: signing.clone(),
785 elector: elector.clone(),
786 blocker,
787 automaton: application.clone(),
788 relay: application.clone(),
789 reporter: reporter.clone(),
790 partition: validator.to_string(),
791 mailbox_size: 1024,
792 epoch: Epoch::new(333),
793 namespace: namespace.clone(),
794 leader_timeout: Duration::from_secs(1),
795 notarization_timeout: Duration::from_secs(2),
796 nullify_retry: Duration::from_secs(10),
797 fetch_timeout: Duration::from_secs(1),
798 activity_timeout,
799 skip_timeout,
800 fetch_concurrent: 4,
801 replay_buffer: NZUsize!(1024 * 1024),
802 write_buffer: NZUsize!(1024 * 1024),
803 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
804 };
805 let engine = Engine::new(context.with_label("engine"), cfg);
806
807 let (pending, recovered, resolver) = registrations
809 .remove(validator)
810 .expect("validator should be registered");
811 engine.start(pending, recovered, resolver);
812 }
813
814 let mut finalizers = Vec::new();
816 for reporter in reporters.iter_mut() {
817 let (mut latest, mut monitor) = reporter.subscribe().await;
818 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
819 while latest < required_containers {
820 latest = monitor.next().await.expect("event missing");
821 }
822 }));
823 }
824 join_all(finalizers).await;
825
826 for reporter in reporters.iter() {
828 {
830 let faults = reporter.faults.lock().unwrap();
831 assert!(faults.is_empty());
832 }
833 {
834 let invalid = reporter.invalid.lock().unwrap();
835 assert_eq!(*invalid, 0);
836 }
837
838 let blocked = oracle.blocked().await.unwrap();
840 assert!(blocked.is_empty());
841 }
842 });
843 }
844
845 #[test_traced]
846 fn test_observer() {
847 observer::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
848 observer::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
849 observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
850 observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
851 observer::<_, _, RoundRobin>(ed25519::fixture);
852 }
853
854 fn unclean_shutdown<S, F, L>(mut fixture: F)
855 where
856 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
857 F: FnMut(&mut StdRng, u32) -> Fixture<S>,
858 L: Elector<S>,
859 {
860 let n = 5;
862 let required_containers = View::new(100);
863 let activity_timeout = ViewDelta::new(10);
864 let skip_timeout = ViewDelta::new(5);
865 let namespace = b"consensus".to_vec();
866
867 let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
869 let supervised = Arc::new(Mutex::new(Vec::new()));
870 let mut prev_checkpoint = None;
871
872 let mut rng = StdRng::seed_from_u64(0);
874 let Fixture {
875 participants,
876 schemes,
877 ..
878 } = fixture(&mut rng, n);
879
880 let relay = Arc::new(mocks::relay::Relay::<Sha256Digest, S::PublicKey>::new());
882
883 loop {
884 let rng = rng.clone();
885 let participants = participants.clone();
886 let schemes = schemes.clone();
887 let namespace = namespace.clone();
888 let shutdowns = shutdowns.clone();
889 let supervised = supervised.clone();
890 let relay = relay.clone();
891 relay.deregister_all(); let f = |mut context: deterministic::Context| async move {
894 let (network, mut oracle) = Network::new(
896 context.with_label("network"),
897 Config {
898 max_size: 1024 * 1024,
899 disconnect_on_block: true,
900 tracked_peer_sets: None,
901 },
902 );
903
904 network.start();
906
907 let mut registrations = register_validators(&mut oracle, &participants).await;
909
910 let link = Link {
912 latency: Duration::from_millis(50),
913 jitter: Duration::from_millis(50),
914 success_rate: 1.0,
915 };
916 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
917
918 let elector = L::default();
920 let relay = Arc::new(mocks::relay::Relay::new());
921 let mut reporters = HashMap::new();
922 let mut engine_handlers = Vec::new();
923 for (idx, validator) in participants.iter().enumerate() {
924 let context = context.with_label(&format!("validator_{}", *validator));
926
927 let reporter_config = mocks::reporter::Config {
929 namespace: namespace.clone(),
930 participants: participants.clone().try_into().unwrap(),
931 scheme: schemes[idx].clone(),
932 elector: elector.clone(),
933 };
934 let reporter = mocks::reporter::Reporter::new(rng.clone(), reporter_config);
935 reporters.insert(validator.clone(), reporter.clone());
936 let application_cfg = mocks::application::Config {
937 hasher: Sha256::default(),
938 relay: relay.clone(),
939 me: validator.clone(),
940 propose_latency: (10.0, 5.0),
941 verify_latency: (10.0, 5.0),
942 certify_latency: (10.0, 5.0),
943 should_certify: mocks::application::Certifier::Sometimes,
944 };
945 let (actor, application) = mocks::application::Application::new(
946 context.with_label("application"),
947 application_cfg,
948 );
949 actor.start();
950 let blocker = oracle.control(validator.clone());
951 let cfg = config::Config {
952 scheme: schemes[idx].clone(),
953 elector: elector.clone(),
954 blocker,
955 automaton: application.clone(),
956 relay: application.clone(),
957 reporter: reporter.clone(),
958 partition: validator.to_string(),
959 mailbox_size: 1024,
960 epoch: Epoch::new(333),
961 namespace: namespace.clone(),
962 leader_timeout: Duration::from_secs(1),
963 notarization_timeout: Duration::from_secs(2),
964 nullify_retry: Duration::from_secs(10),
965 fetch_timeout: Duration::from_secs(1),
966 activity_timeout,
967 skip_timeout,
968 fetch_concurrent: 4,
969 replay_buffer: NZUsize!(1024 * 1024),
970 write_buffer: NZUsize!(1024 * 1024),
971 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
972 };
973 let engine = Engine::new(context.with_label("engine"), cfg);
974
975 let (pending, recovered, resolver) = registrations
977 .remove(validator)
978 .expect("validator should be registered");
979 engine_handlers.push(engine.start(pending, recovered, resolver));
980 }
981
982 let mut finalizers = Vec::new();
984 for (_, reporter) in reporters.iter_mut() {
985 let (mut latest, mut monitor) = reporter.subscribe().await;
986 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
987 while latest < required_containers {
988 latest = monitor.next().await.expect("event missing");
989 }
990 }));
991 }
992
993 let wait =
995 context.gen_range(Duration::from_millis(100)..Duration::from_millis(2_000));
996 let result = select! {
997 _ = context.sleep(wait) => {
998 {
1000 let mut shutdowns = shutdowns.lock().unwrap();
1001 debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
1002 *shutdowns += 1;
1003 }
1004 supervised.lock().unwrap().push(reporters);
1005 false
1006 },
1007 _ = join_all(finalizers) => {
1008 let supervised = supervised.lock().unwrap();
1010 for reporters in supervised.iter() {
1011 for (_, reporter) in reporters.iter() {
1012 let faults = reporter.faults.lock().unwrap();
1013 assert!(faults.is_empty());
1014 }
1015 }
1016 true
1017 }
1018 };
1019
1020 let blocked = oracle.blocked().await.unwrap();
1022 assert!(blocked.is_empty());
1023
1024 result
1025 };
1026
1027 let (complete, checkpoint) = prev_checkpoint
1028 .map_or_else(
1029 || deterministic::Runner::timed(Duration::from_secs(180)),
1030 deterministic::Runner::from,
1031 )
1032 .start_and_recover(f);
1033
1034 if complete {
1036 break;
1037 }
1038
1039 prev_checkpoint = Some(checkpoint);
1040 }
1041 }
1042
1043 #[test_group("slow")]
1044 #[test_traced]
1045 fn test_unclean_shutdown() {
1046 unclean_shutdown::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
1047 unclean_shutdown::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
1048 unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1049 unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1050 unclean_shutdown::<_, _, RoundRobin>(ed25519::fixture);
1051 }
1052
1053 fn backfill<S, F, L>(mut fixture: F)
1054 where
1055 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1056 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
1057 L: Elector<S>,
1058 {
1059 let n = 4;
1061 let required_containers = View::new(100);
1062 let activity_timeout = ViewDelta::new(10);
1063 let skip_timeout = ViewDelta::new(5);
1064 let namespace = b"consensus".to_vec();
1065 let executor = deterministic::Runner::timed(Duration::from_secs(240));
1066 executor.start(|mut context| async move {
1067 let (network, mut oracle) = Network::new(
1069 context.with_label("network"),
1070 Config {
1071 max_size: 1024 * 1024,
1072 disconnect_on_block: true,
1073 tracked_peer_sets: None,
1074 },
1075 );
1076
1077 network.start();
1079
1080 let Fixture {
1082 participants,
1083 schemes,
1084 ..
1085 } = fixture(&mut context, n);
1086 let mut registrations = register_validators(&mut oracle, &participants).await;
1087
1088 let link = Link {
1090 latency: Duration::from_millis(10),
1091 jitter: Duration::from_millis(1),
1092 success_rate: 1.0,
1093 };
1094 link_validators(
1095 &mut oracle,
1096 &participants,
1097 Action::Link(link),
1098 Some(|_, i, j| ![i, j].contains(&0usize)),
1099 )
1100 .await;
1101
1102 let elector = L::default();
1104 let relay = Arc::new(mocks::relay::Relay::new());
1105 let mut reporters = Vec::new();
1106 let mut engine_handlers = Vec::new();
1107 for (idx_scheme, validator) in participants.iter().enumerate() {
1108 if idx_scheme == 0 {
1110 continue;
1111 }
1112
1113 let context = context.with_label(&format!("validator_{}", *validator));
1115
1116 let reporter_config = mocks::reporter::Config {
1118 namespace: namespace.clone(),
1119 participants: participants.clone().try_into().unwrap(),
1120 scheme: schemes[idx_scheme].clone(),
1121 elector: elector.clone(),
1122 };
1123 let reporter =
1124 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1125 reporters.push(reporter.clone());
1126 let application_cfg = mocks::application::Config {
1127 hasher: Sha256::default(),
1128 relay: relay.clone(),
1129 me: validator.clone(),
1130 propose_latency: (10.0, 5.0),
1131 verify_latency: (10.0, 5.0),
1132 certify_latency: (10.0, 5.0),
1133 should_certify: mocks::application::Certifier::Sometimes,
1134 };
1135 let (actor, application) = mocks::application::Application::new(
1136 context.with_label("application"),
1137 application_cfg,
1138 );
1139 actor.start();
1140 let blocker = oracle.control(validator.clone());
1141 let cfg = config::Config {
1142 scheme: schemes[idx_scheme].clone(),
1143 elector: elector.clone(),
1144 blocker,
1145 automaton: application.clone(),
1146 relay: application.clone(),
1147 reporter: reporter.clone(),
1148 partition: validator.to_string(),
1149 mailbox_size: 1024,
1150 epoch: Epoch::new(333),
1151 namespace: namespace.clone(),
1152 leader_timeout: Duration::from_secs(1),
1153 notarization_timeout: Duration::from_secs(2),
1154 nullify_retry: Duration::from_secs(10),
1155 fetch_timeout: Duration::from_secs(1),
1156 activity_timeout,
1157 skip_timeout,
1158 fetch_concurrent: 4,
1159 replay_buffer: NZUsize!(1024 * 1024),
1160 write_buffer: NZUsize!(1024 * 1024),
1161 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1162 };
1163 let engine = Engine::new(context.with_label("engine"), cfg);
1164
1165 let (pending, recovered, resolver) = registrations
1167 .remove(validator)
1168 .expect("validator should be registered");
1169 engine_handlers.push(engine.start(pending, recovered, resolver));
1170 }
1171
1172 let mut finalizers = Vec::new();
1174 for reporter in reporters.iter_mut() {
1175 let (mut latest, mut monitor) = reporter.subscribe().await;
1176 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1177 while latest < required_containers {
1178 latest = monitor.next().await.expect("event missing");
1179 }
1180 }));
1181 }
1182 join_all(finalizers).await;
1183
1184 let link = Link {
1186 latency: Duration::from_secs(3),
1187 jitter: Duration::from_millis(0),
1188 success_rate: 1.0,
1189 };
1190 link_validators(
1191 &mut oracle,
1192 &participants,
1193 Action::Update(link.clone()),
1194 Some(|_, i, j| ![i, j].contains(&0usize)),
1195 )
1196 .await;
1197
1198 context.sleep(Duration::from_secs(60)).await;
1200
1201 link_validators(
1203 &mut oracle,
1204 &participants,
1205 Action::Unlink,
1206 Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
1207 )
1208 .await;
1209
1210 let me = participants[0].clone();
1212 let context = context.with_label(&format!("validator_{me}"));
1213
1214 link_validators(
1216 &mut oracle,
1217 &participants,
1218 Action::Link(link),
1219 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
1220 )
1221 .await;
1222
1223 let link = Link {
1225 latency: Duration::from_millis(10),
1226 jitter: Duration::from_millis(3),
1227 success_rate: 1.0,
1228 };
1229 link_validators(
1230 &mut oracle,
1231 &participants,
1232 Action::Update(link),
1233 Some(|_, i, j| ![i, j].contains(&1usize)),
1234 )
1235 .await;
1236
1237 let reporter_config = mocks::reporter::Config {
1239 namespace: namespace.clone(),
1240 participants: participants.clone().try_into().unwrap(),
1241 scheme: schemes[0].clone(),
1242 elector: elector.clone(),
1243 };
1244 let mut reporter =
1245 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1246 reporters.push(reporter.clone());
1247 let application_cfg = mocks::application::Config {
1248 hasher: Sha256::default(),
1249 relay: relay.clone(),
1250 me: me.clone(),
1251 propose_latency: (10.0, 5.0),
1252 verify_latency: (10.0, 5.0),
1253 certify_latency: (10.0, 5.0),
1254 should_certify: mocks::application::Certifier::Sometimes,
1255 };
1256 let (actor, application) = mocks::application::Application::new(
1257 context.with_label("application"),
1258 application_cfg,
1259 );
1260 actor.start();
1261 let blocker = oracle.control(me.clone());
1262 let cfg = config::Config {
1263 scheme: schemes[0].clone(),
1264 elector: elector.clone(),
1265 blocker,
1266 automaton: application.clone(),
1267 relay: application.clone(),
1268 reporter: reporter.clone(),
1269 partition: me.to_string(),
1270 mailbox_size: 1024,
1271 epoch: Epoch::new(333),
1272 namespace: namespace.clone(),
1273 leader_timeout: Duration::from_secs(1),
1274 notarization_timeout: Duration::from_secs(2),
1275 nullify_retry: Duration::from_secs(10),
1276 fetch_timeout: Duration::from_secs(1),
1277 activity_timeout,
1278 skip_timeout,
1279 fetch_concurrent: 4,
1280 replay_buffer: NZUsize!(1024 * 1024),
1281 write_buffer: NZUsize!(1024 * 1024),
1282 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1283 };
1284 let engine = Engine::new(context.with_label("engine"), cfg);
1285
1286 let (pending, recovered, resolver) = registrations
1288 .remove(&me)
1289 .expect("validator should be registered");
1290 engine_handlers.push(engine.start(pending, recovered, resolver));
1291
1292 let (mut latest, mut monitor) = reporter.subscribe().await;
1294 while latest < required_containers {
1295 latest = monitor.next().await.expect("event missing");
1296 }
1297
1298 let blocked = oracle.blocked().await.unwrap();
1300 assert!(blocked.is_empty());
1301 });
1302 }
1303
1304 #[test_traced]
1305 fn test_backfill() {
1306 backfill::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
1307 backfill::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
1308 backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1309 backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1310 backfill::<_, _, RoundRobin>(ed25519::fixture);
1311 }
1312
1313 fn one_offline<S, F, L>(mut fixture: F)
1314 where
1315 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1316 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
1317 L: Elector<S>,
1318 {
1319 let n = 5;
1321 let quorum = quorum(n) as usize;
1322 let required_containers = View::new(100);
1323 let activity_timeout = ViewDelta::new(10);
1324 let skip_timeout = ViewDelta::new(5);
1325 let max_exceptions = 10;
1326 let namespace = b"consensus".to_vec();
1327 let executor = deterministic::Runner::timed(Duration::from_secs(300));
1328 executor.start(|mut context| async move {
1329 let (network, mut oracle) = Network::new(
1331 context.with_label("network"),
1332 Config {
1333 max_size: 1024 * 1024,
1334 disconnect_on_block: true,
1335 tracked_peer_sets: None,
1336 },
1337 );
1338
1339 network.start();
1341
1342 let Fixture {
1344 participants,
1345 schemes,
1346 ..
1347 } = fixture(&mut context, n);
1348 let mut registrations = register_validators(&mut oracle, &participants).await;
1349
1350 let link = Link {
1352 latency: Duration::from_millis(10),
1353 jitter: Duration::from_millis(1),
1354 success_rate: 1.0,
1355 };
1356 link_validators(
1357 &mut oracle,
1358 &participants,
1359 Action::Link(link),
1360 Some(|_, i, j| ![i, j].contains(&0usize)),
1361 )
1362 .await;
1363
1364 let elector = L::default();
1366 let relay = Arc::new(mocks::relay::Relay::new());
1367 let mut reporters = Vec::new();
1368 let mut engine_handlers = Vec::new();
1369 for (idx_scheme, validator) in participants.iter().enumerate() {
1370 if idx_scheme == 0 {
1372 continue;
1373 }
1374
1375 let context = context.with_label(&format!("validator_{}", *validator));
1377
1378 let reporter_config = mocks::reporter::Config {
1380 namespace: namespace.clone(),
1381 participants: participants.clone().try_into().unwrap(),
1382 scheme: schemes[idx_scheme].clone(),
1383 elector: elector.clone(),
1384 };
1385 let reporter =
1386 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1387 reporters.push(reporter.clone());
1388 let application_cfg = mocks::application::Config {
1389 hasher: Sha256::default(),
1390 relay: relay.clone(),
1391 me: validator.clone(),
1392 propose_latency: (10.0, 5.0),
1393 verify_latency: (10.0, 5.0),
1394 certify_latency: (10.0, 5.0),
1395 should_certify: mocks::application::Certifier::Sometimes,
1396 };
1397 let (actor, application) = mocks::application::Application::new(
1398 context.with_label("application"),
1399 application_cfg,
1400 );
1401 actor.start();
1402 let blocker = oracle.control(validator.clone());
1403 let cfg = config::Config {
1404 scheme: schemes[idx_scheme].clone(),
1405 elector: elector.clone(),
1406 blocker,
1407 automaton: application.clone(),
1408 relay: application.clone(),
1409 reporter: reporter.clone(),
1410 partition: validator.to_string(),
1411 mailbox_size: 1024,
1412 epoch: Epoch::new(333),
1413 namespace: namespace.clone(),
1414 leader_timeout: Duration::from_secs(1),
1415 notarization_timeout: Duration::from_secs(2),
1416 nullify_retry: Duration::from_secs(10),
1417 fetch_timeout: Duration::from_secs(1),
1418 activity_timeout,
1419 skip_timeout,
1420 fetch_concurrent: 4,
1421 replay_buffer: NZUsize!(1024 * 1024),
1422 write_buffer: NZUsize!(1024 * 1024),
1423 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1424 };
1425 let engine = Engine::new(context.with_label("engine"), cfg);
1426
1427 let (pending, recovered, resolver) = registrations
1429 .remove(validator)
1430 .expect("validator should be registered");
1431 engine_handlers.push(engine.start(pending, recovered, resolver));
1432 }
1433
1434 let mut finalizers = Vec::new();
1436 for reporter in reporters.iter_mut() {
1437 let (mut latest, mut monitor) = reporter.subscribe().await;
1438 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1439 while latest < required_containers {
1440 latest = monitor.next().await.expect("event missing");
1441 }
1442 }));
1443 }
1444 join_all(finalizers).await;
1445
1446 let exceptions = 0;
1448 let offline = &participants[0];
1449 for reporter in reporters.iter() {
1450 {
1452 let faults = reporter.faults.lock().unwrap();
1453 assert!(faults.is_empty());
1454 }
1455
1456 {
1458 let invalid = reporter.invalid.lock().unwrap();
1459 assert_eq!(*invalid, 0);
1460 }
1461
1462 let mut exceptions = 0;
1464 {
1465 let notarizes = reporter.notarizes.lock().unwrap();
1466 for (view, payloads) in notarizes.iter() {
1467 for (_, participants) in payloads.iter() {
1468 if participants.contains(offline) {
1469 panic!("view: {view}");
1470 }
1471 }
1472 }
1473 }
1474 {
1475 let nullifies = reporter.nullifies.lock().unwrap();
1476 for (view, participants) in nullifies.iter() {
1477 if participants.contains(offline) {
1478 panic!("view: {view}");
1479 }
1480 }
1481 }
1482 {
1483 let finalizes = reporter.finalizes.lock().unwrap();
1484 for (view, payloads) in finalizes.iter() {
1485 for (_, finalizers) in payloads.iter() {
1486 if finalizers.contains(offline) {
1487 panic!("view: {view}");
1488 }
1489 }
1490 }
1491 }
1492
1493 let mut offline_views = Vec::new();
1495 {
1496 let leaders = reporter.leaders.lock().unwrap();
1497 for (view, leader) in leaders.iter() {
1498 if leader == offline {
1499 offline_views.push(*view);
1500 }
1501 }
1502 }
1503 assert!(!offline_views.is_empty());
1504
1505 {
1507 let nullifies = reporter.nullifies.lock().unwrap();
1508 for view in offline_views.iter() {
1509 let nullifies = nullifies.get(view).map_or(0, |n| n.len());
1510 if nullifies < quorum {
1511 warn!("missing expected view nullifies: {}", view);
1512 exceptions += 1;
1513 }
1514 }
1515 }
1516 {
1517 let nullifications = reporter.nullifications.lock().unwrap();
1518 for view in offline_views.iter() {
1519 if !nullifications.contains_key(view) {
1520 warn!("missing expected view nullifies: {}", view);
1521 exceptions += 1;
1522 }
1523 }
1524 }
1525
1526 assert!(exceptions <= max_exceptions);
1528 }
1529 assert!(exceptions <= max_exceptions);
1530
1531 let blocked = oracle.blocked().await.unwrap();
1533 assert!(blocked.is_empty());
1534
1535 let encoded = context.encode();
1537 let lines = encoded.lines();
1538 let mut skipped_views = 0;
1539 let mut nodes_skipping = 0;
1540 for line in lines {
1541 if line.contains("_skipped_views_total") {
1542 let parts: Vec<&str> = line.split_whitespace().collect();
1543 if let Some(number_str) = parts.last() {
1544 if let Ok(number) = number_str.parse::<u64>() {
1545 if number > 0 {
1546 nodes_skipping += 1;
1547 }
1548 if number > skipped_views {
1549 skipped_views = number;
1550 }
1551 }
1552 }
1553 }
1554 }
1555 assert!(
1556 skipped_views > 0,
1557 "expected skipped views to be greater than 0"
1558 );
1559 assert_eq!(
1560 nodes_skipping,
1561 n - 1,
1562 "expected all online nodes to be skipping views"
1563 );
1564 });
1565 }
1566
1567 #[test_traced]
1568 fn test_one_offline() {
1569 one_offline::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
1570 one_offline::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
1571 one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1572 one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1573 one_offline::<_, _, RoundRobin>(ed25519::fixture);
1574 }
1575
1576 fn slow_validator<S, F, L>(mut fixture: F)
1577 where
1578 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1579 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
1580 L: Elector<S>,
1581 {
1582 let n = 5;
1584 let required_containers = View::new(50);
1585 let activity_timeout = ViewDelta::new(10);
1586 let skip_timeout = ViewDelta::new(5);
1587 let namespace = b"consensus".to_vec();
1588 let executor = deterministic::Runner::timed(Duration::from_secs(300));
1589 executor.start(|mut context| async move {
1590 let (network, mut oracle) = Network::new(
1592 context.with_label("network"),
1593 Config {
1594 max_size: 1024 * 1024,
1595 disconnect_on_block: true,
1596 tracked_peer_sets: None,
1597 },
1598 );
1599
1600 network.start();
1602
1603 let Fixture {
1605 participants,
1606 schemes,
1607 ..
1608 } = fixture(&mut context, n);
1609 let mut registrations = register_validators(&mut oracle, &participants).await;
1610
1611 let link = Link {
1613 latency: Duration::from_millis(10),
1614 jitter: Duration::from_millis(1),
1615 success_rate: 1.0,
1616 };
1617 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1618
1619 let elector = L::default();
1621 let relay = Arc::new(mocks::relay::Relay::new());
1622 let mut reporters = Vec::new();
1623 let mut engine_handlers = Vec::new();
1624 for (idx_scheme, validator) in participants.iter().enumerate() {
1625 let context = context.with_label(&format!("validator_{}", *validator));
1627
1628 let reporter_config = mocks::reporter::Config {
1630 namespace: namespace.clone(),
1631 participants: participants.clone().try_into().unwrap(),
1632 scheme: schemes[idx_scheme].clone(),
1633 elector: elector.clone(),
1634 };
1635 let reporter =
1636 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1637 reporters.push(reporter.clone());
1638 let application_cfg = if idx_scheme == 0 {
1639 mocks::application::Config {
1640 hasher: Sha256::default(),
1641 relay: relay.clone(),
1642 me: validator.clone(),
1643 propose_latency: (10_000.0, 0.0),
1644 verify_latency: (10_000.0, 5.0),
1645 certify_latency: (10_000.0, 5.0),
1646 should_certify: mocks::application::Certifier::Sometimes,
1647 }
1648 } else {
1649 mocks::application::Config {
1650 hasher: Sha256::default(),
1651 relay: relay.clone(),
1652 me: validator.clone(),
1653 propose_latency: (10.0, 5.0),
1654 verify_latency: (10.0, 5.0),
1655 certify_latency: (10.0, 5.0),
1656 should_certify: mocks::application::Certifier::Sometimes,
1657 }
1658 };
1659 let (actor, application) = mocks::application::Application::new(
1660 context.with_label("application"),
1661 application_cfg,
1662 );
1663 actor.start();
1664 let blocker = oracle.control(validator.clone());
1665 let cfg = config::Config {
1666 scheme: schemes[idx_scheme].clone(),
1667 elector: elector.clone(),
1668 blocker,
1669 automaton: application.clone(),
1670 relay: application.clone(),
1671 reporter: reporter.clone(),
1672 partition: validator.to_string(),
1673 mailbox_size: 1024,
1674 epoch: Epoch::new(333),
1675 namespace: namespace.clone(),
1676 leader_timeout: Duration::from_secs(1),
1677 notarization_timeout: Duration::from_secs(2),
1678 nullify_retry: Duration::from_secs(10),
1679 fetch_timeout: Duration::from_secs(1),
1680 activity_timeout,
1681 skip_timeout,
1682 fetch_concurrent: 4,
1683 replay_buffer: NZUsize!(1024 * 1024),
1684 write_buffer: NZUsize!(1024 * 1024),
1685 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1686 };
1687 let engine = Engine::new(context.with_label("engine"), cfg);
1688
1689 let (pending, recovered, resolver) = registrations
1691 .remove(validator)
1692 .expect("validator should be registered");
1693 engine_handlers.push(engine.start(pending, recovered, resolver));
1694 }
1695
1696 let mut finalizers = Vec::new();
1698 for reporter in reporters.iter_mut() {
1699 let (mut latest, mut monitor) = reporter.subscribe().await;
1700 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1701 while latest < required_containers {
1702 latest = monitor.next().await.expect("event missing");
1703 }
1704 }));
1705 }
1706 join_all(finalizers).await;
1707
1708 let slow = &participants[0];
1710 for reporter in reporters.iter() {
1711 {
1713 let faults = reporter.faults.lock().unwrap();
1714 assert!(faults.is_empty());
1715 }
1716
1717 {
1719 let invalid = reporter.invalid.lock().unwrap();
1720 assert_eq!(*invalid, 0);
1721 }
1722
1723 let mut observed = false;
1725 {
1726 let notarizes = reporter.notarizes.lock().unwrap();
1727 for (_, payloads) in notarizes.iter() {
1728 for (_, participants) in payloads.iter() {
1729 if participants.contains(slow) {
1730 observed = true;
1731 break;
1732 }
1733 }
1734 }
1735 }
1736 {
1737 let finalizes = reporter.finalizes.lock().unwrap();
1738 for (_, payloads) in finalizes.iter() {
1739 for (_, finalizers) in payloads.iter() {
1740 if finalizers.contains(slow) {
1741 observed = true;
1742 break;
1743 }
1744 }
1745 }
1746 }
1747 assert!(observed);
1748 }
1749
1750 let blocked = oracle.blocked().await.unwrap();
1752 assert!(blocked.is_empty());
1753 });
1754 }
1755
1756 #[test_traced]
1757 fn test_slow_validator() {
1758 slow_validator::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
1759 slow_validator::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
1760 slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1761 slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1762 slow_validator::<_, _, RoundRobin>(ed25519::fixture);
1763 }
1764
1765 fn all_recovery<S, F, L>(mut fixture: F)
1766 where
1767 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1768 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
1769 L: Elector<S>,
1770 {
1771 let n = 5;
1773 let required_containers = View::new(100);
1774 let activity_timeout = ViewDelta::new(10);
1775 let skip_timeout = ViewDelta::new(2);
1776 let namespace = b"consensus".to_vec();
1777 let executor = deterministic::Runner::timed(Duration::from_secs(1800));
1778 executor.start(|mut context| async move {
1779 let (network, mut oracle) = Network::new(
1781 context.with_label("network"),
1782 Config {
1783 max_size: 1024 * 1024,
1784 disconnect_on_block: false,
1785 tracked_peer_sets: None,
1786 },
1787 );
1788
1789 network.start();
1791
1792 let Fixture {
1794 participants,
1795 schemes,
1796 ..
1797 } = fixture(&mut context, n);
1798 let mut registrations = register_validators(&mut oracle, &participants).await;
1799
1800 let link = Link {
1802 latency: Duration::from_secs(3),
1803 jitter: Duration::from_millis(0),
1804 success_rate: 1.0,
1805 };
1806 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1807
1808 let elector = L::default();
1810 let relay = Arc::new(mocks::relay::Relay::new());
1811 let mut reporters = Vec::new();
1812 let mut engine_handlers = Vec::new();
1813 for (idx, validator) in participants.iter().enumerate() {
1814 let context = context.with_label(&format!("validator_{}", *validator));
1816
1817 let reporter_config = mocks::reporter::Config {
1819 namespace: namespace.clone(),
1820 participants: participants.clone().try_into().unwrap(),
1821 scheme: schemes[idx].clone(),
1822 elector: elector.clone(),
1823 };
1824 let reporter =
1825 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1826 reporters.push(reporter.clone());
1827 let application_cfg = mocks::application::Config {
1828 hasher: Sha256::default(),
1829 relay: relay.clone(),
1830 me: validator.clone(),
1831 propose_latency: (10.0, 5.0),
1832 verify_latency: (10.0, 5.0),
1833 certify_latency: (10.0, 5.0),
1834 should_certify: mocks::application::Certifier::Sometimes,
1835 };
1836 let (actor, application) = mocks::application::Application::new(
1837 context.with_label("application"),
1838 application_cfg,
1839 );
1840 actor.start();
1841 let blocker = oracle.control(validator.clone());
1842 let cfg = config::Config {
1843 scheme: schemes[idx].clone(),
1844 elector: elector.clone(),
1845 blocker,
1846 automaton: application.clone(),
1847 relay: application.clone(),
1848 reporter: reporter.clone(),
1849 partition: validator.to_string(),
1850 mailbox_size: 1024,
1851 epoch: Epoch::new(333),
1852 namespace: namespace.clone(),
1853 leader_timeout: Duration::from_secs(1),
1854 notarization_timeout: Duration::from_secs(2),
1855 nullify_retry: Duration::from_secs(10),
1856 fetch_timeout: Duration::from_secs(1),
1857 activity_timeout,
1858 skip_timeout,
1859 fetch_concurrent: 4,
1860 replay_buffer: NZUsize!(1024 * 1024),
1861 write_buffer: NZUsize!(1024 * 1024),
1862 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1863 };
1864 let engine = Engine::new(context.with_label("engine"), cfg);
1865
1866 let (pending, recovered, resolver) = registrations
1868 .remove(validator)
1869 .expect("validator should be registered");
1870 engine_handlers.push(engine.start(pending, recovered, resolver));
1871 }
1872
1873 let mut finalizers = Vec::new();
1875 for reporter in reporters.iter_mut() {
1876 let (_, mut monitor) = reporter.subscribe().await;
1877 finalizers.push(
1878 context
1879 .with_label("finalizer")
1880 .spawn(move |context| async move {
1881 select! {
1882 _timeout = context.sleep(Duration::from_secs(60)) => {},
1883 _done = monitor.next() => {
1884 panic!("engine should not notarize or finalize anything");
1885 }
1886 }
1887 }),
1888 );
1889 }
1890 join_all(finalizers).await;
1891
1892 link_validators(&mut oracle, &participants, Action::Unlink, None).await;
1894
1895 context.sleep(Duration::from_secs(60)).await;
1897
1898 let mut latest = View::zero();
1900 for reporter in reporters.iter() {
1901 let nullifies = reporter.nullifies.lock().unwrap();
1902 let max = nullifies.keys().max().unwrap();
1903 if *max > latest {
1904 latest = *max;
1905 }
1906 }
1907
1908 let link = Link {
1910 latency: Duration::from_millis(10),
1911 jitter: Duration::from_millis(1),
1912 success_rate: 1.0,
1913 };
1914 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1915
1916 let mut finalizers = Vec::new();
1918 for reporter in reporters.iter_mut() {
1919 let (mut latest, mut monitor) = reporter.subscribe().await;
1920 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1921 while latest < required_containers {
1922 latest = monitor.next().await.expect("event missing");
1923 }
1924 }));
1925 }
1926 join_all(finalizers).await;
1927
1928 for reporter in reporters.iter() {
1930 {
1932 let faults = reporter.faults.lock().unwrap();
1933 assert!(faults.is_empty());
1934 }
1935
1936 {
1938 let invalid = reporter.invalid.lock().unwrap();
1939 assert_eq!(*invalid, 0);
1940 }
1941
1942 {
1947 let mut found = 0;
1951 let notarizations = reporter.notarizations.lock().unwrap();
1952 for view in View::range(latest, latest.saturating_add(activity_timeout)) {
1953 if notarizations.contains_key(&view) {
1954 found += 1;
1955 }
1956 }
1957 assert!(
1958 found >= activity_timeout.get().saturating_sub(2),
1959 "found: {found}"
1960 );
1961 }
1962 }
1963
1964 let blocked = oracle.blocked().await.unwrap();
1966 assert!(blocked.is_empty());
1967 });
1968 }
1969
1970 #[test_traced]
1971 fn test_all_recovery() {
1972 all_recovery::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
1973 all_recovery::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
1974 all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1975 all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1976 all_recovery::<_, _, RoundRobin>(ed25519::fixture);
1977 }
1978
1979 fn partition<S, F, L>(mut fixture: F)
1980 where
1981 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1982 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
1983 L: Elector<S>,
1984 {
1985 let n = 10;
1987 let required_containers = View::new(50);
1988 let activity_timeout = ViewDelta::new(10);
1989 let skip_timeout = ViewDelta::new(5);
1990 let namespace = b"consensus".to_vec();
1991 let executor = deterministic::Runner::timed(Duration::from_secs(900));
1992 executor.start(|mut context| async move {
1993 let (network, mut oracle) = Network::new(
1995 context.with_label("network"),
1996 Config {
1997 max_size: 1024 * 1024,
1998 disconnect_on_block: false,
1999 tracked_peer_sets: None,
2000 },
2001 );
2002
2003 network.start();
2005
2006 let Fixture {
2008 participants,
2009 schemes,
2010 ..
2011 } = fixture(&mut context, n);
2012 let mut registrations = register_validators(&mut oracle, &participants).await;
2013
2014 let link = Link {
2016 latency: Duration::from_millis(10),
2017 jitter: Duration::from_millis(1),
2018 success_rate: 1.0,
2019 };
2020 link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
2021
2022 let elector = L::default();
2024 let relay = Arc::new(mocks::relay::Relay::new());
2025 let mut reporters = Vec::new();
2026 let mut engine_handlers = Vec::new();
2027 for (idx, validator) in participants.iter().enumerate() {
2028 let context = context.with_label(&format!("validator_{}", *validator));
2030
2031 let reporter_config = mocks::reporter::Config {
2033 namespace: namespace.clone(),
2034 participants: participants.clone().try_into().unwrap(),
2035 scheme: schemes[idx].clone(),
2036 elector: elector.clone(),
2037 };
2038 let reporter =
2039 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2040 reporters.push(reporter.clone());
2041 let application_cfg = mocks::application::Config {
2042 hasher: Sha256::default(),
2043 relay: relay.clone(),
2044 me: validator.clone(),
2045 propose_latency: (10.0, 5.0),
2046 verify_latency: (10.0, 5.0),
2047 certify_latency: (10.0, 5.0),
2048 should_certify: mocks::application::Certifier::Sometimes,
2049 };
2050 let (actor, application) = mocks::application::Application::new(
2051 context.with_label("application"),
2052 application_cfg,
2053 );
2054 actor.start();
2055 let blocker = oracle.control(validator.clone());
2056 let cfg = config::Config {
2057 scheme: schemes[idx].clone(),
2058 elector: elector.clone(),
2059 blocker,
2060 automaton: application.clone(),
2061 relay: application.clone(),
2062 reporter: reporter.clone(),
2063 partition: validator.to_string(),
2064 mailbox_size: 1024,
2065 epoch: Epoch::new(333),
2066 namespace: namespace.clone(),
2067 leader_timeout: Duration::from_secs(1),
2068 notarization_timeout: Duration::from_secs(2),
2069 nullify_retry: Duration::from_secs(10),
2070 fetch_timeout: Duration::from_secs(1),
2071 activity_timeout,
2072 skip_timeout,
2073 fetch_concurrent: 4,
2074 replay_buffer: NZUsize!(1024 * 1024),
2075 write_buffer: NZUsize!(1024 * 1024),
2076 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2077 };
2078 let engine = Engine::new(context.with_label("engine"), cfg);
2079
2080 let (pending, recovered, resolver) = registrations
2082 .remove(validator)
2083 .expect("validator should be registered");
2084 engine_handlers.push(engine.start(pending, recovered, resolver));
2085 }
2086
2087 let mut finalizers = Vec::new();
2089 for reporter in reporters.iter_mut() {
2090 let (mut latest, mut monitor) = reporter.subscribe().await;
2091 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2092 while latest < required_containers {
2093 latest = monitor.next().await.expect("event missing");
2094 }
2095 }));
2096 }
2097 join_all(finalizers).await;
2098
2099 fn separated(n: usize, a: usize, b: usize) -> bool {
2101 let m = n / 2;
2102 (a < m && b >= m) || (a >= m && b < m)
2103 }
2104 link_validators(&mut oracle, &participants, Action::Unlink, Some(separated)).await;
2105
2106 context.sleep(Duration::from_secs(10)).await;
2108
2109 let mut finalizers = Vec::new();
2111 for reporter in reporters.iter_mut() {
2112 let (_, mut monitor) = reporter.subscribe().await;
2113 finalizers.push(
2114 context
2115 .with_label("finalizer")
2116 .spawn(move |context| async move {
2117 select! {
2118 _timeout = context.sleep(Duration::from_secs(60)) => {},
2119 _done = monitor.next() => {
2120 panic!("engine should not notarize or finalize anything");
2121 }
2122 }
2123 }),
2124 );
2125 }
2126 join_all(finalizers).await;
2127
2128 link_validators(
2130 &mut oracle,
2131 &participants,
2132 Action::Link(link),
2133 Some(separated),
2134 )
2135 .await;
2136
2137 let mut finalizers = Vec::new();
2139 for reporter in reporters.iter_mut() {
2140 let (mut latest, mut monitor) = reporter.subscribe().await;
2141 let required = latest.saturating_add(ViewDelta::new(required_containers.get()));
2142 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2143 while latest < required {
2144 latest = monitor.next().await.expect("event missing");
2145 }
2146 }));
2147 }
2148 join_all(finalizers).await;
2149
2150 for reporter in reporters.iter() {
2152 {
2154 let faults = reporter.faults.lock().unwrap();
2155 assert!(faults.is_empty());
2156 }
2157
2158 {
2160 let invalid = reporter.invalid.lock().unwrap();
2161 assert_eq!(*invalid, 0);
2162 }
2163 }
2164
2165 let blocked = oracle.blocked().await.unwrap();
2167 assert!(blocked.is_empty());
2168 });
2169 }
2170
2171 #[test_group("slow")]
2172 #[test_traced]
2173 fn test_partition() {
2174 partition::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
2175 partition::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
2176 partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2177 partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2178 partition::<_, _, RoundRobin>(ed25519::fixture);
2179 }
2180
2181 fn slow_and_lossy_links<S, F, L>(seed: u64, mut fixture: F) -> String
2182 where
2183 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2184 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2185 L: Elector<S>,
2186 {
2187 let n = 5;
2189 let required_containers = View::new(50);
2190 let activity_timeout = ViewDelta::new(10);
2191 let skip_timeout = ViewDelta::new(5);
2192 let namespace = b"consensus".to_vec();
2193 let cfg = deterministic::Config::new()
2194 .with_seed(seed)
2195 .with_timeout(Some(Duration::from_secs(5_000)));
2196 let executor = deterministic::Runner::new(cfg);
2197 executor.start(|mut context| async move {
2198 let (network, mut oracle) = Network::new(
2200 context.with_label("network"),
2201 Config {
2202 max_size: 1024 * 1024,
2203 disconnect_on_block: false,
2204 tracked_peer_sets: None,
2205 },
2206 );
2207
2208 network.start();
2210
2211 let Fixture {
2213 participants,
2214 schemes,
2215 ..
2216 } = fixture(&mut context, n);
2217 let mut registrations = register_validators(&mut oracle, &participants).await;
2218
2219 let degraded_link = Link {
2221 latency: Duration::from_millis(200),
2222 jitter: Duration::from_millis(150),
2223 success_rate: 0.5,
2224 };
2225 link_validators(
2226 &mut oracle,
2227 &participants,
2228 Action::Link(degraded_link),
2229 None,
2230 )
2231 .await;
2232
2233 let elector = L::default();
2235 let relay = Arc::new(mocks::relay::Relay::new());
2236 let mut reporters = Vec::new();
2237 let mut engine_handlers = Vec::new();
2238 for (idx, validator) in participants.iter().enumerate() {
2239 let context = context.with_label(&format!("validator_{}", *validator));
2241
2242 let reporter_config = mocks::reporter::Config {
2244 namespace: namespace.clone(),
2245 participants: participants.clone().try_into().unwrap(),
2246 scheme: schemes[idx].clone(),
2247 elector: elector.clone(),
2248 };
2249 let reporter =
2250 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2251 reporters.push(reporter.clone());
2252 let application_cfg = mocks::application::Config {
2253 hasher: Sha256::default(),
2254 relay: relay.clone(),
2255 me: validator.clone(),
2256 propose_latency: (10.0, 5.0),
2257 verify_latency: (10.0, 5.0),
2258 certify_latency: (10.0, 5.0),
2259 should_certify: mocks::application::Certifier::Sometimes,
2260 };
2261 let (actor, application) = mocks::application::Application::new(
2262 context.with_label("application"),
2263 application_cfg,
2264 );
2265 actor.start();
2266 let blocker = oracle.control(validator.clone());
2267 let cfg = config::Config {
2268 scheme: schemes[idx].clone(),
2269 elector: elector.clone(),
2270 blocker,
2271 automaton: application.clone(),
2272 relay: application.clone(),
2273 reporter: reporter.clone(),
2274 partition: validator.to_string(),
2275 mailbox_size: 1024,
2276 epoch: Epoch::new(333),
2277 namespace: namespace.clone(),
2278 leader_timeout: Duration::from_secs(1),
2279 notarization_timeout: Duration::from_secs(2),
2280 nullify_retry: Duration::from_secs(10),
2281 fetch_timeout: Duration::from_secs(1),
2282 activity_timeout,
2283 skip_timeout,
2284 fetch_concurrent: 4,
2285 replay_buffer: NZUsize!(1024 * 1024),
2286 write_buffer: NZUsize!(1024 * 1024),
2287 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2288 };
2289 let engine = Engine::new(context.with_label("engine"), cfg);
2290
2291 let (pending, recovered, resolver) = registrations
2293 .remove(validator)
2294 .expect("validator should be registered");
2295 engine_handlers.push(engine.start(pending, recovered, resolver));
2296 }
2297
2298 let mut finalizers = Vec::new();
2300 for reporter in reporters.iter_mut() {
2301 let (mut latest, mut monitor) = reporter.subscribe().await;
2302 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2303 while latest < required_containers {
2304 latest = monitor.next().await.expect("event missing");
2305 }
2306 }));
2307 }
2308 join_all(finalizers).await;
2309
2310 for reporter in reporters.iter() {
2312 {
2314 let faults = reporter.faults.lock().unwrap();
2315 assert!(faults.is_empty());
2316 }
2317
2318 {
2320 let invalid = reporter.invalid.lock().unwrap();
2321 assert_eq!(*invalid, 0);
2322 }
2323 }
2324
2325 let blocked = oracle.blocked().await.unwrap();
2327 assert!(blocked.is_empty());
2328
2329 context.auditor().state()
2330 })
2331 }
2332
2333 #[test_traced]
2334 fn test_slow_and_lossy_links() {
2335 slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold::fixture::<MinPk, _>);
2336 slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold::fixture::<MinSig, _>);
2337 slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinPk, _>);
2338 slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinSig, _>);
2339 slow_and_lossy_links::<_, _, RoundRobin>(0, ed25519::fixture);
2340 }
2341
2342 #[test_group("slow")]
2343 #[test_traced]
2344 fn test_determinism() {
2345 for seed in 1..6 {
2348 let ts_pk_state_1 =
2349 slow_and_lossy_links::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
2350 let ts_pk_state_2 =
2351 slow_and_lossy_links::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
2352 assert_eq!(ts_pk_state_1, ts_pk_state_2);
2353
2354 let ts_sig_state_1 = slow_and_lossy_links::<_, _, Random>(
2355 seed,
2356 bls12381_threshold::fixture::<MinSig, _>,
2357 );
2358 let ts_sig_state_2 = slow_and_lossy_links::<_, _, Random>(
2359 seed,
2360 bls12381_threshold::fixture::<MinSig, _>,
2361 );
2362 assert_eq!(ts_sig_state_1, ts_sig_state_2);
2363
2364 let ms_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2365 seed,
2366 bls12381_multisig::fixture::<MinPk, _>,
2367 );
2368 let ms_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2369 seed,
2370 bls12381_multisig::fixture::<MinPk, _>,
2371 );
2372 assert_eq!(ms_pk_state_1, ms_pk_state_2);
2373
2374 let ms_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2375 seed,
2376 bls12381_multisig::fixture::<MinSig, _>,
2377 );
2378 let ms_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2379 seed,
2380 bls12381_multisig::fixture::<MinSig, _>,
2381 );
2382 assert_eq!(ms_sig_state_1, ms_sig_state_2);
2383
2384 let ed_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
2385 let ed_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
2386 assert_eq!(ed_state_1, ed_state_2);
2387
2388 let states = [
2389 ("threshold-minpk", ts_pk_state_1),
2390 ("threshold-minsig", ts_sig_state_1),
2391 ("multisig-minpk", ms_pk_state_1),
2392 ("multisig-minsig", ms_sig_state_1),
2393 ("ed25519", ed_state_1),
2394 ];
2395
2396 for pair in states.windows(2) {
2398 assert_ne!(
2399 pair[0].1, pair[1].1,
2400 "state {} equals state {}",
2401 pair[0].0, pair[1].0
2402 );
2403 }
2404 }
2405 }
2406
2407 fn conflicter<S, F, L>(seed: u64, mut fixture: F)
2408 where
2409 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2410 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2411 L: Elector<S>,
2412 {
2413 let n = 4;
2415 let required_containers = View::new(50);
2416 let activity_timeout = ViewDelta::new(10);
2417 let skip_timeout = ViewDelta::new(5);
2418 let namespace = b"consensus".to_vec();
2419 let cfg = deterministic::Config::new()
2420 .with_seed(seed)
2421 .with_timeout(Some(Duration::from_secs(30)));
2422 let executor = deterministic::Runner::new(cfg);
2423 executor.start(|mut context| async move {
2424 let (network, mut oracle) = Network::new(
2426 context.with_label("network"),
2427 Config {
2428 max_size: 1024 * 1024,
2429 disconnect_on_block: false,
2430 tracked_peer_sets: None,
2431 },
2432 );
2433
2434 network.start();
2436
2437 let Fixture {
2439 participants,
2440 schemes,
2441 ..
2442 } = fixture(&mut context, n);
2443 let mut registrations = register_validators(&mut oracle, &participants).await;
2444
2445 let link = Link {
2447 latency: Duration::from_millis(10),
2448 jitter: Duration::from_millis(1),
2449 success_rate: 1.0,
2450 };
2451 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2452
2453 let elector = L::default();
2455 let relay = Arc::new(mocks::relay::Relay::new());
2456 let mut reporters = Vec::new();
2457 for (idx_scheme, validator) in participants.iter().enumerate() {
2458 let context = context.with_label(&format!("validator_{}", *validator));
2460
2461 let reporter_config = mocks::reporter::Config {
2463 namespace: namespace.clone(),
2464 participants: participants.clone().try_into().unwrap(),
2465 scheme: schemes[idx_scheme].clone(),
2466 elector: elector.clone(),
2467 };
2468 let reporter =
2469 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2470 let (pending, recovered, resolver) = registrations
2471 .remove(validator)
2472 .expect("validator should be registered");
2473 if idx_scheme == 0 {
2474 let cfg = mocks::conflicter::Config {
2475 namespace: namespace.clone(),
2476 scheme: schemes[idx_scheme].clone(),
2477 };
2478
2479 let engine: mocks::conflicter::Conflicter<_, _, Sha256> =
2480 mocks::conflicter::Conflicter::new(
2481 context.with_label("byzantine_engine"),
2482 cfg,
2483 );
2484 engine.start(pending);
2485 } else {
2486 reporters.push(reporter.clone());
2487 let application_cfg = mocks::application::Config {
2488 hasher: Sha256::default(),
2489 relay: relay.clone(),
2490 me: validator.clone(),
2491 propose_latency: (10.0, 5.0),
2492 verify_latency: (10.0, 5.0),
2493 certify_latency: (10.0, 5.0),
2494 should_certify: mocks::application::Certifier::Sometimes,
2495 };
2496 let (actor, application) = mocks::application::Application::new(
2497 context.with_label("application"),
2498 application_cfg,
2499 );
2500 actor.start();
2501 let blocker = oracle.control(validator.clone());
2502 let cfg = config::Config {
2503 scheme: schemes[idx_scheme].clone(),
2504 elector: elector.clone(),
2505 blocker,
2506 automaton: application.clone(),
2507 relay: application.clone(),
2508 reporter: reporter.clone(),
2509 partition: validator.to_string(),
2510 mailbox_size: 1024,
2511 epoch: Epoch::new(333),
2512 namespace: namespace.clone(),
2513 leader_timeout: Duration::from_secs(1),
2514 notarization_timeout: Duration::from_secs(2),
2515 nullify_retry: Duration::from_secs(10),
2516 fetch_timeout: Duration::from_secs(1),
2517 activity_timeout,
2518 skip_timeout,
2519 fetch_concurrent: 4,
2520 replay_buffer: NZUsize!(1024 * 1024),
2521 write_buffer: NZUsize!(1024 * 1024),
2522 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2523 };
2524 let engine = Engine::new(context.with_label("engine"), cfg);
2525 engine.start(pending, recovered, resolver);
2526 }
2527 }
2528
2529 let mut finalizers = Vec::new();
2531 for reporter in reporters.iter_mut() {
2532 let (mut latest, mut monitor) = reporter.subscribe().await;
2533 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2534 while latest < required_containers {
2535 latest = monitor.next().await.expect("event missing");
2536 }
2537 }));
2538 }
2539 join_all(finalizers).await;
2540
2541 let byz = &participants[0];
2543 let mut count_conflicting = 0;
2544 for reporter in reporters.iter() {
2545 {
2547 let faults = reporter.faults.lock().unwrap();
2548 assert_eq!(faults.len(), 1);
2549 let faulter = faults.get(byz).expect("byzantine party is not faulter");
2550 for (_, faults) in faulter.iter() {
2551 for fault in faults.iter() {
2552 match fault {
2553 Activity::ConflictingNotarize(_) => {
2554 count_conflicting += 1;
2555 }
2556 Activity::ConflictingFinalize(_) => {
2557 count_conflicting += 1;
2558 }
2559 _ => panic!("unexpected fault: {fault:?}"),
2560 }
2561 }
2562 }
2563 }
2564
2565 {
2567 let invalid = reporter.invalid.lock().unwrap();
2568 assert_eq!(*invalid, 0);
2569 }
2570 }
2571 assert!(count_conflicting > 0);
2572
2573 let blocked = oracle.blocked().await.unwrap();
2575 assert!(!blocked.is_empty());
2576 for (a, b) in blocked {
2577 assert_ne!(&a, byz);
2578 assert_eq!(&b, byz);
2579 }
2580 });
2581 }
2582
2583 #[test_group("slow")]
2584 #[test_traced]
2585 fn test_conflicter() {
2586 for seed in 0..5 {
2587 conflicter::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
2588 conflicter::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
2589 conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
2590 conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
2591 conflicter::<_, _, RoundRobin>(seed, ed25519::fixture);
2592 }
2593 }
2594
2595 fn invalid<S, F, L>(seed: u64, mut fixture: F)
2596 where
2597 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2598 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2599 L: Elector<S>,
2600 {
2601 let n = 4;
2603 let required_containers = View::new(50);
2604 let activity_timeout = ViewDelta::new(10);
2605 let skip_timeout = ViewDelta::new(5);
2606 let namespace = b"consensus".to_vec();
2607 let cfg = deterministic::Config::new()
2608 .with_seed(seed)
2609 .with_timeout(Some(Duration::from_secs(30)));
2610 let executor = deterministic::Runner::new(cfg);
2611 executor.start(|mut context| async move {
2612 let (network, mut oracle) = Network::new(
2614 context.with_label("network"),
2615 Config {
2616 max_size: 1024 * 1024,
2617 disconnect_on_block: false,
2618 tracked_peer_sets: None,
2619 },
2620 );
2621
2622 network.start();
2624
2625 let Fixture {
2627 participants,
2628 schemes,
2629 ..
2630 } = fixture(&mut context, n);
2631 let mut registrations = register_validators(&mut oracle, &participants).await;
2632
2633 let link = Link {
2635 latency: Duration::from_millis(10),
2636 jitter: Duration::from_millis(1),
2637 success_rate: 1.0,
2638 };
2639 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2640
2641 let elector = L::default();
2643 let relay = Arc::new(mocks::relay::Relay::new());
2644 let mut reporters = Vec::new();
2645 for (idx_scheme, validator) in participants.iter().enumerate() {
2646 let context = context.with_label(&format!("validator_{}", *validator));
2648
2649 let engine_namespace = if idx_scheme == 0 {
2651 vec![]
2652 } else {
2653 namespace.clone()
2654 };
2655
2656 let reporter_config = mocks::reporter::Config {
2657 namespace: namespace.clone(), participants: participants.clone().try_into().unwrap(),
2659 scheme: schemes[idx_scheme].clone(),
2660 elector: elector.clone(),
2661 };
2662 let reporter =
2663 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2664 reporters.push(reporter.clone());
2665
2666 let application_cfg = mocks::application::Config {
2667 hasher: Sha256::default(),
2668 relay: relay.clone(),
2669 me: validator.clone(),
2670 propose_latency: (10.0, 5.0),
2671 verify_latency: (10.0, 5.0),
2672 certify_latency: (10.0, 5.0),
2673 should_certify: mocks::application::Certifier::Sometimes,
2674 };
2675 let (actor, application) = mocks::application::Application::new(
2676 context.with_label("application"),
2677 application_cfg,
2678 );
2679 actor.start();
2680 let blocker = oracle.control(validator.clone());
2681 let cfg = config::Config {
2682 scheme: schemes[idx_scheme].clone(),
2683 elector: elector.clone(),
2684 blocker,
2685 automaton: application.clone(),
2686 relay: application.clone(),
2687 reporter: reporter.clone(),
2688 partition: validator.clone().to_string(),
2689 mailbox_size: 1024,
2690 epoch: Epoch::new(333),
2691 namespace: engine_namespace,
2692 leader_timeout: Duration::from_secs(1),
2693 notarization_timeout: Duration::from_secs(2),
2694 nullify_retry: Duration::from_secs(10),
2695 fetch_timeout: Duration::from_secs(1),
2696 activity_timeout,
2697 skip_timeout,
2698 fetch_concurrent: 4,
2699 replay_buffer: NZUsize!(1024 * 1024),
2700 write_buffer: NZUsize!(1024 * 1024),
2701 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2702 };
2703 let engine = Engine::new(context.with_label("engine"), cfg);
2704 let (pending, recovered, resolver) = registrations
2705 .remove(validator)
2706 .expect("validator should be registered");
2707 engine.start(pending, recovered, resolver);
2708 }
2709
2710 let mut finalizers = Vec::new();
2712 for reporter in reporters.iter_mut().skip(1) {
2713 let (mut latest, mut monitor) = reporter.subscribe().await;
2714 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2715 while latest < required_containers {
2716 latest = monitor.next().await.expect("event missing");
2717 }
2718 }));
2719 }
2720 join_all(finalizers).await;
2721
2722 let mut invalid_count = 0;
2724 for reporter in reporters.iter().skip(1) {
2725 {
2727 let faults = reporter.faults.lock().unwrap();
2728 assert!(faults.is_empty());
2729 }
2730
2731 {
2733 let invalid = reporter.invalid.lock().unwrap();
2734 if *invalid > 0 {
2735 invalid_count += 1;
2736 }
2737 }
2738 }
2739
2740 assert_eq!(invalid_count, n - 1);
2742
2743 let blocked = oracle.blocked().await.unwrap();
2745 assert!(!blocked.is_empty());
2746 for (a, b) in blocked {
2747 if a != participants[0] {
2748 assert_eq!(b, participants[0]);
2749 }
2750 }
2751 });
2752 }
2753
2754 #[test_group("slow")]
2755 #[test_traced]
2756 fn test_invalid() {
2757 for seed in 0..5 {
2758 invalid::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
2759 invalid::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
2760 invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
2761 invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
2762 invalid::<_, _, RoundRobin>(seed, ed25519::fixture);
2763 }
2764 }
2765
2766 fn impersonator<S, F, L>(seed: u64, mut fixture: F)
2767 where
2768 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2769 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2770 L: Elector<S>,
2771 {
2772 let n = 4;
2774 let required_containers = View::new(50);
2775 let activity_timeout = ViewDelta::new(10);
2776 let skip_timeout = ViewDelta::new(5);
2777 let namespace = b"consensus".to_vec();
2778 let cfg = deterministic::Config::new()
2779 .with_seed(seed)
2780 .with_timeout(Some(Duration::from_secs(30)));
2781 let executor = deterministic::Runner::new(cfg);
2782 executor.start(|mut context| async move {
2783 let (network, mut oracle) = Network::new(
2785 context.with_label("network"),
2786 Config {
2787 max_size: 1024 * 1024,
2788 disconnect_on_block: false,
2789 tracked_peer_sets: None,
2790 },
2791 );
2792
2793 network.start();
2795
2796 let Fixture {
2798 participants,
2799 schemes,
2800 ..
2801 } = fixture(&mut context, n);
2802 let mut registrations = register_validators(&mut oracle, &participants).await;
2803
2804 let link = Link {
2806 latency: Duration::from_millis(10),
2807 jitter: Duration::from_millis(1),
2808 success_rate: 1.0,
2809 };
2810 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2811
2812 let elector = L::default();
2814 let relay = Arc::new(mocks::relay::Relay::new());
2815 let mut reporters = Vec::new();
2816 for (idx_scheme, validator) in participants.iter().enumerate() {
2817 let context = context.with_label(&format!("validator_{}", *validator));
2819
2820 let reporter_config = mocks::reporter::Config {
2822 namespace: namespace.clone(),
2823 participants: participants.clone().try_into().unwrap(),
2824 scheme: schemes[idx_scheme].clone(),
2825 elector: elector.clone(),
2826 };
2827 let reporter =
2828 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2829 let (pending, recovered, resolver) = registrations
2830 .remove(validator)
2831 .expect("validator should be registered");
2832 if idx_scheme == 0 {
2833 let cfg = mocks::impersonator::Config {
2834 scheme: schemes[idx_scheme].clone(),
2835 namespace: namespace.clone(),
2836 };
2837
2838 let engine: mocks::impersonator::Impersonator<_, _, Sha256> =
2839 mocks::impersonator::Impersonator::new(
2840 context.with_label("byzantine_engine"),
2841 cfg,
2842 );
2843 engine.start(pending);
2844 } else {
2845 reporters.push(reporter.clone());
2846 let application_cfg = mocks::application::Config {
2847 hasher: Sha256::default(),
2848 relay: relay.clone(),
2849 me: validator.clone(),
2850 propose_latency: (10.0, 5.0),
2851 verify_latency: (10.0, 5.0),
2852 certify_latency: (10.0, 5.0),
2853 should_certify: mocks::application::Certifier::Sometimes,
2854 };
2855 let (actor, application) = mocks::application::Application::new(
2856 context.with_label("application"),
2857 application_cfg,
2858 );
2859 actor.start();
2860 let blocker = oracle.control(validator.clone());
2861 let cfg = config::Config {
2862 scheme: schemes[idx_scheme].clone(),
2863 elector: elector.clone(),
2864 blocker,
2865 automaton: application.clone(),
2866 relay: application.clone(),
2867 reporter: reporter.clone(),
2868 partition: validator.clone().to_string(),
2869 mailbox_size: 1024,
2870 epoch: Epoch::new(333),
2871 namespace: namespace.clone(),
2872 leader_timeout: Duration::from_secs(1),
2873 notarization_timeout: Duration::from_secs(2),
2874 nullify_retry: Duration::from_secs(10),
2875 fetch_timeout: Duration::from_secs(1),
2876 activity_timeout,
2877 skip_timeout,
2878 fetch_concurrent: 4,
2879 replay_buffer: NZUsize!(1024 * 1024),
2880 write_buffer: NZUsize!(1024 * 1024),
2881 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2882 };
2883 let engine = Engine::new(context.with_label("engine"), cfg);
2884 engine.start(pending, recovered, resolver);
2885 }
2886 }
2887
2888 let mut finalizers = Vec::new();
2890 for reporter in reporters.iter_mut() {
2891 let (mut latest, mut monitor) = reporter.subscribe().await;
2892 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2893 while latest < required_containers {
2894 latest = monitor.next().await.expect("event missing");
2895 }
2896 }));
2897 }
2898 join_all(finalizers).await;
2899
2900 let byz = &participants[0];
2902 for reporter in reporters.iter() {
2903 {
2905 let faults = reporter.faults.lock().unwrap();
2906 assert!(faults.is_empty());
2907 }
2908
2909 {
2911 let invalid = reporter.invalid.lock().unwrap();
2912 assert_eq!(*invalid, 0);
2913 }
2914 }
2915
2916 let blocked = oracle.blocked().await.unwrap();
2918 assert!(!blocked.is_empty());
2919 for (a, b) in blocked {
2920 assert_ne!(&a, byz);
2921 assert_eq!(&b, byz);
2922 }
2923 });
2924 }
2925
2926 #[test_group("slow")]
2927 #[test_traced]
2928 fn test_impersonator() {
2929 for seed in 0..5 {
2930 impersonator::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
2931 impersonator::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
2932 impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
2933 impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
2934 impersonator::<_, _, RoundRobin>(seed, ed25519::fixture);
2935 }
2936 }
2937
2938 fn equivocator<S, F, L>(seed: u64, mut fixture: F) -> bool
2939 where
2940 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2941 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2942 L: Elector<S>,
2943 {
2944 let n = 7;
2946 let required_containers = View::new(50);
2947 let activity_timeout = ViewDelta::new(10);
2948 let skip_timeout = ViewDelta::new(5);
2949 let namespace = b"consensus".to_vec();
2950 let cfg = deterministic::Config::new()
2951 .with_seed(seed)
2952 .with_timeout(Some(Duration::from_secs(30)));
2953 let executor = deterministic::Runner::new(cfg);
2954 executor.start(|mut context| async move {
2955 let (network, mut oracle) = Network::new(
2957 context.with_label("network"),
2958 Config {
2959 max_size: 1024 * 1024,
2960 disconnect_on_block: false,
2961 tracked_peer_sets: None,
2962 },
2963 );
2964
2965 network.start();
2967
2968 let Fixture {
2970 participants,
2971 schemes,
2972 ..
2973 } = fixture(&mut context, n);
2974 let mut registrations = register_validators(&mut oracle, &participants).await;
2975
2976 let link = Link {
2978 latency: Duration::from_millis(10),
2979 jitter: Duration::from_millis(1),
2980 success_rate: 1.0,
2981 };
2982 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2983
2984 let elector = L::default();
2986 let mut engines = Vec::new();
2987 let relay = Arc::new(mocks::relay::Relay::new());
2988 let mut reporters = Vec::new();
2989 for (idx_scheme, validator) in participants.iter().enumerate() {
2990 let context = context.with_label(&format!("validator_{}", *validator));
2992
2993 let reporter_config = mocks::reporter::Config {
2995 namespace: namespace.clone(),
2996 participants: participants.clone().try_into().unwrap(),
2997 scheme: schemes[idx_scheme].clone(),
2998 elector: elector.clone(),
2999 };
3000 let reporter =
3001 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3002 reporters.push(reporter.clone());
3003 let (pending, recovered, resolver) = registrations
3004 .remove(validator)
3005 .expect("validator should be registered");
3006 if idx_scheme == 0 {
3007 let cfg = mocks::equivocator::Config {
3008 namespace: namespace.clone(),
3009 scheme: schemes[idx_scheme].clone(),
3010 epoch: Epoch::new(333),
3011 relay: relay.clone(),
3012 hasher: Sha256::default(),
3013 elector: elector.clone(),
3014 };
3015
3016 let engine = mocks::equivocator::Equivocator::new(
3017 context.with_label("byzantine_engine"),
3018 cfg,
3019 );
3020 engines.push(engine.start(pending, recovered));
3021 } else {
3022 let application_cfg = mocks::application::Config {
3023 hasher: Sha256::default(),
3024 relay: relay.clone(),
3025 me: validator.clone(),
3026 propose_latency: (10.0, 5.0),
3027 verify_latency: (10.0, 5.0),
3028 certify_latency: (10.0, 5.0),
3029 should_certify: mocks::application::Certifier::Sometimes,
3030 };
3031 let (actor, application) = mocks::application::Application::new(
3032 context.with_label("application"),
3033 application_cfg,
3034 );
3035 actor.start();
3036 let blocker = oracle.control(validator.clone());
3037 let cfg = config::Config {
3038 scheme: schemes[idx_scheme].clone(),
3039 elector: elector.clone(),
3040 blocker,
3041 automaton: application.clone(),
3042 relay: application.clone(),
3043 reporter: reporter.clone(),
3044 partition: validator.to_string(),
3045 mailbox_size: 1024,
3046 epoch: Epoch::new(333),
3047 namespace: namespace.clone(),
3048 leader_timeout: Duration::from_secs(1),
3049 notarization_timeout: Duration::from_secs(2),
3050 nullify_retry: Duration::from_secs(10),
3051 fetch_timeout: Duration::from_secs(1),
3052 activity_timeout,
3053 skip_timeout,
3054 fetch_concurrent: 4,
3055 replay_buffer: NZUsize!(1024 * 1024),
3056 write_buffer: NZUsize!(1024 * 1024),
3057 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3058 };
3059 let engine = Engine::new(context.with_label("engine"), cfg);
3060 engines.push(engine.start(pending, recovered, resolver));
3061 }
3062 }
3063
3064 let mut finalizers = Vec::new();
3066 for reporter in reporters.iter_mut().skip(1) {
3067 let (mut latest, mut monitor) = reporter.subscribe().await;
3068 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3069 while latest < required_containers {
3070 latest = monitor.next().await.expect("event missing");
3071 }
3072 }));
3073 }
3074 join_all(finalizers).await;
3075
3076 let idx = context.gen_range(1..engines.len()); let validator = &participants[idx];
3079 let handle = engines.remove(idx);
3080 handle.abort();
3081 let _ = handle.await;
3082 reporters.remove(idx);
3083 info!(idx, ?validator, "aborted validator");
3084
3085 let mut finalizers = Vec::new();
3087 for reporter in reporters.iter_mut().skip(1) {
3088 let (mut latest, mut monitor) = reporter.subscribe().await;
3089 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3090 while latest < View::new(required_containers.get() * 2) {
3091 latest = monitor.next().await.expect("event missing");
3092 }
3093 }));
3094 }
3095 join_all(finalizers).await;
3096
3097 info!(idx, ?validator, "restarting validator");
3099 let context = context.with_label(&format!("validator_{}_restarted", *validator));
3100
3101 let reporter_config = mocks::reporter::Config {
3103 namespace: namespace.clone(),
3104 participants: participants.clone().try_into().unwrap(),
3105 scheme: schemes[idx].clone(),
3106 elector: elector.clone(),
3107 };
3108 let reporter =
3109 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3110 let (pending, recovered, resolver) =
3111 register_validator(&mut oracle, validator.clone()).await;
3112 reporters.push(reporter.clone());
3113 let application_cfg = mocks::application::Config {
3114 hasher: Sha256::default(),
3115 relay: relay.clone(),
3116 me: validator.clone(),
3117 propose_latency: (10.0, 5.0),
3118 verify_latency: (10.0, 5.0),
3119 certify_latency: (10.0, 5.0),
3120 should_certify: mocks::application::Certifier::Sometimes,
3121 };
3122 let (actor, application) = mocks::application::Application::new(
3123 context.with_label("application"),
3124 application_cfg,
3125 );
3126 actor.start();
3127 let blocker = oracle.control(validator.clone());
3128 let cfg = config::Config {
3129 scheme: schemes[idx].clone(),
3130 elector: elector.clone(),
3131 blocker,
3132 automaton: application.clone(),
3133 relay: application.clone(),
3134 reporter: reporter.clone(),
3135 partition: validator.to_string(),
3136 mailbox_size: 1024,
3137 epoch: Epoch::new(333),
3138 namespace: namespace.clone(),
3139 leader_timeout: Duration::from_secs(1),
3140 notarization_timeout: Duration::from_secs(2),
3141 nullify_retry: Duration::from_secs(10),
3142 fetch_timeout: Duration::from_secs(1),
3143 activity_timeout,
3144 skip_timeout,
3145 fetch_concurrent: 4,
3146 replay_buffer: NZUsize!(1024 * 1024),
3147 write_buffer: NZUsize!(1024 * 1024),
3148 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3149 };
3150 let engine = Engine::new(context.with_label("engine"), cfg);
3151 engine.start(pending, recovered, resolver);
3152
3153 let mut finalizers = Vec::new();
3155 for reporter in reporters.iter_mut().skip(1) {
3156 let (mut latest, mut monitor) = reporter.subscribe().await;
3157 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3158 while latest < View::new(required_containers.get() * 3) {
3159 latest = monitor.next().await.expect("event missing");
3160 }
3161 }));
3162 }
3163 join_all(finalizers).await;
3164
3165 let byz = &participants[0];
3169 let blocked = oracle.blocked().await.unwrap();
3170 for (a, b) in &blocked {
3171 assert_ne!(a, byz);
3172 assert_eq!(b, byz);
3173 }
3174 !blocked.is_empty()
3175 })
3176 }
3177
3178 #[test_group("slow")]
3179 #[test_traced]
3180 fn test_equivocator_bls12381_threshold_min_pk() {
3181 let detected = (0..5)
3182 .any(|seed| equivocator::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>));
3183 assert!(
3184 detected,
3185 "expected at least one seed to detect equivocation"
3186 );
3187 }
3188
3189 #[test_group("slow")]
3190 #[test_traced]
3191 fn test_equivocator_bls12381_threshold_min_sig() {
3192 let detected = (0..5).any(|seed| {
3193 equivocator::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>)
3194 });
3195 assert!(
3196 detected,
3197 "expected at least one seed to detect equivocation"
3198 );
3199 }
3200
3201 #[test_group("slow")]
3202 #[test_traced]
3203 fn test_equivocator_bls12381_multisig_min_pk() {
3204 let detected = (0..5).any(|seed| {
3205 equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>)
3206 });
3207 assert!(
3208 detected,
3209 "expected at least one seed to detect equivocation"
3210 );
3211 }
3212
3213 #[test_group("slow")]
3214 #[test_traced]
3215 fn test_equivocator_bls12381_multisig_min_sig() {
3216 let detected = (0..5).any(|seed| {
3217 equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>)
3218 });
3219 assert!(
3220 detected,
3221 "expected at least one seed to detect equivocation"
3222 );
3223 }
3224
3225 #[test_group("slow")]
3226 #[test_traced]
3227 fn test_equivocator_ed25519() {
3228 let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, ed25519::fixture));
3229 assert!(
3230 detected,
3231 "expected at least one seed to detect equivocation"
3232 );
3233 }
3234
3235 fn reconfigurer<S, F, L>(seed: u64, mut fixture: F)
3236 where
3237 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3238 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
3239 L: Elector<S>,
3240 {
3241 let n = 4;
3243 let required_containers = View::new(50);
3244 let activity_timeout = ViewDelta::new(10);
3245 let skip_timeout = ViewDelta::new(5);
3246 let namespace = b"consensus".to_vec();
3247 let cfg = deterministic::Config::new()
3248 .with_seed(seed)
3249 .with_timeout(Some(Duration::from_secs(30)));
3250 let executor = deterministic::Runner::new(cfg);
3251 executor.start(|mut context| async move {
3252 let (network, mut oracle) = Network::new(
3254 context.with_label("network"),
3255 Config {
3256 max_size: 1024 * 1024,
3257 disconnect_on_block: false,
3258 tracked_peer_sets: None,
3259 },
3260 );
3261
3262 network.start();
3264
3265 let Fixture {
3267 participants,
3268 schemes,
3269 ..
3270 } = fixture(&mut context, n);
3271 let mut registrations = register_validators(&mut oracle, &participants).await;
3272
3273 let link = Link {
3275 latency: Duration::from_millis(10),
3276 jitter: Duration::from_millis(1),
3277 success_rate: 1.0,
3278 };
3279 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3280
3281 let elector = L::default();
3283 let relay = Arc::new(mocks::relay::Relay::new());
3284 let mut reporters = Vec::new();
3285 for (idx_scheme, validator) in participants.iter().enumerate() {
3286 let context = context.with_label(&format!("validator_{}", *validator));
3288
3289 let reporter_config = mocks::reporter::Config {
3291 namespace: namespace.clone(),
3292 participants: participants.clone().try_into().unwrap(),
3293 scheme: schemes[idx_scheme].clone(),
3294 elector: elector.clone(),
3295 };
3296 let reporter =
3297 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3298 let (pending, recovered, resolver) = registrations
3299 .remove(validator)
3300 .expect("validator should be registered");
3301 if idx_scheme == 0 {
3302 let cfg = mocks::reconfigurer::Config {
3303 scheme: schemes[idx_scheme].clone(),
3304 namespace: namespace.clone(),
3305 };
3306 let engine: mocks::reconfigurer::Reconfigurer<_, _, Sha256> =
3307 mocks::reconfigurer::Reconfigurer::new(
3308 context.with_label("byzantine_engine"),
3309 cfg,
3310 );
3311 engine.start(pending);
3312 } else {
3313 reporters.push(reporter.clone());
3314 let application_cfg = mocks::application::Config {
3315 hasher: Sha256::default(),
3316 relay: relay.clone(),
3317 me: validator.clone(),
3318 propose_latency: (10.0, 5.0),
3319 verify_latency: (10.0, 5.0),
3320 certify_latency: (10.0, 5.0),
3321 should_certify: mocks::application::Certifier::Sometimes,
3322 };
3323 let (actor, application) = mocks::application::Application::new(
3324 context.with_label("application"),
3325 application_cfg,
3326 );
3327 actor.start();
3328 let blocker = oracle.control(validator.clone());
3329 let cfg = config::Config {
3330 scheme: schemes[idx_scheme].clone(),
3331 elector: elector.clone(),
3332 blocker,
3333 automaton: application.clone(),
3334 relay: application.clone(),
3335 reporter: reporter.clone(),
3336 partition: validator.to_string(),
3337 mailbox_size: 1024,
3338 epoch: Epoch::new(333),
3339 namespace: namespace.clone(),
3340 leader_timeout: Duration::from_secs(1),
3341 notarization_timeout: Duration::from_secs(2),
3342 nullify_retry: Duration::from_secs(10),
3343 fetch_timeout: Duration::from_secs(1),
3344 activity_timeout,
3345 skip_timeout,
3346 fetch_concurrent: 4,
3347 replay_buffer: NZUsize!(1024 * 1024),
3348 write_buffer: NZUsize!(1024 * 1024),
3349 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3350 };
3351 let engine = Engine::new(context.with_label("engine"), cfg);
3352 engine.start(pending, recovered, resolver);
3353 }
3354 }
3355
3356 let mut finalizers = Vec::new();
3358 for reporter in reporters.iter_mut() {
3359 let (mut latest, mut monitor) = reporter.subscribe().await;
3360 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3361 while latest < required_containers {
3362 latest = monitor.next().await.expect("event missing");
3363 }
3364 }));
3365 }
3366 join_all(finalizers).await;
3367
3368 let byz = &participants[0];
3370 for reporter in reporters.iter() {
3371 {
3373 let faults = reporter.faults.lock().unwrap();
3374 assert!(faults.is_empty());
3375 }
3376
3377 {
3379 let invalid = reporter.invalid.lock().unwrap();
3380 assert_eq!(*invalid, 0);
3381 }
3382 }
3383
3384 let blocked = oracle.blocked().await.unwrap();
3386 assert!(!blocked.is_empty());
3387 for (a, b) in blocked {
3388 assert_ne!(&a, byz);
3389 assert_eq!(&b, byz);
3390 }
3391 });
3392 }
3393
3394 #[test_group("slow")]
3395 #[test_traced]
3396 fn test_reconfigurer() {
3397 for seed in 0..5 {
3398 reconfigurer::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
3399 reconfigurer::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
3400 reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3401 reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3402 reconfigurer::<_, _, RoundRobin>(seed, ed25519::fixture);
3403 }
3404 }
3405
3406 fn nuller<S, F, L>(seed: u64, mut fixture: F)
3407 where
3408 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3409 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
3410 L: Elector<S>,
3411 {
3412 let n = 4;
3414 let required_containers = View::new(50);
3415 let activity_timeout = ViewDelta::new(10);
3416 let skip_timeout = ViewDelta::new(5);
3417 let namespace = b"consensus".to_vec();
3418 let cfg = deterministic::Config::new()
3419 .with_seed(seed)
3420 .with_timeout(Some(Duration::from_secs(30)));
3421 let executor = deterministic::Runner::new(cfg);
3422 executor.start(|mut context| async move {
3423 let (network, mut oracle) = Network::new(
3425 context.with_label("network"),
3426 Config {
3427 max_size: 1024 * 1024,
3428 disconnect_on_block: false,
3429 tracked_peer_sets: None,
3430 },
3431 );
3432
3433 network.start();
3435
3436 let Fixture {
3438 participants,
3439 schemes,
3440 ..
3441 } = fixture(&mut context, n);
3442 let mut registrations = register_validators(&mut oracle, &participants).await;
3443
3444 let link = Link {
3446 latency: Duration::from_millis(10),
3447 jitter: Duration::from_millis(1),
3448 success_rate: 1.0,
3449 };
3450 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3451
3452 let elector = L::default();
3454 let relay = Arc::new(mocks::relay::Relay::new());
3455 let mut reporters = Vec::new();
3456 for (idx_scheme, validator) in participants.iter().enumerate() {
3457 let context = context.with_label(&format!("validator_{}", *validator));
3459
3460 let reporter_config = mocks::reporter::Config {
3462 namespace: namespace.clone(),
3463 participants: participants.clone().try_into().unwrap(),
3464 scheme: schemes[idx_scheme].clone(),
3465 elector: elector.clone(),
3466 };
3467 let reporter =
3468 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3469 let (pending, recovered, resolver) = registrations
3470 .remove(validator)
3471 .expect("validator should be registered");
3472 if idx_scheme == 0 {
3473 let cfg = mocks::nuller::Config {
3474 namespace: namespace.clone(),
3475 scheme: schemes[idx_scheme].clone(),
3476 };
3477 let engine: mocks::nuller::Nuller<_, _, Sha256> =
3478 mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
3479 engine.start(pending);
3480 } else {
3481 reporters.push(reporter.clone());
3482 let application_cfg = mocks::application::Config {
3483 hasher: Sha256::default(),
3484 relay: relay.clone(),
3485 me: validator.clone(),
3486 propose_latency: (10.0, 5.0),
3487 verify_latency: (10.0, 5.0),
3488 certify_latency: (10.0, 5.0),
3489 should_certify: mocks::application::Certifier::Sometimes,
3490 };
3491 let (actor, application) = mocks::application::Application::new(
3492 context.with_label("application"),
3493 application_cfg,
3494 );
3495 actor.start();
3496 let blocker = oracle.control(validator.clone());
3497 let cfg = config::Config {
3498 scheme: schemes[idx_scheme].clone(),
3499 elector: elector.clone(),
3500 blocker,
3501 automaton: application.clone(),
3502 relay: application.clone(),
3503 reporter: reporter.clone(),
3504 partition: validator.clone().to_string(),
3505 mailbox_size: 1024,
3506 epoch: Epoch::new(333),
3507 namespace: namespace.clone(),
3508 leader_timeout: Duration::from_secs(1),
3509 notarization_timeout: Duration::from_secs(2),
3510 nullify_retry: Duration::from_secs(10),
3511 fetch_timeout: Duration::from_secs(1),
3512 activity_timeout,
3513 skip_timeout,
3514 fetch_concurrent: 4,
3515 replay_buffer: NZUsize!(1024 * 1024),
3516 write_buffer: NZUsize!(1024 * 1024),
3517 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3518 };
3519 let engine = Engine::new(context.with_label("engine"), cfg);
3520 engine.start(pending, recovered, resolver);
3521 }
3522 }
3523
3524 let mut finalizers = Vec::new();
3526 for reporter in reporters.iter_mut() {
3527 let (mut latest, mut monitor) = reporter.subscribe().await;
3528 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3529 while latest < required_containers {
3530 latest = monitor.next().await.expect("event missing");
3531 }
3532 }));
3533 }
3534 join_all(finalizers).await;
3535
3536 let byz = &participants[0];
3538 let mut count_nullify_and_finalize = 0;
3539 for reporter in reporters.iter() {
3540 {
3542 let faults = reporter.faults.lock().unwrap();
3543 assert_eq!(faults.len(), 1);
3544 let faulter = faults.get(byz).expect("byzantine party is not faulter");
3545 for (_, faults) in faulter.iter() {
3546 for fault in faults.iter() {
3547 match fault {
3548 Activity::NullifyFinalize(_) => {
3549 count_nullify_and_finalize += 1;
3550 }
3551 _ => panic!("unexpected fault: {fault:?}"),
3552 }
3553 }
3554 }
3555 }
3556
3557 {
3559 let invalid = reporter.invalid.lock().unwrap();
3560 assert_eq!(*invalid, 0);
3561 }
3562 }
3563 assert!(count_nullify_and_finalize > 0);
3564
3565 let blocked = oracle.blocked().await.unwrap();
3567 assert!(!blocked.is_empty());
3568 for (a, b) in blocked {
3569 assert_ne!(&a, byz);
3570 assert_eq!(&b, byz);
3571 }
3572 });
3573 }
3574
3575 #[test_group("slow")]
3576 #[test_traced]
3577 fn test_nuller() {
3578 for seed in 0..5 {
3579 nuller::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
3580 nuller::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
3581 nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3582 nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3583 nuller::<_, _, RoundRobin>(seed, ed25519::fixture);
3584 }
3585 }
3586
3587 fn outdated<S, F, L>(seed: u64, mut fixture: F)
3588 where
3589 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3590 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
3591 L: Elector<S>,
3592 {
3593 let n = 4;
3595 let required_containers = View::new(100);
3596 let activity_timeout = ViewDelta::new(10);
3597 let skip_timeout = ViewDelta::new(5);
3598 let namespace = b"consensus".to_vec();
3599 let cfg = deterministic::Config::new()
3600 .with_seed(seed)
3601 .with_timeout(Some(Duration::from_secs(30)));
3602 let executor = deterministic::Runner::new(cfg);
3603 executor.start(|mut context| async move {
3604 let (network, mut oracle) = Network::new(
3606 context.with_label("network"),
3607 Config {
3608 max_size: 1024 * 1024,
3609 disconnect_on_block: false,
3610 tracked_peer_sets: None,
3611 },
3612 );
3613
3614 network.start();
3616
3617 let Fixture {
3619 participants,
3620 schemes,
3621 ..
3622 } = fixture(&mut context, n);
3623 let mut registrations = register_validators(&mut oracle, &participants).await;
3624
3625 let link = Link {
3627 latency: Duration::from_millis(10),
3628 jitter: Duration::from_millis(1),
3629 success_rate: 1.0,
3630 };
3631 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3632
3633 let elector = L::default();
3635 let relay = Arc::new(mocks::relay::Relay::new());
3636 let mut reporters = Vec::new();
3637 for (idx_scheme, validator) in participants.iter().enumerate() {
3638 let context = context.with_label(&format!("validator_{}", *validator));
3640
3641 let reporter_config = mocks::reporter::Config {
3643 namespace: namespace.clone(),
3644 participants: participants.clone().try_into().unwrap(),
3645 scheme: schemes[idx_scheme].clone(),
3646 elector: elector.clone(),
3647 };
3648 let reporter =
3649 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3650 let (pending, recovered, resolver) = registrations
3651 .remove(validator)
3652 .expect("validator should be registered");
3653 if idx_scheme == 0 {
3654 let cfg = mocks::outdated::Config {
3655 scheme: schemes[idx_scheme].clone(),
3656 namespace: namespace.clone(),
3657 view_delta: ViewDelta::new(activity_timeout.get().saturating_mul(4)),
3658 };
3659 let engine: mocks::outdated::Outdated<_, _, Sha256> =
3660 mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
3661 engine.start(pending);
3662 } else {
3663 reporters.push(reporter.clone());
3664 let application_cfg = mocks::application::Config {
3665 hasher: Sha256::default(),
3666 relay: relay.clone(),
3667 me: validator.clone(),
3668 propose_latency: (10.0, 5.0),
3669 verify_latency: (10.0, 5.0),
3670 certify_latency: (10.0, 5.0),
3671 should_certify: mocks::application::Certifier::Sometimes,
3672 };
3673 let (actor, application) = mocks::application::Application::new(
3674 context.with_label("application"),
3675 application_cfg,
3676 );
3677 actor.start();
3678 let blocker = oracle.control(validator.clone());
3679 let cfg = config::Config {
3680 scheme: schemes[idx_scheme].clone(),
3681 elector: elector.clone(),
3682 blocker,
3683 automaton: application.clone(),
3684 relay: application.clone(),
3685 reporter: reporter.clone(),
3686 partition: validator.clone().to_string(),
3687 mailbox_size: 1024,
3688 epoch: Epoch::new(333),
3689 namespace: namespace.clone(),
3690 leader_timeout: Duration::from_secs(1),
3691 notarization_timeout: Duration::from_secs(2),
3692 nullify_retry: Duration::from_secs(10),
3693 fetch_timeout: Duration::from_secs(1),
3694 activity_timeout,
3695 skip_timeout,
3696 fetch_concurrent: 4,
3697 replay_buffer: NZUsize!(1024 * 1024),
3698 write_buffer: NZUsize!(1024 * 1024),
3699 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3700 };
3701 let engine = Engine::new(context.with_label("engine"), cfg);
3702 engine.start(pending, recovered, resolver);
3703 }
3704 }
3705
3706 let mut finalizers = Vec::new();
3708 for reporter in reporters.iter_mut() {
3709 let (mut latest, mut monitor) = reporter.subscribe().await;
3710 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3711 while latest < required_containers {
3712 latest = monitor.next().await.expect("event missing");
3713 }
3714 }));
3715 }
3716 join_all(finalizers).await;
3717
3718 for reporter in reporters.iter() {
3720 {
3722 let faults = reporter.faults.lock().unwrap();
3723 assert!(faults.is_empty());
3724 }
3725
3726 {
3728 let invalid = reporter.invalid.lock().unwrap();
3729 assert_eq!(*invalid, 0);
3730 }
3731 }
3732
3733 let blocked = oracle.blocked().await.unwrap();
3735 assert!(blocked.is_empty());
3736 });
3737 }
3738
3739 #[test_group("slow")]
3740 #[test_traced]
3741 fn test_outdated() {
3742 for seed in 0..5 {
3743 outdated::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
3744 outdated::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
3745 outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3746 outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3747 outdated::<_, _, RoundRobin>(seed, ed25519::fixture);
3748 }
3749 }
3750
3751 fn run_1k<S, F, L>(mut fixture: F)
3752 where
3753 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3754 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
3755 L: Elector<S>,
3756 {
3757 let n = 10;
3759 let required_containers = View::new(1_000);
3760 let activity_timeout = ViewDelta::new(10);
3761 let skip_timeout = ViewDelta::new(5);
3762 let namespace = b"consensus".to_vec();
3763 let cfg = deterministic::Config::new();
3764 let executor = deterministic::Runner::new(cfg);
3765 executor.start(|mut context| async move {
3766 let (network, mut oracle) = Network::new(
3768 context.with_label("network"),
3769 Config {
3770 max_size: 1024 * 1024,
3771 disconnect_on_block: false,
3772 tracked_peer_sets: None,
3773 },
3774 );
3775
3776 network.start();
3778
3779 let Fixture {
3781 participants,
3782 schemes,
3783 ..
3784 } = fixture(&mut context, n);
3785 let mut registrations = register_validators(&mut oracle, &participants).await;
3786
3787 let link = Link {
3789 latency: Duration::from_millis(80),
3790 jitter: Duration::from_millis(10),
3791 success_rate: 0.98,
3792 };
3793 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3794
3795 let elector = L::default();
3797 let relay = Arc::new(mocks::relay::Relay::new());
3798 let mut reporters = Vec::new();
3799 let mut engine_handlers = Vec::new();
3800 for (idx, validator) in participants.iter().enumerate() {
3801 let context = context.with_label(&format!("validator_{}", *validator));
3803
3804 let reporter_config = mocks::reporter::Config {
3806 namespace: namespace.clone(),
3807 participants: participants.clone().try_into().unwrap(),
3808 scheme: schemes[idx].clone(),
3809 elector: elector.clone(),
3810 };
3811 let reporter =
3812 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3813 reporters.push(reporter.clone());
3814 let application_cfg = mocks::application::Config {
3815 hasher: Sha256::default(),
3816 relay: relay.clone(),
3817 me: validator.clone(),
3818 propose_latency: (100.0, 50.0),
3819 verify_latency: (50.0, 40.0),
3820 certify_latency: (50.0, 40.0),
3821 should_certify: mocks::application::Certifier::Sometimes,
3822 };
3823 let (actor, application) = mocks::application::Application::new(
3824 context.with_label("application"),
3825 application_cfg,
3826 );
3827 actor.start();
3828 let blocker = oracle.control(validator.clone());
3829 let cfg = config::Config {
3830 scheme: schemes[idx].clone(),
3831 elector: elector.clone(),
3832 blocker,
3833 automaton: application.clone(),
3834 relay: application.clone(),
3835 reporter: reporter.clone(),
3836 partition: validator.to_string(),
3837 mailbox_size: 1024,
3838 epoch: Epoch::new(333),
3839 namespace: namespace.clone(),
3840 leader_timeout: Duration::from_secs(1),
3841 notarization_timeout: Duration::from_secs(2),
3842 nullify_retry: Duration::from_secs(10),
3843 fetch_timeout: Duration::from_secs(1),
3844 activity_timeout,
3845 skip_timeout,
3846 fetch_concurrent: 4,
3847 replay_buffer: NZUsize!(1024 * 1024),
3848 write_buffer: NZUsize!(1024 * 1024),
3849 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3850 };
3851 let engine = Engine::new(context.with_label("engine"), cfg);
3852
3853 let (pending, recovered, resolver) = registrations
3855 .remove(validator)
3856 .expect("validator should be registered");
3857 engine_handlers.push(engine.start(pending, recovered, resolver));
3858 }
3859
3860 let mut finalizers = Vec::new();
3862 for reporter in reporters.iter_mut() {
3863 let (mut latest, mut monitor) = reporter.subscribe().await;
3864 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3865 while latest < required_containers {
3866 latest = monitor.next().await.expect("event missing");
3867 }
3868 }));
3869 }
3870 join_all(finalizers).await;
3871
3872 for reporter in reporters.iter() {
3874 {
3876 let faults = reporter.faults.lock().unwrap();
3877 assert!(faults.is_empty());
3878 }
3879
3880 {
3882 let invalid = reporter.invalid.lock().unwrap();
3883 assert_eq!(*invalid, 0);
3884 }
3885 }
3886
3887 let blocked = oracle.blocked().await.unwrap();
3889 assert!(blocked.is_empty());
3890 })
3891 }
3892
3893 #[test_group("slow")]
3894 #[test_traced]
3895 fn test_1k_bls12381_threshold_min_pk() {
3896 run_1k::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
3897 }
3898
3899 #[test_group("slow")]
3900 #[test_traced]
3901 fn test_1k_bls12381_threshold_min_sig() {
3902 run_1k::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
3903 }
3904
3905 #[test_group("slow")]
3906 #[test_traced]
3907 fn test_1k_bls12381_multisig_min_pk() {
3908 run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
3909 }
3910
3911 #[test_group("slow")]
3912 #[test_traced]
3913 fn test_1k_bls12381_multisig_min_sig() {
3914 run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
3915 }
3916
3917 #[test_group("slow")]
3918 #[test_traced]
3919 fn test_1k_ed25519() {
3920 run_1k::<_, _, RoundRobin>(ed25519::fixture);
3921 }
3922
3923 fn engine_shutdown<S, F, L>(mut fixture: F, graceful: bool)
3924 where
3925 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3926 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
3927 L: Elector<S>,
3928 {
3929 let n = 1;
3931 let namespace = b"consensus".to_vec();
3932 let executor = deterministic::Runner::timed(Duration::from_secs(10));
3933 executor.start(|mut context| async move {
3934 let (network, mut oracle) = Network::new(
3936 context.with_label("network"),
3937 Config {
3938 max_size: 1024 * 1024,
3939 disconnect_on_block: true,
3940 tracked_peer_sets: None,
3941 },
3942 );
3943
3944 network.start();
3946
3947 let Fixture {
3949 participants,
3950 schemes,
3951 ..
3952 } = fixture(&mut context, n);
3953 let mut registrations = register_validators(&mut oracle, &participants).await;
3954
3955 let link = Link {
3957 latency: Duration::from_millis(1),
3958 jitter: Duration::from_millis(0),
3959 success_rate: 1.0,
3960 };
3961 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3962
3963 let elector = L::default();
3965 let reporter_config = mocks::reporter::Config {
3966 namespace: namespace.clone(),
3967 participants: participants.clone().try_into().unwrap(),
3968 scheme: schemes[0].clone(),
3969 elector: elector.clone(),
3970 };
3971 let reporter =
3972 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3973 let relay = Arc::new(mocks::relay::Relay::new());
3974 let application_cfg = mocks::application::Config {
3975 hasher: Sha256::default(),
3976 relay: relay.clone(),
3977 me: participants[0].clone(),
3978 propose_latency: (1.0, 0.0),
3979 verify_latency: (1.0, 0.0),
3980 certify_latency: (1.0, 0.0),
3981 should_certify: mocks::application::Certifier::Sometimes,
3982 };
3983 let (actor, application) = mocks::application::Application::new(
3984 context.with_label("application"),
3985 application_cfg,
3986 );
3987 actor.start();
3988 let blocker = oracle.control(participants[0].clone());
3989 let cfg = config::Config {
3990 scheme: schemes[0].clone(),
3991 elector: elector.clone(),
3992 blocker,
3993 automaton: application.clone(),
3994 relay: application.clone(),
3995 reporter: reporter.clone(),
3996 partition: participants[0].clone().to_string(),
3997 mailbox_size: 64,
3998 epoch: Epoch::new(333),
3999 namespace: namespace.clone(),
4000 leader_timeout: Duration::from_millis(50),
4001 notarization_timeout: Duration::from_millis(100),
4002 nullify_retry: Duration::from_millis(250),
4003 fetch_timeout: Duration::from_millis(50),
4004 activity_timeout: ViewDelta::new(4),
4005 skip_timeout: ViewDelta::new(2),
4006 fetch_concurrent: 4,
4007 replay_buffer: NZUsize!(1024 * 16),
4008 write_buffer: NZUsize!(1024 * 16),
4009 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4010 };
4011 let engine = Engine::new(context.with_label("engine"), cfg);
4012
4013 let (pending, recovered, resolver) = registrations
4015 .remove(&participants[0])
4016 .expect("validator should be registered");
4017 let handle = engine.start(pending, recovered, resolver);
4018
4019 context.sleep(Duration::from_millis(1000)).await;
4021
4022 let metrics_before = context.encode();
4024 let is_running = |name: &str| -> bool {
4025 metrics_before.lines().any(|line| {
4026 line.starts_with("runtime_tasks_running{")
4027 && line.contains(&format!("name=\"{name}\""))
4028 && line.contains("kind=\"Task\"")
4029 && line.trim_end().ends_with(" 1")
4030 })
4031 };
4032 assert!(is_running("engine"));
4033 assert!(is_running("engine_batcher"));
4034 assert!(is_running("engine_voter"));
4035 assert!(is_running("engine_resolver"));
4036
4037 context.sleep(Duration::from_millis(1000)).await;
4039 assert!(is_running("engine"));
4040
4041 let metrics_after = if graceful {
4043 let metrics_context = context.clone();
4044 let result = context.stop(0, Some(Duration::from_secs(5))).await;
4045 assert!(
4046 result.is_ok(),
4047 "graceful shutdown should complete: {result:?}"
4048 );
4049 metrics_context.encode()
4050 } else {
4051 handle.abort();
4052 let _ = handle.await; context.sleep(Duration::from_millis(1000)).await;
4056 context.encode()
4057 };
4058 let is_stopped = |name: &str| -> bool {
4059 metrics_after.lines().any(|line| {
4061 line.starts_with("runtime_tasks_running{")
4062 && line.contains(&format!("name=\"{name}\""))
4063 && line.contains("kind=\"Task\"")
4064 && line.trim_end().ends_with(" 0")
4065 })
4066 };
4067 assert!(is_stopped("engine"));
4068 assert!(is_stopped("engine_batcher"));
4069 assert!(is_stopped("engine_voter"));
4070 assert!(is_stopped("engine_resolver"));
4071 });
4072 }
4073
4074 #[test_traced]
4075 fn test_children_shutdown_on_engine_abort() {
4076 engine_shutdown::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>, false);
4077 engine_shutdown::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>, false);
4078 engine_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>, false);
4079 engine_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>, false);
4080 engine_shutdown::<_, _, RoundRobin>(ed25519::fixture, false);
4081 }
4082
4083 #[test_traced]
4084 fn test_graceful_shutdown() {
4085 engine_shutdown::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>, true);
4086 engine_shutdown::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>, true);
4087 engine_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>, true);
4088 engine_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>, true);
4089 engine_shutdown::<_, _, RoundRobin>(ed25519::fixture, true);
4090 }
4091
4092 fn attributable_reporter_filtering<S, F, L>(mut fixture: F)
4093 where
4094 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4095 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
4096 L: Elector<S>,
4097 {
4098 let n = 3;
4099 let required_containers = View::new(10);
4100 let activity_timeout = ViewDelta::new(10);
4101 let skip_timeout = ViewDelta::new(5);
4102 let namespace = b"consensus".to_vec();
4103 let executor = deterministic::Runner::timed(Duration::from_secs(30));
4104 executor.start(|mut context| async move {
4105 let (network, mut oracle) = Network::new(
4107 context.with_label("network"),
4108 Config {
4109 max_size: 1024 * 1024,
4110 disconnect_on_block: false,
4111 tracked_peer_sets: None,
4112 },
4113 );
4114 network.start();
4115
4116 let Fixture {
4118 participants,
4119 schemes,
4120 ..
4121 } = fixture(&mut context, n);
4122 let mut registrations = register_validators(&mut oracle, &participants).await;
4123
4124 let link = Link {
4126 latency: Duration::from_millis(10),
4127 jitter: Duration::from_millis(1),
4128 success_rate: 1.0,
4129 };
4130 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4131
4132 let elector = L::default();
4134 let relay = Arc::new(mocks::relay::Relay::new());
4135 let mut reporters = Vec::new();
4136 for (idx, validator) in participants.iter().enumerate() {
4137 let context = context.with_label(&format!("validator_{}", *validator));
4138
4139 let reporter_config = mocks::reporter::Config {
4140 namespace: namespace.clone(),
4141 participants: participants.clone().try_into().unwrap(),
4142 scheme: schemes[idx].clone(),
4143 elector: elector.clone(),
4144 };
4145 let mock_reporter = mocks::reporter::Reporter::new(
4146 context.with_label("mock_reporter"),
4147 reporter_config,
4148 );
4149
4150 let attributable_reporter = scheme::reporter::AttributableReporter::new(
4152 context.with_label("rng"),
4153 schemes[idx].clone(),
4154 namespace.clone(),
4155 mock_reporter.clone(),
4156 true, );
4158 reporters.push(mock_reporter.clone());
4159
4160 let application_cfg = mocks::application::Config {
4161 hasher: Sha256::default(),
4162 relay: relay.clone(),
4163 me: validator.clone(),
4164 propose_latency: (10.0, 5.0),
4165 verify_latency: (10.0, 5.0),
4166 certify_latency: (10.0, 5.0),
4167 should_certify: mocks::application::Certifier::Sometimes,
4168 };
4169 let (actor, application) = mocks::application::Application::new(
4170 context.with_label("application"),
4171 application_cfg,
4172 );
4173 actor.start();
4174 let blocker = oracle.control(validator.clone());
4175 let cfg = config::Config {
4176 scheme: schemes[idx].clone(),
4177 elector: elector.clone(),
4178 blocker,
4179 automaton: application.clone(),
4180 relay: application.clone(),
4181 reporter: attributable_reporter,
4182 partition: validator.to_string(),
4183 mailbox_size: 1024,
4184 epoch: Epoch::new(333),
4185 namespace: namespace.clone(),
4186 leader_timeout: Duration::from_secs(1),
4187 notarization_timeout: Duration::from_secs(2),
4188 nullify_retry: Duration::from_secs(10),
4189 fetch_timeout: Duration::from_secs(1),
4190 activity_timeout,
4191 skip_timeout,
4192 fetch_concurrent: 4,
4193 replay_buffer: NZUsize!(1024 * 1024),
4194 write_buffer: NZUsize!(1024 * 1024),
4195 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4196 };
4197 let engine = Engine::new(context.with_label("engine"), cfg);
4198
4199 let (pending, recovered, resolver) = registrations
4201 .remove(validator)
4202 .expect("validator should be registered");
4203 engine.start(pending, recovered, resolver);
4204 }
4205
4206 let mut finalizers = Vec::new();
4208 for reporter in reporters.iter_mut() {
4209 let (mut latest, mut monitor) = reporter.subscribe().await;
4210 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4211 while latest < required_containers {
4212 latest = monitor.next().await.expect("event missing");
4213 }
4214 }));
4215 }
4216 join_all(finalizers).await;
4217
4218 for reporter in reporters.iter() {
4220 {
4222 let faults = reporter.faults.lock().unwrap();
4223 assert!(faults.is_empty(), "No faults should be reported");
4224 }
4225
4226 {
4228 let invalid = reporter.invalid.lock().unwrap();
4229 assert_eq!(*invalid, 0, "No invalid signatures");
4230 }
4231
4232 {
4234 let notarizations = reporter.notarizations.lock().unwrap();
4235 let finalizations = reporter.finalizations.lock().unwrap();
4236 assert!(
4237 !notarizations.is_empty() || !finalizations.is_empty(),
4238 "Certificates should be reported"
4239 );
4240 }
4241
4242 let notarizes = reporter.notarizes.lock().unwrap();
4244 let last_view = notarizes.keys().max().cloned().unwrap_or_default();
4245 for (view, payloads) in notarizes.iter() {
4246 if *view == last_view {
4247 continue; }
4249
4250 let signers: usize = payloads.values().map(|signers| signers.len()).sum();
4251
4252 if schemes[0].is_attributable() {
4254 assert!(signers > 1, "view {view}: {signers}");
4255 } else {
4256 assert_eq!(signers, 0);
4258 }
4259 }
4260
4261 let finalizes = reporter.finalizes.lock().unwrap();
4263 for (_, payloads) in finalizes.iter() {
4264 let signers: usize = payloads.values().map(|signers| signers.len()).sum();
4265
4266 if schemes[0].is_attributable() {
4268 assert!(signers > 1);
4269 } else {
4270 assert_eq!(signers, 0);
4272 }
4273 }
4274 }
4275
4276 let blocked = oracle.blocked().await.unwrap();
4278 assert!(blocked.is_empty());
4279 });
4280 }
4281
4282 #[test_traced]
4283 fn test_attributable_reporter_filtering() {
4284 attributable_reporter_filtering::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
4285 attributable_reporter_filtering::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
4286 attributable_reporter_filtering::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4287 attributable_reporter_filtering::<_, _, RoundRobin>(
4288 bls12381_multisig::fixture::<MinSig, _>,
4289 );
4290 attributable_reporter_filtering::<_, _, RoundRobin>(ed25519::fixture);
4291 }
4292
4293 fn split_views_no_lockup<S, F, L>(mut fixture: F)
4294 where
4295 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4296 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
4297 L: Elector<S>,
4298 {
4299 enum ParticipantType {
4312 Group1, Group2, Ignorant, Byzantine, }
4317 let get_type = |idx: usize| -> ParticipantType {
4318 match idx {
4319 0..3 => ParticipantType::Group1,
4320 3..6 => ParticipantType::Group2,
4321 6 => ParticipantType::Ignorant,
4322 7..10 => ParticipantType::Byzantine,
4323 _ => unreachable!(),
4324 }
4325 };
4326
4327 let n = 10;
4329 let quorum = quorum(n) as usize;
4330 assert_eq!(quorum, 7);
4331 let activity_timeout = ViewDelta::new(10);
4332 let skip_timeout = ViewDelta::new(5);
4333 let namespace = b"consensus".to_vec();
4334 let executor = deterministic::Runner::timed(Duration::from_secs(300));
4335 executor.start(|mut context| async move {
4336 let (network, mut oracle) = Network::new(
4338 context.with_label("network"),
4339 Config {
4340 max_size: 1024 * 1024,
4341 disconnect_on_block: false,
4342 tracked_peer_sets: None,
4343 },
4344 );
4345 network.start();
4346
4347 let Fixture {
4349 participants,
4350 schemes,
4351 ..
4352 } = fixture(&mut context, n);
4353 let mut registrations = register_validators(&mut oracle, &participants).await;
4354
4355 let elector = L::default();
4361 let relay = Arc::new(mocks::relay::Relay::new());
4362 let mut honest_reporters = Vec::new();
4363 for (idx, validator) in participants.iter().enumerate() {
4364 let (pending, recovered, resolver) = registrations
4365 .remove(validator)
4366 .expect("validator should be registered");
4367 let participant_type = get_type(idx);
4368 if matches!(participant_type, ParticipantType::Byzantine) {
4369 let cfg = mocks::nullify_only::Config {
4371 scheme: schemes[idx].clone(),
4372 namespace: namespace.clone(),
4373 };
4374 let engine: mocks::nullify_only::NullifyOnly<_, _, Sha256> =
4375 mocks::nullify_only::NullifyOnly::new(
4376 context.with_label(&format!("byzantine_{}", *validator)),
4377 cfg,
4378 );
4379 engine.start(pending);
4380 drop(recovered);
4382 drop(resolver);
4383 } else {
4384 let reporter_config = mocks::reporter::Config {
4386 namespace: namespace.clone(),
4387 participants: participants.clone().try_into().unwrap(),
4388 scheme: schemes[idx].clone(),
4389 elector: elector.clone(),
4390 };
4391 let reporter = mocks::reporter::Reporter::new(
4392 context.with_label(&format!("reporter_{}", *validator)),
4393 reporter_config,
4394 );
4395 honest_reporters.push(reporter.clone());
4396
4397 let application_cfg = mocks::application::Config {
4398 hasher: Sha256::default(),
4399 relay: relay.clone(),
4400 me: validator.clone(),
4401 propose_latency: (10.0, 5.0),
4402 verify_latency: (10.0, 5.0),
4403 certify_latency: (10.0, 5.0),
4404 should_certify: mocks::application::Certifier::Sometimes,
4405 };
4406 let (actor, application) = mocks::application::Application::new(
4407 context.with_label(&format!("application_{}", *validator)),
4408 application_cfg,
4409 );
4410 actor.start();
4411 let blocker = oracle.control(validator.clone());
4412 let cfg = config::Config {
4413 scheme: schemes[idx].clone(),
4414 elector: elector.clone(),
4415 blocker,
4416 automaton: application.clone(),
4417 relay: application.clone(),
4418 reporter: reporter.clone(),
4419 partition: validator.to_string(),
4420 mailbox_size: 1024,
4421 epoch: Epoch::new(333),
4422 namespace: namespace.clone(),
4423 leader_timeout: Duration::from_secs(10),
4424 notarization_timeout: Duration::from_secs(10),
4425 nullify_retry: Duration::from_secs(10),
4426 fetch_timeout: Duration::from_secs(1),
4427 activity_timeout,
4428 skip_timeout,
4429 fetch_concurrent: 4,
4430 replay_buffer: NZUsize!(1024 * 1024),
4431 write_buffer: NZUsize!(1024 * 1024),
4432 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4433 };
4434 let engine =
4435 Engine::new(context.with_label(&format!("engine_{}", *validator)), cfg);
4436 engine.start(pending, recovered, resolver);
4437 }
4438 }
4439
4440 let build_finalization = |proposal: &Proposal<D>| -> TFinalization<_, D> {
4444 let votes: Vec<_> = (0..=quorum)
4445 .map(|i| TFinalize::sign(&schemes[i], &namespace, proposal.clone()).unwrap())
4446 .collect();
4447 TFinalization::from_finalizes(&schemes[0], &votes).expect("finalization quorum")
4448 };
4449 let build_notarization = |proposal: &Proposal<D>| -> TNotarization<_, D> {
4451 let votes: Vec<_> = (0..=quorum)
4452 .map(|i| TNotarize::sign(&schemes[i], &namespace, proposal.clone()).unwrap())
4453 .collect();
4454 TNotarization::from_notarizes(&schemes[0], &votes).expect("notarization quorum")
4455 };
4456 let build_nullification = |round: Round| -> TNullification<_> {
4457 let votes: Vec<_> = (0..=quorum)
4458 .map(|i| TNullify::sign::<D>(&schemes[i], &namespace, round).unwrap())
4459 .collect();
4460 TNullification::from_nullifies(&schemes[0], &votes).expect("nullification quorum")
4461 };
4462 let f_view = 1;
4464 let round_f = Round::new(Epoch::new(333), View::new(f_view));
4465 let payload_b0 = Sha256::hash(b"B_F");
4466 let proposal_b0 = Proposal::new(round_f, View::new(f_view - 1), payload_b0);
4467 let payload_b1a = Sha256::hash(b"B_G1");
4468 let proposal_b1a = Proposal::new(
4469 Round::new(Epoch::new(333), View::new(f_view + 1)),
4470 View::new(f_view),
4471 payload_b1a,
4472 );
4473 let payload_b1b = Sha256::hash(b"B_G2");
4474 let proposal_b1b = Proposal::new(
4475 Round::new(Epoch::new(333), View::new(f_view + 2)),
4476 View::new(f_view),
4477 payload_b1b,
4478 );
4479
4480 let b0_notarization = build_notarization(&proposal_b0);
4482 let b0_finalization = build_finalization(&proposal_b0);
4483 let b1a_notarization = build_notarization(&proposal_b1a);
4485 let b1b_notarization = build_notarization(&proposal_b1b);
4486 let null_a = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 1)));
4488 let null_b = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 2)));
4489
4490 let injector_pk = PrivateKey::from_seed(1_000_000).public_key();
4492 let (mut injector_sender, _inj_certificate_receiver) = oracle
4493 .control(injector_pk.clone())
4494 .register(1, TEST_QUOTA)
4495 .await
4496 .unwrap();
4497
4498 let link = Link {
4500 latency: Duration::from_millis(10),
4501 jitter: Duration::from_millis(0),
4502 success_rate: 1.0,
4503 };
4504 for p in participants.iter() {
4505 oracle
4506 .add_link(injector_pk.clone(), p.clone(), link.clone())
4507 .await
4508 .unwrap();
4509 }
4510
4511 let notarization_msg = Certificate::<_, D>::Notarization(b1b_notarization);
4519 let nullification_msg = Certificate::<_, D>::Nullification(null_b.clone());
4520 for (i, participant) in participants.iter().enumerate() {
4521 let recipient = Recipients::One(participant.clone());
4522 let msg = match get_type(i) {
4523 ParticipantType::Group2 => notarization_msg.encode().into(),
4524 _ => nullification_msg.encode().into(),
4525 };
4526 injector_sender.send(recipient, msg, true).await.unwrap();
4527 }
4528 let notarization_msg = Certificate::<_, D>::Notarization(b1a_notarization);
4530 let nullification_msg = Certificate::<_, D>::Nullification(null_a.clone());
4531 for (i, participant) in participants.iter().enumerate() {
4532 let recipient = Recipients::One(participant.clone());
4533 let msg = match get_type(i) {
4534 ParticipantType::Group1 => notarization_msg.encode().into(),
4535 _ => nullification_msg.encode().into(),
4536 };
4537 injector_sender.send(recipient, msg, true).await.unwrap();
4538 }
4539 let msg = Certificate::<_, D>::Notarization(b0_notarization)
4541 .encode()
4542 .into();
4543 injector_sender
4544 .send(Recipients::All, msg, true)
4545 .await
4546 .unwrap();
4547 let msg = Certificate::<_, D>::Finalization(b0_finalization)
4548 .encode()
4549 .into();
4550 injector_sender
4551 .send(Recipients::All, msg, true)
4552 .await
4553 .unwrap();
4554
4555 debug!("waiting for certificates to propagate");
4558 context.sleep(Duration::from_secs(5)).await;
4559 debug!("certificates propagated");
4560
4561 let view = View::new(f_view);
4566 for reporter in honest_reporters.iter() {
4567 let finalizations = reporter.finalizations.lock().unwrap();
4568 assert!(finalizations.contains_key(&view));
4569 }
4570
4571 let view = View::new(f_view + 1);
4575 for (i, reporter) in honest_reporters.iter().enumerate() {
4576 let finalizations = reporter.finalizations.lock().unwrap();
4577 assert!(!finalizations.contains_key(&view));
4578 let nullifications = reporter.nullifications.lock().unwrap();
4579 let notarizations = reporter.notarizations.lock().unwrap();
4580 match get_type(i) {
4581 ParticipantType::Group1 => {
4582 assert!(notarizations.contains_key(&view));
4583 assert!(!nullifications.contains_key(&view));
4584 }
4585 _ => {
4586 assert!(nullifications.contains_key(&view));
4587 assert!(!notarizations.contains_key(&view));
4588 }
4589 }
4590 }
4591
4592 let view = View::new(f_view + 2);
4596 for (i, reporter) in honest_reporters.iter().enumerate() {
4597 let finalizations = reporter.finalizations.lock().unwrap();
4598 assert!(!finalizations.contains_key(&view));
4599 let nullifications = reporter.nullifications.lock().unwrap();
4600 let notarizations = reporter.notarizations.lock().unwrap();
4601 match get_type(i) {
4602 ParticipantType::Group2 => {
4603 assert!(notarizations.contains_key(&view));
4604 assert!(!nullifications.contains_key(&view));
4605 }
4606 _ => {
4607 assert!(nullifications.contains_key(&view));
4608 assert!(!notarizations.contains_key(&view));
4609 }
4610 }
4611 }
4612
4613 let next_view = View::new(f_view + 3);
4615 for (i, reporter) in honest_reporters.iter().enumerate() {
4616 let nullifies = reporter.nullifies.lock().unwrap();
4617 assert!(!nullifies.contains_key(&next_view), "reporter {i}");
4618 }
4619
4620 link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
4624
4625 {
4627 let target = View::new(f_view + 3);
4628 let mut finalizers = Vec::new();
4629 for reporter in honest_reporters.iter_mut() {
4630 let (mut latest, mut monitor) = reporter.subscribe().await;
4631 finalizers.push(context.with_label("resume_finalizer").spawn(
4632 move |_| async move {
4633 while latest < target {
4634 latest = monitor.next().await.expect("event missing");
4635 }
4636 },
4637 ));
4638 }
4639 join_all(finalizers).await;
4640 }
4641
4642 for reporter in honest_reporters.iter() {
4644 {
4645 let faults = reporter.faults.lock().unwrap();
4646 assert!(faults.is_empty());
4647 }
4648 {
4649 let invalid = reporter.invalid.lock().unwrap();
4650 assert_eq!(*invalid, 0);
4651 }
4652 }
4653 let blocked = oracle.blocked().await.unwrap();
4654 assert!(blocked.is_empty());
4655 });
4656 }
4657
4658 #[test_traced]
4659 fn test_split_views_no_lockup() {
4660 split_views_no_lockup::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
4661 split_views_no_lockup::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
4662 split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4663 split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
4664 split_views_no_lockup::<_, _, RoundRobin>(ed25519::fixture);
4665 }
4666
4667 fn tle<V, L>()
4668 where
4669 V: Variant,
4670 L: Elector<bls12381_threshold::Scheme<PublicKey, V>>,
4671 {
4672 let n = 4;
4674 let namespace = b"consensus".to_vec();
4675 let activity_timeout = ViewDelta::new(100);
4676 let skip_timeout = ViewDelta::new(50);
4677 let executor = deterministic::Runner::timed(Duration::from_secs(30));
4678 executor.start(|mut context| async move {
4679 let (network, mut oracle) = Network::new(
4681 context.with_label("network"),
4682 Config {
4683 max_size: 1024 * 1024,
4684 disconnect_on_block: false,
4685 tracked_peer_sets: None,
4686 },
4687 );
4688
4689 network.start();
4691
4692 let Fixture {
4694 participants,
4695 schemes,
4696 ..
4697 } = bls12381_threshold::fixture::<V, _>(&mut context, n);
4698 let mut registrations = register_validators(&mut oracle, &participants).await;
4699
4700 let link = Link {
4702 latency: Duration::from_millis(10),
4703 jitter: Duration::from_millis(5),
4704 success_rate: 1.0,
4705 };
4706 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4707
4708 let elector = L::default();
4710 let relay = Arc::new(mocks::relay::Relay::new());
4711 let mut reporters = Vec::new();
4712 let mut engine_handlers = Vec::new();
4713 let monitor_reporter = Arc::new(Mutex::new(None));
4714 for (idx, validator) in participants.iter().enumerate() {
4715 let context = context.with_label(&format!("validator_{}", *validator));
4717
4718 let reporter_config = mocks::reporter::Config {
4720 namespace: namespace.clone(),
4721 participants: participants.clone().try_into().unwrap(),
4722 scheme: schemes[idx].clone(),
4723 elector: elector.clone(),
4724 };
4725 let reporter =
4726 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
4727 reporters.push(reporter.clone());
4728 if idx == 0 {
4729 *monitor_reporter.lock().unwrap() = Some(reporter.clone());
4730 }
4731
4732 let application_cfg = mocks::application::Config {
4734 hasher: Sha256::default(),
4735 relay: relay.clone(),
4736 me: validator.clone(),
4737 propose_latency: (10.0, 5.0),
4738 verify_latency: (10.0, 5.0),
4739 certify_latency: (10.0, 5.0),
4740 should_certify: mocks::application::Certifier::Sometimes,
4741 };
4742 let (actor, application) = mocks::application::Application::new(
4743 context.with_label("application"),
4744 application_cfg,
4745 );
4746 actor.start();
4747 let blocker = oracle.control(validator.clone());
4748 let cfg = config::Config {
4749 scheme: schemes[idx].clone(),
4750 elector: elector.clone(),
4751 blocker,
4752 automaton: application.clone(),
4753 relay: application.clone(),
4754 reporter: reporter.clone(),
4755 partition: validator.to_string(),
4756 mailbox_size: 1024,
4757 epoch: Epoch::new(333),
4758 namespace: namespace.clone(),
4759 leader_timeout: Duration::from_millis(100),
4760 notarization_timeout: Duration::from_millis(200),
4761 nullify_retry: Duration::from_millis(500),
4762 fetch_timeout: Duration::from_millis(100),
4763 activity_timeout,
4764 skip_timeout,
4765 fetch_concurrent: 4,
4766 replay_buffer: NZUsize!(1024 * 1024),
4767 write_buffer: NZUsize!(1024 * 1024),
4768 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4769 };
4770 let engine = Engine::new(context.with_label("engine"), cfg);
4771
4772 let (pending, recovered, resolver) = registrations
4774 .remove(validator)
4775 .expect("validator should be registered");
4776 engine_handlers.push(engine.start(pending, recovered, resolver));
4777 }
4778
4779 let target = Round::new(Epoch::new(333), View::new(10)); let message = b"Secret message for future view10"; let ciphertext = schemes[0].encrypt(&mut context, &namespace, target, *message);
4785
4786 let reporter = monitor_reporter.lock().unwrap().clone().unwrap();
4788 loop {
4789 context.sleep(Duration::from_millis(100)).await;
4791 let notarizations = reporter.notarizations.lock().unwrap();
4792 let Some(notarization) = notarizations.get(&target.view()) else {
4793 continue;
4794 };
4795
4796 let seed = notarization.seed();
4798 let decrypted = seed
4799 .decrypt(&ciphertext)
4800 .expect("Decryption should succeed with valid seed signature");
4801 assert_eq!(
4802 message,
4803 decrypted.as_ref(),
4804 "Decrypted message should match original message"
4805 );
4806 break;
4807 }
4808 });
4809 }
4810
4811 #[test_traced]
4812 fn test_tle() {
4813 tle::<MinPk, Random>();
4814 tle::<MinSig, Random>();
4815 }
4816
4817 fn hailstorm<S, F, L>(
4818 seed: u64,
4819 shutdowns: usize,
4820 interval: ViewDelta,
4821 mut fixture: F,
4822 ) -> String
4823 where
4824 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4825 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
4826 L: Elector<S>,
4827 {
4828 let n = 5;
4830 let activity_timeout = ViewDelta::new(10);
4831 let skip_timeout = ViewDelta::new(5);
4832 let namespace = b"consensus".to_vec();
4833 let cfg = deterministic::Config::new().with_seed(seed);
4834 let executor = deterministic::Runner::new(cfg);
4835 executor.start(|mut context| async move {
4836 let (network, mut oracle) = Network::new(
4838 context.with_label("network"),
4839 Config {
4840 max_size: 1024 * 1024,
4841 disconnect_on_block: true,
4842 tracked_peer_sets: None,
4843 },
4844 );
4845
4846 network.start();
4848
4849 let Fixture {
4851 participants,
4852 schemes,
4853 ..
4854 } = fixture(&mut context, n);
4855 let mut registrations = register_validators(&mut oracle, &participants).await;
4856
4857 let link = Link {
4859 latency: Duration::from_millis(10),
4860 jitter: Duration::from_millis(1),
4861 success_rate: 1.0,
4862 };
4863 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4864
4865 let elector = L::default();
4867 let relay = Arc::new(mocks::relay::Relay::new());
4868 let mut reporters = BTreeMap::new();
4869 let mut engine_handlers = BTreeMap::new();
4870 for (idx, validator) in participants.iter().enumerate() {
4871 let context = context.with_label(&format!("validator_{}", *validator));
4873
4874 let reporter_config = mocks::reporter::Config {
4876 namespace: namespace.clone(),
4877 participants: participants.clone().try_into().unwrap(),
4878 scheme: schemes[idx].clone(),
4879 elector: elector.clone(),
4880 };
4881 let reporter =
4882 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
4883 reporters.insert(idx, reporter.clone());
4884 let application_cfg = mocks::application::Config {
4885 hasher: Sha256::default(),
4886 relay: relay.clone(),
4887 me: validator.clone(),
4888 propose_latency: (10.0, 5.0),
4889 verify_latency: (10.0, 5.0),
4890 certify_latency: (10.0, 5.0),
4891 should_certify: mocks::application::Certifier::Sometimes,
4892 };
4893 let (actor, application) = mocks::application::Application::new(
4894 context.with_label("application"),
4895 application_cfg,
4896 );
4897 actor.start();
4898 let blocker = oracle.control(validator.clone());
4899 let cfg = config::Config {
4900 scheme: schemes[idx].clone(),
4901 elector: elector.clone(),
4902 blocker,
4903 automaton: application.clone(),
4904 relay: application.clone(),
4905 reporter: reporter.clone(),
4906 partition: validator.to_string(),
4907 mailbox_size: 1024,
4908 epoch: Epoch::new(333),
4909 namespace: namespace.clone(),
4910 leader_timeout: Duration::from_secs(1),
4911 notarization_timeout: Duration::from_secs(2),
4912 nullify_retry: Duration::from_secs(10),
4913 fetch_timeout: Duration::from_secs(1),
4914 activity_timeout,
4915 skip_timeout,
4916 fetch_concurrent: 4,
4917 replay_buffer: NZUsize!(1024 * 1024),
4918 write_buffer: NZUsize!(1024 * 1024),
4919 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4920 };
4921 let engine = Engine::new(context.with_label("engine"), cfg);
4922
4923 let (pending, recovered, resolver) = registrations
4925 .remove(validator)
4926 .expect("validator should be registered");
4927 engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
4928 }
4929
4930 let mut target = View::zero();
4932 for i in 0..shutdowns {
4933 target = target.saturating_add(interval);
4935
4936 let mut finalizers = Vec::new();
4938 for (_, reporter) in reporters.iter_mut() {
4939 let (mut latest, mut monitor) = reporter.subscribe().await;
4940 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4941 while latest < target {
4942 latest = monitor.next().await.expect("event missing");
4943 }
4944 }));
4945 }
4946 join_all(finalizers).await;
4947 target = target.saturating_add(interval);
4948
4949 let idx = context.gen_range(0..engine_handlers.len());
4951 let validator = &participants[idx];
4952 let handle = engine_handlers.remove(&idx).unwrap();
4953 handle.abort();
4954 let _ = handle.await;
4955 let selected_reporter = reporters.remove(&idx).unwrap();
4956 info!(idx, ?validator, "shutdown validator");
4957
4958 let mut finalizers = Vec::new();
4960 for (_, reporter) in reporters.iter_mut() {
4961 let (mut latest, mut monitor) = reporter.subscribe().await;
4962 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4963 while latest < target {
4964 latest = monitor.next().await.expect("event missing");
4965 }
4966 }));
4967 }
4968 join_all(finalizers).await;
4969 target = target.saturating_add(interval);
4970
4971 info!(idx, ?validator, "restarting validator");
4973 let context =
4974 context.with_label(&format!("validator_{}_restarted_{}", *validator, i));
4975
4976 let (pending, recovered, resolver) =
4978 register_validator(&mut oracle, validator.clone()).await;
4979 let application_cfg = mocks::application::Config {
4980 hasher: Sha256::default(),
4981 relay: relay.clone(),
4982 me: validator.clone(),
4983 propose_latency: (10.0, 5.0),
4984 verify_latency: (10.0, 5.0),
4985 certify_latency: (10.0, 5.0),
4986 should_certify: mocks::application::Certifier::Sometimes,
4987 };
4988 let (actor, application) = mocks::application::Application::new(
4989 context.with_label("application"),
4990 application_cfg,
4991 );
4992 actor.start();
4993 reporters.insert(idx, selected_reporter.clone());
4994 let blocker = oracle.control(validator.clone());
4995 let cfg = config::Config {
4996 scheme: schemes[idx].clone(),
4997 elector: elector.clone(),
4998 blocker,
4999 automaton: application.clone(),
5000 relay: application.clone(),
5001 reporter: selected_reporter,
5002 partition: validator.to_string(),
5003 mailbox_size: 1024,
5004 epoch: Epoch::new(333),
5005 namespace: namespace.clone(),
5006 leader_timeout: Duration::from_secs(1),
5007 notarization_timeout: Duration::from_secs(2),
5008 nullify_retry: Duration::from_secs(10),
5009 fetch_timeout: Duration::from_secs(1),
5010 activity_timeout,
5011 skip_timeout,
5012 fetch_concurrent: 4,
5013 replay_buffer: NZUsize!(1024 * 1024),
5014 write_buffer: NZUsize!(1024 * 1024),
5015 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5016 };
5017 let engine = Engine::new(context.with_label("engine"), cfg);
5018 engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
5019
5020 let mut finalizers = Vec::new();
5022 for (_, reporter) in reporters.iter_mut() {
5023 let (mut latest, mut monitor) = reporter.subscribe().await;
5024 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5025 while latest < target {
5026 latest = monitor.next().await.expect("event missing");
5027 }
5028 }));
5029 }
5030 join_all(finalizers).await;
5031 info!(idx, ?validator, "validator recovered");
5032 }
5033
5034 let latest_complete = target.saturating_sub(activity_timeout);
5036 for (_, reporter) in reporters.iter() {
5037 {
5039 let faults = reporter.faults.lock().unwrap();
5040 assert!(faults.is_empty());
5041 }
5042
5043 {
5045 let invalid = reporter.invalid.lock().unwrap();
5046 assert_eq!(*invalid, 0);
5047 }
5048
5049 let mut notarized = HashMap::new();
5051 let mut finalized = HashMap::new();
5052 {
5053 let notarizes = reporter.notarizes.lock().unwrap();
5054 for view in View::range(View::new(1), latest_complete) {
5055 let Some(payloads) = notarizes.get(&view) else {
5057 continue;
5058 };
5059 if payloads.len() > 1 {
5060 panic!("view: {view}");
5061 }
5062 let (digest, _) = payloads.iter().next().unwrap();
5063 notarized.insert(view, *digest);
5064 }
5065 }
5066 {
5067 let notarizations = reporter.notarizations.lock().unwrap();
5068 for view in View::range(View::new(1), latest_complete) {
5069 let Some(notarization) = notarizations.get(&view) else {
5071 continue;
5072 };
5073 let Some(digest) = notarized.get(&view) else {
5074 continue;
5075 };
5076 assert_eq!(¬arization.proposal.payload, digest);
5077 }
5078 }
5079 {
5080 let finalizes = reporter.finalizes.lock().unwrap();
5081 for view in View::range(View::new(1), latest_complete) {
5082 let Some(payloads) = finalizes.get(&view) else {
5084 continue;
5085 };
5086 if payloads.len() > 1 {
5087 panic!("view: {view}");
5088 }
5089 let (digest, _) = payloads.iter().next().unwrap();
5090 finalized.insert(view, *digest);
5091
5092 if view > latest_complete {
5094 continue;
5095 }
5096
5097 let nullifies = reporter.nullifies.lock().unwrap();
5099 let Some(nullifies) = nullifies.get(&view) else {
5100 continue;
5101 };
5102 for (_, finalizers) in payloads.iter() {
5103 for finalizer in finalizers.iter() {
5104 if nullifies.contains(finalizer) {
5105 panic!("should not nullify and finalize at same view");
5106 }
5107 }
5108 }
5109 }
5110 }
5111 {
5112 let finalizations = reporter.finalizations.lock().unwrap();
5113 for view in View::range(View::new(1), latest_complete) {
5114 let Some(finalization) = finalizations.get(&view) else {
5116 continue;
5117 };
5118 let Some(digest) = finalized.get(&view) else {
5119 continue;
5120 };
5121 assert_eq!(&finalization.proposal.payload, digest);
5122 }
5123 }
5124 }
5125
5126 let blocked = oracle.blocked().await.unwrap();
5128 assert!(blocked.is_empty());
5129
5130 context.auditor().state()
5132 })
5133 }
5134
5135 #[test_group("slow")]
5136 #[test_traced]
5137 fn test_hailstorm_bls12381_threshold_min_pk() {
5138 assert_eq!(
5139 hailstorm::<_, _, Random>(
5140 0,
5141 10,
5142 ViewDelta::new(15),
5143 bls12381_threshold::fixture::<MinPk, _>
5144 ),
5145 hailstorm::<_, _, Random>(
5146 0,
5147 10,
5148 ViewDelta::new(15),
5149 bls12381_threshold::fixture::<MinPk, _>
5150 ),
5151 );
5152 }
5153
5154 #[test_group("slow")]
5155 #[test_traced]
5156 fn test_hailstorm_bls12381_threshold_min_sig() {
5157 assert_eq!(
5158 hailstorm::<_, _, Random>(
5159 0,
5160 10,
5161 ViewDelta::new(15),
5162 bls12381_threshold::fixture::<MinSig, _>
5163 ),
5164 hailstorm::<_, _, Random>(
5165 0,
5166 10,
5167 ViewDelta::new(15),
5168 bls12381_threshold::fixture::<MinSig, _>
5169 ),
5170 );
5171 }
5172
5173 #[test_group("slow")]
5174 #[test_traced]
5175 fn test_hailstorm_bls12381_multisig_min_pk() {
5176 assert_eq!(
5177 hailstorm::<_, _, RoundRobin>(
5178 0,
5179 10,
5180 ViewDelta::new(15),
5181 bls12381_multisig::fixture::<MinPk, _>
5182 ),
5183 hailstorm::<_, _, RoundRobin>(
5184 0,
5185 10,
5186 ViewDelta::new(15),
5187 bls12381_multisig::fixture::<MinPk, _>
5188 ),
5189 );
5190 }
5191
5192 #[test_group("slow")]
5193 #[test_traced]
5194 fn test_hailstorm_bls12381_multisig_min_sig() {
5195 assert_eq!(
5196 hailstorm::<_, _, RoundRobin>(
5197 0,
5198 10,
5199 ViewDelta::new(15),
5200 bls12381_multisig::fixture::<MinSig, _>
5201 ),
5202 hailstorm::<_, _, RoundRobin>(
5203 0,
5204 10,
5205 ViewDelta::new(15),
5206 bls12381_multisig::fixture::<MinSig, _>
5207 ),
5208 );
5209 }
5210
5211 #[test_group("slow")]
5212 #[test_traced]
5213 fn test_hailstorm_ed25519() {
5214 assert_eq!(
5215 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture),
5216 hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture)
5217 );
5218 }
5219
5220 fn twins<S, F, L>(seed: u64, n: u32, strategy: Strategy, link: Link, mut fixture: F)
5222 where
5223 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5224 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
5225 L: Elector<S>,
5226 {
5227 let faults = max_faults(n);
5228 let required_containers = View::new(100);
5229 let activity_timeout = ViewDelta::new(10);
5230 let skip_timeout = ViewDelta::new(5);
5231 let namespace = b"consensus".to_vec();
5232 let cfg = deterministic::Config::new()
5233 .with_seed(seed)
5234 .with_timeout(Some(Duration::from_secs(900)));
5235 let executor = deterministic::Runner::new(cfg);
5236 executor.start(|mut context| async move {
5237 let (network, mut oracle) = Network::new(
5238 context.with_label("network"),
5239 Config {
5240 max_size: 1024 * 1024,
5241 disconnect_on_block: false,
5242 tracked_peer_sets: None,
5243 },
5244 );
5245 network.start();
5246
5247 let Fixture {
5248 participants,
5249 schemes,
5250 ..
5251 } = fixture(&mut context, n);
5252 let participants: Arc<[_]> = participants.into();
5253 let mut registrations = register_validators(&mut oracle, &participants).await;
5254 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5255
5256 let elector = L::default();
5259 let relay = Arc::new(mocks::relay::Relay::new());
5260 let mut reporters = Vec::new();
5261 let mut engine_handlers = Vec::new();
5262
5263 for (idx, validator) in participants.iter().enumerate().take(faults as usize) {
5265 let (
5266 (vote_sender, vote_receiver),
5267 (certificate_sender, certificate_receiver),
5268 (resolver_sender, resolver_receiver),
5269 ) = registrations
5270 .remove(validator)
5271 .expect("validator should be registered");
5272
5273 let make_vote_forwarder = || {
5275 let participants = participants.clone();
5276 move |origin: SplitOrigin, _: &Recipients<_>, message: &Bytes| {
5277 let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
5278 let (primary, secondary) =
5279 strategy.partitions(msg.view(), participants.as_ref());
5280 match origin {
5281 SplitOrigin::Primary => Some(Recipients::Some(primary)),
5282 SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
5283 }
5284 }
5285 };
5286 let make_certificate_forwarder = || {
5288 let codec = schemes[idx].certificate_codec_config();
5289 let participants = participants.clone();
5290 move |origin: SplitOrigin, _: &Recipients<_>, message: &Bytes| {
5291 let msg: Certificate<S, D> =
5292 Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
5293 let (primary, secondary) =
5294 strategy.partitions(msg.view(), participants.as_ref());
5295 match origin {
5296 SplitOrigin::Primary => Some(Recipients::Some(primary)),
5297 SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
5298 }
5299 }
5300 };
5301 let make_drop_forwarder =
5302 || move |_: SplitOrigin, _: &Recipients<_>, _: &Bytes| None;
5303
5304 let make_vote_router = || {
5306 let participants = participants.clone();
5307 move |(sender, message): &(_, Bytes)| {
5308 let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
5309 strategy.route(msg.view(), sender, participants.as_ref())
5310 }
5311 };
5312 let make_certificate_router = || {
5314 let codec = schemes[idx].certificate_codec_config();
5315 let participants = participants.clone();
5316 move |(sender, message): &(_, Bytes)| {
5317 let msg: Certificate<S, D> =
5318 Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
5319 strategy.route(msg.view(), sender, participants.as_ref())
5320 }
5321 };
5322 let make_drop_router = || move |(_, _): &(_, _)| SplitTarget::None;
5323
5324 let (vote_sender_primary, vote_sender_secondary) =
5326 vote_sender.split_with(make_vote_forwarder());
5327 let (vote_receiver_primary, vote_receiver_secondary) = vote_receiver.split_with(
5328 context.with_label(&format!("pending_split_{idx}")),
5329 make_vote_router(),
5330 );
5331 let (certificate_sender_primary, certificate_sender_secondary) =
5332 certificate_sender.split_with(make_certificate_forwarder());
5333 let (certificate_receiver_primary, certificate_receiver_secondary) =
5334 certificate_receiver.split_with(
5335 context.with_label(&format!("recovered_split_{idx}")),
5336 make_certificate_router(),
5337 );
5338
5339 let (resolver_sender_primary, resolver_sender_secondary) =
5341 resolver_sender.split_with(make_drop_forwarder());
5342 let (resolver_receiver_primary, resolver_receiver_secondary) = resolver_receiver
5343 .split_with(
5344 context.with_label(&format!("resolver_split_{idx}")),
5345 make_drop_router(),
5346 );
5347
5348 for (twin_label, pending, recovered, resolver) in [
5349 (
5350 "primary",
5351 (vote_sender_primary, vote_receiver_primary),
5352 (certificate_sender_primary, certificate_receiver_primary),
5353 (resolver_sender_primary, resolver_receiver_primary),
5354 ),
5355 (
5356 "secondary",
5357 (vote_sender_secondary, vote_receiver_secondary),
5358 (certificate_sender_secondary, certificate_receiver_secondary),
5359 (resolver_sender_secondary, resolver_receiver_secondary),
5360 ),
5361 ] {
5362 let label = format!("twin_{idx}_{twin_label}");
5363 let context = context.with_label(&label);
5364
5365 let reporter_config = mocks::reporter::Config {
5366 namespace: namespace.clone(),
5367 participants: participants.as_ref().try_into().unwrap(),
5368 scheme: schemes[idx].clone(),
5369 elector: elector.clone(),
5370 };
5371 let reporter = mocks::reporter::Reporter::new(
5372 context.with_label("reporter"),
5373 reporter_config,
5374 );
5375 reporters.push(reporter.clone());
5376
5377 let application_cfg = mocks::application::Config {
5378 hasher: Sha256::default(),
5379 relay: relay.clone(),
5380 me: validator.clone(),
5381 propose_latency: (10.0, 5.0),
5382 verify_latency: (10.0, 5.0),
5383 certify_latency: (10.0, 5.0),
5384 should_certify: mocks::application::Certifier::Sometimes,
5385 };
5386 let (actor, application) = mocks::application::Application::new(
5387 context.with_label("application"),
5388 application_cfg,
5389 );
5390 actor.start();
5391
5392 let blocker = oracle.control(validator.clone());
5393 let cfg = config::Config {
5394 scheme: schemes[idx].clone(),
5395 elector: elector.clone(),
5396 blocker,
5397 automaton: application.clone(),
5398 relay: application.clone(),
5399 reporter: reporter.clone(),
5400 partition: label,
5401 mailbox_size: 1024,
5402 epoch: Epoch::new(333),
5403 namespace: namespace.clone(),
5404 leader_timeout: Duration::from_secs(1),
5405 notarization_timeout: Duration::from_secs(2),
5406 nullify_retry: Duration::from_secs(10),
5407 fetch_timeout: Duration::from_secs(1),
5408 activity_timeout,
5409 skip_timeout,
5410 fetch_concurrent: 4,
5411 replay_buffer: NZUsize!(1024 * 1024),
5412 write_buffer: NZUsize!(1024 * 1024),
5413 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5414 };
5415 let engine = Engine::new(context.with_label("engine"), cfg);
5416 engine_handlers.push(engine.start(pending, recovered, resolver));
5417 }
5418 }
5419
5420 for (idx, validator) in participants.iter().enumerate().skip(faults as usize) {
5422 let label = format!("honest_{idx}");
5423 let context = context.with_label(&label);
5424
5425 let reporter_config = mocks::reporter::Config {
5426 namespace: namespace.clone(),
5427 participants: participants.as_ref().try_into().unwrap(),
5428 scheme: schemes[idx].clone(),
5429 elector: elector.clone(),
5430 };
5431 let reporter =
5432 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5433 reporters.push(reporter.clone());
5434
5435 let application_cfg = mocks::application::Config {
5436 hasher: Sha256::default(),
5437 relay: relay.clone(),
5438 me: validator.clone(),
5439 propose_latency: (10.0, 5.0),
5440 verify_latency: (10.0, 5.0),
5441 certify_latency: (10.0, 5.0),
5442 should_certify: mocks::application::Certifier::Sometimes,
5443 };
5444 let (actor, application) = mocks::application::Application::new(
5445 context.with_label("application"),
5446 application_cfg,
5447 );
5448 actor.start();
5449
5450 let blocker = oracle.control(validator.clone());
5451 let cfg = config::Config {
5452 scheme: schemes[idx].clone(),
5453 elector: elector.clone(),
5454 blocker,
5455 automaton: application.clone(),
5456 relay: application.clone(),
5457 reporter: reporter.clone(),
5458 partition: label,
5459 mailbox_size: 1024,
5460 epoch: Epoch::new(333),
5461 namespace: namespace.clone(),
5462 leader_timeout: Duration::from_secs(1),
5463 notarization_timeout: Duration::from_secs(2),
5464 nullify_retry: Duration::from_secs(10),
5465 fetch_timeout: Duration::from_secs(1),
5466 activity_timeout,
5467 skip_timeout,
5468 fetch_concurrent: 4,
5469 replay_buffer: NZUsize!(1024 * 1024),
5470 write_buffer: NZUsize!(1024 * 1024),
5471 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5472 };
5473 let engine = Engine::new(context.with_label("engine"), cfg);
5474
5475 let (pending, recovered, resolver) = registrations
5476 .remove(validator)
5477 .expect("validator should be registered");
5478 engine_handlers.push(engine.start(pending, recovered, resolver));
5479 }
5480
5481 let mut finalizers = Vec::new();
5483 for reporter in reporters.iter_mut() {
5484 let (mut latest, mut monitor) = reporter.subscribe().await;
5485 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5486 while latest < required_containers {
5487 latest = monitor.next().await.expect("event missing");
5488 }
5489 }));
5490 }
5491 join_all(finalizers).await;
5492
5493 let honest_start = faults as usize * 2; let mut finalized_at_view: BTreeMap<View, D> = BTreeMap::new();
5496 for reporter in reporters.iter().skip(honest_start) {
5497 let finalizations = reporter.finalizations.lock().unwrap();
5498 for (view, finalization) in finalizations.iter() {
5499 let digest = finalization.proposal.payload;
5500 if let Some(existing) = finalized_at_view.get(view) {
5501 assert_eq!(
5502 existing, &digest,
5503 "safety violation: conflicting finalizations at view {view}"
5504 );
5505 } else {
5506 finalized_at_view.insert(*view, digest);
5507 }
5508 }
5509 }
5510
5511 for reporter in reporters.iter().skip(honest_start) {
5513 let invalid = reporter.invalid.lock().unwrap();
5514 assert_eq!(*invalid, 0, "invalid signatures detected");
5515 }
5516
5517 let twin_identities: Vec<_> = participants.iter().take(faults as usize).collect();
5519 for reporter in reporters.iter().skip(honest_start) {
5520 let faults = reporter.faults.lock().unwrap();
5521 for (faulter, _) in faults.iter() {
5522 assert!(
5523 twin_identities.contains(&faulter),
5524 "fault from non-twin participant"
5525 );
5526 }
5527 }
5528
5529 let blocked = oracle.blocked().await.unwrap();
5531 for (_, faulter) in blocked.iter() {
5532 assert!(
5533 twin_identities.contains(&faulter),
5534 "blocked connection from non-twin participant"
5535 );
5536 }
5537 });
5538 }
5539
5540 fn test_twins<S, F, L>(mut fixture: F)
5541 where
5542 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5543 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
5544 L: Elector<S>,
5545 {
5546 for strategy in [
5547 Strategy::View,
5548 Strategy::Fixed(3),
5549 Strategy::Isolate(4),
5550 Strategy::Broadcast,
5551 Strategy::Shuffle,
5552 ] {
5553 for link in [
5554 Link {
5555 latency: Duration::from_millis(10),
5556 jitter: Duration::from_millis(1),
5557 success_rate: 1.0,
5558 },
5559 Link {
5560 latency: Duration::from_millis(200),
5561 jitter: Duration::from_millis(150),
5562 success_rate: 0.75,
5563 },
5564 ] {
5565 twins::<S, _, L>(0, 5, strategy, link, |context, n| fixture(context, n));
5566 }
5567 }
5568 }
5569
5570 #[test_group("slow")]
5571 #[test_traced]
5572 fn test_twins_multisig_min_pk() {
5573 test_twins::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
5574 }
5575
5576 #[test_group("slow")]
5577 #[test_traced]
5578 fn test_twins_multisig_min_sig() {
5579 test_twins::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
5580 }
5581
5582 #[test_group("slow")]
5583 #[test_traced]
5584 fn test_twins_threshold_min_pk() {
5585 test_twins::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
5586 }
5587
5588 #[test_group("slow")]
5589 #[test_traced]
5590 fn test_twins_threshold_min_sig() {
5591 test_twins::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
5592 }
5593
5594 #[test_group("slow")]
5595 #[test_traced]
5596 fn test_twins_ed25519() {
5597 test_twins::<_, _, RoundRobin>(ed25519::fixture);
5598 }
5599
5600 #[test_group("slow")]
5601 #[test_traced]
5602 fn test_twins_large_view() {
5603 twins::<_, _, Random>(
5604 0,
5605 10,
5606 Strategy::View,
5607 Link {
5608 latency: Duration::from_millis(200),
5609 jitter: Duration::from_millis(150),
5610 success_rate: 0.75,
5611 },
5612 bls12381_threshold::fixture::<MinPk, _>,
5613 );
5614 }
5615
5616 #[test_group("slow")]
5617 #[test_traced]
5618 fn test_twins_large_shuffle() {
5619 twins::<_, _, Random>(
5620 0,
5621 10,
5622 Strategy::Shuffle,
5623 Link {
5624 latency: Duration::from_millis(200),
5625 jitter: Duration::from_millis(150),
5626 success_rate: 0.75,
5627 },
5628 bls12381_threshold::fixture::<MinPk, _>,
5629 );
5630 }
5631}