1use types::View;
168
169pub mod types;
170
171cfg_if::cfg_if! {
172 if #[cfg(not(target_arch = "wasm32"))] {
173 mod actors;
174 mod config;
175 pub use config::Config;
176 mod engine;
177 pub use engine::Engine;
178 mod metrics;
179 }
180}
181
182#[cfg(test)]
183pub mod mocks;
184
185pub(crate) fn min_active(activity_timeout: View, last_finalized: View) -> View {
187 last_finalized.saturating_sub(activity_timeout)
188}
189
190pub(crate) fn interesting(
194 activity_timeout: View,
195 last_finalized: View,
196 current: View,
197 pending: View,
198 allow_future: bool,
199) -> bool {
200 if pending < min_active(activity_timeout, last_finalized) {
201 return false;
202 }
203 if !allow_future && pending > current + 1 {
204 return false;
205 }
206 true
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212 use crate::{threshold_simplex::types::seed_namespace, Monitor};
213 use commonware_cryptography::{
214 bls12381::{
215 dkg::ops,
216 primitives::{
217 poly::public,
218 variant::{MinPk, MinSig, Variant},
219 },
220 tle::{decrypt, encrypt, Block},
221 },
222 ed25519::PrivateKey,
223 PrivateKeyExt as _, PublicKey, Sha256, Signer as _,
224 };
225 use commonware_macros::{select, test_traced};
226 use commonware_p2p::simulated::{Config, Link, Network, Oracle, Receiver, Sender};
227 use commonware_runtime::{deterministic, Clock, Metrics, Runner, Spawner};
228 use commonware_utils::{quorum, NZU32};
229 use engine::Engine;
230 use futures::{future::join_all, StreamExt};
231 use governor::Quota;
232 use rand::{rngs::StdRng, Rng as _, SeedableRng as _};
233 use std::{
234 collections::{BTreeMap, HashMap},
235 sync::{Arc, Mutex},
236 time::Duration,
237 };
238 use tracing::{debug, warn};
239 use types::Activity;
240
241 async fn register_validators<P: PublicKey>(
243 oracle: &mut Oracle<P>,
244 validators: &[P],
245 ) -> HashMap<
246 P,
247 (
248 (Sender<P>, Receiver<P>),
249 (Sender<P>, Receiver<P>),
250 (Sender<P>, Receiver<P>),
251 ),
252 > {
253 let mut registrations = HashMap::new();
254 for validator in validators.iter() {
255 let (pending_sender, pending_receiver) =
256 oracle.register(validator.clone(), 0).await.unwrap();
257 let (recovered_sender, recovered_receiver) =
258 oracle.register(validator.clone(), 1).await.unwrap();
259 let (resolver_sender, resolver_receiver) =
260 oracle.register(validator.clone(), 2).await.unwrap();
261 registrations.insert(
262 validator.clone(),
263 (
264 (pending_sender, pending_receiver),
265 (recovered_sender, recovered_receiver),
266 (resolver_sender, resolver_receiver),
267 ),
268 );
269 }
270 registrations
271 }
272
273 enum Action {
275 Link(Link),
276 Update(Link), Unlink,
278 }
279
280 async fn link_validators<P: PublicKey>(
286 oracle: &mut Oracle<P>,
287 validators: &[P],
288 action: Action,
289 restrict_to: Option<fn(usize, usize, usize) -> bool>,
290 ) {
291 for (i1, v1) in validators.iter().enumerate() {
292 for (i2, v2) in validators.iter().enumerate() {
293 if v2 == v1 {
295 continue;
296 }
297
298 if let Some(f) = restrict_to {
300 if !f(validators.len(), i1, i2) {
301 continue;
302 }
303 }
304
305 match action {
307 Action::Update(_) | Action::Unlink => {
308 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
309 }
310 _ => {}
311 }
312
313 match action {
315 Action::Link(ref link) | Action::Update(ref link) => {
316 oracle
317 .add_link(v1.clone(), v2.clone(), link.clone())
318 .await
319 .unwrap();
320 }
321 _ => {}
322 }
323 }
324 }
325 }
326
327 fn all_online<V: Variant>() {
328 let n = 5;
330 let threshold = quorum(n);
331 let required_containers = 100;
332 let activity_timeout = 10;
333 let skip_timeout = 5;
334 let namespace = b"consensus".to_vec();
335 let executor = deterministic::Runner::timed(Duration::from_secs(30));
336 executor.start(|mut context| async move {
337 let (network, mut oracle) = Network::new(
339 context.with_label("network"),
340 Config {
341 max_size: 1024 * 1024,
342 },
343 );
344
345 network.start();
347
348 let mut schemes = Vec::new();
350 let mut validators = Vec::new();
351 for i in 0..n {
352 let scheme = PrivateKey::from_seed(i as u64);
353 let pk = scheme.public_key();
354 schemes.push(scheme);
355 validators.push(pk);
356 }
357 validators.sort();
358 schemes.sort_by_key(|s| s.public_key());
359 let mut registrations = register_validators(&mut oracle, &validators).await;
360
361 let link = Link {
363 latency: 10.0,
364 jitter: 1.0,
365 success_rate: 1.0,
366 };
367 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
368
369 let (polynomial, shares) =
371 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
372
373 let relay = Arc::new(mocks::relay::Relay::new());
375 let mut supervisors = Vec::new();
376 let mut engine_handlers = Vec::new();
377 for (idx, scheme) in schemes.into_iter().enumerate() {
378 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
380
381 let validator = scheme.public_key();
383 let mut participants = BTreeMap::new();
384 participants.insert(
385 0,
386 (polynomial.clone(), validators.clone(), shares[idx].clone()),
387 );
388 let supervisor_config = mocks::supervisor::Config::<_, V> {
389 namespace: namespace.clone(),
390 participants,
391 };
392 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
393 supervisors.push(supervisor.clone());
394 let application_cfg = mocks::application::Config {
395 hasher: Sha256::default(),
396 relay: relay.clone(),
397 participant: validator.clone(),
398 propose_latency: (10.0, 5.0),
399 verify_latency: (10.0, 5.0),
400 };
401 let (actor, application) = mocks::application::Application::new(
402 context.with_label("application"),
403 application_cfg,
404 );
405 actor.start();
406 let blocker = oracle.control(scheme.public_key());
407 let cfg = config::Config {
408 crypto: scheme,
409 blocker,
410 automaton: application.clone(),
411 relay: application.clone(),
412 reporter: supervisor.clone(),
413 supervisor,
414 partition: validator.to_string(),
415 compression: Some(3),
416 mailbox_size: 1024,
417 namespace: namespace.clone(),
418 leader_timeout: Duration::from_secs(1),
419 notarization_timeout: Duration::from_secs(2),
420 nullify_retry: Duration::from_secs(10),
421 fetch_timeout: Duration::from_secs(1),
422 activity_timeout,
423 skip_timeout,
424 max_fetch_count: 1,
425 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
426 fetch_concurrent: 1,
427 replay_buffer: 1024 * 1024,
428 write_buffer: 1024 * 1024,
429 };
430 let engine = Engine::new(context.with_label("engine"), cfg);
431
432 let (pending, recovered, resolver) = registrations
434 .remove(&validator)
435 .expect("validator should be registered");
436 engine_handlers.push(engine.start(pending, recovered, resolver));
437 }
438
439 let mut finalizers = Vec::new();
441 for supervisor in supervisors.iter_mut() {
442 let (mut latest, mut monitor) = supervisor.subscribe().await;
443 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
444 while latest < required_containers {
445 latest = monitor.next().await.expect("event missing");
446 }
447 }));
448 }
449 join_all(finalizers).await;
450
451 let latest_complete = required_containers - activity_timeout;
453 for supervisor in supervisors.iter() {
454 {
456 let faults = supervisor.faults.lock().unwrap();
457 assert!(faults.is_empty());
458 }
459
460 {
462 let invalid = supervisor.invalid.lock().unwrap();
463 assert_eq!(*invalid, 0);
464 }
465
466 {
468 let seeds = supervisor.seeds.lock().unwrap();
469 for view in 1..latest_complete {
470 if !seeds.contains_key(&view) {
472 panic!("view: {view}");
473 }
474 }
475 }
476
477 let mut notarized = HashMap::new();
479 let mut finalized = HashMap::new();
480 {
481 let notarizes = supervisor.notarizes.lock().unwrap();
482 for view in 1..latest_complete {
483 let Some(payloads) = notarizes.get(&view) else {
485 continue;
486 };
487 if payloads.len() > 1 {
488 panic!("view: {view}");
489 }
490 let (digest, notarizers) = payloads.iter().next().unwrap();
491 notarized.insert(view, *digest);
492
493 if notarizers.len() < threshold as usize {
494 panic!("view: {view}");
497 }
498 }
499 }
500 {
501 let notarizations = supervisor.notarizations.lock().unwrap();
502 for view in 1..latest_complete {
503 let Some(notarization) = notarizations.get(&view) else {
505 continue;
506 };
507 let Some(digest) = notarized.get(&view) else {
508 continue;
509 };
510 assert_eq!(¬arization.proposal.payload, digest);
511 }
512 }
513 {
514 let finalizes = supervisor.finalizes.lock().unwrap();
515 for view in 1..latest_complete {
516 let Some(payloads) = finalizes.get(&view) else {
518 continue;
519 };
520 if payloads.len() > 1 {
521 panic!("view: {view}");
522 }
523 let (digest, finalizers) = payloads.iter().next().unwrap();
524 finalized.insert(view, *digest);
525
526 if view > latest_complete {
528 continue;
529 }
530
531 if finalizers.len() < threshold as usize {
533 panic!("view: {view}");
536 }
537
538 let nullifies = supervisor.nullifies.lock().unwrap();
540 let Some(nullifies) = nullifies.get(&view) else {
541 continue;
542 };
543 for (_, finalizers) in payloads.iter() {
544 for finalizer in finalizers.iter() {
545 if nullifies.contains(finalizer) {
546 panic!("should not nullify and finalize at same view");
547 }
548 }
549 }
550 }
551 }
552 {
553 let finalizations = supervisor.finalizations.lock().unwrap();
554 for view in 1..latest_complete {
555 let Some(finalization) = finalizations.get(&view) else {
557 continue;
558 };
559 let Some(digest) = finalized.get(&view) else {
560 continue;
561 };
562 assert_eq!(&finalization.proposal.payload, digest);
563 }
564 }
565 }
566
567 let blocked = oracle.blocked().await.unwrap();
569 assert!(blocked.is_empty());
570 });
571 }
572
573 #[test_traced]
574 fn test_all_online() {
575 all_online::<MinPk>();
576 all_online::<MinSig>();
577 }
578
579 fn unclean_shutdown<V: Variant>() {
580 let n = 5;
582 let threshold = quorum(n);
583 let required_containers = 100;
584 let activity_timeout = 10;
585 let skip_timeout = 5;
586 let namespace = b"consensus".to_vec();
587
588 let mut rng = StdRng::seed_from_u64(0);
590 let (polynomial, shares) = ops::generate_shares::<_, V>(&mut rng, None, n, threshold);
591
592 let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
594 let supervised = Arc::new(Mutex::new(Vec::new()));
595 let mut prev_ctx = None;
596
597 loop {
598 let namespace = namespace.clone();
599 let shutdowns = shutdowns.clone();
600 let supervised = supervised.clone();
601 let polynomial = polynomial.clone();
602 let shares = shares.clone();
603
604 let f = |mut context: deterministic::Context| async move {
605 let (network, mut oracle) = Network::new(
607 context.with_label("network"),
608 Config {
609 max_size: 1024 * 1024,
610 },
611 );
612
613 network.start();
615
616 let mut schemes = Vec::new();
618 let mut validators = Vec::new();
619 for i in 0..n {
620 let scheme = PrivateKey::from_seed(i as u64);
621 let pk = scheme.public_key();
622 schemes.push(scheme);
623 validators.push(pk);
624 }
625 validators.sort();
626 schemes.sort_by_key(|s| s.public_key());
627 let mut registrations = register_validators(&mut oracle, &validators).await;
628
629 let link = Link {
631 latency: 50.0,
632 jitter: 50.0,
633 success_rate: 1.0,
634 };
635 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
636
637 let relay = Arc::new(mocks::relay::Relay::new());
639 let mut supervisors = HashMap::new();
640 let mut engine_handlers = Vec::new();
641 for (idx, scheme) in schemes.into_iter().enumerate() {
642 let context = context
644 .clone()
645 .with_label(&format!("validator-{}", scheme.public_key()));
646
647 let validator = scheme.public_key();
649 let mut participants = BTreeMap::new();
650 participants.insert(
651 0,
652 (polynomial.clone(), validators.clone(), shares[idx].clone()),
653 );
654 let supervisor_config = mocks::supervisor::Config::<_, V> {
655 namespace: namespace.clone(),
656 participants,
657 };
658 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
659 supervisors.insert(validator.clone(), supervisor.clone());
660 let application_cfg = mocks::application::Config {
661 hasher: Sha256::default(),
662 relay: relay.clone(),
663 participant: validator.clone(),
664 propose_latency: (10.0, 5.0),
665 verify_latency: (10.0, 5.0),
666 };
667 let (actor, application) = mocks::application::Application::new(
668 context.with_label("application"),
669 application_cfg,
670 );
671 actor.start();
672 let blocker = oracle.control(scheme.public_key());
673 let cfg = config::Config {
674 crypto: scheme,
675 blocker,
676 automaton: application.clone(),
677 relay: application.clone(),
678 reporter: supervisor.clone(),
679 supervisor,
680 partition: validator.to_string(),
681 compression: Some(3),
682 mailbox_size: 1024,
683 namespace: namespace.clone(),
684 leader_timeout: Duration::from_secs(1),
685 notarization_timeout: Duration::from_secs(2),
686 nullify_retry: Duration::from_secs(10),
687 fetch_timeout: Duration::from_secs(1),
688 activity_timeout,
689 skip_timeout,
690 max_fetch_count: 1,
691 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
692 fetch_concurrent: 1,
693 replay_buffer: 1024 * 1024,
694 write_buffer: 1024 * 1024,
695 };
696 let engine = Engine::new(context.with_label("engine"), cfg);
697
698 let (pending, recovered, resolver) = registrations
700 .remove(&validator)
701 .expect("validator should be registered");
702 engine_handlers.push(engine.start(pending, recovered, resolver));
703 }
704
705 let mut finalizers = Vec::new();
707 for (_, supervisor) in supervisors.iter_mut() {
708 let (mut latest, mut monitor) = supervisor.subscribe().await;
709 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
710 while latest < required_containers {
711 latest = monitor.next().await.expect("event missing");
712 }
713 }));
714 }
715
716 let wait =
718 context.gen_range(Duration::from_millis(10)..Duration::from_millis(2_000));
719 let result = select! {
720 _ = context.sleep(wait) => {
721 {
723 let mut shutdowns = shutdowns.lock().unwrap();
724 debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
725 *shutdowns += 1;
726 }
727 supervised.lock().unwrap().push(supervisors);
728 (false,context)
729 },
730 _ = join_all(finalizers) => {
731 let supervised = supervised.lock().unwrap();
733 for supervisors in supervised.iter() {
734 for (_, supervisor) in supervisors.iter() {
735 let faults = supervisor.faults.lock().unwrap();
736 assert!(faults.is_empty());
737 }
738 }
739 (true,context)
740 }
741 };
742
743 let blocked = oracle.blocked().await.unwrap();
745 assert!(blocked.is_empty());
746
747 result
748 };
749
750 let (complete, context) = if let Some(prev_ctx) = prev_ctx {
751 deterministic::Runner::from(prev_ctx)
752 } else {
753 deterministic::Runner::timed(Duration::from_secs(30))
754 }
755 .start(f);
756
757 if complete {
759 break;
760 }
761
762 prev_ctx = Some(context.recover());
763 }
764 }
765
766 #[test_traced]
767 fn test_unclean_shutdown() {
768 unclean_shutdown::<MinPk>();
769 unclean_shutdown::<MinSig>();
770 }
771
772 fn backfill<V: Variant>() {
773 let n = 4;
775 let threshold = quorum(n);
776 let required_containers = 100;
777 let activity_timeout = 10;
778 let skip_timeout = 5;
779 let namespace = b"consensus".to_vec();
780 let executor = deterministic::Runner::timed(Duration::from_secs(720));
781 executor.start(|mut context| async move {
782 let (network, mut oracle) = Network::new(
784 context.with_label("network"),
785 Config {
786 max_size: 1024 * 1024,
787 },
788 );
789
790 network.start();
792
793 let mut schemes = Vec::new();
795 let mut validators = Vec::new();
796 for i in 0..n {
797 let scheme = PrivateKey::from_seed(i as u64);
798 let pk = scheme.public_key();
799 schemes.push(scheme);
800 validators.push(pk);
801 }
802 validators.sort();
803 schemes.sort_by_key(|s| s.public_key());
804 let mut registrations = register_validators(&mut oracle, &validators).await;
805
806 let link = Link {
808 latency: 10.0,
809 jitter: 1.0,
810 success_rate: 1.0,
811 };
812 link_validators(
813 &mut oracle,
814 &validators,
815 Action::Link(link),
816 Some(|_, i, j| ![i, j].contains(&0usize)),
817 )
818 .await;
819
820 let (polynomial, shares) =
822 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
823
824 let relay = Arc::new(mocks::relay::Relay::new());
826 let mut supervisors = Vec::new();
827 let mut engine_handlers = Vec::new();
828 for (idx_scheme, scheme) in schemes.iter().enumerate() {
829 if idx_scheme == 0 {
831 continue;
832 }
833
834 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
836
837 let validator = scheme.public_key();
839 let mut participants = BTreeMap::new();
840 participants.insert(
841 0,
842 (
843 polynomial.clone(),
844 validators.clone(),
845 shares[idx_scheme].clone(),
846 ),
847 );
848 let supervisor_config = mocks::supervisor::Config {
849 namespace: namespace.clone(),
850 participants,
851 };
852 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
853 supervisors.push(supervisor.clone());
854 let application_cfg = mocks::application::Config {
855 hasher: Sha256::default(),
856 relay: relay.clone(),
857 participant: validator.clone(),
858 propose_latency: (10.0, 5.0),
859 verify_latency: (10.0, 5.0),
860 };
861 let (actor, application) = mocks::application::Application::new(
862 context.with_label("application"),
863 application_cfg,
864 );
865 actor.start();
866 let blocker = oracle.control(scheme.public_key());
867 let cfg = config::Config {
868 crypto: scheme.clone(),
869 blocker,
870 automaton: application.clone(),
871 relay: application.clone(),
872 reporter: supervisor.clone(),
873 supervisor,
874 partition: validator.to_string(),
875 compression: Some(3),
876 mailbox_size: 1024,
877 namespace: namespace.clone(),
878 leader_timeout: Duration::from_secs(1),
879 notarization_timeout: Duration::from_secs(2),
880 nullify_retry: Duration::from_secs(10),
881 fetch_timeout: Duration::from_secs(1),
882 activity_timeout,
883 skip_timeout,
884 max_fetch_count: 1, fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
886 fetch_concurrent: 1,
887 replay_buffer: 1024 * 1024,
888 write_buffer: 1024 * 1024,
889 };
890 let engine = Engine::new(context.with_label("engine"), cfg);
891
892 let (pending, recovered, resolver) = registrations
894 .remove(&validator)
895 .expect("validator should be registered");
896 engine_handlers.push(engine.start(pending, recovered, resolver));
897 }
898
899 let mut finalizers = Vec::new();
901 for supervisor in supervisors.iter_mut() {
902 let (mut latest, mut monitor) = supervisor.subscribe().await;
903 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
904 while latest < required_containers {
905 latest = monitor.next().await.expect("event missing");
906 }
907 }));
908 }
909 join_all(finalizers).await;
910
911 let link = Link {
913 latency: 3_000.0,
914 jitter: 0.0,
915 success_rate: 1.0,
916 };
917 link_validators(
918 &mut oracle,
919 &validators,
920 Action::Update(link.clone()),
921 Some(|_, i, j| ![i, j].contains(&0usize)),
922 )
923 .await;
924
925 context.sleep(Duration::from_secs(120)).await;
927
928 link_validators(
930 &mut oracle,
931 &validators,
932 Action::Unlink,
933 Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
934 )
935 .await;
936
937 let scheme = schemes[0].clone();
939 let validator = scheme.public_key();
940 let context = context.with_label(&format!("validator-{validator}"));
941
942 link_validators(
944 &mut oracle,
945 &validators,
946 Action::Link(link),
947 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
948 )
949 .await;
950
951 let link = Link {
953 latency: 10.0,
954 jitter: 2.5,
955 success_rate: 1.0,
956 };
957 link_validators(
958 &mut oracle,
959 &validators,
960 Action::Update(link),
961 Some(|_, i, j| ![i, j].contains(&1usize)),
962 )
963 .await;
964
965 let mut participants = BTreeMap::new();
967 participants.insert(
968 0,
969 (polynomial.clone(), validators.clone(), shares[0].clone()),
970 );
971 let supervisor_config = mocks::supervisor::Config::<_, V> {
972 namespace: namespace.clone(),
973 participants,
974 };
975 let mut supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
976 supervisors.push(supervisor.clone());
977 let application_cfg = mocks::application::Config {
978 hasher: Sha256::default(),
979 relay: relay.clone(),
980 participant: validator.clone(),
981 propose_latency: (10.0, 5.0),
982 verify_latency: (10.0, 5.0),
983 };
984 let (actor, application) = mocks::application::Application::new(
985 context.with_label("application"),
986 application_cfg,
987 );
988 actor.start();
989 let blocker = oracle.control(scheme.public_key());
990 let cfg = config::Config {
991 crypto: scheme,
992 blocker,
993 automaton: application.clone(),
994 relay: application.clone(),
995 reporter: supervisor.clone(),
996 supervisor: supervisor.clone(),
997 partition: validator.to_string(),
998 compression: Some(3),
999 mailbox_size: 1024,
1000 namespace: namespace.clone(),
1001 leader_timeout: Duration::from_secs(1),
1002 notarization_timeout: Duration::from_secs(2),
1003 nullify_retry: Duration::from_secs(10),
1004 fetch_timeout: Duration::from_secs(1),
1005 activity_timeout,
1006 skip_timeout,
1007 max_fetch_count: 1,
1008 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1009 fetch_concurrent: 1,
1010 replay_buffer: 1024 * 1024,
1011 write_buffer: 1024 * 1024,
1012 };
1013 let engine = Engine::new(context.with_label("engine"), cfg);
1014
1015 let (pending, recovered, resolver) = registrations
1017 .remove(&validator)
1018 .expect("validator should be registered");
1019 engine_handlers.push(engine.start(pending, recovered, resolver));
1020
1021 let (mut latest, mut monitor) = supervisor.subscribe().await;
1023 while latest < required_containers {
1024 latest = monitor.next().await.expect("event missing");
1025 }
1026
1027 let blocked = oracle.blocked().await.unwrap();
1029 assert!(blocked.is_empty());
1030 });
1031 }
1032
1033 #[test_traced]
1034 fn test_backfill() {
1035 backfill::<MinPk>();
1036 backfill::<MinSig>();
1037 }
1038
1039 fn one_offline<V: Variant>() {
1040 let n = 5;
1042 let threshold = quorum(n);
1043 let required_containers = 100;
1044 let activity_timeout = 10;
1045 let skip_timeout = 5;
1046 let max_exceptions = 10;
1047 let namespace = b"consensus".to_vec();
1048 let executor = deterministic::Runner::timed(Duration::from_secs(30));
1049 executor.start(|mut context| async move {
1050 let (network, mut oracle) = Network::new(
1052 context.with_label("network"),
1053 Config {
1054 max_size: 1024 * 1024,
1055 },
1056 );
1057
1058 network.start();
1060
1061 let mut schemes = Vec::new();
1063 let mut validators = Vec::new();
1064 for i in 0..n {
1065 let scheme = PrivateKey::from_seed(i as u64);
1066 let pk = scheme.public_key();
1067 schemes.push(scheme);
1068 validators.push(pk);
1069 }
1070 validators.sort();
1071 schemes.sort_by_key(|s| s.public_key());
1072 let mut registrations = register_validators(&mut oracle, &validators).await;
1073
1074 let link = Link {
1076 latency: 10.0,
1077 jitter: 1.0,
1078 success_rate: 1.0,
1079 };
1080 link_validators(
1081 &mut oracle,
1082 &validators,
1083 Action::Link(link),
1084 Some(|_, i, j| ![i, j].contains(&0usize)),
1085 )
1086 .await;
1087
1088 let (polynomial, shares) =
1090 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1091
1092 let relay = Arc::new(mocks::relay::Relay::new());
1094 let mut supervisors = Vec::new();
1095 let mut engine_handlers = Vec::new();
1096 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1097 if idx_scheme == 0 {
1099 continue;
1100 }
1101
1102 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1104
1105 let validator = scheme.public_key();
1107 let mut participants = BTreeMap::new();
1108 participants.insert(
1109 0,
1110 (
1111 polynomial.clone(),
1112 validators.clone(),
1113 shares[idx_scheme].clone(),
1114 ),
1115 );
1116 let supervisor_config = mocks::supervisor::Config::<_, V> {
1117 namespace: namespace.clone(),
1118 participants,
1119 };
1120 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1121 supervisors.push(supervisor.clone());
1122 let application_cfg = mocks::application::Config {
1123 hasher: Sha256::default(),
1124 relay: relay.clone(),
1125 participant: validator.clone(),
1126 propose_latency: (10.0, 5.0),
1127 verify_latency: (10.0, 5.0),
1128 };
1129 let (actor, application) = mocks::application::Application::new(
1130 context.with_label("application"),
1131 application_cfg,
1132 );
1133 actor.start();
1134 let blocker = oracle.control(scheme.public_key());
1135 let cfg = config::Config {
1136 crypto: scheme,
1137 blocker,
1138 automaton: application.clone(),
1139 relay: application.clone(),
1140 reporter: supervisor.clone(),
1141 supervisor,
1142 partition: validator.to_string(),
1143 compression: Some(3),
1144 mailbox_size: 1024,
1145 namespace: namespace.clone(),
1146 leader_timeout: Duration::from_secs(1),
1147 notarization_timeout: Duration::from_secs(2),
1148 nullify_retry: Duration::from_secs(10),
1149 fetch_timeout: Duration::from_secs(1),
1150 activity_timeout,
1151 skip_timeout,
1152 max_fetch_count: 1,
1153 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1154 fetch_concurrent: 1,
1155 replay_buffer: 1024 * 1024,
1156 write_buffer: 1024 * 1024,
1157 };
1158 let engine = Engine::new(context.with_label("engine"), cfg);
1159
1160 let (pending, recovered, resolver) = registrations
1162 .remove(&validator)
1163 .expect("validator should be registered");
1164 engine_handlers.push(engine.start(pending, recovered, resolver));
1165 }
1166
1167 let mut finalizers = Vec::new();
1169 for supervisor in supervisors.iter_mut() {
1170 let (mut latest, mut monitor) = supervisor.subscribe().await;
1171 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1172 while latest < required_containers {
1173 latest = monitor.next().await.expect("event missing");
1174 }
1175 }));
1176 }
1177 join_all(finalizers).await;
1178
1179 let exceptions = 0;
1181 let offline = &validators[0];
1182 for supervisor in supervisors.iter() {
1183 {
1185 let faults = supervisor.faults.lock().unwrap();
1186 assert!(faults.is_empty());
1187 }
1188
1189 {
1191 let invalid = supervisor.invalid.lock().unwrap();
1192 assert_eq!(*invalid, 0);
1193 }
1194
1195 let mut exceptions = 0;
1197 {
1198 let notarizes = supervisor.notarizes.lock().unwrap();
1199 for (view, payloads) in notarizes.iter() {
1200 for (_, participants) in payloads.iter() {
1201 if participants.contains(offline) {
1202 panic!("view: {view}");
1203 }
1204 }
1205 }
1206 }
1207 {
1208 let nullifies = supervisor.nullifies.lock().unwrap();
1209 for (view, participants) in nullifies.iter() {
1210 if participants.contains(offline) {
1211 panic!("view: {view}");
1212 }
1213 }
1214 }
1215 {
1216 let finalizes = supervisor.finalizes.lock().unwrap();
1217 for (view, payloads) in finalizes.iter() {
1218 for (_, finalizers) in payloads.iter() {
1219 if finalizers.contains(offline) {
1220 panic!("view: {view}");
1221 }
1222 }
1223 }
1224 }
1225
1226 let mut offline_views = Vec::new();
1228 {
1229 let leaders = supervisor.leaders.lock().unwrap();
1230 for (view, leader) in leaders.iter() {
1231 if leader == offline {
1232 offline_views.push(*view);
1233 }
1234 }
1235 }
1236 assert!(!offline_views.is_empty());
1237
1238 {
1240 let nullifies = supervisor.nullifies.lock().unwrap();
1241 for view in offline_views.iter() {
1242 let nullifies = nullifies.get(view).map_or(0, |n| n.len());
1243 if nullifies < threshold as usize {
1244 warn!("missing expected view nullifies: {}", view);
1245 exceptions += 1;
1246 }
1247 }
1248 }
1249 {
1250 let nullifications = supervisor.nullifications.lock().unwrap();
1251 for view in offline_views.iter() {
1252 if !nullifications.contains_key(view) {
1253 warn!("missing expected view nullifies: {}", view);
1254 exceptions += 1;
1255 }
1256 }
1257 }
1258
1259 assert!(exceptions <= max_exceptions);
1261 }
1262 assert!(exceptions <= max_exceptions);
1263
1264 let blocked = oracle.blocked().await.unwrap();
1266 assert!(blocked.is_empty());
1267
1268 let encoded = context.encode();
1270 let lines = encoded.lines();
1271 let mut skipped_views = 0;
1272 let mut nodes_skipping = 0;
1273 for line in lines {
1274 if line.contains("_engine_voter_skipped_views_total") {
1275 let parts: Vec<&str> = line.split_whitespace().collect();
1276 if let Some(number_str) = parts.last() {
1277 if let Ok(number) = number_str.parse::<u64>() {
1278 if number > 0 {
1279 nodes_skipping += 1;
1280 }
1281 if number > skipped_views {
1282 skipped_views = number;
1283 }
1284 }
1285 }
1286 }
1287 }
1288 assert!(
1289 skipped_views > 0,
1290 "expected skipped views to be greater than 0"
1291 );
1292 assert_eq!(
1293 nodes_skipping,
1294 n - 1,
1295 "expected all online nodes to be skipping views"
1296 );
1297 });
1298 }
1299
1300 #[test_traced]
1301 fn test_one_offline() {
1302 one_offline::<MinPk>();
1303 one_offline::<MinSig>();
1304 }
1305
1306 fn slow_validator<V: Variant>() {
1307 let n = 5;
1309 let threshold = quorum(n);
1310 let required_containers = 50;
1311 let activity_timeout = 10;
1312 let skip_timeout = 5;
1313 let namespace = b"consensus".to_vec();
1314 let executor = deterministic::Runner::timed(Duration::from_secs(30));
1315 executor.start(|mut context| async move {
1316 let (network, mut oracle) = Network::new(
1318 context.with_label("network"),
1319 Config {
1320 max_size: 1024 * 1024,
1321 },
1322 );
1323
1324 network.start();
1326
1327 let mut schemes = Vec::new();
1329 let mut validators = Vec::new();
1330 for i in 0..n {
1331 let scheme = PrivateKey::from_seed(i as u64);
1332 let pk = scheme.public_key();
1333 schemes.push(scheme);
1334 validators.push(pk);
1335 }
1336 validators.sort();
1337 schemes.sort_by_key(|s| s.public_key());
1338 let mut registrations = register_validators(&mut oracle, &validators).await;
1339
1340 let link = Link {
1342 latency: 10.0,
1343 jitter: 1.0,
1344 success_rate: 1.0,
1345 };
1346 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1347
1348 let (polynomial, shares) =
1350 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1351
1352 let relay = Arc::new(mocks::relay::Relay::new());
1354 let mut supervisors = Vec::new();
1355 let mut engine_handlers = Vec::new();
1356 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1357 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1359
1360 let validator = scheme.public_key();
1362 let mut participants = BTreeMap::new();
1363 participants.insert(
1364 0,
1365 (
1366 polynomial.clone(),
1367 validators.clone(),
1368 shares[idx_scheme].clone(),
1369 ),
1370 );
1371 let supervisor_config = mocks::supervisor::Config::<_, V> {
1372 namespace: namespace.clone(),
1373 participants,
1374 };
1375 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1376 supervisors.push(supervisor.clone());
1377 let application_cfg = if idx_scheme == 0 {
1378 mocks::application::Config {
1379 hasher: Sha256::default(),
1380 relay: relay.clone(),
1381 participant: validator.clone(),
1382 propose_latency: (10_000.0, 0.0),
1383 verify_latency: (10_000.0, 5.0),
1384 }
1385 } else {
1386 mocks::application::Config {
1387 hasher: Sha256::default(),
1388 relay: relay.clone(),
1389 participant: validator.clone(),
1390 propose_latency: (10.0, 5.0),
1391 verify_latency: (10.0, 5.0),
1392 }
1393 };
1394 let (actor, application) = mocks::application::Application::new(
1395 context.with_label("application"),
1396 application_cfg,
1397 );
1398 actor.start();
1399 let blocker = oracle.control(scheme.public_key());
1400 let cfg = config::Config {
1401 crypto: scheme,
1402 blocker,
1403 automaton: application.clone(),
1404 relay: application.clone(),
1405 reporter: supervisor.clone(),
1406 supervisor,
1407 partition: validator.to_string(),
1408 compression: Some(3),
1409 mailbox_size: 1024,
1410 namespace: namespace.clone(),
1411 leader_timeout: Duration::from_secs(1),
1412 notarization_timeout: Duration::from_secs(2),
1413 nullify_retry: Duration::from_secs(10),
1414 fetch_timeout: Duration::from_secs(1),
1415 activity_timeout,
1416 skip_timeout,
1417 max_fetch_count: 1,
1418 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1419 fetch_concurrent: 1,
1420 replay_buffer: 1024 * 1024,
1421 write_buffer: 1024 * 1024,
1422 };
1423 let engine = Engine::new(context.with_label("engine"), cfg);
1424
1425 let (pending, recovered, resolver) = registrations
1427 .remove(&validator)
1428 .expect("validator should be registered");
1429 engine_handlers.push(engine.start(pending, recovered, resolver));
1430 }
1431
1432 let mut finalizers = Vec::new();
1434 for supervisor in supervisors.iter_mut() {
1435 let (mut latest, mut monitor) = supervisor.subscribe().await;
1436 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1437 while latest < required_containers {
1438 latest = monitor.next().await.expect("event missing");
1439 }
1440 }));
1441 }
1442 join_all(finalizers).await;
1443
1444 let slow = &validators[0];
1446 for supervisor in supervisors.iter() {
1447 {
1449 let faults = supervisor.faults.lock().unwrap();
1450 assert!(faults.is_empty());
1451 }
1452
1453 {
1455 let invalid = supervisor.invalid.lock().unwrap();
1456 assert_eq!(*invalid, 0);
1457 }
1458
1459 {
1461 let notarizes = supervisor.notarizes.lock().unwrap();
1462 for (view, payloads) in notarizes.iter() {
1463 for (_, participants) in payloads.iter() {
1464 if participants.contains(slow) {
1465 panic!("view: {view}");
1466 }
1467 }
1468 }
1469 }
1470 {
1471 let finalizes = supervisor.finalizes.lock().unwrap();
1472 for (view, payloads) in finalizes.iter() {
1473 for (_, finalizers) in payloads.iter() {
1474 if finalizers.contains(slow) {
1475 panic!("view: {view}");
1476 }
1477 }
1478 }
1479 }
1480 }
1481
1482 let blocked = oracle.blocked().await.unwrap();
1484 assert!(blocked.is_empty());
1485 });
1486 }
1487
1488 #[test_traced]
1489 fn test_slow_validator() {
1490 slow_validator::<MinPk>();
1491 slow_validator::<MinSig>();
1492 }
1493
1494 fn all_recovery<V: Variant>() {
1495 let n = 5;
1497 let threshold = quorum(n);
1498 let required_containers = 100;
1499 let activity_timeout = 10;
1500 let skip_timeout = 2;
1501 let namespace = b"consensus".to_vec();
1502 let executor = deterministic::Runner::timed(Duration::from_secs(180));
1503 executor.start(|mut context| async move {
1504 let (network, mut oracle) = Network::new(
1506 context.with_label("network"),
1507 Config {
1508 max_size: 1024 * 1024,
1509 },
1510 );
1511
1512 network.start();
1514
1515 let mut schemes = Vec::new();
1517 let mut validators = Vec::new();
1518 for i in 0..n {
1519 let scheme = PrivateKey::from_seed(i as u64);
1520 let pk = scheme.public_key();
1521 schemes.push(scheme);
1522 validators.push(pk);
1523 }
1524 validators.sort();
1525 schemes.sort_by_key(|s| s.public_key());
1526 let mut registrations = register_validators(&mut oracle, &validators).await;
1527
1528 let link = Link {
1530 latency: 3_000.0,
1531 jitter: 0.0,
1532 success_rate: 1.0,
1533 };
1534 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1535
1536 let (polynomial, shares) =
1538 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1539
1540 let relay = Arc::new(mocks::relay::Relay::new());
1542 let mut supervisors = Vec::new();
1543 let mut engine_handlers = Vec::new();
1544 for (idx, scheme) in schemes.iter().enumerate() {
1545 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1547
1548 let validator = scheme.public_key();
1550 let mut participants = BTreeMap::new();
1551 participants.insert(
1552 0,
1553 (polynomial.clone(), validators.clone(), shares[idx].clone()),
1554 );
1555 let supervisor_config = mocks::supervisor::Config::<_, V> {
1556 namespace: namespace.clone(),
1557 participants,
1558 };
1559 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1560 supervisors.push(supervisor.clone());
1561 let application_cfg = mocks::application::Config {
1562 hasher: Sha256::default(),
1563 relay: relay.clone(),
1564 participant: validator.clone(),
1565 propose_latency: (10.0, 5.0),
1566 verify_latency: (10.0, 5.0),
1567 };
1568 let (actor, application) = mocks::application::Application::new(
1569 context.with_label("application"),
1570 application_cfg,
1571 );
1572 actor.start();
1573 let blocker = oracle.control(scheme.public_key());
1574 let cfg = config::Config {
1575 crypto: scheme.clone(),
1576 blocker,
1577 automaton: application.clone(),
1578 relay: application.clone(),
1579 reporter: supervisor.clone(),
1580 supervisor,
1581 partition: validator.to_string(),
1582 compression: Some(3),
1583 mailbox_size: 1024,
1584 namespace: namespace.clone(),
1585 leader_timeout: Duration::from_secs(1),
1586 notarization_timeout: Duration::from_secs(2),
1587 nullify_retry: Duration::from_secs(10),
1588 fetch_timeout: Duration::from_secs(1),
1589 activity_timeout,
1590 skip_timeout,
1591 max_fetch_count: 1,
1592 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1593 fetch_concurrent: 1,
1594 replay_buffer: 1024 * 1024,
1595 write_buffer: 1024 * 1024,
1596 };
1597 let engine = Engine::new(context.with_label("engine"), cfg);
1598
1599 let (pending, recovered, resolver) = registrations
1601 .remove(&validator)
1602 .expect("validator should be registered");
1603 engine_handlers.push(engine.start(pending, recovered, resolver));
1604 }
1605
1606 let mut finalizers = Vec::new();
1608 for supervisor in supervisors.iter_mut() {
1609 let (_, mut monitor) = supervisor.subscribe().await;
1610 finalizers.push(
1611 context
1612 .with_label("finalizer")
1613 .spawn(move |context| async move {
1614 select! {
1615 _timeout = context.sleep(Duration::from_secs(60)) => {},
1616 _done = monitor.next() => {
1617 panic!("engine should not notarize or finalize anything");
1618 }
1619 }
1620 }),
1621 );
1622 }
1623 join_all(finalizers).await;
1624
1625 link_validators(&mut oracle, &validators, Action::Unlink, None).await;
1627
1628 context.sleep(Duration::from_secs(60)).await;
1630
1631 let mut latest = 0;
1633 for supervisor in supervisors.iter() {
1634 let nullifies = supervisor.nullifies.lock().unwrap();
1635 let max = nullifies.keys().max().unwrap();
1636 if *max > latest {
1637 latest = *max;
1638 }
1639 }
1640
1641 let link = Link {
1643 latency: 10.0,
1644 jitter: 1.0,
1645 success_rate: 1.0,
1646 };
1647 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1648
1649 let mut finalizers = Vec::new();
1651 for supervisor in supervisors.iter_mut() {
1652 let (mut latest, mut monitor) = supervisor.subscribe().await;
1653 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1654 while latest < required_containers {
1655 latest = monitor.next().await.expect("event missing");
1656 }
1657 }));
1658 }
1659 join_all(finalizers).await;
1660
1661 for supervisor in supervisors.iter() {
1663 {
1665 let faults = supervisor.faults.lock().unwrap();
1666 assert!(faults.is_empty());
1667 }
1668
1669 {
1671 let invalid = supervisor.invalid.lock().unwrap();
1672 assert_eq!(*invalid, 0);
1673 }
1674
1675 {
1680 let mut found = 0;
1682 let finalizations = supervisor.finalizations.lock().unwrap();
1683 for i in latest..latest + activity_timeout {
1684 if finalizations.contains_key(&i) {
1685 found += 1;
1686 }
1687 }
1688 assert!(found >= activity_timeout - 2, "found: {found}");
1689 }
1690 }
1691
1692 let blocked = oracle.blocked().await.unwrap();
1694 assert!(blocked.is_empty());
1695 });
1696 }
1697
1698 #[test_traced]
1699 fn test_all_recovery() {
1700 all_recovery::<MinPk>();
1701 all_recovery::<MinSig>();
1702 }
1703
1704 fn partition<V: Variant>() {
1705 let n = 10;
1707 let threshold = quorum(n);
1708 let required_containers = 50;
1709 let activity_timeout = 10;
1710 let skip_timeout = 5;
1711 let namespace = b"consensus".to_vec();
1712 let executor = deterministic::Runner::timed(Duration::from_secs(900));
1713 executor.start(|mut context| async move {
1714 let (network, mut oracle) = Network::new(
1716 context.with_label("network"),
1717 Config {
1718 max_size: 1024 * 1024,
1719 },
1720 );
1721
1722 network.start();
1724
1725 let mut schemes = Vec::new();
1727 let mut validators = Vec::new();
1728 for i in 0..n {
1729 let scheme = PrivateKey::from_seed(i as u64);
1730 let pk = scheme.public_key();
1731 schemes.push(scheme);
1732 validators.push(pk);
1733 }
1734 validators.sort();
1735 schemes.sort_by_key(|s| s.public_key());
1736 let mut registrations = register_validators(&mut oracle, &validators).await;
1737
1738 let link = Link {
1740 latency: 10.0,
1741 jitter: 1.0,
1742 success_rate: 1.0,
1743 };
1744 link_validators(&mut oracle, &validators, Action::Link(link.clone()), None).await;
1745
1746 let (polynomial, shares) =
1748 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1749
1750 let relay = Arc::new(mocks::relay::Relay::new());
1752 let mut supervisors = Vec::new();
1753 let mut engine_handlers = Vec::new();
1754 for (idx, scheme) in schemes.iter().enumerate() {
1755 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1757
1758 let validator = scheme.public_key();
1760 let mut participants = BTreeMap::new();
1761 participants.insert(
1762 0,
1763 (polynomial.clone(), validators.clone(), shares[idx].clone()),
1764 );
1765 let supervisor_config = mocks::supervisor::Config::<_, V> {
1766 namespace: namespace.clone(),
1767 participants,
1768 };
1769 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1770 supervisors.push(supervisor.clone());
1771 let application_cfg = mocks::application::Config {
1772 hasher: Sha256::default(),
1773 relay: relay.clone(),
1774 participant: validator.clone(),
1775 propose_latency: (10.0, 5.0),
1776 verify_latency: (10.0, 5.0),
1777 };
1778 let (actor, application) = mocks::application::Application::new(
1779 context.with_label("application"),
1780 application_cfg,
1781 );
1782 actor.start();
1783 let blocker = oracle.control(scheme.public_key());
1784 let cfg = config::Config {
1785 crypto: scheme.clone(),
1786 blocker,
1787 automaton: application.clone(),
1788 relay: application.clone(),
1789 reporter: supervisor.clone(),
1790 supervisor,
1791 partition: validator.to_string(),
1792 compression: Some(3),
1793 mailbox_size: 1024,
1794 namespace: namespace.clone(),
1795 leader_timeout: Duration::from_secs(1),
1796 notarization_timeout: Duration::from_secs(2),
1797 nullify_retry: Duration::from_secs(10),
1798 fetch_timeout: Duration::from_secs(1),
1799 activity_timeout,
1800 skip_timeout,
1801 max_fetch_count: 1,
1802 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1803 fetch_concurrent: 1,
1804 replay_buffer: 1024 * 1024,
1805 write_buffer: 1024 * 1024,
1806 };
1807 let engine = Engine::new(context.with_label("engine"), cfg);
1808
1809 let (pending, recovered, resolver) = registrations
1811 .remove(&validator)
1812 .expect("validator should be registered");
1813 engine_handlers.push(engine.start(pending, recovered, resolver));
1814 }
1815
1816 let mut finalizers = Vec::new();
1818 for supervisor in supervisors.iter_mut() {
1819 let (mut latest, mut monitor) = supervisor.subscribe().await;
1820 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1821 while latest < required_containers {
1822 latest = monitor.next().await.expect("event missing");
1823 }
1824 }));
1825 }
1826 join_all(finalizers).await;
1827
1828 fn separated(n: usize, a: usize, b: usize) -> bool {
1830 let m = n / 2;
1831 (a < m && b >= m) || (a >= m && b < m)
1832 }
1833 link_validators(&mut oracle, &validators, Action::Unlink, Some(separated)).await;
1834
1835 context.sleep(Duration::from_secs(10)).await;
1837
1838 let mut finalizers = Vec::new();
1840 for supervisor in supervisors.iter_mut() {
1841 let (_, mut monitor) = supervisor.subscribe().await;
1842 finalizers.push(
1843 context
1844 .with_label("finalizer")
1845 .spawn(move |context| async move {
1846 select! {
1847 _timeout = context.sleep(Duration::from_secs(60)) => {},
1848 _done = monitor.next() => {
1849 panic!("engine should not notarize or finalize anything");
1850 }
1851 }
1852 }),
1853 );
1854 }
1855 join_all(finalizers).await;
1856
1857 link_validators(
1859 &mut oracle,
1860 &validators,
1861 Action::Link(link),
1862 Some(separated),
1863 )
1864 .await;
1865
1866 let mut finalizers = Vec::new();
1868 for supervisor in supervisors.iter_mut() {
1869 let (mut latest, mut monitor) = supervisor.subscribe().await;
1870 let required = latest + required_containers;
1871 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1872 while latest < required {
1873 latest = monitor.next().await.expect("event missing");
1874 }
1875 }));
1876 }
1877 join_all(finalizers).await;
1878
1879 for supervisor in supervisors.iter() {
1881 {
1883 let faults = supervisor.faults.lock().unwrap();
1884 assert!(faults.is_empty());
1885 }
1886
1887 {
1889 let invalid = supervisor.invalid.lock().unwrap();
1890 assert_eq!(*invalid, 0);
1891 }
1892 }
1893
1894 let blocked = oracle.blocked().await.unwrap();
1896 assert!(blocked.is_empty());
1897 });
1898 }
1899
1900 #[test_traced]
1901 #[ignore]
1902 fn test_partition() {
1903 partition::<MinPk>();
1904 partition::<MinSig>();
1905 }
1906
1907 fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
1908 let n = 5;
1910 let threshold = quorum(n);
1911 let required_containers = 50;
1912 let activity_timeout = 10;
1913 let skip_timeout = 5;
1914 let namespace = b"consensus".to_vec();
1915 let cfg = deterministic::Config::new()
1916 .with_seed(seed)
1917 .with_timeout(Some(Duration::from_secs(5_000)));
1918 let executor = deterministic::Runner::new(cfg);
1919 executor.start(|mut context| async move {
1920 let (network, mut oracle) = Network::new(
1922 context.with_label("network"),
1923 Config {
1924 max_size: 1024 * 1024,
1925 },
1926 );
1927
1928 network.start();
1930
1931 let mut schemes = Vec::new();
1933 let mut validators = Vec::new();
1934 for i in 0..n {
1935 let scheme = PrivateKey::from_seed(i as u64);
1936 let pk = scheme.public_key();
1937 schemes.push(scheme);
1938 validators.push(pk);
1939 }
1940 validators.sort();
1941 schemes.sort_by_key(|s| s.public_key());
1942 let mut registrations = register_validators(&mut oracle, &validators).await;
1943
1944 let degraded_link = Link {
1946 latency: 200.0,
1947 jitter: 150.0,
1948 success_rate: 0.5,
1949 };
1950 link_validators(&mut oracle, &validators, Action::Link(degraded_link), None).await;
1951
1952 let (polynomial, shares) =
1954 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1955
1956 let relay = Arc::new(mocks::relay::Relay::new());
1958 let mut supervisors = Vec::new();
1959 let mut engine_handlers = Vec::new();
1960 for (idx, scheme) in schemes.into_iter().enumerate() {
1961 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1963
1964 let validator = scheme.public_key();
1966 let mut participants = BTreeMap::new();
1967 participants.insert(
1968 0,
1969 (polynomial.clone(), validators.clone(), shares[idx].clone()),
1970 );
1971 let supervisor_config = mocks::supervisor::Config::<_, V> {
1972 namespace: namespace.clone(),
1973 participants,
1974 };
1975 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1976 supervisors.push(supervisor.clone());
1977 let application_cfg = mocks::application::Config {
1978 hasher: Sha256::default(),
1979 relay: relay.clone(),
1980 participant: validator.clone(),
1981 propose_latency: (10.0, 5.0),
1982 verify_latency: (10.0, 5.0),
1983 };
1984 let (actor, application) = mocks::application::Application::new(
1985 context.with_label("application"),
1986 application_cfg,
1987 );
1988 actor.start();
1989 let blocker = oracle.control(scheme.public_key());
1990 let cfg = config::Config {
1991 crypto: scheme,
1992 blocker,
1993 automaton: application.clone(),
1994 relay: application.clone(),
1995 reporter: supervisor.clone(),
1996 supervisor,
1997 partition: validator.to_string(),
1998 compression: Some(3),
1999 mailbox_size: 1024,
2000 namespace: namespace.clone(),
2001 leader_timeout: Duration::from_secs(1),
2002 notarization_timeout: Duration::from_secs(2),
2003 nullify_retry: Duration::from_secs(10),
2004 fetch_timeout: Duration::from_secs(1),
2005 activity_timeout,
2006 skip_timeout,
2007 max_fetch_count: 1,
2008 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2009 fetch_concurrent: 1,
2010 replay_buffer: 1024 * 1024,
2011 write_buffer: 1024 * 1024,
2012 };
2013 let engine = Engine::new(context.with_label("engine"), cfg);
2014
2015 let (pending, recovered, resolver) = registrations
2017 .remove(&validator)
2018 .expect("validator should be registered");
2019 engine_handlers.push(engine.start(pending, recovered, resolver));
2020 }
2021
2022 let mut finalizers = Vec::new();
2024 for supervisor in supervisors.iter_mut() {
2025 let (mut latest, mut monitor) = supervisor.subscribe().await;
2026 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2027 while latest < required_containers {
2028 latest = monitor.next().await.expect("event missing");
2029 }
2030 }));
2031 }
2032 join_all(finalizers).await;
2033
2034 for supervisor in supervisors.iter() {
2036 {
2038 let faults = supervisor.faults.lock().unwrap();
2039 assert!(faults.is_empty());
2040 }
2041
2042 {
2044 let invalid = supervisor.invalid.lock().unwrap();
2045 assert_eq!(*invalid, 0);
2046 }
2047 }
2048
2049 let blocked = oracle.blocked().await.unwrap();
2051 assert!(blocked.is_empty());
2052
2053 context.auditor().state()
2054 })
2055 }
2056
2057 #[test_traced]
2058 fn test_slow_and_lossy_links() {
2059 slow_and_lossy_links::<MinPk>(0);
2060 slow_and_lossy_links::<MinSig>(0);
2061 }
2062
2063 #[test_traced]
2064 #[ignore]
2065 fn test_determinism() {
2066 for seed in 1..6 {
2069 let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
2070 let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
2071 assert_eq!(pk_state_1, pk_state_2);
2072
2073 let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
2074 let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
2075 assert_eq!(sig_state_1, sig_state_2);
2076
2077 assert_ne!(pk_state_1, sig_state_1);
2079 }
2080 }
2081
2082 fn conflicter<V: Variant>(seed: u64) {
2083 let n = 4;
2085 let threshold = quorum(n);
2086 let required_containers = 50;
2087 let activity_timeout = 10;
2088 let skip_timeout = 5;
2089 let namespace = b"consensus".to_vec();
2090 let cfg = deterministic::Config::new()
2091 .with_seed(seed)
2092 .with_timeout(Some(Duration::from_secs(30)));
2093 let executor = deterministic::Runner::new(cfg);
2094 executor.start(|mut context| async move {
2095 let (network, mut oracle) = Network::new(
2097 context.with_label("network"),
2098 Config {
2099 max_size: 1024 * 1024,
2100 },
2101 );
2102
2103 network.start();
2105
2106 let mut schemes = Vec::new();
2108 let mut validators = Vec::new();
2109 for i in 0..n {
2110 let scheme = PrivateKey::from_seed(i as u64);
2111 let pk = scheme.public_key();
2112 schemes.push(scheme);
2113 validators.push(pk);
2114 }
2115 validators.sort();
2116 schemes.sort_by_key(|s| s.public_key());
2117 let mut registrations = register_validators(&mut oracle, &validators).await;
2118
2119 let link = Link {
2121 latency: 10.0,
2122 jitter: 1.0,
2123 success_rate: 1.0,
2124 };
2125 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2126
2127 let (polynomial, shares) =
2129 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2130
2131 let relay = Arc::new(mocks::relay::Relay::new());
2133 let mut supervisors = Vec::new();
2134 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2135 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2137
2138 let validator = scheme.public_key();
2140 let mut participants = BTreeMap::new();
2141 participants.insert(
2142 0,
2143 (
2144 polynomial.clone(),
2145 validators.clone(),
2146 shares[idx_scheme].clone(),
2147 ),
2148 );
2149 let supervisor_config = mocks::supervisor::Config::<_, V> {
2150 namespace: namespace.clone(),
2151 participants,
2152 };
2153 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2154 let (pending, recovered, resolver) = registrations
2155 .remove(&validator)
2156 .expect("validator should be registered");
2157 if idx_scheme == 0 {
2158 let cfg = mocks::conflicter::Config {
2159 supervisor,
2160 namespace: namespace.clone(),
2161 };
2162
2163 let engine: mocks::conflicter::Conflicter<_, V, Sha256, _> =
2164 mocks::conflicter::Conflicter::new(
2165 context.with_label("byzantine_engine"),
2166 cfg,
2167 );
2168 engine.start(pending);
2169 } else {
2170 supervisors.push(supervisor.clone());
2171 let application_cfg = mocks::application::Config {
2172 hasher: Sha256::default(),
2173 relay: relay.clone(),
2174 participant: validator.clone(),
2175 propose_latency: (10.0, 5.0),
2176 verify_latency: (10.0, 5.0),
2177 };
2178 let (actor, application) = mocks::application::Application::new(
2179 context.with_label("application"),
2180 application_cfg,
2181 );
2182 actor.start();
2183 let blocker = oracle.control(scheme.public_key());
2184 let cfg = config::Config {
2185 crypto: scheme,
2186 blocker,
2187 automaton: application.clone(),
2188 relay: application.clone(),
2189 reporter: supervisor.clone(),
2190 supervisor,
2191 partition: validator.to_string(),
2192 compression: Some(3),
2193 mailbox_size: 1024,
2194 namespace: namespace.clone(),
2195 leader_timeout: Duration::from_secs(1),
2196 notarization_timeout: Duration::from_secs(2),
2197 nullify_retry: Duration::from_secs(10),
2198 fetch_timeout: Duration::from_secs(1),
2199 activity_timeout,
2200 skip_timeout,
2201 max_fetch_count: 1,
2202 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2203 fetch_concurrent: 1,
2204 replay_buffer: 1024 * 1024,
2205 write_buffer: 1024 * 1024,
2206 };
2207 let engine = Engine::new(context.with_label("engine"), cfg);
2208 engine.start(pending, recovered, resolver);
2209 }
2210 }
2211
2212 let mut finalizers = Vec::new();
2214 for supervisor in supervisors.iter_mut() {
2215 let (mut latest, mut monitor) = supervisor.subscribe().await;
2216 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2217 while latest < required_containers {
2218 latest = monitor.next().await.expect("event missing");
2219 }
2220 }));
2221 }
2222 join_all(finalizers).await;
2223
2224 let byz = &validators[0];
2226 let mut count_conflicting = 0;
2227 for supervisor in supervisors.iter() {
2228 {
2230 let faults = supervisor.faults.lock().unwrap();
2231 assert_eq!(faults.len(), 1);
2232 let faulter = faults.get(byz).expect("byzantine party is not faulter");
2233 for (_, faults) in faulter.iter() {
2234 for fault in faults.iter() {
2235 match fault {
2236 Activity::ConflictingNotarize(_) => {
2237 count_conflicting += 1;
2238 }
2239 Activity::ConflictingFinalize(_) => {
2240 count_conflicting += 1;
2241 }
2242 _ => panic!("unexpected fault: {fault:?}"),
2243 }
2244 }
2245 }
2246 }
2247
2248 {
2250 let invalid = supervisor.invalid.lock().unwrap();
2251 assert_eq!(*invalid, 0);
2252 }
2253 }
2254 assert!(count_conflicting > 0);
2255
2256 let blocked = oracle.blocked().await.unwrap();
2258 assert!(!blocked.is_empty());
2259 for (a, b) in blocked {
2260 assert_ne!(&a, byz);
2261 assert_eq!(&b, byz);
2262 }
2263 });
2264 }
2265
2266 #[test_traced]
2267 #[ignore]
2268 fn test_conflicter() {
2269 for seed in 0..5 {
2270 conflicter::<MinPk>(seed);
2271 conflicter::<MinSig>(seed);
2272 }
2273 }
2274
2275 fn invalid<V: Variant>(seed: u64) {
2276 let n = 4;
2278 let threshold = quorum(n);
2279 let required_containers = 50;
2280 let activity_timeout = 10;
2281 let skip_timeout = 5;
2282 let namespace = b"consensus".to_vec();
2283 let cfg = deterministic::Config::new()
2284 .with_seed(seed)
2285 .with_timeout(Some(Duration::from_secs(30)));
2286 let executor = deterministic::Runner::new(cfg);
2287 executor.start(|mut context| async move {
2288 let (network, mut oracle) = Network::new(
2290 context.with_label("network"),
2291 Config {
2292 max_size: 1024 * 1024,
2293 },
2294 );
2295
2296 network.start();
2298
2299 let mut schemes = Vec::new();
2301 let mut validators = Vec::new();
2302 for i in 0..n {
2303 let scheme = PrivateKey::from_seed(i as u64);
2304 let pk = scheme.public_key();
2305 schemes.push(scheme);
2306 validators.push(pk);
2307 }
2308 validators.sort();
2309 schemes.sort_by_key(|s| s.public_key());
2310 let mut registrations = register_validators(&mut oracle, &validators).await;
2311
2312 let link = Link {
2314 latency: 10.0,
2315 jitter: 1.0,
2316 success_rate: 1.0,
2317 };
2318 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2319
2320 let (polynomial, shares) =
2322 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2323
2324 let relay = Arc::new(mocks::relay::Relay::new());
2326 let mut supervisors = Vec::new();
2327 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2328 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2330
2331 let validator = scheme.public_key();
2333 let mut participants = BTreeMap::new();
2334 participants.insert(
2335 0,
2336 (
2337 polynomial.clone(),
2338 validators.clone(),
2339 shares[idx_scheme].clone(),
2340 ),
2341 );
2342 let supervisor_config = mocks::supervisor::Config::<_, V> {
2343 namespace: namespace.clone(),
2344 participants,
2345 };
2346 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2347 let (pending, recovered, resolver) = registrations
2348 .remove(&validator)
2349 .expect("validator should be registered");
2350 if idx_scheme == 0 {
2351 let cfg = mocks::invalid::Config {
2352 supervisor,
2353 namespace: namespace.clone(),
2354 };
2355
2356 let engine: mocks::invalid::Invalid<_, V, Sha256, _> =
2357 mocks::invalid::Invalid::new(context.with_label("byzantine_engine"), cfg);
2358 engine.start(pending);
2359 } else {
2360 supervisors.push(supervisor.clone());
2361 let application_cfg = mocks::application::Config {
2362 hasher: Sha256::default(),
2363 relay: relay.clone(),
2364 participant: validator.clone(),
2365 propose_latency: (10.0, 5.0),
2366 verify_latency: (10.0, 5.0),
2367 };
2368 let (actor, application) = mocks::application::Application::new(
2369 context.with_label("application"),
2370 application_cfg,
2371 );
2372 actor.start();
2373 let blocker = oracle.control(scheme.public_key());
2374 let cfg = config::Config {
2375 crypto: scheme,
2376 blocker,
2377 automaton: application.clone(),
2378 relay: application.clone(),
2379 reporter: supervisor.clone(),
2380 supervisor,
2381 partition: validator.to_string(),
2382 compression: Some(3),
2383 mailbox_size: 1024,
2384 namespace: namespace.clone(),
2385 leader_timeout: Duration::from_secs(1),
2386 notarization_timeout: Duration::from_secs(2),
2387 nullify_retry: Duration::from_secs(10),
2388 fetch_timeout: Duration::from_secs(1),
2389 activity_timeout,
2390 skip_timeout,
2391 max_fetch_count: 1,
2392 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2393 fetch_concurrent: 1,
2394 replay_buffer: 1024 * 1024,
2395 write_buffer: 1024 * 1024,
2396 };
2397 let engine = Engine::new(context.with_label("engine"), cfg);
2398 engine.start(pending, recovered, resolver);
2399 }
2400 }
2401
2402 let mut finalizers = Vec::new();
2404 for supervisor in supervisors.iter_mut() {
2405 let (mut latest, mut monitor) = supervisor.subscribe().await;
2406 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2407 while latest < required_containers {
2408 latest = monitor.next().await.expect("event missing");
2409 }
2410 }));
2411 }
2412 join_all(finalizers).await;
2413
2414 let mut invalid_count = 0;
2416 let byz = &validators[0];
2417 for supervisor in supervisors.iter() {
2418 {
2420 let faults = supervisor.faults.lock().unwrap();
2421 assert!(faults.is_empty());
2422 }
2423
2424 {
2426 let invalid = supervisor.invalid.lock().unwrap();
2427 if *invalid > 0 {
2428 invalid_count += 1;
2429 }
2430 }
2431 }
2432 assert_eq!(invalid_count, n - 1);
2433
2434 let blocked = oracle.blocked().await.unwrap();
2436 assert!(!blocked.is_empty());
2437 for (a, b) in blocked {
2438 assert_ne!(&a, byz);
2439 assert_eq!(&b, byz);
2440 }
2441 });
2442 }
2443
2444 #[test_traced]
2445 #[ignore]
2446 fn test_invalid() {
2447 for seed in 0..5 {
2448 invalid::<MinPk>(seed);
2449 invalid::<MinSig>(seed);
2450 }
2451 }
2452
2453 fn impersonator<V: Variant>(seed: u64) {
2454 let n = 4;
2456 let threshold = quorum(n);
2457 let required_containers = 50;
2458 let activity_timeout = 10;
2459 let skip_timeout = 5;
2460 let namespace = b"consensus".to_vec();
2461 let cfg = deterministic::Config::new()
2462 .with_seed(seed)
2463 .with_timeout(Some(Duration::from_secs(30)));
2464 let executor = deterministic::Runner::new(cfg);
2465 executor.start(|mut context| async move {
2466 let (network, mut oracle) = Network::new(
2468 context.with_label("network"),
2469 Config {
2470 max_size: 1024 * 1024,
2471 },
2472 );
2473
2474 network.start();
2476
2477 let mut schemes = Vec::new();
2479 let mut validators = Vec::new();
2480 for i in 0..n {
2481 let scheme = PrivateKey::from_seed(i as u64);
2482 let pk = scheme.public_key();
2483 schemes.push(scheme);
2484 validators.push(pk);
2485 }
2486 validators.sort();
2487 schemes.sort_by_key(|s| s.public_key());
2488 let mut registrations = register_validators(&mut oracle, &validators).await;
2489
2490 let link = Link {
2492 latency: 10.0,
2493 jitter: 1.0,
2494 success_rate: 1.0,
2495 };
2496 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2497
2498 let (polynomial, shares) =
2500 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2501
2502 let relay = Arc::new(mocks::relay::Relay::new());
2504 let mut supervisors = Vec::new();
2505 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2506 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2508
2509 let validator = scheme.public_key();
2511 let mut participants = BTreeMap::new();
2512 participants.insert(
2513 0,
2514 (
2515 polynomial.clone(),
2516 validators.clone(),
2517 shares[idx_scheme].clone(),
2518 ),
2519 );
2520 let supervisor_config = mocks::supervisor::Config::<_, V> {
2521 namespace: namespace.clone(),
2522 participants,
2523 };
2524 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2525 let (pending, recovered, resolver) = registrations
2526 .remove(&validator)
2527 .expect("validator should be registered");
2528 if idx_scheme == 0 {
2529 let cfg = mocks::impersonator::Config {
2530 supervisor,
2531 namespace: namespace.clone(),
2532 };
2533
2534 let engine: mocks::impersonator::Impersonator<_, V, Sha256, _> =
2535 mocks::impersonator::Impersonator::new(
2536 context.with_label("byzantine_engine"),
2537 cfg,
2538 );
2539 engine.start(pending);
2540 } else {
2541 supervisors.push(supervisor.clone());
2542 let application_cfg = mocks::application::Config {
2543 hasher: Sha256::default(),
2544 relay: relay.clone(),
2545 participant: validator.clone(),
2546 propose_latency: (10.0, 5.0),
2547 verify_latency: (10.0, 5.0),
2548 };
2549 let (actor, application) = mocks::application::Application::new(
2550 context.with_label("application"),
2551 application_cfg,
2552 );
2553 actor.start();
2554 let blocker = oracle.control(scheme.public_key());
2555 let cfg = config::Config {
2556 crypto: scheme,
2557 blocker,
2558 automaton: application.clone(),
2559 relay: application.clone(),
2560 reporter: supervisor.clone(),
2561 supervisor,
2562 partition: validator.to_string(),
2563 compression: Some(3),
2564 mailbox_size: 1024,
2565 namespace: namespace.clone(),
2566 leader_timeout: Duration::from_secs(1),
2567 notarization_timeout: Duration::from_secs(2),
2568 nullify_retry: Duration::from_secs(10),
2569 fetch_timeout: Duration::from_secs(1),
2570 activity_timeout,
2571 skip_timeout,
2572 max_fetch_count: 1,
2573 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2574 fetch_concurrent: 1,
2575 replay_buffer: 1024 * 1024,
2576 write_buffer: 1024 * 1024,
2577 };
2578 let engine = Engine::new(context.with_label("engine"), cfg);
2579 engine.start(pending, recovered, resolver);
2580 }
2581 }
2582
2583 let mut finalizers = Vec::new();
2585 for supervisor in supervisors.iter_mut() {
2586 let (mut latest, mut monitor) = supervisor.subscribe().await;
2587 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2588 while latest < required_containers {
2589 latest = monitor.next().await.expect("event missing");
2590 }
2591 }));
2592 }
2593 join_all(finalizers).await;
2594
2595 let byz = &validators[0];
2597 for supervisor in supervisors.iter() {
2598 {
2600 let faults = supervisor.faults.lock().unwrap();
2601 assert!(faults.is_empty());
2602 }
2603
2604 {
2606 let invalid = supervisor.invalid.lock().unwrap();
2607 assert_eq!(*invalid, 0);
2608 }
2609 }
2610
2611 let blocked = oracle.blocked().await.unwrap();
2613 assert!(!blocked.is_empty());
2614 for (a, b) in blocked {
2615 assert_ne!(&a, byz);
2616 assert_eq!(&b, byz);
2617 }
2618 });
2619 }
2620
2621 #[test_traced]
2622 #[ignore]
2623 fn test_impersonator() {
2624 for seed in 0..5 {
2625 impersonator::<MinPk>(seed);
2626 impersonator::<MinSig>(seed);
2627 }
2628 }
2629
2630 fn nuller<V: Variant>(seed: u64) {
2631 let n = 4;
2633 let threshold = quorum(n);
2634 let required_containers = 50;
2635 let activity_timeout = 10;
2636 let skip_timeout = 5;
2637 let namespace = b"consensus".to_vec();
2638 let cfg = deterministic::Config::new()
2639 .with_seed(seed)
2640 .with_timeout(Some(Duration::from_secs(30)));
2641 let executor = deterministic::Runner::new(cfg);
2642 executor.start(|mut context| async move {
2643 let (network, mut oracle) = Network::new(
2645 context.with_label("network"),
2646 Config {
2647 max_size: 1024 * 1024,
2648 },
2649 );
2650
2651 network.start();
2653
2654 let mut schemes = Vec::new();
2656 let mut validators = Vec::new();
2657 for i in 0..n {
2658 let scheme = PrivateKey::from_seed(i as u64);
2659 let pk = scheme.public_key();
2660 schemes.push(scheme);
2661 validators.push(pk);
2662 }
2663 validators.sort();
2664 schemes.sort_by_key(|s| s.public_key());
2665 let mut registrations = register_validators(&mut oracle, &validators).await;
2666
2667 let link = Link {
2669 latency: 10.0,
2670 jitter: 1.0,
2671 success_rate: 1.0,
2672 };
2673 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2674
2675 let (polynomial, shares) =
2677 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2678
2679 let relay = Arc::new(mocks::relay::Relay::new());
2681 let mut supervisors = Vec::new();
2682 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2683 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2685
2686 let validator = scheme.public_key();
2688 let mut participants = BTreeMap::new();
2689 participants.insert(
2690 0,
2691 (
2692 polynomial.clone(),
2693 validators.clone(),
2694 shares[idx_scheme].clone(),
2695 ),
2696 );
2697 let supervisor_config = mocks::supervisor::Config::<_, V> {
2698 namespace: namespace.clone(),
2699 participants,
2700 };
2701 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2702 let (pending, recovered, resolver) = registrations
2703 .remove(&validator)
2704 .expect("validator should be registered");
2705 if idx_scheme == 0 {
2706 let cfg = mocks::nuller::Config {
2707 supervisor,
2708 namespace: namespace.clone(),
2709 };
2710 let engine: mocks::nuller::Nuller<_, V, Sha256, _> =
2711 mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
2712 engine.start(pending);
2713 } else {
2714 supervisors.push(supervisor.clone());
2715 let application_cfg = mocks::application::Config {
2716 hasher: Sha256::default(),
2717 relay: relay.clone(),
2718 participant: validator.clone(),
2719 propose_latency: (10.0, 5.0),
2720 verify_latency: (10.0, 5.0),
2721 };
2722 let (actor, application) = mocks::application::Application::new(
2723 context.with_label("application"),
2724 application_cfg,
2725 );
2726 actor.start();
2727 let blocker = oracle.control(scheme.public_key());
2728 let cfg = config::Config {
2729 crypto: scheme,
2730 blocker,
2731 automaton: application.clone(),
2732 relay: application.clone(),
2733 reporter: supervisor.clone(),
2734 supervisor,
2735 partition: validator.to_string(),
2736 compression: Some(3),
2737 mailbox_size: 1024,
2738 namespace: namespace.clone(),
2739 leader_timeout: Duration::from_secs(1),
2740 notarization_timeout: Duration::from_secs(2),
2741 nullify_retry: Duration::from_secs(10),
2742 fetch_timeout: Duration::from_secs(1),
2743 activity_timeout,
2744 skip_timeout,
2745 max_fetch_count: 1,
2746 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2747 fetch_concurrent: 1,
2748 replay_buffer: 1024 * 1024,
2749 write_buffer: 1024 * 1024,
2750 };
2751 let engine = Engine::new(context.with_label("engine"), cfg);
2752 engine.start(pending, recovered, resolver);
2753 }
2754 }
2755
2756 let mut finalizers = Vec::new();
2758 for supervisor in supervisors.iter_mut() {
2759 let (mut latest, mut monitor) = supervisor.subscribe().await;
2760 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2761 while latest < required_containers {
2762 latest = monitor.next().await.expect("event missing");
2763 }
2764 }));
2765 }
2766 join_all(finalizers).await;
2767
2768 let byz = &validators[0];
2770 let mut count_nullify_and_finalize = 0;
2771 for supervisor in supervisors.iter() {
2772 {
2774 let faults = supervisor.faults.lock().unwrap();
2775 assert_eq!(faults.len(), 1);
2776 let faulter = faults.get(byz).expect("byzantine party is not faulter");
2777 for (_, faults) in faulter.iter() {
2778 for fault in faults.iter() {
2779 match fault {
2780 Activity::NullifyFinalize(_) => {
2781 count_nullify_and_finalize += 1;
2782 }
2783 _ => panic!("unexpected fault: {fault:?}"),
2784 }
2785 }
2786 }
2787 }
2788
2789 {
2791 let invalid = supervisor.invalid.lock().unwrap();
2792 assert_eq!(*invalid, 0);
2793 }
2794 }
2795 assert!(count_nullify_and_finalize > 0);
2796
2797 let blocked = oracle.blocked().await.unwrap();
2799 assert!(!blocked.is_empty());
2800 for (a, b) in blocked {
2801 assert_ne!(&a, byz);
2802 assert_eq!(&b, byz);
2803 }
2804 });
2805 }
2806
2807 #[test_traced]
2808 #[ignore]
2809 fn test_nuller() {
2810 for seed in 0..5 {
2811 nuller::<MinPk>(seed);
2812 nuller::<MinSig>(seed);
2813 }
2814 }
2815
2816 fn outdated<V: Variant>(seed: u64) {
2817 let n = 4;
2819 let threshold = quorum(n);
2820 let required_containers = 100;
2821 let activity_timeout = 10;
2822 let skip_timeout = 5;
2823 let namespace = b"consensus".to_vec();
2824 let cfg = deterministic::Config::new()
2825 .with_seed(seed)
2826 .with_timeout(Some(Duration::from_secs(30)));
2827 let executor = deterministic::Runner::new(cfg);
2828 executor.start(|mut context| async move {
2829 let (network, mut oracle) = Network::new(
2831 context.with_label("network"),
2832 Config {
2833 max_size: 1024 * 1024,
2834 },
2835 );
2836
2837 network.start();
2839
2840 let mut schemes = Vec::new();
2842 let mut validators = Vec::new();
2843 for i in 0..n {
2844 let scheme = PrivateKey::from_seed(i as u64);
2845 let pk = scheme.public_key();
2846 schemes.push(scheme);
2847 validators.push(pk);
2848 }
2849 validators.sort();
2850 schemes.sort_by_key(|s| s.public_key());
2851 let mut registrations = register_validators(&mut oracle, &validators).await;
2852
2853 let link = Link {
2855 latency: 10.0,
2856 jitter: 1.0,
2857 success_rate: 1.0,
2858 };
2859 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2860
2861 let (polynomial, shares) =
2863 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2864
2865 let relay = Arc::new(mocks::relay::Relay::new());
2867 let mut supervisors = Vec::new();
2868 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2869 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2871
2872 let validator = scheme.public_key();
2874 let mut participants = BTreeMap::new();
2875 participants.insert(
2876 0,
2877 (
2878 polynomial.clone(),
2879 validators.clone(),
2880 shares[idx_scheme].clone(),
2881 ),
2882 );
2883 let supervisor_config = mocks::supervisor::Config::<_, V> {
2884 namespace: namespace.clone(),
2885 participants,
2886 };
2887 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2888 let (pending, recovered, resolver) = registrations
2889 .remove(&validator)
2890 .expect("validator should be registered");
2891 if idx_scheme == 0 {
2892 let cfg = mocks::outdated::Config {
2893 supervisor,
2894 namespace: namespace.clone(),
2895 view_delta: activity_timeout * 4,
2896 };
2897 let engine: mocks::outdated::Outdated<_, V, Sha256, _> =
2898 mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
2899 engine.start(pending);
2900 } else {
2901 supervisors.push(supervisor.clone());
2902 let application_cfg = mocks::application::Config {
2903 hasher: Sha256::default(),
2904 relay: relay.clone(),
2905 participant: validator.clone(),
2906 propose_latency: (10.0, 5.0),
2907 verify_latency: (10.0, 5.0),
2908 };
2909 let (actor, application) = mocks::application::Application::new(
2910 context.with_label("application"),
2911 application_cfg,
2912 );
2913 actor.start();
2914 let blocker = oracle.control(scheme.public_key());
2915 let cfg = config::Config {
2916 crypto: scheme,
2917 blocker,
2918 automaton: application.clone(),
2919 relay: application.clone(),
2920 reporter: supervisor.clone(),
2921 supervisor,
2922 partition: validator.to_string(),
2923 compression: Some(3),
2924 mailbox_size: 1024,
2925 namespace: namespace.clone(),
2926 leader_timeout: Duration::from_secs(1),
2927 notarization_timeout: Duration::from_secs(2),
2928 nullify_retry: Duration::from_secs(10),
2929 fetch_timeout: Duration::from_secs(1),
2930 activity_timeout,
2931 skip_timeout,
2932 max_fetch_count: 1,
2933 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2934 fetch_concurrent: 1,
2935 replay_buffer: 1024 * 1024,
2936 write_buffer: 1024 * 1024,
2937 };
2938 let engine = Engine::new(context.with_label("engine"), cfg);
2939 engine.start(pending, recovered, resolver);
2940 }
2941 }
2942
2943 let mut finalizers = Vec::new();
2945 for supervisor in supervisors.iter_mut() {
2946 let (mut latest, mut monitor) = supervisor.subscribe().await;
2947 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2948 while latest < required_containers {
2949 latest = monitor.next().await.expect("event missing");
2950 }
2951 }));
2952 }
2953 join_all(finalizers).await;
2954
2955 for supervisor in supervisors.iter() {
2957 {
2959 let faults = supervisor.faults.lock().unwrap();
2960 assert!(faults.is_empty());
2961 }
2962
2963 {
2965 let invalid = supervisor.invalid.lock().unwrap();
2966 assert_eq!(*invalid, 0);
2967 }
2968 }
2969
2970 let blocked = oracle.blocked().await.unwrap();
2972 assert!(blocked.is_empty());
2973 });
2974 }
2975
2976 #[test_traced]
2977 #[ignore]
2978 fn test_outdated() {
2979 for seed in 0..5 {
2980 outdated::<MinPk>(seed);
2981 outdated::<MinSig>(seed);
2982 }
2983 }
2984
2985 fn run_1k<V: Variant>() {
2986 let n = 10;
2988 let threshold = quorum(n);
2989 let required_containers = 1_000;
2990 let activity_timeout = 10;
2991 let skip_timeout = 5;
2992 let namespace = b"consensus".to_vec();
2993 let cfg = deterministic::Config::new();
2994 let executor = deterministic::Runner::new(cfg);
2995 executor.start(|mut context| async move {
2996 let (network, mut oracle) = Network::new(
2998 context.with_label("network"),
2999 Config {
3000 max_size: 1024 * 1024,
3001 },
3002 );
3003
3004 network.start();
3006
3007 let mut schemes = Vec::new();
3009 let mut validators = Vec::new();
3010 for i in 0..n {
3011 let scheme = PrivateKey::from_seed(i as u64);
3012 let pk = scheme.public_key();
3013 schemes.push(scheme);
3014 validators.push(pk);
3015 }
3016 validators.sort();
3017 schemes.sort_by_key(|s| s.public_key());
3018 let mut registrations = register_validators(&mut oracle, &validators).await;
3019
3020 let link = Link {
3022 latency: 80.0,
3023 jitter: 10.0,
3024 success_rate: 0.98,
3025 };
3026 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
3027
3028 let (polynomial, shares) =
3030 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
3031
3032 let relay = Arc::new(mocks::relay::Relay::new());
3034 let mut supervisors = Vec::new();
3035 let mut engine_handlers = Vec::new();
3036 for (idx, scheme) in schemes.into_iter().enumerate() {
3037 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
3039
3040 let validator = scheme.public_key();
3042 let mut participants = BTreeMap::new();
3043 participants.insert(
3044 0,
3045 (polynomial.clone(), validators.clone(), shares[idx].clone()),
3046 );
3047 let supervisor_config = mocks::supervisor::Config::<_, V> {
3048 namespace: namespace.clone(),
3049 participants,
3050 };
3051 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
3052 supervisors.push(supervisor.clone());
3053 let application_cfg = mocks::application::Config {
3054 hasher: Sha256::default(),
3055 relay: relay.clone(),
3056 participant: validator.clone(),
3057 propose_latency: (100.0, 50.0),
3058 verify_latency: (50.0, 40.0),
3059 };
3060 let (actor, application) = mocks::application::Application::new(
3061 context.with_label("application"),
3062 application_cfg,
3063 );
3064 actor.start();
3065 let blocker = oracle.control(scheme.public_key());
3066 let cfg = config::Config {
3067 crypto: scheme,
3068 blocker,
3069 automaton: application.clone(),
3070 relay: application.clone(),
3071 reporter: supervisor.clone(),
3072 supervisor,
3073 partition: validator.to_string(),
3074 compression: Some(3),
3075 mailbox_size: 1024,
3076 namespace: namespace.clone(),
3077 leader_timeout: Duration::from_secs(1),
3078 notarization_timeout: Duration::from_secs(2),
3079 nullify_retry: Duration::from_secs(10),
3080 fetch_timeout: Duration::from_secs(1),
3081 activity_timeout,
3082 skip_timeout,
3083 max_fetch_count: 1,
3084 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
3085 fetch_concurrent: 1,
3086 replay_buffer: 1024 * 1024,
3087 write_buffer: 1024 * 1024,
3088 };
3089 let engine = Engine::new(context.with_label("engine"), cfg);
3090
3091 let (pending, recovered, resolver) = registrations
3093 .remove(&validator)
3094 .expect("validator should be registered");
3095 engine_handlers.push(engine.start(pending, recovered, resolver));
3096 }
3097
3098 let mut finalizers = Vec::new();
3100 for supervisor in supervisors.iter_mut() {
3101 let (mut latest, mut monitor) = supervisor.subscribe().await;
3102 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3103 while latest < required_containers {
3104 latest = monitor.next().await.expect("event missing");
3105 }
3106 }));
3107 }
3108 join_all(finalizers).await;
3109
3110 for supervisor in supervisors.iter() {
3112 {
3114 let faults = supervisor.faults.lock().unwrap();
3115 assert!(faults.is_empty());
3116 }
3117
3118 {
3120 let invalid = supervisor.invalid.lock().unwrap();
3121 assert_eq!(*invalid, 0);
3122 }
3123 }
3124
3125 let blocked = oracle.blocked().await.unwrap();
3127 assert!(blocked.is_empty());
3128 })
3129 }
3130
3131 #[test_traced]
3132 #[ignore]
3133 fn test_1k() {
3134 run_1k::<MinPk>();
3135 run_1k::<MinSig>();
3136 }
3137
3138 fn tle<V: Variant>() {
3139 let n = 4;
3141 let threshold = quorum(n);
3142 let namespace = b"consensus".to_vec();
3143 let activity_timeout = 100;
3144 let skip_timeout = 50;
3145 let executor = deterministic::Runner::timed(Duration::from_secs(30));
3146 executor.start(|mut context| async move {
3147 let (network, mut oracle) = Network::new(
3149 context.with_label("network"),
3150 Config {
3151 max_size: 1024 * 1024,
3152 },
3153 );
3154
3155 network.start();
3157
3158 let mut schemes = Vec::new();
3160 let mut validators = Vec::new();
3161 for i in 0..n {
3162 let scheme = PrivateKey::from_seed(i as u64);
3163 let pk = scheme.public_key();
3164 schemes.push(scheme);
3165 validators.push(pk);
3166 }
3167 validators.sort();
3168 schemes.sort_by_key(|s| s.public_key());
3169 let mut registrations = register_validators(&mut oracle, &validators).await;
3170
3171 let link = Link {
3173 latency: 10.0,
3174 jitter: 5.0,
3175 success_rate: 1.0,
3176 };
3177 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
3178
3179 let (polynomial, shares) =
3181 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
3182 let public_key = *public::<V>(&polynomial);
3183
3184 let relay = Arc::new(mocks::relay::Relay::new());
3186 let mut supervisors = Vec::new();
3187 let mut engine_handlers = Vec::new();
3188 let monitor_supervisor = Arc::new(Mutex::new(None));
3189 for (idx, scheme) in schemes.into_iter().enumerate() {
3190 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
3192
3193 let validator = scheme.public_key();
3195 let mut participants = BTreeMap::new();
3196 participants.insert(
3197 0,
3198 (polynomial.clone(), validators.clone(), shares[idx].clone()),
3199 );
3200
3201 let supervisor_config = mocks::supervisor::Config::<_, V> {
3203 namespace: namespace.clone(),
3204 participants,
3205 };
3206 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
3207 supervisors.push(supervisor.clone());
3208 if idx == 0 {
3209 *monitor_supervisor.lock().unwrap() = Some(supervisor.clone());
3210 }
3211
3212 let application_cfg = mocks::application::Config {
3214 hasher: Sha256::default(),
3215 relay: relay.clone(),
3216 participant: validator.clone(),
3217 propose_latency: (10.0, 5.0),
3218 verify_latency: (10.0, 5.0),
3219 };
3220 let (actor, application) = mocks::application::Application::new(
3221 context.with_label("application"),
3222 application_cfg,
3223 );
3224 actor.start();
3225 let blocker = oracle.control(scheme.public_key());
3226 let cfg = config::Config {
3227 crypto: scheme,
3228 blocker,
3229 automaton: application.clone(),
3230 relay: application.clone(),
3231 reporter: supervisor.clone(),
3232 supervisor,
3233 partition: validator.to_string(),
3234 compression: Some(3),
3235 mailbox_size: 1024,
3236 namespace: namespace.clone(),
3237 leader_timeout: Duration::from_millis(100),
3238 notarization_timeout: Duration::from_millis(200),
3239 nullify_retry: Duration::from_millis(500),
3240 fetch_timeout: Duration::from_millis(100),
3241 activity_timeout,
3242 skip_timeout,
3243 max_fetch_count: 1,
3244 fetch_rate_per_peer: Quota::per_second(NZU32!(10)),
3245 fetch_concurrent: 1,
3246 replay_buffer: 1024 * 1024,
3247 write_buffer: 1024 * 1024,
3248 };
3249 let engine = Engine::new(context.with_label("engine"), cfg);
3250
3251 let (pending, recovered, resolver) = registrations
3253 .remove(&validator)
3254 .expect("validator should be registered");
3255 engine_handlers.push(engine.start(pending, recovered, resolver));
3256 }
3257
3258 let target = 10u64; let target_bytes = target.to_be_bytes();
3261 let message_content = b"Secret message for future view10"; let message = Block::new(*message_content);
3263
3264 let seed_namespace = seed_namespace(&namespace);
3266 let ciphertext = encrypt::<_, V>(
3267 &mut context,
3268 public_key,
3269 (Some(&seed_namespace), &target_bytes),
3270 &message,
3271 );
3272
3273 let supervisor = monitor_supervisor.lock().unwrap().clone().unwrap();
3275 loop {
3276 context.sleep(Duration::from_millis(100)).await;
3278 let notarizations = supervisor.notarizations.lock().unwrap();
3279 let Some(notarization) = notarizations.get(&target) else {
3280 continue;
3281 };
3282
3283 let seed_signature = notarization.seed_signature;
3285 let decrypted = decrypt::<V>(&seed_signature, &ciphertext)
3286 .expect("Decryption should succeed with valid seed signature");
3287 assert_eq!(
3288 message.as_ref(),
3289 decrypted.as_ref(),
3290 "Decrypted message should match original message"
3291 );
3292 break;
3293 }
3294 });
3295 }
3296
3297 #[test_traced]
3298 fn test_tle() {
3299 tle::<MinPk>();
3300 tle::<MinSig>();
3301 }
3302}