1pub mod signing_scheme;
194pub mod types;
195
196cfg_if::cfg_if! {
197 if #[cfg(not(target_arch = "wasm32"))] {
198 mod actors;
199 mod config;
200 pub use config::Config;
201 mod engine;
202 pub use engine::Engine;
203 mod metrics;
204 }
205}
206
207#[cfg(test)]
208pub mod mocks;
209
210use crate::types::{Round, View};
211use commonware_codec::Encode;
212use signing_scheme::Scheme;
213
214pub(crate) fn min_active(activity_timeout: View, last_finalized: View) -> View {
216 last_finalized.saturating_sub(activity_timeout)
217}
218
219pub(crate) fn interesting(
223 activity_timeout: View,
224 last_finalized: View,
225 current: View,
226 pending: View,
227 allow_future: bool,
228) -> bool {
229 if pending < min_active(activity_timeout, last_finalized) {
230 return false;
231 }
232 if !allow_future && pending > current + 1 {
233 return false;
234 }
235 true
236}
237
238pub fn select_leader<S, P: Clone>(
248 participants: &[P],
249 round: Round,
250 seed: Option<S::Seed>,
251) -> (P, u32)
252where
253 S: Scheme,
254{
255 assert!(
256 !participants.is_empty(),
257 "no participants to select leader from"
258 );
259 let idx = if let Some(seed) = seed {
260 commonware_utils::modulo(seed.encode().as_ref(), participants.len() as u64) as usize
261 } else {
262 (round.epoch().wrapping_add(round.view())) as usize % participants.len()
263 };
264 let leader = participants[idx].clone();
265
266 (leader, idx as u32)
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272 use crate::{
273 simplex::{
274 mocks::fixtures::{bls12381_multisig, bls12381_threshold, ed25519, Fixture},
275 signing_scheme::seed_namespace,
276 },
277 types::Round,
278 Monitor,
279 };
280 use commonware_codec::Encode;
281 use commonware_cryptography::{
282 bls12381::{
283 primitives::variant::{MinPk, MinSig, Variant},
284 tle::{decrypt, encrypt, Block},
285 },
286 ed25519, PrivateKeyExt as _, PublicKey, Sha256, Signer as _,
287 };
288 use commonware_macros::{select, test_traced};
289 use commonware_p2p::simulated::{Config, Link, Network, Oracle, Receiver, Sender};
290 use commonware_runtime::{buffer::PoolRef, deterministic, Clock, Metrics, Runner, Spawner};
291 use commonware_utils::{quorum, NZUsize, NZU32};
292 use engine::Engine;
293 use futures::{future::join_all, StreamExt};
294 use governor::Quota;
295 use rand::{rngs::StdRng, Rng as _, SeedableRng as _};
296 use std::{
297 collections::HashMap,
298 num::NonZeroUsize,
299 sync::{Arc, Mutex},
300 time::Duration,
301 };
302 use tracing::{debug, warn};
303 use types::Activity;
304
305 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
306 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
307
308 async fn register_validators<P: PublicKey>(
310 oracle: &mut Oracle<P>,
311 validators: &[P],
312 ) -> HashMap<
313 P,
314 (
315 (Sender<P>, Receiver<P>),
316 (Sender<P>, Receiver<P>),
317 (Sender<P>, Receiver<P>),
318 ),
319 > {
320 let mut registrations = HashMap::new();
321 for validator in validators.iter() {
322 let mut control = oracle.control(validator.clone());
323 let (pending_sender, pending_receiver) = control.register(0).await.unwrap();
324 let (recovered_sender, recovered_receiver) = control.register(1).await.unwrap();
325 let (resolver_sender, resolver_receiver) = control.register(2).await.unwrap();
326 registrations.insert(
327 validator.clone(),
328 (
329 (pending_sender, pending_receiver),
330 (recovered_sender, recovered_receiver),
331 (resolver_sender, resolver_receiver),
332 ),
333 );
334 }
335 registrations
336 }
337
338 enum Action {
340 Link(Link),
341 Update(Link), Unlink,
343 }
344
345 async fn link_validators<P: PublicKey>(
351 oracle: &mut Oracle<P>,
352 validators: &[P],
353 action: Action,
354 restrict_to: Option<fn(usize, usize, usize) -> bool>,
355 ) {
356 for (i1, v1) in validators.iter().enumerate() {
357 for (i2, v2) in validators.iter().enumerate() {
358 if v2 == v1 {
360 continue;
361 }
362
363 if let Some(f) = restrict_to {
365 if !f(validators.len(), i1, i2) {
366 continue;
367 }
368 }
369
370 match action {
372 Action::Update(_) | Action::Unlink => {
373 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
374 }
375 _ => {}
376 }
377
378 match action {
380 Action::Link(ref link) | Action::Update(ref link) => {
381 oracle
382 .add_link(v1.clone(), v2.clone(), link.clone())
383 .await
384 .unwrap();
385 }
386 _ => {}
387 }
388 }
389 }
390 }
391
392 fn all_online<S, F>(mut fixture: F)
393 where
394 S: Scheme<PublicKey = ed25519::PublicKey>,
395 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
396 {
397 let n = 5;
399 let quorum = quorum(n);
400 let required_containers = 100;
401 let activity_timeout = 10;
402 let skip_timeout = 5;
403 let namespace = b"consensus".to_vec();
404 let executor = deterministic::Runner::timed(Duration::from_secs(30));
405 executor.start(|mut context| async move {
406 let (network, mut oracle) = Network::new(
408 context.with_label("network"),
409 Config {
410 max_size: 1024 * 1024,
411 disconnect_on_block: true,
412 tracked_peer_sets: None,
413 },
414 );
415
416 network.start();
418
419 let Fixture {
421 participants,
422 schemes,
423 ..
424 } = fixture(&mut context, n);
425 let mut registrations = register_validators(&mut oracle, &participants).await;
426
427 let link = Link {
429 latency: Duration::from_millis(10),
430 jitter: Duration::from_millis(1),
431 success_rate: 1.0,
432 };
433 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
434
435 let relay = Arc::new(mocks::relay::Relay::new());
437 let mut reporters = Vec::new();
438 let mut engine_handlers = Vec::new();
439 for (idx, validator) in participants.iter().enumerate() {
440 let context = context.with_label(&format!("validator-{}", *validator));
442
443 let reporter_config = mocks::reporter::Config {
445 namespace: namespace.clone(),
446 participants: participants.clone().into(),
447 scheme: schemes[idx].clone(),
448 };
449 let reporter =
450 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
451 reporters.push(reporter.clone());
452 let application_cfg = mocks::application::Config {
453 hasher: Sha256::default(),
454 relay: relay.clone(),
455 me: validator.clone(),
456 propose_latency: (10.0, 5.0),
457 verify_latency: (10.0, 5.0),
458 };
459 let (actor, application) = mocks::application::Application::new(
460 context.with_label("application"),
461 application_cfg,
462 );
463 actor.start();
464 let blocker = oracle.control(validator.clone());
465 let cfg = config::Config {
466 scheme: schemes[idx].clone(),
467 blocker,
468 automaton: application.clone(),
469 relay: application.clone(),
470 reporter: reporter.clone(),
471 partition: validator.to_string(),
472 mailbox_size: 1024,
473 epoch: 333,
474 namespace: namespace.clone(),
475 leader_timeout: Duration::from_secs(1),
476 notarization_timeout: Duration::from_secs(2),
477 nullify_retry: Duration::from_secs(10),
478 fetch_timeout: Duration::from_secs(1),
479 activity_timeout,
480 skip_timeout,
481 max_fetch_count: 1,
482 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
483 fetch_concurrent: 1,
484 replay_buffer: NZUsize!(1024 * 1024),
485 write_buffer: NZUsize!(1024 * 1024),
486 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
487 };
488 let engine = Engine::new(context.with_label("engine"), cfg);
489
490 let (pending, recovered, resolver) = registrations
492 .remove(validator)
493 .expect("validator should be registered");
494 engine_handlers.push(engine.start(pending, recovered, resolver));
495 }
496
497 let mut finalizers = Vec::new();
499 for reporter in reporters.iter_mut() {
500 let (mut latest, mut monitor) = reporter.subscribe().await;
501 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
502 while latest < required_containers {
503 latest = monitor.next().await.expect("event missing");
504 }
505 }));
506 }
507 join_all(finalizers).await;
508
509 let latest_complete = required_containers - activity_timeout;
511 for reporter in reporters.iter() {
512 {
514 let faults = reporter.faults.lock().unwrap();
515 assert!(faults.is_empty());
516 }
517
518 {
520 let invalid = reporter.invalid.lock().unwrap();
521 assert_eq!(*invalid, 0);
522 }
523
524 {
526 let seeds = reporter.seeds.lock().unwrap();
527 for view in 1..latest_complete {
528 if !seeds.contains_key(&view) {
530 panic!("view: {view}");
531 }
532 }
533 }
534
535 let mut notarized = HashMap::new();
537 let mut finalized = HashMap::new();
538 {
539 let notarizes = reporter.notarizes.lock().unwrap();
540 for view in 1..latest_complete {
541 let Some(payloads) = notarizes.get(&view) else {
543 continue;
544 };
545 if payloads.len() > 1 {
546 panic!("view: {view}");
547 }
548 let (digest, notarizers) = payloads.iter().next().unwrap();
549 notarized.insert(view, *digest);
550
551 if notarizers.len() < quorum as usize {
552 panic!("view: {view}");
555 }
556 }
557 }
558 {
559 let notarizations = reporter.notarizations.lock().unwrap();
560 for view in 1..latest_complete {
561 let Some(notarization) = notarizations.get(&view) else {
563 continue;
564 };
565 let Some(digest) = notarized.get(&view) else {
566 continue;
567 };
568 assert_eq!(¬arization.proposal.payload, digest);
569 }
570 }
571 {
572 let finalizes = reporter.finalizes.lock().unwrap();
573 for view in 1..latest_complete {
574 let Some(payloads) = finalizes.get(&view) else {
576 continue;
577 };
578 if payloads.len() > 1 {
579 panic!("view: {view}");
580 }
581 let (digest, finalizers) = payloads.iter().next().unwrap();
582 finalized.insert(view, *digest);
583
584 if view > latest_complete {
586 continue;
587 }
588
589 if finalizers.len() < quorum as usize {
591 panic!("view: {view}");
594 }
595
596 let nullifies = reporter.nullifies.lock().unwrap();
598 let Some(nullifies) = nullifies.get(&view) else {
599 continue;
600 };
601 for (_, finalizers) in payloads.iter() {
602 for finalizer in finalizers.iter() {
603 if nullifies.contains(finalizer) {
604 panic!("should not nullify and finalize at same view");
605 }
606 }
607 }
608 }
609 }
610 {
611 let finalizations = reporter.finalizations.lock().unwrap();
612 for view in 1..latest_complete {
613 let Some(finalization) = finalizations.get(&view) else {
615 continue;
616 };
617 let Some(digest) = finalized.get(&view) else {
618 continue;
619 };
620 assert_eq!(&finalization.proposal.payload, digest);
621 }
622 }
623 }
624
625 let blocked = oracle.blocked().await.unwrap();
627 assert!(blocked.is_empty());
628 });
629 }
630
631 #[test_traced]
632 fn test_all_online() {
633 all_online(bls12381_threshold::<MinPk, _>);
634 all_online(bls12381_threshold::<MinSig, _>);
635 all_online(bls12381_multisig::<MinPk, _>);
636 all_online(bls12381_multisig::<MinSig, _>);
637 all_online(ed25519);
638 }
639
640 fn observer<S, F>(mut fixture: F)
641 where
642 S: Scheme<PublicKey = ed25519::PublicKey>,
643 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
644 {
645 let n_active = 5;
647 let required_containers = 100;
648 let activity_timeout = 10;
649 let skip_timeout = 5;
650 let namespace = b"consensus".to_vec();
651 let executor = deterministic::Runner::timed(Duration::from_secs(30));
652 executor.start(|mut context| async move {
653 let (network, mut oracle) = Network::new(
655 context.with_label("network"),
656 Config {
657 max_size: 1024 * 1024,
658 disconnect_on_block: true,
659 tracked_peer_sets: None,
660 },
661 );
662
663 network.start();
665
666 let Fixture {
668 participants,
669 schemes,
670 verifier,
671 ..
672 } = fixture(&mut context, n_active);
673
674 let private_key_observer = ed25519::PrivateKey::from_seed(n_active as u64);
676 let public_key_observer = private_key_observer.public_key();
677
678 let mut all_validators = participants.clone();
680 all_validators.push(public_key_observer.clone());
681 all_validators.sort();
682 let mut registrations = register_validators(&mut oracle, &all_validators).await;
683
684 let link = Link {
686 latency: Duration::from_millis(10),
687 jitter: Duration::from_millis(1),
688 success_rate: 1.0,
689 };
690 link_validators(&mut oracle, &all_validators, Action::Link(link), None).await;
691
692 let relay = Arc::new(mocks::relay::Relay::new());
694 let mut reporters = Vec::new();
695
696 for (idx, validator) in participants.iter().enumerate() {
697 let is_observer = *validator == public_key_observer;
698
699 let context = context.with_label(&format!("validator-{}", *validator));
701
702 let signing = if is_observer {
704 verifier.clone()
705 } else {
706 schemes[idx].clone()
707 };
708 let reporter_config = mocks::reporter::Config {
709 namespace: namespace.clone(),
710 participants: participants.clone().into(),
711 scheme: signing.clone(),
712 };
713 let reporter =
714 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
715 reporters.push(reporter.clone());
716 let application_cfg = mocks::application::Config {
717 hasher: Sha256::default(),
718 relay: relay.clone(),
719 me: validator.clone(),
720 propose_latency: (10.0, 5.0),
721 verify_latency: (10.0, 5.0),
722 };
723 let (actor, application) = mocks::application::Application::new(
724 context.with_label("application"),
725 application_cfg,
726 );
727 actor.start();
728 let blocker = oracle.control(validator.clone());
729 let cfg = config::Config {
730 scheme: signing.clone(),
731 blocker,
732 automaton: application.clone(),
733 relay: application.clone(),
734 reporter: reporter.clone(),
735 partition: validator.to_string(),
736 mailbox_size: 1024,
737 epoch: 333,
738 namespace: namespace.clone(),
739 leader_timeout: Duration::from_secs(1),
740 notarization_timeout: Duration::from_secs(2),
741 nullify_retry: Duration::from_secs(10),
742 fetch_timeout: Duration::from_secs(1),
743 activity_timeout,
744 skip_timeout,
745 max_fetch_count: 1,
746 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
747 fetch_concurrent: 1,
748 replay_buffer: NZUsize!(1024 * 1024),
749 write_buffer: NZUsize!(1024 * 1024),
750 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
751 };
752 let engine = Engine::new(context.with_label("engine"), cfg);
753
754 let (pending, recovered, resolver) = registrations
756 .remove(validator)
757 .expect("validator should be registered");
758 engine.start(pending, recovered, resolver);
759 }
760
761 let mut finalizers = Vec::new();
763 for reporter in reporters.iter_mut() {
764 let (mut latest, mut monitor) = reporter.subscribe().await;
765 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
766 while latest < required_containers {
767 latest = monitor.next().await.expect("event missing");
768 }
769 }));
770 }
771 join_all(finalizers).await;
772
773 for reporter in reporters.iter() {
775 {
777 let faults = reporter.faults.lock().unwrap();
778 assert!(faults.is_empty());
779 }
780 {
781 let invalid = reporter.invalid.lock().unwrap();
782 assert_eq!(*invalid, 0);
783 }
784
785 let blocked = oracle.blocked().await.unwrap();
787 assert!(blocked.is_empty());
788 }
789 });
790 }
791
792 #[test_traced]
793 fn test_observer() {
794 observer(bls12381_threshold::<MinPk, _>);
795 observer(bls12381_threshold::<MinSig, _>);
796 observer(bls12381_multisig::<MinPk, _>);
797 observer(bls12381_multisig::<MinSig, _>);
798 observer(ed25519);
799 }
800
801 fn unclean_shutdown<S, F>(mut fixture: F)
802 where
803 S: Scheme<PublicKey = ed25519::PublicKey>,
804 F: FnMut(&mut StdRng, u32) -> Fixture<S>,
805 {
806 let n = 5;
808 let required_containers = 100;
809 let activity_timeout = 10;
810 let skip_timeout = 5;
811 let namespace = b"consensus".to_vec();
812
813 let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
815 let supervised = Arc::new(Mutex::new(Vec::new()));
816 let mut prev_checkpoint = None;
817
818 let mut rng = StdRng::seed_from_u64(0);
820 let Fixture {
821 participants,
822 schemes,
823 ..
824 } = fixture(&mut rng, n);
825
826 loop {
827 let rng = rng.clone();
828 let participants = participants.clone();
829 let schemes = schemes.clone();
830 let namespace = namespace.clone();
831 let shutdowns = shutdowns.clone();
832 let supervised = supervised.clone();
833
834 let f = |mut context: deterministic::Context| async move {
835 let (network, mut oracle) = Network::new(
837 context.with_label("network"),
838 Config {
839 max_size: 1024 * 1024,
840 disconnect_on_block: true,
841 tracked_peer_sets: None,
842 },
843 );
844
845 network.start();
847
848 let mut registrations = register_validators(&mut oracle, &participants).await;
850
851 let link = Link {
853 latency: Duration::from_millis(50),
854 jitter: Duration::from_millis(50),
855 success_rate: 1.0,
856 };
857 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
858
859 let relay = Arc::new(mocks::relay::Relay::new());
861 let mut reporters = HashMap::new();
862 let mut engine_handlers = Vec::new();
863 for (idx, validator) in participants.iter().enumerate() {
864 let context = context.with_label(&format!("validator-{}", *validator));
866
867 let reporter_config = mocks::reporter::Config {
869 namespace: namespace.clone(),
870 participants: participants.clone().into(),
871 scheme: schemes[idx].clone(),
872 };
873 let reporter = mocks::reporter::Reporter::new(rng.clone(), reporter_config);
874 reporters.insert(validator.clone(), reporter.clone());
875 let application_cfg = mocks::application::Config {
876 hasher: Sha256::default(),
877 relay: relay.clone(),
878 me: validator.clone(),
879 propose_latency: (10.0, 5.0),
880 verify_latency: (10.0, 5.0),
881 };
882 let (actor, application) = mocks::application::Application::new(
883 context.with_label("application"),
884 application_cfg,
885 );
886 actor.start();
887 let blocker = oracle.control(validator.clone());
888 let cfg = config::Config {
889 scheme: schemes[idx].clone(),
890 blocker,
891 automaton: application.clone(),
892 relay: application.clone(),
893 reporter: reporter.clone(),
894 partition: validator.to_string(),
895 mailbox_size: 1024,
896 epoch: 333,
897 namespace: namespace.clone(),
898 leader_timeout: Duration::from_secs(1),
899 notarization_timeout: Duration::from_secs(2),
900 nullify_retry: Duration::from_secs(10),
901 fetch_timeout: Duration::from_secs(1),
902 activity_timeout,
903 skip_timeout,
904 max_fetch_count: 1,
905 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
906 fetch_concurrent: 1,
907 replay_buffer: NZUsize!(1024 * 1024),
908 write_buffer: NZUsize!(1024 * 1024),
909 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
910 };
911 let engine = Engine::new(context.with_label("engine"), cfg);
912
913 let (pending, recovered, resolver) = registrations
915 .remove(validator)
916 .expect("validator should be registered");
917 engine_handlers.push(engine.start(pending, recovered, resolver));
918 }
919
920 let mut finalizers = Vec::new();
922 for (_, reporter) in reporters.iter_mut() {
923 let (mut latest, mut monitor) = reporter.subscribe().await;
924 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
925 while latest < required_containers {
926 latest = monitor.next().await.expect("event missing");
927 }
928 }));
929 }
930
931 let wait =
933 context.gen_range(Duration::from_millis(10)..Duration::from_millis(2_000));
934 let result = select! {
935 _ = context.sleep(wait) => {
936 {
938 let mut shutdowns = shutdowns.lock().unwrap();
939 debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
940 *shutdowns += 1;
941 }
942 supervised.lock().unwrap().push(reporters);
943 false
944 },
945 _ = join_all(finalizers) => {
946 let supervised = supervised.lock().unwrap();
948 for reporters in supervised.iter() {
949 for (_, reporter) in reporters.iter() {
950 let faults = reporter.faults.lock().unwrap();
951 assert!(faults.is_empty());
952 }
953 }
954 true
955 }
956 };
957
958 let blocked = oracle.blocked().await.unwrap();
960 assert!(blocked.is_empty());
961
962 result
963 };
964
965 let (complete, checkpoint) = if let Some(prev_checkpoint) = prev_checkpoint {
966 deterministic::Runner::from(prev_checkpoint)
967 } else {
968 deterministic::Runner::timed(Duration::from_secs(60))
969 }
970 .start_and_recover(f);
971
972 if complete {
974 break;
975 }
976
977 prev_checkpoint = Some(checkpoint);
978 }
979 }
980
981 #[test_traced]
982 fn test_unclean_shutdown() {
983 unclean_shutdown(bls12381_threshold::<MinPk, _>);
984 unclean_shutdown(bls12381_threshold::<MinSig, _>);
985 unclean_shutdown(bls12381_multisig::<MinPk, _>);
986 unclean_shutdown(bls12381_multisig::<MinSig, _>);
987 unclean_shutdown(ed25519);
988 }
989
990 fn backfill<S, F>(mut fixture: F)
991 where
992 S: Scheme<PublicKey = ed25519::PublicKey>,
993 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
994 {
995 let n = 4;
997 let required_containers = 100;
998 let activity_timeout = 10;
999 let skip_timeout = 5;
1000 let namespace = b"consensus".to_vec();
1001 let executor = deterministic::Runner::timed(Duration::from_secs(720));
1002 executor.start(|mut context| async move {
1003 let (network, mut oracle) = Network::new(
1005 context.with_label("network"),
1006 Config {
1007 max_size: 1024 * 1024,
1008 disconnect_on_block: true,
1009 tracked_peer_sets: None,
1010 },
1011 );
1012
1013 network.start();
1015
1016 let Fixture {
1018 participants,
1019 schemes,
1020 ..
1021 } = fixture(&mut context, n);
1022 let mut registrations = register_validators(&mut oracle, &participants).await;
1023
1024 let link = Link {
1026 latency: Duration::from_millis(10),
1027 jitter: Duration::from_millis(1),
1028 success_rate: 1.0,
1029 };
1030 link_validators(
1031 &mut oracle,
1032 &participants,
1033 Action::Link(link),
1034 Some(|_, i, j| ![i, j].contains(&0usize)),
1035 )
1036 .await;
1037
1038 let relay = Arc::new(mocks::relay::Relay::new());
1040 let mut reporters = Vec::new();
1041 let mut engine_handlers = Vec::new();
1042 for (idx_scheme, validator) in participants.iter().enumerate() {
1043 if idx_scheme == 0 {
1045 continue;
1046 }
1047
1048 let context = context.with_label(&format!("validator-{}", *validator));
1050
1051 let reporter_config = mocks::reporter::Config {
1053 namespace: namespace.clone(),
1054 participants: participants.clone().into(),
1055 scheme: schemes[idx_scheme].clone(),
1056 };
1057 let reporter =
1058 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1059 reporters.push(reporter.clone());
1060 let application_cfg = mocks::application::Config {
1061 hasher: Sha256::default(),
1062 relay: relay.clone(),
1063 me: validator.clone(),
1064 propose_latency: (10.0, 5.0),
1065 verify_latency: (10.0, 5.0),
1066 };
1067 let (actor, application) = mocks::application::Application::new(
1068 context.with_label("application"),
1069 application_cfg,
1070 );
1071 actor.start();
1072 let blocker = oracle.control(validator.clone());
1073 let cfg = config::Config {
1074 scheme: schemes[idx_scheme].clone(),
1075 blocker,
1076 automaton: application.clone(),
1077 relay: application.clone(),
1078 reporter: reporter.clone(),
1079 partition: validator.to_string(),
1080 mailbox_size: 1024,
1081 epoch: 333,
1082 namespace: namespace.clone(),
1083 leader_timeout: Duration::from_secs(1),
1084 notarization_timeout: Duration::from_secs(2),
1085 nullify_retry: Duration::from_secs(10),
1086 fetch_timeout: Duration::from_secs(1),
1087 activity_timeout,
1088 skip_timeout,
1089 max_fetch_count: 1, fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1091 fetch_concurrent: 1,
1092 replay_buffer: NZUsize!(1024 * 1024),
1093 write_buffer: NZUsize!(1024 * 1024),
1094 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1095 };
1096 let engine = Engine::new(context.with_label("engine"), cfg);
1097
1098 let (pending, recovered, resolver) = registrations
1100 .remove(validator)
1101 .expect("validator should be registered");
1102 engine_handlers.push(engine.start(pending, recovered, resolver));
1103 }
1104
1105 let mut finalizers = Vec::new();
1107 for reporter in reporters.iter_mut() {
1108 let (mut latest, mut monitor) = reporter.subscribe().await;
1109 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1110 while latest < required_containers {
1111 latest = monitor.next().await.expect("event missing");
1112 }
1113 }));
1114 }
1115 join_all(finalizers).await;
1116
1117 let link = Link {
1119 latency: Duration::from_secs(3),
1120 jitter: Duration::from_millis(0),
1121 success_rate: 1.0,
1122 };
1123 link_validators(
1124 &mut oracle,
1125 &participants,
1126 Action::Update(link.clone()),
1127 Some(|_, i, j| ![i, j].contains(&0usize)),
1128 )
1129 .await;
1130
1131 context.sleep(Duration::from_secs(120)).await;
1133
1134 link_validators(
1136 &mut oracle,
1137 &participants,
1138 Action::Unlink,
1139 Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
1140 )
1141 .await;
1142
1143 let me = participants[0].clone();
1145 let context = context.with_label(&format!("validator-{me}"));
1146
1147 link_validators(
1149 &mut oracle,
1150 &participants,
1151 Action::Link(link),
1152 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
1153 )
1154 .await;
1155
1156 let link = Link {
1158 latency: Duration::from_millis(10),
1159 jitter: Duration::from_millis(3),
1160 success_rate: 1.0,
1161 };
1162 link_validators(
1163 &mut oracle,
1164 &participants,
1165 Action::Update(link),
1166 Some(|_, i, j| ![i, j].contains(&1usize)),
1167 )
1168 .await;
1169
1170 let reporter_config = mocks::reporter::Config {
1172 namespace: namespace.clone(),
1173 participants: participants.clone().into(),
1174 scheme: schemes[0].clone(),
1175 };
1176 let mut reporter =
1177 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1178 reporters.push(reporter.clone());
1179 let application_cfg = mocks::application::Config {
1180 hasher: Sha256::default(),
1181 relay: relay.clone(),
1182 me: me.clone(),
1183 propose_latency: (10.0, 5.0),
1184 verify_latency: (10.0, 5.0),
1185 };
1186 let (actor, application) = mocks::application::Application::new(
1187 context.with_label("application"),
1188 application_cfg,
1189 );
1190 actor.start();
1191 let blocker = oracle.control(me.clone());
1192 let cfg = config::Config {
1193 scheme: schemes[0].clone(),
1194 blocker,
1195 automaton: application.clone(),
1196 relay: application.clone(),
1197 reporter: reporter.clone(),
1198 partition: me.to_string(),
1199 mailbox_size: 1024,
1200 epoch: 333,
1201 namespace: namespace.clone(),
1202 leader_timeout: Duration::from_secs(1),
1203 notarization_timeout: Duration::from_secs(2),
1204 nullify_retry: Duration::from_secs(10),
1205 fetch_timeout: Duration::from_secs(1),
1206 activity_timeout,
1207 skip_timeout,
1208 max_fetch_count: 1,
1209 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1210 fetch_concurrent: 1,
1211 replay_buffer: NZUsize!(1024 * 1024),
1212 write_buffer: NZUsize!(1024 * 1024),
1213 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1214 };
1215 let engine = Engine::new(context.with_label("engine"), cfg);
1216
1217 let (pending, recovered, resolver) = registrations
1219 .remove(&me)
1220 .expect("validator should be registered");
1221 engine_handlers.push(engine.start(pending, recovered, resolver));
1222
1223 let (mut latest, mut monitor) = reporter.subscribe().await;
1225 while latest < required_containers {
1226 latest = monitor.next().await.expect("event missing");
1227 }
1228
1229 let blocked = oracle.blocked().await.unwrap();
1231 assert!(blocked.is_empty());
1232 });
1233 }
1234
1235 #[test_traced]
1236 fn test_backfill() {
1237 backfill(bls12381_threshold::<MinPk, _>);
1238 backfill(bls12381_threshold::<MinSig, _>);
1239 backfill(bls12381_multisig::<MinPk, _>);
1240 backfill(bls12381_multisig::<MinSig, _>);
1241 backfill(ed25519);
1242 }
1243
1244 fn one_offline<S, F>(mut fixture: F)
1245 where
1246 S: Scheme<PublicKey = ed25519::PublicKey>,
1247 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
1248 {
1249 let n = 5;
1251 let quorum = quorum(n);
1252 let required_containers = 100;
1253 let activity_timeout = 10;
1254 let skip_timeout = 5;
1255 let max_exceptions = 10;
1256 let namespace = b"consensus".to_vec();
1257 let executor = deterministic::Runner::timed(Duration::from_secs(30));
1258 executor.start(|mut context| async move {
1259 let (network, mut oracle) = Network::new(
1261 context.with_label("network"),
1262 Config {
1263 max_size: 1024 * 1024,
1264 disconnect_on_block: true,
1265 tracked_peer_sets: None,
1266 },
1267 );
1268
1269 network.start();
1271
1272 let Fixture {
1274 participants,
1275 schemes,
1276 ..
1277 } = fixture(&mut context, n);
1278 let mut registrations = register_validators(&mut oracle, &participants).await;
1279
1280 let link = Link {
1282 latency: Duration::from_millis(10),
1283 jitter: Duration::from_millis(1),
1284 success_rate: 1.0,
1285 };
1286 link_validators(
1287 &mut oracle,
1288 &participants,
1289 Action::Link(link),
1290 Some(|_, i, j| ![i, j].contains(&0usize)),
1291 )
1292 .await;
1293
1294 let relay = Arc::new(mocks::relay::Relay::new());
1296 let mut reporters = Vec::new();
1297 let mut engine_handlers = Vec::new();
1298 for (idx_scheme, validator) in participants.iter().enumerate() {
1299 if idx_scheme == 0 {
1301 continue;
1302 }
1303
1304 let context = context.with_label(&format!("validator-{}", *validator));
1306
1307 let reporter_config = mocks::reporter::Config {
1309 namespace: namespace.clone(),
1310 participants: participants.clone().into(),
1311 scheme: schemes[idx_scheme].clone(),
1312 };
1313 let reporter =
1314 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1315 reporters.push(reporter.clone());
1316 let application_cfg = mocks::application::Config {
1317 hasher: Sha256::default(),
1318 relay: relay.clone(),
1319 me: validator.clone(),
1320 propose_latency: (10.0, 5.0),
1321 verify_latency: (10.0, 5.0),
1322 };
1323 let (actor, application) = mocks::application::Application::new(
1324 context.with_label("application"),
1325 application_cfg,
1326 );
1327 actor.start();
1328 let blocker = oracle.control(validator.clone());
1329 let cfg = config::Config {
1330 scheme: schemes[idx_scheme].clone(),
1331 blocker,
1332 automaton: application.clone(),
1333 relay: application.clone(),
1334 reporter: reporter.clone(),
1335 partition: validator.to_string(),
1336 mailbox_size: 1024,
1337 epoch: 333,
1338 namespace: namespace.clone(),
1339 leader_timeout: Duration::from_secs(1),
1340 notarization_timeout: Duration::from_secs(2),
1341 nullify_retry: Duration::from_secs(10),
1342 fetch_timeout: Duration::from_secs(1),
1343 activity_timeout,
1344 skip_timeout,
1345 max_fetch_count: 1,
1346 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1347 fetch_concurrent: 1,
1348 replay_buffer: NZUsize!(1024 * 1024),
1349 write_buffer: NZUsize!(1024 * 1024),
1350 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1351 };
1352 let engine = Engine::new(context.with_label("engine"), cfg);
1353
1354 let (pending, recovered, resolver) = registrations
1356 .remove(validator)
1357 .expect("validator should be registered");
1358 engine_handlers.push(engine.start(pending, recovered, resolver));
1359 }
1360
1361 let mut finalizers = Vec::new();
1363 for reporter in reporters.iter_mut() {
1364 let (mut latest, mut monitor) = reporter.subscribe().await;
1365 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1366 while latest < required_containers {
1367 latest = monitor.next().await.expect("event missing");
1368 }
1369 }));
1370 }
1371 join_all(finalizers).await;
1372
1373 let exceptions = 0;
1375 let offline = &participants[0];
1376 for reporter in reporters.iter() {
1377 {
1379 let faults = reporter.faults.lock().unwrap();
1380 assert!(faults.is_empty());
1381 }
1382
1383 {
1385 let invalid = reporter.invalid.lock().unwrap();
1386 assert_eq!(*invalid, 0);
1387 }
1388
1389 let mut exceptions = 0;
1391 {
1392 let notarizes = reporter.notarizes.lock().unwrap();
1393 for (view, payloads) in notarizes.iter() {
1394 for (_, participants) in payloads.iter() {
1395 if participants.contains(offline) {
1396 panic!("view: {view}");
1397 }
1398 }
1399 }
1400 }
1401 {
1402 let nullifies = reporter.nullifies.lock().unwrap();
1403 for (view, participants) in nullifies.iter() {
1404 if participants.contains(offline) {
1405 panic!("view: {view}");
1406 }
1407 }
1408 }
1409 {
1410 let finalizes = reporter.finalizes.lock().unwrap();
1411 for (view, payloads) in finalizes.iter() {
1412 for (_, finalizers) in payloads.iter() {
1413 if finalizers.contains(offline) {
1414 panic!("view: {view}");
1415 }
1416 }
1417 }
1418 }
1419
1420 let mut offline_views = Vec::new();
1422 {
1423 let leaders = reporter.leaders.lock().unwrap();
1424 for (view, leader) in leaders.iter() {
1425 if leader == offline {
1426 offline_views.push(*view);
1427 }
1428 }
1429 }
1430 assert!(!offline_views.is_empty());
1431
1432 {
1434 let nullifies = reporter.nullifies.lock().unwrap();
1435 for view in offline_views.iter() {
1436 let nullifies = nullifies.get(view).map_or(0, |n| n.len());
1437 if nullifies < quorum as usize {
1438 warn!("missing expected view nullifies: {}", view);
1439 exceptions += 1;
1440 }
1441 }
1442 }
1443 {
1444 let nullifications = reporter.nullifications.lock().unwrap();
1445 for view in offline_views.iter() {
1446 if !nullifications.contains_key(view) {
1447 warn!("missing expected view nullifies: {}", view);
1448 exceptions += 1;
1449 }
1450 }
1451 }
1452
1453 assert!(exceptions <= max_exceptions);
1455 }
1456 assert!(exceptions <= max_exceptions);
1457
1458 let blocked = oracle.blocked().await.unwrap();
1460 assert!(blocked.is_empty());
1461
1462 let encoded = context.encode();
1464 let lines = encoded.lines();
1465 let mut skipped_views = 0;
1466 let mut nodes_skipping = 0;
1467 for line in lines {
1468 if line.contains("_engine_voter_skipped_views_total") {
1469 let parts: Vec<&str> = line.split_whitespace().collect();
1470 if let Some(number_str) = parts.last() {
1471 if let Ok(number) = number_str.parse::<u64>() {
1472 if number > 0 {
1473 nodes_skipping += 1;
1474 }
1475 if number > skipped_views {
1476 skipped_views = number;
1477 }
1478 }
1479 }
1480 }
1481 }
1482 assert!(
1483 skipped_views > 0,
1484 "expected skipped views to be greater than 0"
1485 );
1486 assert_eq!(
1487 nodes_skipping,
1488 n - 1,
1489 "expected all online nodes to be skipping views"
1490 );
1491 });
1492 }
1493
1494 #[test_traced]
1495 fn test_one_offline() {
1496 one_offline(bls12381_threshold::<MinPk, _>);
1497 one_offline(bls12381_threshold::<MinSig, _>);
1498 one_offline(bls12381_multisig::<MinPk, _>);
1499 one_offline(bls12381_multisig::<MinSig, _>);
1500 one_offline(ed25519);
1501 }
1502
1503 fn slow_validator<S, F>(mut fixture: F)
1504 where
1505 S: Scheme<PublicKey = ed25519::PublicKey>,
1506 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
1507 {
1508 let n = 5;
1510 let required_containers = 50;
1511 let activity_timeout = 10;
1512 let skip_timeout = 5;
1513 let namespace = b"consensus".to_vec();
1514 let executor = deterministic::Runner::timed(Duration::from_secs(30));
1515 executor.start(|mut context| async move {
1516 let (network, mut oracle) = Network::new(
1518 context.with_label("network"),
1519 Config {
1520 max_size: 1024 * 1024,
1521 disconnect_on_block: true,
1522 tracked_peer_sets: None,
1523 },
1524 );
1525
1526 network.start();
1528
1529 let Fixture {
1531 participants,
1532 schemes,
1533 ..
1534 } = fixture(&mut context, n);
1535 let mut registrations = register_validators(&mut oracle, &participants).await;
1536
1537 let link = Link {
1539 latency: Duration::from_millis(10),
1540 jitter: Duration::from_millis(1),
1541 success_rate: 1.0,
1542 };
1543 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1544
1545 let relay = Arc::new(mocks::relay::Relay::new());
1547 let mut reporters = Vec::new();
1548 let mut engine_handlers = Vec::new();
1549 for (idx_scheme, validator) in participants.iter().enumerate() {
1550 let context = context.with_label(&format!("validator-{}", *validator));
1552
1553 let reporter_config = mocks::reporter::Config {
1555 namespace: namespace.clone(),
1556 participants: participants.clone().into(),
1557 scheme: schemes[idx_scheme].clone(),
1558 };
1559 let reporter =
1560 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1561 reporters.push(reporter.clone());
1562 let application_cfg = if idx_scheme == 0 {
1563 mocks::application::Config {
1564 hasher: Sha256::default(),
1565 relay: relay.clone(),
1566 me: validator.clone(),
1567 propose_latency: (10_000.0, 0.0),
1568 verify_latency: (10_000.0, 5.0),
1569 }
1570 } else {
1571 mocks::application::Config {
1572 hasher: Sha256::default(),
1573 relay: relay.clone(),
1574 me: validator.clone(),
1575 propose_latency: (10.0, 5.0),
1576 verify_latency: (10.0, 5.0),
1577 }
1578 };
1579 let (actor, application) = mocks::application::Application::new(
1580 context.with_label("application"),
1581 application_cfg,
1582 );
1583 actor.start();
1584 let blocker = oracle.control(validator.clone());
1585 let cfg = config::Config {
1586 scheme: schemes[idx_scheme].clone(),
1587 blocker,
1588 automaton: application.clone(),
1589 relay: application.clone(),
1590 reporter: reporter.clone(),
1591 partition: validator.to_string(),
1592 mailbox_size: 1024,
1593 epoch: 333,
1594 namespace: namespace.clone(),
1595 leader_timeout: Duration::from_secs(1),
1596 notarization_timeout: Duration::from_secs(2),
1597 nullify_retry: Duration::from_secs(10),
1598 fetch_timeout: Duration::from_secs(1),
1599 activity_timeout,
1600 skip_timeout,
1601 max_fetch_count: 1,
1602 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1603 fetch_concurrent: 1,
1604 replay_buffer: NZUsize!(1024 * 1024),
1605 write_buffer: NZUsize!(1024 * 1024),
1606 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1607 };
1608 let engine = Engine::new(context.with_label("engine"), cfg);
1609
1610 let (pending, recovered, resolver) = registrations
1612 .remove(validator)
1613 .expect("validator should be registered");
1614 engine_handlers.push(engine.start(pending, recovered, resolver));
1615 }
1616
1617 let mut finalizers = Vec::new();
1619 for reporter in reporters.iter_mut() {
1620 let (mut latest, mut monitor) = reporter.subscribe().await;
1621 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1622 while latest < required_containers {
1623 latest = monitor.next().await.expect("event missing");
1624 }
1625 }));
1626 }
1627 join_all(finalizers).await;
1628
1629 let slow = &participants[0];
1631 for reporter in reporters.iter() {
1632 {
1634 let faults = reporter.faults.lock().unwrap();
1635 assert!(faults.is_empty());
1636 }
1637
1638 {
1640 let invalid = reporter.invalid.lock().unwrap();
1641 assert_eq!(*invalid, 0);
1642 }
1643
1644 {
1646 let notarizes = reporter.notarizes.lock().unwrap();
1647 for (view, payloads) in notarizes.iter() {
1648 for (_, participants) in payloads.iter() {
1649 if participants.contains(slow) {
1650 panic!("view: {view}");
1651 }
1652 }
1653 }
1654 }
1655 {
1656 let finalizes = reporter.finalizes.lock().unwrap();
1657 for (view, payloads) in finalizes.iter() {
1658 for (_, finalizers) in payloads.iter() {
1659 if finalizers.contains(slow) {
1660 panic!("view: {view}");
1661 }
1662 }
1663 }
1664 }
1665 }
1666
1667 let blocked = oracle.blocked().await.unwrap();
1669 assert!(blocked.is_empty());
1670 });
1671 }
1672
1673 #[test_traced]
1674 fn test_slow_validator() {
1675 slow_validator(bls12381_threshold::<MinPk, _>);
1676 slow_validator(bls12381_threshold::<MinSig, _>);
1677 slow_validator(bls12381_multisig::<MinPk, _>);
1678 slow_validator(bls12381_multisig::<MinSig, _>);
1679 slow_validator(ed25519);
1680 }
1681
1682 fn all_recovery<S, F>(mut fixture: F)
1683 where
1684 S: Scheme<PublicKey = ed25519::PublicKey>,
1685 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
1686 {
1687 let n = 5;
1689 let required_containers = 100;
1690 let activity_timeout = 10;
1691 let skip_timeout = 2;
1692 let namespace = b"consensus".to_vec();
1693 let executor = deterministic::Runner::timed(Duration::from_secs(180));
1694 executor.start(|mut context| async move {
1695 let (network, mut oracle) = Network::new(
1697 context.with_label("network"),
1698 Config {
1699 max_size: 1024 * 1024,
1700 disconnect_on_block: false,
1701 tracked_peer_sets: None,
1702 },
1703 );
1704
1705 network.start();
1707
1708 let Fixture {
1710 participants,
1711 schemes,
1712 ..
1713 } = fixture(&mut context, n);
1714 let mut registrations = register_validators(&mut oracle, &participants).await;
1715
1716 let link = Link {
1718 latency: Duration::from_secs(3),
1719 jitter: Duration::from_millis(0),
1720 success_rate: 1.0,
1721 };
1722 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1723
1724 let relay = Arc::new(mocks::relay::Relay::new());
1726 let mut reporters = Vec::new();
1727 let mut engine_handlers = Vec::new();
1728 for (idx, validator) in participants.iter().enumerate() {
1729 let context = context.with_label(&format!("validator-{}", *validator));
1731
1732 let reporter_config = mocks::reporter::Config {
1734 namespace: namespace.clone(),
1735 participants: participants.clone().into(),
1736 scheme: schemes[idx].clone(),
1737 };
1738 let reporter =
1739 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1740 reporters.push(reporter.clone());
1741 let application_cfg = mocks::application::Config {
1742 hasher: Sha256::default(),
1743 relay: relay.clone(),
1744 me: validator.clone(),
1745 propose_latency: (10.0, 5.0),
1746 verify_latency: (10.0, 5.0),
1747 };
1748 let (actor, application) = mocks::application::Application::new(
1749 context.with_label("application"),
1750 application_cfg,
1751 );
1752 actor.start();
1753 let blocker = oracle.control(validator.clone());
1754 let cfg = config::Config {
1755 scheme: schemes[idx].clone(),
1756 blocker,
1757 automaton: application.clone(),
1758 relay: application.clone(),
1759 reporter: reporter.clone(),
1760 partition: validator.to_string(),
1761 mailbox_size: 1024,
1762 epoch: 333,
1763 namespace: namespace.clone(),
1764 leader_timeout: Duration::from_secs(1),
1765 notarization_timeout: Duration::from_secs(2),
1766 nullify_retry: Duration::from_secs(10),
1767 fetch_timeout: Duration::from_secs(1),
1768 activity_timeout,
1769 skip_timeout,
1770 max_fetch_count: 1,
1771 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1772 fetch_concurrent: 1,
1773 replay_buffer: NZUsize!(1024 * 1024),
1774 write_buffer: NZUsize!(1024 * 1024),
1775 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1776 };
1777 let engine = Engine::new(context.with_label("engine"), cfg);
1778
1779 let (pending, recovered, resolver) = registrations
1781 .remove(validator)
1782 .expect("validator should be registered");
1783 engine_handlers.push(engine.start(pending, recovered, resolver));
1784 }
1785
1786 let mut finalizers = Vec::new();
1788 for reporter in reporters.iter_mut() {
1789 let (_, mut monitor) = reporter.subscribe().await;
1790 finalizers.push(
1791 context
1792 .with_label("finalizer")
1793 .spawn(move |context| async move {
1794 select! {
1795 _timeout = context.sleep(Duration::from_secs(60)) => {},
1796 _done = monitor.next() => {
1797 panic!("engine should not notarize or finalize anything");
1798 }
1799 }
1800 }),
1801 );
1802 }
1803 join_all(finalizers).await;
1804
1805 link_validators(&mut oracle, &participants, Action::Unlink, None).await;
1807
1808 context.sleep(Duration::from_secs(60)).await;
1810
1811 let mut latest = 0;
1813 for reporter in reporters.iter() {
1814 let nullifies = reporter.nullifies.lock().unwrap();
1815 let max = nullifies.keys().max().unwrap();
1816 if *max > latest {
1817 latest = *max;
1818 }
1819 }
1820
1821 let link = Link {
1823 latency: Duration::from_millis(10),
1824 jitter: Duration::from_millis(1),
1825 success_rate: 1.0,
1826 };
1827 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1828
1829 let mut finalizers = Vec::new();
1831 for reporter in reporters.iter_mut() {
1832 let (mut latest, mut monitor) = reporter.subscribe().await;
1833 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1834 while latest < required_containers {
1835 latest = monitor.next().await.expect("event missing");
1836 }
1837 }));
1838 }
1839 join_all(finalizers).await;
1840
1841 for reporter in reporters.iter() {
1843 {
1845 let faults = reporter.faults.lock().unwrap();
1846 assert!(faults.is_empty());
1847 }
1848
1849 {
1851 let invalid = reporter.invalid.lock().unwrap();
1852 assert_eq!(*invalid, 0);
1853 }
1854
1855 {
1860 let mut found = 0;
1862 let finalizations = reporter.finalizations.lock().unwrap();
1863 for i in latest..latest + activity_timeout {
1864 if finalizations.contains_key(&i) {
1865 found += 1;
1866 }
1867 }
1868 assert!(found >= activity_timeout - 2, "found: {found}");
1869 }
1870 }
1871
1872 let blocked = oracle.blocked().await.unwrap();
1874 assert!(blocked.is_empty());
1875 });
1876 }
1877
1878 #[test_traced]
1879 fn test_all_recovery() {
1880 all_recovery(bls12381_threshold::<MinPk, _>);
1881 all_recovery(bls12381_threshold::<MinSig, _>);
1882 all_recovery(bls12381_multisig::<MinPk, _>);
1883 all_recovery(bls12381_multisig::<MinSig, _>);
1884 all_recovery(ed25519);
1885 }
1886
1887 fn partition<S, F>(mut fixture: F)
1888 where
1889 S: Scheme<PublicKey = ed25519::PublicKey>,
1890 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
1891 {
1892 let n = 10;
1894 let required_containers = 50;
1895 let activity_timeout = 10;
1896 let skip_timeout = 5;
1897 let namespace = b"consensus".to_vec();
1898 let executor = deterministic::Runner::timed(Duration::from_secs(900));
1899 executor.start(|mut context| async move {
1900 let (network, mut oracle) = Network::new(
1902 context.with_label("network"),
1903 Config {
1904 max_size: 1024 * 1024,
1905 disconnect_on_block: false,
1906 tracked_peer_sets: None,
1907 },
1908 );
1909
1910 network.start();
1912
1913 let Fixture {
1915 participants,
1916 schemes,
1917 ..
1918 } = fixture(&mut context, n);
1919 let mut registrations = register_validators(&mut oracle, &participants).await;
1920
1921 let link = Link {
1923 latency: Duration::from_millis(10),
1924 jitter: Duration::from_millis(1),
1925 success_rate: 1.0,
1926 };
1927 link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
1928
1929 let relay = Arc::new(mocks::relay::Relay::new());
1931 let mut reporters = Vec::new();
1932 let mut engine_handlers = Vec::new();
1933 for (idx, validator) in participants.iter().enumerate() {
1934 let context = context.with_label(&format!("validator-{}", *validator));
1936
1937 let reporter_config = mocks::reporter::Config {
1939 namespace: namespace.clone(),
1940 participants: participants.clone().into(),
1941 scheme: schemes[idx].clone(),
1942 };
1943 let reporter =
1944 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1945 reporters.push(reporter.clone());
1946 let application_cfg = mocks::application::Config {
1947 hasher: Sha256::default(),
1948 relay: relay.clone(),
1949 me: validator.clone(),
1950 propose_latency: (10.0, 5.0),
1951 verify_latency: (10.0, 5.0),
1952 };
1953 let (actor, application) = mocks::application::Application::new(
1954 context.with_label("application"),
1955 application_cfg,
1956 );
1957 actor.start();
1958 let blocker = oracle.control(validator.clone());
1959 let cfg = config::Config {
1960 scheme: schemes[idx].clone(),
1961 blocker,
1962 automaton: application.clone(),
1963 relay: application.clone(),
1964 reporter: reporter.clone(),
1965 partition: validator.to_string(),
1966 mailbox_size: 1024,
1967 epoch: 333,
1968 namespace: namespace.clone(),
1969 leader_timeout: Duration::from_secs(1),
1970 notarization_timeout: Duration::from_secs(2),
1971 nullify_retry: Duration::from_secs(10),
1972 fetch_timeout: Duration::from_secs(1),
1973 activity_timeout,
1974 skip_timeout,
1975 max_fetch_count: 1,
1976 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1977 fetch_concurrent: 1,
1978 replay_buffer: NZUsize!(1024 * 1024),
1979 write_buffer: NZUsize!(1024 * 1024),
1980 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1981 };
1982 let engine = Engine::new(context.with_label("engine"), cfg);
1983
1984 let (pending, recovered, resolver) = registrations
1986 .remove(validator)
1987 .expect("validator should be registered");
1988 engine_handlers.push(engine.start(pending, recovered, resolver));
1989 }
1990
1991 let mut finalizers = Vec::new();
1993 for reporter in reporters.iter_mut() {
1994 let (mut latest, mut monitor) = reporter.subscribe().await;
1995 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1996 while latest < required_containers {
1997 latest = monitor.next().await.expect("event missing");
1998 }
1999 }));
2000 }
2001 join_all(finalizers).await;
2002
2003 fn separated(n: usize, a: usize, b: usize) -> bool {
2005 let m = n / 2;
2006 (a < m && b >= m) || (a >= m && b < m)
2007 }
2008 link_validators(&mut oracle, &participants, Action::Unlink, Some(separated)).await;
2009
2010 context.sleep(Duration::from_secs(10)).await;
2012
2013 let mut finalizers = Vec::new();
2015 for reporter in reporters.iter_mut() {
2016 let (_, mut monitor) = reporter.subscribe().await;
2017 finalizers.push(
2018 context
2019 .with_label("finalizer")
2020 .spawn(move |context| async move {
2021 select! {
2022 _timeout = context.sleep(Duration::from_secs(60)) => {},
2023 _done = monitor.next() => {
2024 panic!("engine should not notarize or finalize anything");
2025 }
2026 }
2027 }),
2028 );
2029 }
2030 join_all(finalizers).await;
2031
2032 link_validators(
2034 &mut oracle,
2035 &participants,
2036 Action::Link(link),
2037 Some(separated),
2038 )
2039 .await;
2040
2041 let mut finalizers = Vec::new();
2043 for reporter in reporters.iter_mut() {
2044 let (mut latest, mut monitor) = reporter.subscribe().await;
2045 let required = latest + required_containers;
2046 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2047 while latest < required {
2048 latest = monitor.next().await.expect("event missing");
2049 }
2050 }));
2051 }
2052 join_all(finalizers).await;
2053
2054 for reporter in reporters.iter() {
2056 {
2058 let faults = reporter.faults.lock().unwrap();
2059 assert!(faults.is_empty());
2060 }
2061
2062 {
2064 let invalid = reporter.invalid.lock().unwrap();
2065 assert_eq!(*invalid, 0);
2066 }
2067 }
2068
2069 let blocked = oracle.blocked().await.unwrap();
2071 assert!(blocked.is_empty());
2072 });
2073 }
2074
2075 #[test_traced]
2076 #[ignore]
2077 fn test_partition() {
2078 partition(bls12381_threshold::<MinPk, _>);
2079 partition(bls12381_threshold::<MinSig, _>);
2080 partition(bls12381_multisig::<MinPk, _>);
2081 partition(bls12381_multisig::<MinSig, _>);
2082 partition(ed25519);
2083 }
2084
2085 fn slow_and_lossy_links<S, F>(seed: u64, mut fixture: F) -> String
2086 where
2087 S: Scheme<PublicKey = ed25519::PublicKey>,
2088 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2089 {
2090 let n = 5;
2092 let required_containers = 50;
2093 let activity_timeout = 10;
2094 let skip_timeout = 5;
2095 let namespace = b"consensus".to_vec();
2096 let cfg = deterministic::Config::new()
2097 .with_seed(seed)
2098 .with_timeout(Some(Duration::from_secs(5_000)));
2099 let executor = deterministic::Runner::new(cfg);
2100 executor.start(|mut context| async move {
2101 let (network, mut oracle) = Network::new(
2103 context.with_label("network"),
2104 Config {
2105 max_size: 1024 * 1024,
2106 disconnect_on_block: false,
2107 tracked_peer_sets: None,
2108 },
2109 );
2110
2111 network.start();
2113
2114 let Fixture {
2116 participants,
2117 schemes,
2118 ..
2119 } = fixture(&mut context, n);
2120 let mut registrations = register_validators(&mut oracle, &participants).await;
2121
2122 let degraded_link = Link {
2124 latency: Duration::from_millis(200),
2125 jitter: Duration::from_millis(150),
2126 success_rate: 0.5,
2127 };
2128 link_validators(
2129 &mut oracle,
2130 &participants,
2131 Action::Link(degraded_link),
2132 None,
2133 )
2134 .await;
2135
2136 let relay = Arc::new(mocks::relay::Relay::new());
2138 let mut reporters = Vec::new();
2139 let mut engine_handlers = Vec::new();
2140 for (idx, validator) in participants.iter().enumerate() {
2141 let context = context.with_label(&format!("validator-{}", *validator));
2143
2144 let reporter_config = mocks::reporter::Config {
2146 namespace: namespace.clone(),
2147 participants: participants.clone().into(),
2148 scheme: schemes[idx].clone(),
2149 };
2150 let reporter =
2151 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2152 reporters.push(reporter.clone());
2153 let application_cfg = mocks::application::Config {
2154 hasher: Sha256::default(),
2155 relay: relay.clone(),
2156 me: validator.clone(),
2157 propose_latency: (10.0, 5.0),
2158 verify_latency: (10.0, 5.0),
2159 };
2160 let (actor, application) = mocks::application::Application::new(
2161 context.with_label("application"),
2162 application_cfg,
2163 );
2164 actor.start();
2165 let blocker = oracle.control(validator.clone());
2166 let cfg = config::Config {
2167 scheme: schemes[idx].clone(),
2168 blocker,
2169 automaton: application.clone(),
2170 relay: application.clone(),
2171 reporter: reporter.clone(),
2172 partition: validator.to_string(),
2173 mailbox_size: 1024,
2174 epoch: 333,
2175 namespace: namespace.clone(),
2176 leader_timeout: Duration::from_secs(1),
2177 notarization_timeout: Duration::from_secs(2),
2178 nullify_retry: Duration::from_secs(10),
2179 fetch_timeout: Duration::from_secs(1),
2180 activity_timeout,
2181 skip_timeout,
2182 max_fetch_count: 1,
2183 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2184 fetch_concurrent: 1,
2185 replay_buffer: NZUsize!(1024 * 1024),
2186 write_buffer: NZUsize!(1024 * 1024),
2187 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2188 };
2189 let engine = Engine::new(context.with_label("engine"), cfg);
2190
2191 let (pending, recovered, resolver) = registrations
2193 .remove(validator)
2194 .expect("validator should be registered");
2195 engine_handlers.push(engine.start(pending, recovered, resolver));
2196 }
2197
2198 let mut finalizers = Vec::new();
2200 for reporter in reporters.iter_mut() {
2201 let (mut latest, mut monitor) = reporter.subscribe().await;
2202 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2203 while latest < required_containers {
2204 latest = monitor.next().await.expect("event missing");
2205 }
2206 }));
2207 }
2208 join_all(finalizers).await;
2209
2210 for reporter in reporters.iter() {
2212 {
2214 let faults = reporter.faults.lock().unwrap();
2215 assert!(faults.is_empty());
2216 }
2217
2218 {
2220 let invalid = reporter.invalid.lock().unwrap();
2221 assert_eq!(*invalid, 0);
2222 }
2223 }
2224
2225 let blocked = oracle.blocked().await.unwrap();
2227 assert!(blocked.is_empty());
2228
2229 context.auditor().state()
2230 })
2231 }
2232
2233 #[test_traced]
2234 fn test_slow_and_lossy_links() {
2235 slow_and_lossy_links(0, bls12381_threshold::<MinPk, _>);
2236 slow_and_lossy_links(0, bls12381_threshold::<MinSig, _>);
2237 slow_and_lossy_links(0, bls12381_multisig::<MinPk, _>);
2238 slow_and_lossy_links(0, bls12381_multisig::<MinSig, _>);
2239 slow_and_lossy_links(0, ed25519);
2240 }
2241
2242 #[test_traced]
2243 #[ignore]
2244 fn test_determinism() {
2245 for seed in 1..6 {
2248 let ts_pk_state_1 = slow_and_lossy_links(seed, bls12381_threshold::<MinPk, _>);
2249 let ts_pk_state_2 = slow_and_lossy_links(seed, bls12381_threshold::<MinPk, _>);
2250 assert_eq!(ts_pk_state_1, ts_pk_state_2);
2251
2252 let ts_sig_state_1 = slow_and_lossy_links(seed, bls12381_threshold::<MinSig, _>);
2253 let ts_sig_state_2 = slow_and_lossy_links(seed, bls12381_threshold::<MinSig, _>);
2254 assert_eq!(ts_sig_state_1, ts_sig_state_2);
2255
2256 let ms_pk_state_1 = slow_and_lossy_links(seed, bls12381_multisig::<MinPk, _>);
2257 let ms_pk_state_2 = slow_and_lossy_links(seed, bls12381_multisig::<MinPk, _>);
2258 assert_eq!(ms_pk_state_1, ms_pk_state_2);
2259
2260 let ms_sig_state_1 = slow_and_lossy_links(seed, bls12381_multisig::<MinSig, _>);
2261 let ms_sig_state_2 = slow_and_lossy_links(seed, bls12381_multisig::<MinSig, _>);
2262 assert_eq!(ms_sig_state_1, ms_sig_state_2);
2263
2264 let ed_state_1 = slow_and_lossy_links(seed, ed25519);
2265 let ed_state_2 = slow_and_lossy_links(seed, ed25519);
2266 assert_eq!(ed_state_1, ed_state_2);
2267
2268 let states = [
2269 ("threshold-minpk", ts_pk_state_1),
2270 ("threshold-minsig", ts_sig_state_1),
2271 ("multisig-minpk", ms_pk_state_1),
2272 ("multisig-minsig", ms_sig_state_1),
2273 ("ed25519", ed_state_1),
2274 ];
2275
2276 for pair in states.windows(2) {
2278 assert_ne!(
2279 pair[0].1, pair[1].1,
2280 "state {} equals state {}",
2281 pair[0].0, pair[0].0
2282 );
2283 }
2284 }
2285 }
2286
2287 fn conflicter<S, F>(seed: u64, mut fixture: F)
2288 where
2289 S: Scheme<PublicKey = ed25519::PublicKey>,
2290 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2291 {
2292 let n = 4;
2294 let required_containers = 50;
2295 let activity_timeout = 10;
2296 let skip_timeout = 5;
2297 let namespace = b"consensus".to_vec();
2298 let cfg = deterministic::Config::new()
2299 .with_seed(seed)
2300 .with_timeout(Some(Duration::from_secs(30)));
2301 let executor = deterministic::Runner::new(cfg);
2302 executor.start(|mut context| async move {
2303 let (network, mut oracle) = Network::new(
2305 context.with_label("network"),
2306 Config {
2307 max_size: 1024 * 1024,
2308 disconnect_on_block: false,
2309 tracked_peer_sets: None,
2310 },
2311 );
2312
2313 network.start();
2315
2316 let Fixture {
2318 participants,
2319 schemes,
2320 ..
2321 } = fixture(&mut context, n);
2322 let mut registrations = register_validators(&mut oracle, &participants).await;
2323
2324 let link = Link {
2326 latency: Duration::from_millis(10),
2327 jitter: Duration::from_millis(1),
2328 success_rate: 1.0,
2329 };
2330 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2331
2332 let relay = Arc::new(mocks::relay::Relay::new());
2334 let mut reporters = Vec::new();
2335 for (idx_scheme, validator) in participants.iter().enumerate() {
2336 let context = context.with_label(&format!("validator-{}", *validator));
2338
2339 let reporter_config = mocks::reporter::Config {
2341 namespace: namespace.clone(),
2342 participants: participants.clone().into(),
2343 scheme: schemes[idx_scheme].clone(),
2344 };
2345 let reporter =
2346 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2347 let (pending, recovered, resolver) = registrations
2348 .remove(validator)
2349 .expect("validator should be registered");
2350 if idx_scheme == 0 {
2351 let cfg = mocks::conflicter::Config {
2352 namespace: namespace.clone(),
2353 scheme: schemes[idx_scheme].clone(),
2354 };
2355
2356 let engine: mocks::conflicter::Conflicter<_, _, Sha256> =
2357 mocks::conflicter::Conflicter::new(
2358 context.with_label("byzantine_engine"),
2359 cfg,
2360 );
2361 engine.start(pending);
2362 } else {
2363 reporters.push(reporter.clone());
2364 let application_cfg = mocks::application::Config {
2365 hasher: Sha256::default(),
2366 relay: relay.clone(),
2367 me: validator.clone(),
2368 propose_latency: (10.0, 5.0),
2369 verify_latency: (10.0, 5.0),
2370 };
2371 let (actor, application) = mocks::application::Application::new(
2372 context.with_label("application"),
2373 application_cfg,
2374 );
2375 actor.start();
2376 let blocker = oracle.control(validator.clone());
2377 let cfg = config::Config {
2378 scheme: schemes[idx_scheme].clone(),
2379 blocker,
2380 automaton: application.clone(),
2381 relay: application.clone(),
2382 reporter: reporter.clone(),
2383 partition: validator.to_string(),
2384 mailbox_size: 1024,
2385 epoch: 333,
2386 namespace: namespace.clone(),
2387 leader_timeout: Duration::from_secs(1),
2388 notarization_timeout: Duration::from_secs(2),
2389 nullify_retry: Duration::from_secs(10),
2390 fetch_timeout: Duration::from_secs(1),
2391 activity_timeout,
2392 skip_timeout,
2393 max_fetch_count: 1,
2394 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2395 fetch_concurrent: 1,
2396 replay_buffer: NZUsize!(1024 * 1024),
2397 write_buffer: NZUsize!(1024 * 1024),
2398 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2399 };
2400 let engine = Engine::new(context.with_label("engine"), cfg);
2401 engine.start(pending, recovered, resolver);
2402 }
2403 }
2404
2405 let mut finalizers = Vec::new();
2407 for reporter in reporters.iter_mut() {
2408 let (mut latest, mut monitor) = reporter.subscribe().await;
2409 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2410 while latest < required_containers {
2411 latest = monitor.next().await.expect("event missing");
2412 }
2413 }));
2414 }
2415 join_all(finalizers).await;
2416
2417 let byz = &participants[0];
2419 let mut count_conflicting = 0;
2420 for reporter in reporters.iter() {
2421 {
2423 let faults = reporter.faults.lock().unwrap();
2424 assert_eq!(faults.len(), 1);
2425 let faulter = faults.get(byz).expect("byzantine party is not faulter");
2426 for (_, faults) in faulter.iter() {
2427 for fault in faults.iter() {
2428 match fault {
2429 Activity::ConflictingNotarize(_) => {
2430 count_conflicting += 1;
2431 }
2432 Activity::ConflictingFinalize(_) => {
2433 count_conflicting += 1;
2434 }
2435 _ => panic!("unexpected fault: {fault:?}"),
2436 }
2437 }
2438 }
2439 }
2440
2441 {
2443 let invalid = reporter.invalid.lock().unwrap();
2444 assert_eq!(*invalid, 0);
2445 }
2446 }
2447 assert!(count_conflicting > 0);
2448
2449 let blocked = oracle.blocked().await.unwrap();
2451 assert!(!blocked.is_empty());
2452 for (a, b) in blocked {
2453 assert_ne!(&a, byz);
2454 assert_eq!(&b, byz);
2455 }
2456 });
2457 }
2458
2459 #[test_traced]
2460 #[ignore]
2461 fn test_conflicter() {
2462 for seed in 0..5 {
2463 conflicter(seed, bls12381_threshold::<MinPk, _>);
2464 conflicter(seed, bls12381_threshold::<MinSig, _>);
2465 conflicter(seed, bls12381_multisig::<MinPk, _>);
2466 conflicter(seed, bls12381_multisig::<MinSig, _>);
2467 conflicter(seed, ed25519);
2468 }
2469 }
2470
2471 fn invalid<S, F>(seed: u64, mut fixture: F)
2472 where
2473 S: Scheme<PublicKey = ed25519::PublicKey>,
2474 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2475 {
2476 let n = 4;
2478 let required_containers = 50;
2479 let activity_timeout = 10;
2480 let skip_timeout = 5;
2481 let namespace = b"consensus".to_vec();
2482 let cfg = deterministic::Config::new()
2483 .with_seed(seed)
2484 .with_timeout(Some(Duration::from_secs(30)));
2485 let executor = deterministic::Runner::new(cfg);
2486 executor.start(|mut context| async move {
2487 let (network, mut oracle) = Network::new(
2489 context.with_label("network"),
2490 Config {
2491 max_size: 1024 * 1024,
2492 disconnect_on_block: false,
2493 tracked_peer_sets: None,
2494 },
2495 );
2496
2497 network.start();
2499
2500 let Fixture {
2502 participants,
2503 schemes,
2504 ..
2505 } = fixture(&mut context, n);
2506 let mut registrations = register_validators(&mut oracle, &participants).await;
2507
2508 let link = Link {
2510 latency: Duration::from_millis(10),
2511 jitter: Duration::from_millis(1),
2512 success_rate: 1.0,
2513 };
2514 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2515
2516 let relay = Arc::new(mocks::relay::Relay::new());
2518 let mut reporters = Vec::new();
2519 for (idx_scheme, validator) in participants.iter().enumerate() {
2520 let context = context.with_label(&format!("validator-{}", *validator));
2522
2523 let engine_namespace = if idx_scheme == 0 {
2525 vec![]
2526 } else {
2527 namespace.clone()
2528 };
2529
2530 let reporter_config = mocks::reporter::Config {
2531 namespace: namespace.clone(), participants: participants.clone().into(),
2533 scheme: schemes[idx_scheme].clone(),
2534 };
2535 let reporter =
2536 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2537 reporters.push(reporter.clone());
2538
2539 let application_cfg = mocks::application::Config {
2540 hasher: Sha256::default(),
2541 relay: relay.clone(),
2542 me: validator.clone(),
2543 propose_latency: (10.0, 5.0),
2544 verify_latency: (10.0, 5.0),
2545 };
2546 let (actor, application) = mocks::application::Application::new(
2547 context.with_label("application"),
2548 application_cfg,
2549 );
2550 actor.start();
2551 let blocker = oracle.control(validator.clone());
2552 let cfg = config::Config {
2553 scheme: schemes[idx_scheme].clone(),
2554 blocker,
2555 automaton: application.clone(),
2556 relay: application.clone(),
2557 reporter: reporter.clone(),
2558 partition: validator.clone().to_string(),
2559 mailbox_size: 1024,
2560 epoch: 333,
2561 namespace: engine_namespace,
2562 leader_timeout: Duration::from_secs(1),
2563 notarization_timeout: Duration::from_secs(2),
2564 nullify_retry: Duration::from_secs(10),
2565 fetch_timeout: Duration::from_secs(1),
2566 activity_timeout,
2567 skip_timeout,
2568 max_fetch_count: 1,
2569 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2570 fetch_concurrent: 1,
2571 replay_buffer: NZUsize!(1024 * 1024),
2572 write_buffer: NZUsize!(1024 * 1024),
2573 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2574 };
2575 let engine = Engine::new(context.with_label("engine"), cfg);
2576 let (pending, recovered, resolver) = registrations
2577 .remove(validator)
2578 .expect("validator should be registered");
2579 engine.start(pending, recovered, resolver);
2580 }
2581
2582 let mut finalizers = Vec::new();
2584 for reporter in reporters.iter_mut().skip(1) {
2585 let (mut latest, mut monitor) = reporter.subscribe().await;
2586 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2587 while latest < required_containers {
2588 latest = monitor.next().await.expect("event missing");
2589 }
2590 }));
2591 }
2592 join_all(finalizers).await;
2593
2594 let mut invalid_count = 0;
2596 for reporter in reporters.iter().skip(1) {
2597 {
2599 let faults = reporter.faults.lock().unwrap();
2600 assert!(faults.is_empty());
2601 }
2602
2603 {
2605 let invalid = reporter.invalid.lock().unwrap();
2606 if *invalid > 0 {
2607 invalid_count += 1;
2608 }
2609 }
2610 }
2611
2612 assert_eq!(invalid_count, n - 1);
2614
2615 let blocked = oracle.blocked().await.unwrap();
2617 assert!(!blocked.is_empty());
2618 for (a, b) in blocked {
2619 if a != participants[0] {
2620 assert_eq!(b, participants[0]);
2621 }
2622 }
2623 });
2624 }
2625
2626 #[test_traced]
2627 #[ignore]
2628 fn test_invalid() {
2629 for seed in 0..5 {
2630 invalid(seed, bls12381_threshold::<MinPk, _>);
2631 invalid(seed, bls12381_threshold::<MinSig, _>);
2632 invalid(seed, bls12381_multisig::<MinPk, _>);
2633 invalid(seed, bls12381_multisig::<MinSig, _>);
2634 invalid(seed, ed25519);
2635 }
2636 }
2637
2638 fn impersonator<S, F>(seed: u64, mut fixture: F)
2639 where
2640 S: Scheme<PublicKey = ed25519::PublicKey>,
2641 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2642 {
2643 let n = 4;
2645 let required_containers = 50;
2646 let activity_timeout = 10;
2647 let skip_timeout = 5;
2648 let namespace = b"consensus".to_vec();
2649 let cfg = deterministic::Config::new()
2650 .with_seed(seed)
2651 .with_timeout(Some(Duration::from_secs(30)));
2652 let executor = deterministic::Runner::new(cfg);
2653 executor.start(|mut context| async move {
2654 let (network, mut oracle) = Network::new(
2656 context.with_label("network"),
2657 Config {
2658 max_size: 1024 * 1024,
2659 disconnect_on_block: false,
2660 tracked_peer_sets: None,
2661 },
2662 );
2663
2664 network.start();
2666
2667 let Fixture {
2669 participants,
2670 schemes,
2671 ..
2672 } = fixture(&mut context, n);
2673 let mut registrations = register_validators(&mut oracle, &participants).await;
2674
2675 let link = Link {
2677 latency: Duration::from_millis(10),
2678 jitter: Duration::from_millis(1),
2679 success_rate: 1.0,
2680 };
2681 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2682
2683 let relay = Arc::new(mocks::relay::Relay::new());
2685 let mut reporters = Vec::new();
2686 for (idx_scheme, validator) in participants.iter().enumerate() {
2687 let context = context.with_label(&format!("validator-{}", *validator));
2689
2690 let reporter_config = mocks::reporter::Config {
2692 namespace: namespace.clone(),
2693 participants: participants.clone().into(),
2694 scheme: schemes[idx_scheme].clone(),
2695 };
2696 let reporter =
2697 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2698 let (pending, recovered, resolver) = registrations
2699 .remove(validator)
2700 .expect("validator should be registered");
2701 if idx_scheme == 0 {
2702 let cfg = mocks::impersonator::Config {
2703 scheme: schemes[idx_scheme].clone(),
2704 namespace: namespace.clone(),
2705 };
2706
2707 let engine: mocks::impersonator::Impersonator<_, _, Sha256> =
2708 mocks::impersonator::Impersonator::new(
2709 context.with_label("byzantine_engine"),
2710 cfg,
2711 );
2712 engine.start(pending);
2713 } else {
2714 reporters.push(reporter.clone());
2715 let application_cfg = mocks::application::Config {
2716 hasher: Sha256::default(),
2717 relay: relay.clone(),
2718 me: validator.clone(),
2719 propose_latency: (10.0, 5.0),
2720 verify_latency: (10.0, 5.0),
2721 };
2722 let (actor, application) = mocks::application::Application::new(
2723 context.with_label("application"),
2724 application_cfg,
2725 );
2726 actor.start();
2727 let blocker = oracle.control(validator.clone());
2728 let cfg = config::Config {
2729 scheme: schemes[idx_scheme].clone(),
2730 blocker,
2731 automaton: application.clone(),
2732 relay: application.clone(),
2733 reporter: reporter.clone(),
2734 partition: validator.clone().to_string(),
2735 mailbox_size: 1024,
2736 epoch: 333,
2737 namespace: namespace.clone(),
2738 leader_timeout: Duration::from_secs(1),
2739 notarization_timeout: Duration::from_secs(2),
2740 nullify_retry: Duration::from_secs(10),
2741 fetch_timeout: Duration::from_secs(1),
2742 activity_timeout,
2743 skip_timeout,
2744 max_fetch_count: 1,
2745 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2746 fetch_concurrent: 1,
2747 replay_buffer: NZUsize!(1024 * 1024),
2748 write_buffer: NZUsize!(1024 * 1024),
2749 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2750 };
2751 let engine = Engine::new(context.with_label("engine"), cfg);
2752 engine.start(pending, recovered, resolver);
2753 }
2754 }
2755
2756 let mut finalizers = Vec::new();
2758 for reporter in reporters.iter_mut() {
2759 let (mut latest, mut monitor) = reporter.subscribe().await;
2760 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2761 while latest < required_containers {
2762 latest = monitor.next().await.expect("event missing");
2763 }
2764 }));
2765 }
2766 join_all(finalizers).await;
2767
2768 let byz = &participants[0];
2770 for reporter in reporters.iter() {
2771 {
2773 let faults = reporter.faults.lock().unwrap();
2774 assert!(faults.is_empty());
2775 }
2776
2777 {
2779 let invalid = reporter.invalid.lock().unwrap();
2780 assert_eq!(*invalid, 0);
2781 }
2782 }
2783
2784 let blocked = oracle.blocked().await.unwrap();
2786 assert!(!blocked.is_empty());
2787 for (a, b) in blocked {
2788 assert_ne!(&a, byz);
2789 assert_eq!(&b, byz);
2790 }
2791 });
2792 }
2793
2794 #[test_traced]
2795 #[ignore]
2796 fn test_impersonator() {
2797 for seed in 0..5 {
2798 impersonator(seed, bls12381_threshold::<MinPk, _>);
2799 impersonator(seed, bls12381_threshold::<MinSig, _>);
2800 impersonator(seed, bls12381_multisig::<MinPk, _>);
2801 impersonator(seed, bls12381_multisig::<MinSig, _>);
2802 impersonator(seed, ed25519);
2803 }
2804 }
2805
2806 fn reconfigurer<S, F>(seed: u64, mut fixture: F)
2807 where
2808 S: Scheme<PublicKey = ed25519::PublicKey>,
2809 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2810 {
2811 let n = 4;
2813 let required_containers = 50;
2814 let activity_timeout = 10;
2815 let skip_timeout = 5;
2816 let namespace = b"consensus".to_vec();
2817 let cfg = deterministic::Config::new()
2818 .with_seed(seed)
2819 .with_timeout(Some(Duration::from_secs(30)));
2820 let executor = deterministic::Runner::new(cfg);
2821 executor.start(|mut context| async move {
2822 let (network, mut oracle) = Network::new(
2824 context.with_label("network"),
2825 Config {
2826 max_size: 1024 * 1024,
2827 disconnect_on_block: false,
2828 tracked_peer_sets: None,
2829 },
2830 );
2831
2832 network.start();
2834
2835 let Fixture {
2837 participants,
2838 schemes,
2839 ..
2840 } = fixture(&mut context, n);
2841 let mut registrations = register_validators(&mut oracle, &participants).await;
2842
2843 let link = Link {
2845 latency: Duration::from_millis(10),
2846 jitter: Duration::from_millis(1),
2847 success_rate: 1.0,
2848 };
2849 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2850
2851 let relay = Arc::new(mocks::relay::Relay::new());
2853 let mut reporters = Vec::new();
2854 for (idx_scheme, validator) in participants.iter().enumerate() {
2855 let context = context.with_label(&format!("validator-{}", *validator));
2857
2858 let reporter_config = mocks::reporter::Config {
2860 namespace: namespace.clone(),
2861 participants: participants.clone().into(),
2862 scheme: schemes[idx_scheme].clone(),
2863 };
2864 let reporter =
2865 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2866 let (pending, recovered, resolver) = registrations
2867 .remove(validator)
2868 .expect("validator should be registered");
2869 if idx_scheme == 0 {
2870 let cfg = mocks::reconfigurer::Config {
2871 scheme: schemes[idx_scheme].clone(),
2872 namespace: namespace.clone(),
2873 };
2874 let engine: mocks::reconfigurer::Reconfigurer<_, _, Sha256> =
2875 mocks::reconfigurer::Reconfigurer::new(
2876 context.with_label("byzantine_engine"),
2877 cfg,
2878 );
2879 engine.start(pending);
2880 } else {
2881 reporters.push(reporter.clone());
2882 let application_cfg = mocks::application::Config {
2883 hasher: Sha256::default(),
2884 relay: relay.clone(),
2885 me: validator.clone(),
2886 propose_latency: (10.0, 5.0),
2887 verify_latency: (10.0, 5.0),
2888 };
2889 let (actor, application) = mocks::application::Application::new(
2890 context.with_label("application"),
2891 application_cfg,
2892 );
2893 actor.start();
2894 let blocker = oracle.control(validator.clone());
2895 let cfg = config::Config {
2896 scheme: schemes[idx_scheme].clone(),
2897 blocker,
2898 automaton: application.clone(),
2899 relay: application.clone(),
2900 reporter: reporter.clone(),
2901 partition: validator.to_string(),
2902 mailbox_size: 1024,
2903 epoch: 333,
2904 namespace: namespace.clone(),
2905 leader_timeout: Duration::from_secs(1),
2906 notarization_timeout: Duration::from_secs(2),
2907 nullify_retry: Duration::from_secs(10),
2908 fetch_timeout: Duration::from_secs(1),
2909 activity_timeout,
2910 skip_timeout,
2911 max_fetch_count: 1,
2912 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2913 fetch_concurrent: 1,
2914 replay_buffer: NZUsize!(1024 * 1024),
2915 write_buffer: NZUsize!(1024 * 1024),
2916 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2917 };
2918 let engine = Engine::new(context.with_label("engine"), cfg);
2919 engine.start(pending, recovered, resolver);
2920 }
2921 }
2922
2923 let mut finalizers = Vec::new();
2925 for reporter in reporters.iter_mut() {
2926 let (mut latest, mut monitor) = reporter.subscribe().await;
2927 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2928 while latest < required_containers {
2929 latest = monitor.next().await.expect("event missing");
2930 }
2931 }));
2932 }
2933 join_all(finalizers).await;
2934
2935 let byz = &participants[0];
2937 for reporter in reporters.iter() {
2938 {
2940 let faults = reporter.faults.lock().unwrap();
2941 assert!(faults.is_empty());
2942 }
2943
2944 {
2946 let invalid = reporter.invalid.lock().unwrap();
2947 assert_eq!(*invalid, 0);
2948 }
2949 }
2950
2951 let blocked = oracle.blocked().await.unwrap();
2953 assert!(!blocked.is_empty());
2954 for (a, b) in blocked {
2955 assert_ne!(&a, byz);
2956 assert_eq!(&b, byz);
2957 }
2958 });
2959 }
2960
2961 #[test_traced]
2962 #[ignore]
2963 fn test_reconfigurer() {
2964 for seed in 0..5 {
2965 reconfigurer(seed, bls12381_threshold::<MinPk, _>);
2966 reconfigurer(seed, bls12381_threshold::<MinSig, _>);
2967 reconfigurer(seed, bls12381_multisig::<MinPk, _>);
2968 reconfigurer(seed, bls12381_multisig::<MinSig, _>);
2969 reconfigurer(seed, ed25519);
2970 }
2971 }
2972
2973 fn nuller<S, F>(seed: u64, mut fixture: F)
2974 where
2975 S: Scheme<PublicKey = ed25519::PublicKey>,
2976 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2977 {
2978 let n = 4;
2980 let required_containers = 50;
2981 let activity_timeout = 10;
2982 let skip_timeout = 5;
2983 let namespace = b"consensus".to_vec();
2984 let cfg = deterministic::Config::new()
2985 .with_seed(seed)
2986 .with_timeout(Some(Duration::from_secs(30)));
2987 let executor = deterministic::Runner::new(cfg);
2988 executor.start(|mut context| async move {
2989 let (network, mut oracle) = Network::new(
2991 context.with_label("network"),
2992 Config {
2993 max_size: 1024 * 1024,
2994 disconnect_on_block: false,
2995 tracked_peer_sets: None,
2996 },
2997 );
2998
2999 network.start();
3001
3002 let Fixture {
3004 participants,
3005 schemes,
3006 ..
3007 } = fixture(&mut context, n);
3008 let mut registrations = register_validators(&mut oracle, &participants).await;
3009
3010 let link = Link {
3012 latency: Duration::from_millis(10),
3013 jitter: Duration::from_millis(1),
3014 success_rate: 1.0,
3015 };
3016 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3017
3018 let relay = Arc::new(mocks::relay::Relay::new());
3020 let mut reporters = Vec::new();
3021 for (idx_scheme, validator) in participants.iter().enumerate() {
3022 let context = context.with_label(&format!("validator-{}", *validator));
3024
3025 let reporter_config = mocks::reporter::Config {
3027 namespace: namespace.clone(),
3028 participants: participants.clone().into(),
3029 scheme: schemes[idx_scheme].clone(),
3030 };
3031 let reporter =
3032 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3033 let (pending, recovered, resolver) = registrations
3034 .remove(validator)
3035 .expect("validator should be registered");
3036 if idx_scheme == 0 {
3037 let cfg = mocks::nuller::Config {
3038 namespace: namespace.clone(),
3039 scheme: schemes[idx_scheme].clone(),
3040 };
3041 let engine: mocks::nuller::Nuller<_, _, Sha256> =
3042 mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
3043 engine.start(pending);
3044 } else {
3045 reporters.push(reporter.clone());
3046 let application_cfg = mocks::application::Config {
3047 hasher: Sha256::default(),
3048 relay: relay.clone(),
3049 me: validator.clone(),
3050 propose_latency: (10.0, 5.0),
3051 verify_latency: (10.0, 5.0),
3052 };
3053 let (actor, application) = mocks::application::Application::new(
3054 context.with_label("application"),
3055 application_cfg,
3056 );
3057 actor.start();
3058 let blocker = oracle.control(validator.clone());
3059 let cfg = config::Config {
3060 scheme: schemes[idx_scheme].clone(),
3061 blocker,
3062 automaton: application.clone(),
3063 relay: application.clone(),
3064 reporter: reporter.clone(),
3065 partition: validator.clone().to_string(),
3066 mailbox_size: 1024,
3067 epoch: 333,
3068 namespace: namespace.clone(),
3069 leader_timeout: Duration::from_secs(1),
3070 notarization_timeout: Duration::from_secs(2),
3071 nullify_retry: Duration::from_secs(10),
3072 fetch_timeout: Duration::from_secs(1),
3073 activity_timeout,
3074 skip_timeout,
3075 max_fetch_count: 1,
3076 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
3077 fetch_concurrent: 1,
3078 replay_buffer: NZUsize!(1024 * 1024),
3079 write_buffer: NZUsize!(1024 * 1024),
3080 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3081 };
3082 let engine = Engine::new(context.with_label("engine"), cfg);
3083 engine.start(pending, recovered, resolver);
3084 }
3085 }
3086
3087 let mut finalizers = Vec::new();
3089 for reporter in reporters.iter_mut() {
3090 let (mut latest, mut monitor) = reporter.subscribe().await;
3091 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3092 while latest < required_containers {
3093 latest = monitor.next().await.expect("event missing");
3094 }
3095 }));
3096 }
3097 join_all(finalizers).await;
3098
3099 let byz = &participants[0];
3101 let mut count_nullify_and_finalize = 0;
3102 for reporter in reporters.iter() {
3103 {
3105 let faults = reporter.faults.lock().unwrap();
3106 assert_eq!(faults.len(), 1);
3107 let faulter = faults.get(byz).expect("byzantine party is not faulter");
3108 for (_, faults) in faulter.iter() {
3109 for fault in faults.iter() {
3110 match fault {
3111 Activity::NullifyFinalize(_) => {
3112 count_nullify_and_finalize += 1;
3113 }
3114 _ => panic!("unexpected fault: {fault:?}"),
3115 }
3116 }
3117 }
3118 }
3119
3120 {
3122 let invalid = reporter.invalid.lock().unwrap();
3123 assert_eq!(*invalid, 0);
3124 }
3125 }
3126 assert!(count_nullify_and_finalize > 0);
3127
3128 let blocked = oracle.blocked().await.unwrap();
3130 assert!(!blocked.is_empty());
3131 for (a, b) in blocked {
3132 assert_ne!(&a, byz);
3133 assert_eq!(&b, byz);
3134 }
3135 });
3136 }
3137
3138 #[test_traced]
3139 #[ignore]
3140 fn test_nuller() {
3141 for seed in 0..5 {
3142 nuller(seed, bls12381_threshold::<MinPk, _>);
3143 nuller(seed, bls12381_threshold::<MinSig, _>);
3144 nuller(seed, bls12381_multisig::<MinPk, _>);
3145 nuller(seed, bls12381_multisig::<MinSig, _>);
3146 nuller(seed, ed25519);
3147 }
3148 }
3149
3150 fn outdated<S, F>(seed: u64, mut fixture: F)
3151 where
3152 S: Scheme<PublicKey = ed25519::PublicKey>,
3153 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
3154 {
3155 let n = 4;
3157 let required_containers = 100;
3158 let activity_timeout = 10;
3159 let skip_timeout = 5;
3160 let namespace = b"consensus".to_vec();
3161 let cfg = deterministic::Config::new()
3162 .with_seed(seed)
3163 .with_timeout(Some(Duration::from_secs(30)));
3164 let executor = deterministic::Runner::new(cfg);
3165 executor.start(|mut context| async move {
3166 let (network, mut oracle) = Network::new(
3168 context.with_label("network"),
3169 Config {
3170 max_size: 1024 * 1024,
3171 disconnect_on_block: false,
3172 tracked_peer_sets: None,
3173 },
3174 );
3175
3176 network.start();
3178
3179 let Fixture {
3181 participants,
3182 schemes,
3183 ..
3184 } = fixture(&mut context, n);
3185 let mut registrations = register_validators(&mut oracle, &participants).await;
3186
3187 let link = Link {
3189 latency: Duration::from_millis(10),
3190 jitter: Duration::from_millis(1),
3191 success_rate: 1.0,
3192 };
3193 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3194
3195 let relay = Arc::new(mocks::relay::Relay::new());
3197 let mut reporters = Vec::new();
3198 for (idx_scheme, validator) in participants.iter().enumerate() {
3199 let context = context.with_label(&format!("validator-{}", *validator));
3201
3202 let reporter_config = mocks::reporter::Config {
3204 namespace: namespace.clone(),
3205 participants: participants.clone().into(),
3206 scheme: schemes[idx_scheme].clone(),
3207 };
3208 let reporter =
3209 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3210 let (pending, recovered, resolver) = registrations
3211 .remove(validator)
3212 .expect("validator should be registered");
3213 if idx_scheme == 0 {
3214 let cfg = mocks::outdated::Config {
3215 scheme: schemes[idx_scheme].clone(),
3216 namespace: namespace.clone(),
3217 view_delta: activity_timeout * 4,
3218 };
3219 let engine: mocks::outdated::Outdated<_, _, Sha256> =
3220 mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
3221 engine.start(pending);
3222 } else {
3223 reporters.push(reporter.clone());
3224 let application_cfg = mocks::application::Config {
3225 hasher: Sha256::default(),
3226 relay: relay.clone(),
3227 me: validator.clone(),
3228 propose_latency: (10.0, 5.0),
3229 verify_latency: (10.0, 5.0),
3230 };
3231 let (actor, application) = mocks::application::Application::new(
3232 context.with_label("application"),
3233 application_cfg,
3234 );
3235 actor.start();
3236 let blocker = oracle.control(validator.clone());
3237 let cfg = config::Config {
3238 scheme: schemes[idx_scheme].clone(),
3239 blocker,
3240 automaton: application.clone(),
3241 relay: application.clone(),
3242 reporter: reporter.clone(),
3243 partition: validator.clone().to_string(),
3244 mailbox_size: 1024,
3245 epoch: 333,
3246 namespace: namespace.clone(),
3247 leader_timeout: Duration::from_secs(1),
3248 notarization_timeout: Duration::from_secs(2),
3249 nullify_retry: Duration::from_secs(10),
3250 fetch_timeout: Duration::from_secs(1),
3251 activity_timeout,
3252 skip_timeout,
3253 max_fetch_count: 1,
3254 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
3255 fetch_concurrent: 1,
3256 replay_buffer: NZUsize!(1024 * 1024),
3257 write_buffer: NZUsize!(1024 * 1024),
3258 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3259 };
3260 let engine = Engine::new(context.with_label("engine"), cfg);
3261 engine.start(pending, recovered, resolver);
3262 }
3263 }
3264
3265 let mut finalizers = Vec::new();
3267 for reporter in reporters.iter_mut() {
3268 let (mut latest, mut monitor) = reporter.subscribe().await;
3269 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3270 while latest < required_containers {
3271 latest = monitor.next().await.expect("event missing");
3272 }
3273 }));
3274 }
3275 join_all(finalizers).await;
3276
3277 for reporter in reporters.iter() {
3279 {
3281 let faults = reporter.faults.lock().unwrap();
3282 assert!(faults.is_empty());
3283 }
3284
3285 {
3287 let invalid = reporter.invalid.lock().unwrap();
3288 assert_eq!(*invalid, 0);
3289 }
3290 }
3291
3292 let blocked = oracle.blocked().await.unwrap();
3294 assert!(blocked.is_empty());
3295 });
3296 }
3297
3298 #[test_traced]
3299 #[ignore]
3300 fn test_outdated() {
3301 for seed in 0..5 {
3302 outdated(seed, bls12381_threshold::<MinPk, _>);
3303 outdated(seed, bls12381_threshold::<MinSig, _>);
3304 outdated(seed, bls12381_multisig::<MinPk, _>);
3305 outdated(seed, bls12381_multisig::<MinSig, _>);
3306 outdated(seed, ed25519);
3307 }
3308 }
3309
3310 fn run_1k<S, F>(mut fixture: F)
3311 where
3312 S: Scheme<PublicKey = ed25519::PublicKey>,
3313 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
3314 {
3315 let n = 10;
3317 let required_containers = 1_000;
3318 let activity_timeout = 10;
3319 let skip_timeout = 5;
3320 let namespace = b"consensus".to_vec();
3321 let cfg = deterministic::Config::new();
3322 let executor = deterministic::Runner::new(cfg);
3323 executor.start(|mut context| async move {
3324 let (network, mut oracle) = Network::new(
3326 context.with_label("network"),
3327 Config {
3328 max_size: 1024 * 1024,
3329 disconnect_on_block: false,
3330 tracked_peer_sets: None,
3331 },
3332 );
3333
3334 network.start();
3336
3337 let Fixture {
3339 participants,
3340 schemes,
3341 ..
3342 } = fixture(&mut context, n);
3343 let mut registrations = register_validators(&mut oracle, &participants).await;
3344
3345 let link = Link {
3347 latency: Duration::from_millis(80),
3348 jitter: Duration::from_millis(10),
3349 success_rate: 0.98,
3350 };
3351 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3352
3353 let relay = Arc::new(mocks::relay::Relay::new());
3355 let mut reporters = Vec::new();
3356 let mut engine_handlers = Vec::new();
3357 for (idx, validator) in participants.iter().enumerate() {
3358 let context = context.with_label(&format!("validator-{}", *validator));
3360
3361 let reporter_config = mocks::reporter::Config {
3363 namespace: namespace.clone(),
3364 participants: participants.clone().into(),
3365 scheme: schemes[idx].clone(),
3366 };
3367 let reporter =
3368 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3369 reporters.push(reporter.clone());
3370 let application_cfg = mocks::application::Config {
3371 hasher: Sha256::default(),
3372 relay: relay.clone(),
3373 me: validator.clone(),
3374 propose_latency: (100.0, 50.0),
3375 verify_latency: (50.0, 40.0),
3376 };
3377 let (actor, application) = mocks::application::Application::new(
3378 context.with_label("application"),
3379 application_cfg,
3380 );
3381 actor.start();
3382 let blocker = oracle.control(validator.clone());
3383 let cfg = config::Config {
3384 scheme: schemes[idx].clone(),
3385 blocker,
3386 automaton: application.clone(),
3387 relay: application.clone(),
3388 reporter: reporter.clone(),
3389 partition: validator.to_string(),
3390 mailbox_size: 1024,
3391 epoch: 333,
3392 namespace: namespace.clone(),
3393 leader_timeout: Duration::from_secs(1),
3394 notarization_timeout: Duration::from_secs(2),
3395 nullify_retry: Duration::from_secs(10),
3396 fetch_timeout: Duration::from_secs(1),
3397 activity_timeout,
3398 skip_timeout,
3399 max_fetch_count: 1,
3400 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
3401 fetch_concurrent: 1,
3402 replay_buffer: NZUsize!(1024 * 1024),
3403 write_buffer: NZUsize!(1024 * 1024),
3404 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3405 };
3406 let engine = Engine::new(context.with_label("engine"), cfg);
3407
3408 let (pending, recovered, resolver) = registrations
3410 .remove(validator)
3411 .expect("validator should be registered");
3412 engine_handlers.push(engine.start(pending, recovered, resolver));
3413 }
3414
3415 let mut finalizers = Vec::new();
3417 for reporter in reporters.iter_mut() {
3418 let (mut latest, mut monitor) = reporter.subscribe().await;
3419 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3420 while latest < required_containers {
3421 latest = monitor.next().await.expect("event missing");
3422 }
3423 }));
3424 }
3425 join_all(finalizers).await;
3426
3427 for reporter in reporters.iter() {
3429 {
3431 let faults = reporter.faults.lock().unwrap();
3432 assert!(faults.is_empty());
3433 }
3434
3435 {
3437 let invalid = reporter.invalid.lock().unwrap();
3438 assert_eq!(*invalid, 0);
3439 }
3440 }
3441
3442 let blocked = oracle.blocked().await.unwrap();
3444 assert!(blocked.is_empty());
3445 })
3446 }
3447
3448 #[test_traced]
3449 #[ignore]
3450 fn test_1k_bls12381_threshold_min_pk() {
3451 run_1k(bls12381_threshold::<MinPk, _>);
3452 }
3453
3454 #[test_traced]
3455 #[ignore]
3456 fn test_1k_bls12381_threshold_min_sig() {
3457 run_1k(bls12381_threshold::<MinSig, _>);
3458 }
3459
3460 #[test_traced]
3461 #[ignore]
3462 fn test_1k_bls12381_multisig_min_pk() {
3463 run_1k(bls12381_multisig::<MinPk, _>);
3464 }
3465
3466 #[test_traced]
3467 #[ignore]
3468 fn test_1k_bls12381_multisig_min_sig() {
3469 run_1k(bls12381_multisig::<MinSig, _>);
3470 }
3471
3472 #[test_traced]
3473 #[ignore]
3474 fn test_1k_ed25519() {
3475 run_1k(ed25519);
3476 }
3477
3478 fn children_shutdown_on_engine_abort<S, F>(mut fixture: F)
3479 where
3480 S: Scheme<PublicKey = ed25519::PublicKey>,
3481 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
3482 {
3483 let n = 1;
3485 let namespace = b"consensus".to_vec();
3486 let executor = deterministic::Runner::timed(Duration::from_secs(10));
3487 executor.start(|mut context| async move {
3488 let (network, mut oracle) = Network::new(
3490 context.with_label("network"),
3491 Config {
3492 max_size: 1024 * 1024,
3493 disconnect_on_block: true,
3494 tracked_peer_sets: None,
3495 },
3496 );
3497
3498 network.start();
3500
3501 let Fixture {
3503 participants,
3504 schemes,
3505 ..
3506 } = fixture(&mut context, n);
3507 let mut registrations = register_validators(&mut oracle, &participants).await;
3508
3509 let link = Link {
3511 latency: Duration::from_millis(1),
3512 jitter: Duration::from_millis(0),
3513 success_rate: 1.0,
3514 };
3515 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3516
3517 let reporter_config = mocks::reporter::Config {
3519 namespace: namespace.clone(),
3520 participants: participants.clone().into(),
3521 scheme: schemes[0].clone(),
3522 };
3523 let reporter =
3524 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3525 let relay = Arc::new(mocks::relay::Relay::new());
3526 let application_cfg = mocks::application::Config {
3527 hasher: Sha256::default(),
3528 relay: relay.clone(),
3529 me: participants[0].clone(),
3530 propose_latency: (1.0, 0.0),
3531 verify_latency: (1.0, 0.0),
3532 };
3533 let (actor, application) = mocks::application::Application::new(
3534 context.with_label("application"),
3535 application_cfg,
3536 );
3537 actor.start();
3538 let blocker = oracle.control(participants[0].clone());
3539 let cfg = config::Config {
3540 scheme: schemes[0].clone(),
3541 blocker,
3542 automaton: application.clone(),
3543 relay: application.clone(),
3544 reporter: reporter.clone(),
3545 partition: participants[0].clone().to_string(),
3546 mailbox_size: 64,
3547 epoch: 333,
3548 namespace: namespace.clone(),
3549 leader_timeout: Duration::from_millis(50),
3550 notarization_timeout: Duration::from_millis(100),
3551 nullify_retry: Duration::from_millis(250),
3552 fetch_timeout: Duration::from_millis(50),
3553 activity_timeout: 4,
3554 skip_timeout: 2,
3555 max_fetch_count: 1,
3556 fetch_rate_per_peer: Quota::per_second(NZU32!(10)),
3557 fetch_concurrent: 1,
3558 replay_buffer: NZUsize!(1024 * 16),
3559 write_buffer: NZUsize!(1024 * 16),
3560 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3561 };
3562 let engine = Engine::new(context.with_label("engine"), cfg);
3563
3564 let (pending, recovered, resolver) = registrations
3566 .remove(&participants[0])
3567 .expect("validator should be registered");
3568 let handle = engine.start(pending, recovered, resolver);
3569
3570 context.sleep(Duration::from_millis(1000)).await;
3572
3573 let metrics_before = context.encode();
3575 let is_running = |name: &str| -> bool {
3576 metrics_before.lines().any(|line| {
3577 line.starts_with("runtime_tasks_running{")
3578 && line.contains(&format!("name=\"{name}\""))
3579 && line.contains("kind=\"Task\"")
3580 && line.trim_end().ends_with(" 1")
3581 })
3582 };
3583 assert!(is_running("engine"));
3584 assert!(is_running("engine_batcher"));
3585 assert!(is_running("engine_voter"));
3586 assert!(is_running("engine_resolver"));
3587
3588 context.sleep(Duration::from_millis(1000)).await;
3590 assert!(is_running("engine"));
3591
3592 handle.abort();
3594 let _ = handle.await; context.sleep(Duration::from_millis(1000)).await;
3598
3599 let metrics_after = context.encode();
3600 let is_stopped = |name: &str| -> bool {
3601 metrics_after.lines().any(|line| {
3603 line.starts_with("runtime_tasks_running{")
3604 && line.contains(&format!("name=\"{name}\""))
3605 && line.contains("kind=\"Task\"")
3606 && line.trim_end().ends_with(" 0")
3607 })
3608 };
3609 assert!(is_stopped("engine"));
3610 assert!(is_stopped("engine_batcher"));
3611 assert!(is_stopped("engine_voter"));
3612 assert!(is_stopped("engine_resolver"));
3613 });
3614 }
3615
3616 #[test_traced]
3617 fn test_children_shutdown_on_engine_abort() {
3618 children_shutdown_on_engine_abort(bls12381_threshold::<MinPk, _>);
3619 children_shutdown_on_engine_abort(bls12381_threshold::<MinSig, _>);
3620 children_shutdown_on_engine_abort(bls12381_multisig::<MinPk, _>);
3621 children_shutdown_on_engine_abort(bls12381_multisig::<MinSig, _>);
3622 children_shutdown_on_engine_abort(ed25519);
3623 }
3624
3625 fn attributable_reporter_filtering<S, F>(mut fixture: F)
3626 where
3627 S: Scheme<PublicKey = ed25519::PublicKey>,
3628 F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
3629 {
3630 let n = 3;
3631 let required_containers = 10;
3632 let activity_timeout = 10;
3633 let skip_timeout = 5;
3634 let namespace = b"consensus".to_vec();
3635 let executor = deterministic::Runner::timed(Duration::from_secs(30));
3636 executor.start(|mut context| async move {
3637 let (network, mut oracle) = Network::new(
3639 context.with_label("network"),
3640 Config {
3641 max_size: 1024 * 1024,
3642 disconnect_on_block: false,
3643 tracked_peer_sets: None,
3644 },
3645 );
3646 network.start();
3647
3648 let Fixture {
3650 participants,
3651 schemes,
3652 ..
3653 } = fixture(&mut context, n);
3654 let mut registrations = register_validators(&mut oracle, &participants).await;
3655
3656 let link = Link {
3658 latency: Duration::from_millis(10),
3659 jitter: Duration::from_millis(1),
3660 success_rate: 1.0,
3661 };
3662 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3663
3664 let relay = Arc::new(mocks::relay::Relay::new());
3666 let mut reporters = Vec::new();
3667 for (idx, validator) in participants.iter().enumerate() {
3668 let context = context.with_label(&format!("validator-{}", *validator));
3669
3670 let reporter_config = mocks::reporter::Config {
3671 namespace: namespace.clone(),
3672 participants: participants.clone().into(),
3673 scheme: schemes[idx].clone(),
3674 };
3675 let mock_reporter = mocks::reporter::Reporter::new(
3676 context.with_label("mock_reporter"),
3677 reporter_config,
3678 );
3679
3680 let attributable_reporter = signing_scheme::reporter::AttributableReporter::new(
3682 context.with_label("rng"),
3683 schemes[idx].clone(),
3684 namespace.clone(),
3685 mock_reporter.clone(),
3686 true, );
3688 reporters.push(mock_reporter.clone());
3689
3690 let application_cfg = mocks::application::Config {
3691 hasher: Sha256::default(),
3692 relay: relay.clone(),
3693 me: validator.clone(),
3694 propose_latency: (10.0, 5.0),
3695 verify_latency: (10.0, 5.0),
3696 };
3697 let (actor, application) = mocks::application::Application::new(
3698 context.with_label("application"),
3699 application_cfg,
3700 );
3701 actor.start();
3702 let blocker = oracle.control(validator.clone());
3703 let cfg = config::Config {
3704 scheme: schemes[idx].clone(),
3705 blocker,
3706 automaton: application.clone(),
3707 relay: application.clone(),
3708 reporter: attributable_reporter,
3709 partition: validator.to_string(),
3710 mailbox_size: 1024,
3711 epoch: 333,
3712 namespace: namespace.clone(),
3713 leader_timeout: Duration::from_secs(1),
3714 notarization_timeout: Duration::from_secs(2),
3715 nullify_retry: Duration::from_secs(10),
3716 fetch_timeout: Duration::from_secs(1),
3717 activity_timeout,
3718 skip_timeout,
3719 max_fetch_count: 1,
3720 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
3721 fetch_concurrent: 1,
3722 replay_buffer: NZUsize!(1024 * 1024),
3723 write_buffer: NZUsize!(1024 * 1024),
3724 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3725 };
3726 let engine = Engine::new(context.with_label("engine"), cfg);
3727
3728 let (pending, recovered, resolver) = registrations
3730 .remove(validator)
3731 .expect("validator should be registered");
3732 engine.start(pending, recovered, resolver);
3733 }
3734
3735 let mut finalizers = Vec::new();
3737 for reporter in reporters.iter_mut() {
3738 let (mut latest, mut monitor) = reporter.subscribe().await;
3739 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3740 while latest < required_containers {
3741 latest = monitor.next().await.expect("event missing");
3742 }
3743 }));
3744 }
3745 join_all(finalizers).await;
3746
3747 for reporter in reporters.iter() {
3749 {
3751 let faults = reporter.faults.lock().unwrap();
3752 assert!(faults.is_empty(), "No faults should be reported");
3753 }
3754
3755 {
3757 let invalid = reporter.invalid.lock().unwrap();
3758 assert_eq!(*invalid, 0, "No invalid signatures");
3759 }
3760
3761 {
3763 let notarizations = reporter.notarizations.lock().unwrap();
3764 let finalizations = reporter.finalizations.lock().unwrap();
3765 assert!(
3766 !notarizations.is_empty() || !finalizations.is_empty(),
3767 "Certificates should be reported"
3768 );
3769 }
3770
3771 let notarizes = reporter.notarizes.lock().unwrap();
3773 let last_view = notarizes.keys().max().cloned().unwrap_or_default();
3774 for (view, payloads) in notarizes.iter() {
3775 if *view == last_view {
3776 continue; }
3778
3779 let signers: usize = payloads.values().map(|signers| signers.len()).sum();
3780
3781 if schemes[0].is_attributable() {
3783 assert!(signers > 1, "view {view}: {signers}");
3784 } else {
3785 assert_eq!(signers, 0);
3787 }
3788 }
3789
3790 let finalizes = reporter.finalizes.lock().unwrap();
3792 for (_, payloads) in finalizes.iter() {
3793 let signers: usize = payloads.values().map(|signers| signers.len()).sum();
3794
3795 if schemes[0].is_attributable() {
3797 assert!(signers > 1);
3798 } else {
3799 assert_eq!(signers, 0);
3801 }
3802 }
3803 }
3804
3805 let blocked = oracle.blocked().await.unwrap();
3807 assert!(blocked.is_empty());
3808 });
3809 }
3810
3811 #[test_traced]
3812 fn test_attributable_reporter_filtering() {
3813 attributable_reporter_filtering(bls12381_threshold::<MinPk, _>);
3814 attributable_reporter_filtering(bls12381_threshold::<MinSig, _>);
3815 attributable_reporter_filtering(bls12381_multisig::<MinPk, _>);
3816 attributable_reporter_filtering(bls12381_multisig::<MinSig, _>);
3817 attributable_reporter_filtering(ed25519);
3818 }
3819
3820 fn tle<V: Variant>() {
3821 let n = 4;
3823 let namespace = b"consensus".to_vec();
3824 let activity_timeout = 100;
3825 let skip_timeout = 50;
3826 let executor = deterministic::Runner::timed(Duration::from_secs(30));
3827 executor.start(|mut context| async move {
3828 let (network, mut oracle) = Network::new(
3830 context.with_label("network"),
3831 Config {
3832 max_size: 1024 * 1024,
3833 disconnect_on_block: false,
3834 tracked_peer_sets: None,
3835 },
3836 );
3837
3838 network.start();
3840
3841 let Fixture {
3843 participants,
3844 schemes,
3845 ..
3846 } = bls12381_threshold::<V, _>(&mut context, n);
3847 let mut registrations = register_validators(&mut oracle, &participants).await;
3848
3849 let link = Link {
3851 latency: Duration::from_millis(10),
3852 jitter: Duration::from_millis(5),
3853 success_rate: 1.0,
3854 };
3855 link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3856
3857 let relay = Arc::new(mocks::relay::Relay::new());
3859 let mut reporters = Vec::new();
3860 let mut engine_handlers = Vec::new();
3861 let monitor_reporter = Arc::new(Mutex::new(None));
3862 for (idx, validator) in participants.iter().enumerate() {
3863 let context = context.with_label(&format!("validator-{}", *validator));
3865
3866 let reporter_config = mocks::reporter::Config {
3868 namespace: namespace.clone(),
3869 participants: participants.clone().into(),
3870 scheme: schemes[idx].clone(),
3871 };
3872 let reporter =
3873 mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3874 reporters.push(reporter.clone());
3875 if idx == 0 {
3876 *monitor_reporter.lock().unwrap() = Some(reporter.clone());
3877 }
3878
3879 let application_cfg = mocks::application::Config {
3881 hasher: Sha256::default(),
3882 relay: relay.clone(),
3883 me: validator.clone(),
3884 propose_latency: (10.0, 5.0),
3885 verify_latency: (10.0, 5.0),
3886 };
3887 let (actor, application) = mocks::application::Application::new(
3888 context.with_label("application"),
3889 application_cfg,
3890 );
3891 actor.start();
3892 let blocker = oracle.control(validator.clone());
3893 let cfg = config::Config {
3894 scheme: schemes[idx].clone(),
3895 blocker,
3896 automaton: application.clone(),
3897 relay: application.clone(),
3898 reporter: reporter.clone(),
3899 partition: validator.to_string(),
3900 mailbox_size: 1024,
3901 epoch: 333,
3902 namespace: namespace.clone(),
3903 leader_timeout: Duration::from_millis(100),
3904 notarization_timeout: Duration::from_millis(200),
3905 nullify_retry: Duration::from_millis(500),
3906 fetch_timeout: Duration::from_millis(100),
3907 activity_timeout,
3908 skip_timeout,
3909 max_fetch_count: 1,
3910 fetch_rate_per_peer: Quota::per_second(NZU32!(10)),
3911 fetch_concurrent: 1,
3912 replay_buffer: NZUsize!(1024 * 1024),
3913 write_buffer: NZUsize!(1024 * 1024),
3914 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3915 };
3916 let engine = Engine::new(context.with_label("engine"), cfg);
3917
3918 let (pending, recovered, resolver) = registrations
3920 .remove(validator)
3921 .expect("validator should be registered");
3922 engine_handlers.push(engine.start(pending, recovered, resolver));
3923 }
3924
3925 let target = Round::new(333, 10); let message_content = b"Secret message for future view10"; let message = Block::new(*message_content);
3929
3930 let seed_namespace = seed_namespace(&namespace);
3932 let ciphertext = encrypt::<_, V>(
3933 &mut context,
3934 *schemes[0].identity(),
3935 (Some(&seed_namespace), &target.encode()),
3936 &message,
3937 );
3938
3939 let reporter = monitor_reporter.lock().unwrap().clone().unwrap();
3941 loop {
3942 context.sleep(Duration::from_millis(100)).await;
3944 let notarizations = reporter.notarizations.lock().unwrap();
3945 let Some(notarization) = notarizations.get(&target.view()) else {
3946 continue;
3947 };
3948
3949 let seed_signature = notarization.certificate.seed_signature;
3951 let decrypted = decrypt::<V>(&seed_signature, &ciphertext)
3952 .expect("Decryption should succeed with valid seed signature");
3953 assert_eq!(
3954 message.as_ref(),
3955 decrypted.as_ref(),
3956 "Decrypted message should match original message"
3957 );
3958 break;
3959 }
3960 });
3961 }
3962
3963 #[test_traced]
3964 fn test_tle() {
3965 tle::<MinPk>();
3966 tle::<MinSig>();
3967 }
3968}