1use types::View;
168
169pub mod types;
170
171cfg_if::cfg_if! {
172 if #[cfg(not(target_arch = "wasm32"))] {
173 mod actors;
174 mod config;
175 pub use config::Config;
176 mod engine;
177 pub use engine::Engine;
178 mod metrics;
179 }
180}
181
182#[cfg(test)]
183pub mod mocks;
184
185pub(crate) fn min_active(activity_timeout: View, last_finalized: View) -> View {
187 last_finalized.saturating_sub(activity_timeout)
188}
189
190pub(crate) fn interesting(
194 activity_timeout: View,
195 last_finalized: View,
196 current: View,
197 pending: View,
198 allow_future: bool,
199) -> bool {
200 if pending < min_active(activity_timeout, last_finalized) {
201 return false;
202 }
203 if !allow_future && pending > current + 1 {
204 return false;
205 }
206 true
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212 use crate::{threshold_simplex::types::seed_namespace, Monitor};
213 use commonware_cryptography::{
214 bls12381::{
215 dkg::ops,
216 primitives::{
217 poly::public,
218 variant::{MinPk, MinSig, Variant},
219 },
220 tle::{decrypt, encrypt, Block},
221 },
222 ed25519::PrivateKey,
223 PrivateKeyExt as _, PublicKey, Sha256, Signer as _,
224 };
225 use commonware_macros::{select, test_traced};
226 use commonware_p2p::simulated::{Config, Link, Network, Oracle, Receiver, Sender};
227 use commonware_runtime::{buffer::PoolRef, deterministic, Clock, Metrics, Runner, Spawner};
228 use commonware_utils::{quorum, NZUsize, NZU32};
229 use engine::Engine;
230 use futures::{future::join_all, StreamExt};
231 use governor::Quota;
232 use rand::{rngs::StdRng, Rng as _, SeedableRng as _};
233 use std::{
234 collections::{BTreeMap, HashMap},
235 num::NonZeroUsize,
236 sync::{Arc, Mutex},
237 time::Duration,
238 };
239 use tracing::{debug, warn};
240 use types::Activity;
241
242 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
243 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
244
245 async fn register_validators<P: PublicKey>(
247 oracle: &mut Oracle<P>,
248 validators: &[P],
249 ) -> HashMap<
250 P,
251 (
252 (Sender<P>, Receiver<P>),
253 (Sender<P>, Receiver<P>),
254 (Sender<P>, Receiver<P>),
255 ),
256 > {
257 let mut registrations = HashMap::new();
258 for validator in validators.iter() {
259 let (pending_sender, pending_receiver) =
260 oracle.register(validator.clone(), 0).await.unwrap();
261 let (recovered_sender, recovered_receiver) =
262 oracle.register(validator.clone(), 1).await.unwrap();
263 let (resolver_sender, resolver_receiver) =
264 oracle.register(validator.clone(), 2).await.unwrap();
265 registrations.insert(
266 validator.clone(),
267 (
268 (pending_sender, pending_receiver),
269 (recovered_sender, recovered_receiver),
270 (resolver_sender, resolver_receiver),
271 ),
272 );
273 }
274 registrations
275 }
276
277 enum Action {
279 Link(Link),
280 Update(Link), Unlink,
282 }
283
284 async fn link_validators<P: PublicKey>(
290 oracle: &mut Oracle<P>,
291 validators: &[P],
292 action: Action,
293 restrict_to: Option<fn(usize, usize, usize) -> bool>,
294 ) {
295 for (i1, v1) in validators.iter().enumerate() {
296 for (i2, v2) in validators.iter().enumerate() {
297 if v2 == v1 {
299 continue;
300 }
301
302 if let Some(f) = restrict_to {
304 if !f(validators.len(), i1, i2) {
305 continue;
306 }
307 }
308
309 match action {
311 Action::Update(_) | Action::Unlink => {
312 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
313 }
314 _ => {}
315 }
316
317 match action {
319 Action::Link(ref link) | Action::Update(ref link) => {
320 oracle
321 .add_link(v1.clone(), v2.clone(), link.clone())
322 .await
323 .unwrap();
324 }
325 _ => {}
326 }
327 }
328 }
329 }
330
331 fn all_online<V: Variant>() {
332 let n = 5;
334 let threshold = quorum(n);
335 let required_containers = 100;
336 let activity_timeout = 10;
337 let skip_timeout = 5;
338 let namespace = b"consensus".to_vec();
339 let executor = deterministic::Runner::timed(Duration::from_secs(30));
340 executor.start(|mut context| async move {
341 let (network, mut oracle) = Network::new(
343 context.with_label("network"),
344 Config {
345 max_size: 1024 * 1024,
346 },
347 );
348
349 network.start();
351
352 let mut schemes = Vec::new();
354 let mut validators = Vec::new();
355 for i in 0..n {
356 let scheme = PrivateKey::from_seed(i as u64);
357 let pk = scheme.public_key();
358 schemes.push(scheme);
359 validators.push(pk);
360 }
361 validators.sort();
362 schemes.sort_by_key(|s| s.public_key());
363 let mut registrations = register_validators(&mut oracle, &validators).await;
364
365 let link = Link {
367 latency: 10.0,
368 jitter: 1.0,
369 success_rate: 1.0,
370 };
371 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
372
373 let (polynomial, shares) =
375 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
376
377 let relay = Arc::new(mocks::relay::Relay::new());
379 let mut supervisors = Vec::new();
380 let mut engine_handlers = Vec::new();
381 for (idx, scheme) in schemes.into_iter().enumerate() {
382 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
384
385 let validator = scheme.public_key();
387 let mut participants = BTreeMap::new();
388 participants.insert(
389 0,
390 (polynomial.clone(), validators.clone(), shares[idx].clone()),
391 );
392 let supervisor_config = mocks::supervisor::Config::<_, V> {
393 namespace: namespace.clone(),
394 participants,
395 };
396 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
397 supervisors.push(supervisor.clone());
398 let application_cfg = mocks::application::Config {
399 hasher: Sha256::default(),
400 relay: relay.clone(),
401 participant: validator.clone(),
402 propose_latency: (10.0, 5.0),
403 verify_latency: (10.0, 5.0),
404 };
405 let (actor, application) = mocks::application::Application::new(
406 context.with_label("application"),
407 application_cfg,
408 );
409 actor.start();
410 let blocker = oracle.control(scheme.public_key());
411 let cfg = config::Config {
412 crypto: scheme,
413 blocker,
414 automaton: application.clone(),
415 relay: application.clone(),
416 reporter: supervisor.clone(),
417 supervisor,
418 partition: validator.to_string(),
419 compression: Some(3),
420 mailbox_size: 1024,
421 namespace: namespace.clone(),
422 leader_timeout: Duration::from_secs(1),
423 notarization_timeout: Duration::from_secs(2),
424 nullify_retry: Duration::from_secs(10),
425 fetch_timeout: Duration::from_secs(1),
426 activity_timeout,
427 skip_timeout,
428 max_fetch_count: 1,
429 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
430 fetch_concurrent: 1,
431 replay_buffer: NZUsize!(1024 * 1024),
432 write_buffer: NZUsize!(1024 * 1024),
433 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
434 };
435 let engine = Engine::new(context.with_label("engine"), cfg);
436
437 let (pending, recovered, resolver) = registrations
439 .remove(&validator)
440 .expect("validator should be registered");
441 engine_handlers.push(engine.start(pending, recovered, resolver));
442 }
443
444 let mut finalizers = Vec::new();
446 for supervisor in supervisors.iter_mut() {
447 let (mut latest, mut monitor) = supervisor.subscribe().await;
448 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
449 while latest < required_containers {
450 latest = monitor.next().await.expect("event missing");
451 }
452 }));
453 }
454 join_all(finalizers).await;
455
456 let latest_complete = required_containers - activity_timeout;
458 for supervisor in supervisors.iter() {
459 {
461 let faults = supervisor.faults.lock().unwrap();
462 assert!(faults.is_empty());
463 }
464
465 {
467 let invalid = supervisor.invalid.lock().unwrap();
468 assert_eq!(*invalid, 0);
469 }
470
471 {
473 let seeds = supervisor.seeds.lock().unwrap();
474 for view in 1..latest_complete {
475 if !seeds.contains_key(&view) {
477 panic!("view: {view}");
478 }
479 }
480 }
481
482 let mut notarized = HashMap::new();
484 let mut finalized = HashMap::new();
485 {
486 let notarizes = supervisor.notarizes.lock().unwrap();
487 for view in 1..latest_complete {
488 let Some(payloads) = notarizes.get(&view) else {
490 continue;
491 };
492 if payloads.len() > 1 {
493 panic!("view: {view}");
494 }
495 let (digest, notarizers) = payloads.iter().next().unwrap();
496 notarized.insert(view, *digest);
497
498 if notarizers.len() < threshold as usize {
499 panic!("view: {view}");
502 }
503 }
504 }
505 {
506 let notarizations = supervisor.notarizations.lock().unwrap();
507 for view in 1..latest_complete {
508 let Some(notarization) = notarizations.get(&view) else {
510 continue;
511 };
512 let Some(digest) = notarized.get(&view) else {
513 continue;
514 };
515 assert_eq!(¬arization.proposal.payload, digest);
516 }
517 }
518 {
519 let finalizes = supervisor.finalizes.lock().unwrap();
520 for view in 1..latest_complete {
521 let Some(payloads) = finalizes.get(&view) else {
523 continue;
524 };
525 if payloads.len() > 1 {
526 panic!("view: {view}");
527 }
528 let (digest, finalizers) = payloads.iter().next().unwrap();
529 finalized.insert(view, *digest);
530
531 if view > latest_complete {
533 continue;
534 }
535
536 if finalizers.len() < threshold as usize {
538 panic!("view: {view}");
541 }
542
543 let nullifies = supervisor.nullifies.lock().unwrap();
545 let Some(nullifies) = nullifies.get(&view) else {
546 continue;
547 };
548 for (_, finalizers) in payloads.iter() {
549 for finalizer in finalizers.iter() {
550 if nullifies.contains(finalizer) {
551 panic!("should not nullify and finalize at same view");
552 }
553 }
554 }
555 }
556 }
557 {
558 let finalizations = supervisor.finalizations.lock().unwrap();
559 for view in 1..latest_complete {
560 let Some(finalization) = finalizations.get(&view) else {
562 continue;
563 };
564 let Some(digest) = finalized.get(&view) else {
565 continue;
566 };
567 assert_eq!(&finalization.proposal.payload, digest);
568 }
569 }
570 }
571
572 let blocked = oracle.blocked().await.unwrap();
574 assert!(blocked.is_empty());
575 });
576 }
577
578 #[test_traced]
579 fn test_all_online() {
580 all_online::<MinPk>();
581 all_online::<MinSig>();
582 }
583
584 fn unclean_shutdown<V: Variant>() {
585 let n = 5;
587 let threshold = quorum(n);
588 let required_containers = 100;
589 let activity_timeout = 10;
590 let skip_timeout = 5;
591 let namespace = b"consensus".to_vec();
592
593 let mut rng = StdRng::seed_from_u64(0);
595 let (polynomial, shares) = ops::generate_shares::<_, V>(&mut rng, None, n, threshold);
596
597 let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
599 let supervised = Arc::new(Mutex::new(Vec::new()));
600 let mut prev_ctx = None;
601
602 loop {
603 let namespace = namespace.clone();
604 let shutdowns = shutdowns.clone();
605 let supervised = supervised.clone();
606 let polynomial = polynomial.clone();
607 let shares = shares.clone();
608
609 let f = |mut context: deterministic::Context| async move {
610 let (network, mut oracle) = Network::new(
612 context.with_label("network"),
613 Config {
614 max_size: 1024 * 1024,
615 },
616 );
617
618 network.start();
620
621 let mut schemes = Vec::new();
623 let mut validators = Vec::new();
624 for i in 0..n {
625 let scheme = PrivateKey::from_seed(i as u64);
626 let pk = scheme.public_key();
627 schemes.push(scheme);
628 validators.push(pk);
629 }
630 validators.sort();
631 schemes.sort_by_key(|s| s.public_key());
632 let mut registrations = register_validators(&mut oracle, &validators).await;
633
634 let link = Link {
636 latency: 50.0,
637 jitter: 50.0,
638 success_rate: 1.0,
639 };
640 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
641
642 let relay = Arc::new(mocks::relay::Relay::new());
644 let mut supervisors = HashMap::new();
645 let mut engine_handlers = Vec::new();
646 for (idx, scheme) in schemes.into_iter().enumerate() {
647 let context = context
649 .clone()
650 .with_label(&format!("validator-{}", scheme.public_key()));
651
652 let validator = scheme.public_key();
654 let mut participants = BTreeMap::new();
655 participants.insert(
656 0,
657 (polynomial.clone(), validators.clone(), shares[idx].clone()),
658 );
659 let supervisor_config = mocks::supervisor::Config::<_, V> {
660 namespace: namespace.clone(),
661 participants,
662 };
663 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
664 supervisors.insert(validator.clone(), supervisor.clone());
665 let application_cfg = mocks::application::Config {
666 hasher: Sha256::default(),
667 relay: relay.clone(),
668 participant: validator.clone(),
669 propose_latency: (10.0, 5.0),
670 verify_latency: (10.0, 5.0),
671 };
672 let (actor, application) = mocks::application::Application::new(
673 context.with_label("application"),
674 application_cfg,
675 );
676 actor.start();
677 let blocker = oracle.control(scheme.public_key());
678 let cfg = config::Config {
679 crypto: scheme,
680 blocker,
681 automaton: application.clone(),
682 relay: application.clone(),
683 reporter: supervisor.clone(),
684 supervisor,
685 partition: validator.to_string(),
686 compression: Some(3),
687 mailbox_size: 1024,
688 namespace: namespace.clone(),
689 leader_timeout: Duration::from_secs(1),
690 notarization_timeout: Duration::from_secs(2),
691 nullify_retry: Duration::from_secs(10),
692 fetch_timeout: Duration::from_secs(1),
693 activity_timeout,
694 skip_timeout,
695 max_fetch_count: 1,
696 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
697 fetch_concurrent: 1,
698 replay_buffer: NZUsize!(1024 * 1024),
699 write_buffer: NZUsize!(1024 * 1024),
700 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
701 };
702 let engine = Engine::new(context.with_label("engine"), cfg);
703
704 let (pending, recovered, resolver) = registrations
706 .remove(&validator)
707 .expect("validator should be registered");
708 engine_handlers.push(engine.start(pending, recovered, resolver));
709 }
710
711 let mut finalizers = Vec::new();
713 for (_, supervisor) in supervisors.iter_mut() {
714 let (mut latest, mut monitor) = supervisor.subscribe().await;
715 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
716 while latest < required_containers {
717 latest = monitor.next().await.expect("event missing");
718 }
719 }));
720 }
721
722 let wait =
724 context.gen_range(Duration::from_millis(10)..Duration::from_millis(2_000));
725 let result = select! {
726 _ = context.sleep(wait) => {
727 {
729 let mut shutdowns = shutdowns.lock().unwrap();
730 debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
731 *shutdowns += 1;
732 }
733 supervised.lock().unwrap().push(supervisors);
734 (false,context)
735 },
736 _ = join_all(finalizers) => {
737 let supervised = supervised.lock().unwrap();
739 for supervisors in supervised.iter() {
740 for (_, supervisor) in supervisors.iter() {
741 let faults = supervisor.faults.lock().unwrap();
742 assert!(faults.is_empty());
743 }
744 }
745 (true,context)
746 }
747 };
748
749 let blocked = oracle.blocked().await.unwrap();
751 assert!(blocked.is_empty());
752
753 result
754 };
755
756 let (complete, context) = if let Some(prev_ctx) = prev_ctx {
757 deterministic::Runner::from(prev_ctx)
758 } else {
759 deterministic::Runner::timed(Duration::from_secs(30))
760 }
761 .start(f);
762
763 if complete {
765 break;
766 }
767
768 prev_ctx = Some(context.recover());
769 }
770 }
771
772 #[test_traced]
773 fn test_unclean_shutdown() {
774 unclean_shutdown::<MinPk>();
775 unclean_shutdown::<MinSig>();
776 }
777
778 fn backfill<V: Variant>() {
779 let n = 4;
781 let threshold = quorum(n);
782 let required_containers = 100;
783 let activity_timeout = 10;
784 let skip_timeout = 5;
785 let namespace = b"consensus".to_vec();
786 let executor = deterministic::Runner::timed(Duration::from_secs(720));
787 executor.start(|mut context| async move {
788 let (network, mut oracle) = Network::new(
790 context.with_label("network"),
791 Config {
792 max_size: 1024 * 1024,
793 },
794 );
795
796 network.start();
798
799 let mut schemes = Vec::new();
801 let mut validators = Vec::new();
802 for i in 0..n {
803 let scheme = PrivateKey::from_seed(i as u64);
804 let pk = scheme.public_key();
805 schemes.push(scheme);
806 validators.push(pk);
807 }
808 validators.sort();
809 schemes.sort_by_key(|s| s.public_key());
810 let mut registrations = register_validators(&mut oracle, &validators).await;
811
812 let link = Link {
814 latency: 10.0,
815 jitter: 1.0,
816 success_rate: 1.0,
817 };
818 link_validators(
819 &mut oracle,
820 &validators,
821 Action::Link(link),
822 Some(|_, i, j| ![i, j].contains(&0usize)),
823 )
824 .await;
825
826 let (polynomial, shares) =
828 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
829
830 let relay = Arc::new(mocks::relay::Relay::new());
832 let mut supervisors = Vec::new();
833 let mut engine_handlers = Vec::new();
834 for (idx_scheme, scheme) in schemes.iter().enumerate() {
835 if idx_scheme == 0 {
837 continue;
838 }
839
840 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
842
843 let validator = scheme.public_key();
845 let mut participants = BTreeMap::new();
846 participants.insert(
847 0,
848 (
849 polynomial.clone(),
850 validators.clone(),
851 shares[idx_scheme].clone(),
852 ),
853 );
854 let supervisor_config = mocks::supervisor::Config {
855 namespace: namespace.clone(),
856 participants,
857 };
858 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
859 supervisors.push(supervisor.clone());
860 let application_cfg = mocks::application::Config {
861 hasher: Sha256::default(),
862 relay: relay.clone(),
863 participant: validator.clone(),
864 propose_latency: (10.0, 5.0),
865 verify_latency: (10.0, 5.0),
866 };
867 let (actor, application) = mocks::application::Application::new(
868 context.with_label("application"),
869 application_cfg,
870 );
871 actor.start();
872 let blocker = oracle.control(scheme.public_key());
873 let cfg = config::Config {
874 crypto: scheme.clone(),
875 blocker,
876 automaton: application.clone(),
877 relay: application.clone(),
878 reporter: supervisor.clone(),
879 supervisor,
880 partition: validator.to_string(),
881 compression: Some(3),
882 mailbox_size: 1024,
883 namespace: namespace.clone(),
884 leader_timeout: Duration::from_secs(1),
885 notarization_timeout: Duration::from_secs(2),
886 nullify_retry: Duration::from_secs(10),
887 fetch_timeout: Duration::from_secs(1),
888 activity_timeout,
889 skip_timeout,
890 max_fetch_count: 1, fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
892 fetch_concurrent: 1,
893 replay_buffer: NZUsize!(1024 * 1024),
894 write_buffer: NZUsize!(1024 * 1024),
895 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
896 };
897 let engine = Engine::new(context.with_label("engine"), cfg);
898
899 let (pending, recovered, resolver) = registrations
901 .remove(&validator)
902 .expect("validator should be registered");
903 engine_handlers.push(engine.start(pending, recovered, resolver));
904 }
905
906 let mut finalizers = Vec::new();
908 for supervisor in supervisors.iter_mut() {
909 let (mut latest, mut monitor) = supervisor.subscribe().await;
910 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
911 while latest < required_containers {
912 latest = monitor.next().await.expect("event missing");
913 }
914 }));
915 }
916 join_all(finalizers).await;
917
918 let link = Link {
920 latency: 3_000.0,
921 jitter: 0.0,
922 success_rate: 1.0,
923 };
924 link_validators(
925 &mut oracle,
926 &validators,
927 Action::Update(link.clone()),
928 Some(|_, i, j| ![i, j].contains(&0usize)),
929 )
930 .await;
931
932 context.sleep(Duration::from_secs(120)).await;
934
935 link_validators(
937 &mut oracle,
938 &validators,
939 Action::Unlink,
940 Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
941 )
942 .await;
943
944 let scheme = schemes[0].clone();
946 let validator = scheme.public_key();
947 let context = context.with_label(&format!("validator-{validator}"));
948
949 link_validators(
951 &mut oracle,
952 &validators,
953 Action::Link(link),
954 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
955 )
956 .await;
957
958 let link = Link {
960 latency: 10.0,
961 jitter: 2.5,
962 success_rate: 1.0,
963 };
964 link_validators(
965 &mut oracle,
966 &validators,
967 Action::Update(link),
968 Some(|_, i, j| ![i, j].contains(&1usize)),
969 )
970 .await;
971
972 let mut participants = BTreeMap::new();
974 participants.insert(
975 0,
976 (polynomial.clone(), validators.clone(), shares[0].clone()),
977 );
978 let supervisor_config = mocks::supervisor::Config::<_, V> {
979 namespace: namespace.clone(),
980 participants,
981 };
982 let mut supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
983 supervisors.push(supervisor.clone());
984 let application_cfg = mocks::application::Config {
985 hasher: Sha256::default(),
986 relay: relay.clone(),
987 participant: validator.clone(),
988 propose_latency: (10.0, 5.0),
989 verify_latency: (10.0, 5.0),
990 };
991 let (actor, application) = mocks::application::Application::new(
992 context.with_label("application"),
993 application_cfg,
994 );
995 actor.start();
996 let blocker = oracle.control(scheme.public_key());
997 let cfg = config::Config {
998 crypto: scheme,
999 blocker,
1000 automaton: application.clone(),
1001 relay: application.clone(),
1002 reporter: supervisor.clone(),
1003 supervisor: supervisor.clone(),
1004 partition: validator.to_string(),
1005 compression: Some(3),
1006 mailbox_size: 1024,
1007 namespace: namespace.clone(),
1008 leader_timeout: Duration::from_secs(1),
1009 notarization_timeout: Duration::from_secs(2),
1010 nullify_retry: Duration::from_secs(10),
1011 fetch_timeout: Duration::from_secs(1),
1012 activity_timeout,
1013 skip_timeout,
1014 max_fetch_count: 1,
1015 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1016 fetch_concurrent: 1,
1017 replay_buffer: NZUsize!(1024 * 1024),
1018 write_buffer: NZUsize!(1024 * 1024),
1019 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1020 };
1021 let engine = Engine::new(context.with_label("engine"), cfg);
1022
1023 let (pending, recovered, resolver) = registrations
1025 .remove(&validator)
1026 .expect("validator should be registered");
1027 engine_handlers.push(engine.start(pending, recovered, resolver));
1028
1029 let (mut latest, mut monitor) = supervisor.subscribe().await;
1031 while latest < required_containers {
1032 latest = monitor.next().await.expect("event missing");
1033 }
1034
1035 let blocked = oracle.blocked().await.unwrap();
1037 assert!(blocked.is_empty());
1038 });
1039 }
1040
1041 #[test_traced]
1042 fn test_backfill() {
1043 backfill::<MinPk>();
1044 backfill::<MinSig>();
1045 }
1046
1047 fn one_offline<V: Variant>() {
1048 let n = 5;
1050 let threshold = quorum(n);
1051 let required_containers = 100;
1052 let activity_timeout = 10;
1053 let skip_timeout = 5;
1054 let max_exceptions = 10;
1055 let namespace = b"consensus".to_vec();
1056 let executor = deterministic::Runner::timed(Duration::from_secs(30));
1057 executor.start(|mut context| async move {
1058 let (network, mut oracle) = Network::new(
1060 context.with_label("network"),
1061 Config {
1062 max_size: 1024 * 1024,
1063 },
1064 );
1065
1066 network.start();
1068
1069 let mut schemes = Vec::new();
1071 let mut validators = Vec::new();
1072 for i in 0..n {
1073 let scheme = PrivateKey::from_seed(i as u64);
1074 let pk = scheme.public_key();
1075 schemes.push(scheme);
1076 validators.push(pk);
1077 }
1078 validators.sort();
1079 schemes.sort_by_key(|s| s.public_key());
1080 let mut registrations = register_validators(&mut oracle, &validators).await;
1081
1082 let link = Link {
1084 latency: 10.0,
1085 jitter: 1.0,
1086 success_rate: 1.0,
1087 };
1088 link_validators(
1089 &mut oracle,
1090 &validators,
1091 Action::Link(link),
1092 Some(|_, i, j| ![i, j].contains(&0usize)),
1093 )
1094 .await;
1095
1096 let (polynomial, shares) =
1098 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1099
1100 let relay = Arc::new(mocks::relay::Relay::new());
1102 let mut supervisors = Vec::new();
1103 let mut engine_handlers = Vec::new();
1104 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1105 if idx_scheme == 0 {
1107 continue;
1108 }
1109
1110 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1112
1113 let validator = scheme.public_key();
1115 let mut participants = BTreeMap::new();
1116 participants.insert(
1117 0,
1118 (
1119 polynomial.clone(),
1120 validators.clone(),
1121 shares[idx_scheme].clone(),
1122 ),
1123 );
1124 let supervisor_config = mocks::supervisor::Config::<_, V> {
1125 namespace: namespace.clone(),
1126 participants,
1127 };
1128 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1129 supervisors.push(supervisor.clone());
1130 let application_cfg = mocks::application::Config {
1131 hasher: Sha256::default(),
1132 relay: relay.clone(),
1133 participant: validator.clone(),
1134 propose_latency: (10.0, 5.0),
1135 verify_latency: (10.0, 5.0),
1136 };
1137 let (actor, application) = mocks::application::Application::new(
1138 context.with_label("application"),
1139 application_cfg,
1140 );
1141 actor.start();
1142 let blocker = oracle.control(scheme.public_key());
1143 let cfg = config::Config {
1144 crypto: scheme,
1145 blocker,
1146 automaton: application.clone(),
1147 relay: application.clone(),
1148 reporter: supervisor.clone(),
1149 supervisor,
1150 partition: validator.to_string(),
1151 compression: Some(3),
1152 mailbox_size: 1024,
1153 namespace: namespace.clone(),
1154 leader_timeout: Duration::from_secs(1),
1155 notarization_timeout: Duration::from_secs(2),
1156 nullify_retry: Duration::from_secs(10),
1157 fetch_timeout: Duration::from_secs(1),
1158 activity_timeout,
1159 skip_timeout,
1160 max_fetch_count: 1,
1161 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1162 fetch_concurrent: 1,
1163 replay_buffer: NZUsize!(1024 * 1024),
1164 write_buffer: NZUsize!(1024 * 1024),
1165 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1166 };
1167 let engine = Engine::new(context.with_label("engine"), cfg);
1168
1169 let (pending, recovered, resolver) = registrations
1171 .remove(&validator)
1172 .expect("validator should be registered");
1173 engine_handlers.push(engine.start(pending, recovered, resolver));
1174 }
1175
1176 let mut finalizers = Vec::new();
1178 for supervisor in supervisors.iter_mut() {
1179 let (mut latest, mut monitor) = supervisor.subscribe().await;
1180 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1181 while latest < required_containers {
1182 latest = monitor.next().await.expect("event missing");
1183 }
1184 }));
1185 }
1186 join_all(finalizers).await;
1187
1188 let exceptions = 0;
1190 let offline = &validators[0];
1191 for supervisor in supervisors.iter() {
1192 {
1194 let faults = supervisor.faults.lock().unwrap();
1195 assert!(faults.is_empty());
1196 }
1197
1198 {
1200 let invalid = supervisor.invalid.lock().unwrap();
1201 assert_eq!(*invalid, 0);
1202 }
1203
1204 let mut exceptions = 0;
1206 {
1207 let notarizes = supervisor.notarizes.lock().unwrap();
1208 for (view, payloads) in notarizes.iter() {
1209 for (_, participants) in payloads.iter() {
1210 if participants.contains(offline) {
1211 panic!("view: {view}");
1212 }
1213 }
1214 }
1215 }
1216 {
1217 let nullifies = supervisor.nullifies.lock().unwrap();
1218 for (view, participants) in nullifies.iter() {
1219 if participants.contains(offline) {
1220 panic!("view: {view}");
1221 }
1222 }
1223 }
1224 {
1225 let finalizes = supervisor.finalizes.lock().unwrap();
1226 for (view, payloads) in finalizes.iter() {
1227 for (_, finalizers) in payloads.iter() {
1228 if finalizers.contains(offline) {
1229 panic!("view: {view}");
1230 }
1231 }
1232 }
1233 }
1234
1235 let mut offline_views = Vec::new();
1237 {
1238 let leaders = supervisor.leaders.lock().unwrap();
1239 for (view, leader) in leaders.iter() {
1240 if leader == offline {
1241 offline_views.push(*view);
1242 }
1243 }
1244 }
1245 assert!(!offline_views.is_empty());
1246
1247 {
1249 let nullifies = supervisor.nullifies.lock().unwrap();
1250 for view in offline_views.iter() {
1251 let nullifies = nullifies.get(view).map_or(0, |n| n.len());
1252 if nullifies < threshold as usize {
1253 warn!("missing expected view nullifies: {}", view);
1254 exceptions += 1;
1255 }
1256 }
1257 }
1258 {
1259 let nullifications = supervisor.nullifications.lock().unwrap();
1260 for view in offline_views.iter() {
1261 if !nullifications.contains_key(view) {
1262 warn!("missing expected view nullifies: {}", view);
1263 exceptions += 1;
1264 }
1265 }
1266 }
1267
1268 assert!(exceptions <= max_exceptions);
1270 }
1271 assert!(exceptions <= max_exceptions);
1272
1273 let blocked = oracle.blocked().await.unwrap();
1275 assert!(blocked.is_empty());
1276
1277 let encoded = context.encode();
1279 let lines = encoded.lines();
1280 let mut skipped_views = 0;
1281 let mut nodes_skipping = 0;
1282 for line in lines {
1283 if line.contains("_engine_voter_skipped_views_total") {
1284 let parts: Vec<&str> = line.split_whitespace().collect();
1285 if let Some(number_str) = parts.last() {
1286 if let Ok(number) = number_str.parse::<u64>() {
1287 if number > 0 {
1288 nodes_skipping += 1;
1289 }
1290 if number > skipped_views {
1291 skipped_views = number;
1292 }
1293 }
1294 }
1295 }
1296 }
1297 assert!(
1298 skipped_views > 0,
1299 "expected skipped views to be greater than 0"
1300 );
1301 assert_eq!(
1302 nodes_skipping,
1303 n - 1,
1304 "expected all online nodes to be skipping views"
1305 );
1306 });
1307 }
1308
1309 #[test_traced]
1310 fn test_one_offline() {
1311 one_offline::<MinPk>();
1312 one_offline::<MinSig>();
1313 }
1314
1315 fn slow_validator<V: Variant>() {
1316 let n = 5;
1318 let threshold = quorum(n);
1319 let required_containers = 50;
1320 let activity_timeout = 10;
1321 let skip_timeout = 5;
1322 let namespace = b"consensus".to_vec();
1323 let executor = deterministic::Runner::timed(Duration::from_secs(30));
1324 executor.start(|mut context| async move {
1325 let (network, mut oracle) = Network::new(
1327 context.with_label("network"),
1328 Config {
1329 max_size: 1024 * 1024,
1330 },
1331 );
1332
1333 network.start();
1335
1336 let mut schemes = Vec::new();
1338 let mut validators = Vec::new();
1339 for i in 0..n {
1340 let scheme = PrivateKey::from_seed(i as u64);
1341 let pk = scheme.public_key();
1342 schemes.push(scheme);
1343 validators.push(pk);
1344 }
1345 validators.sort();
1346 schemes.sort_by_key(|s| s.public_key());
1347 let mut registrations = register_validators(&mut oracle, &validators).await;
1348
1349 let link = Link {
1351 latency: 10.0,
1352 jitter: 1.0,
1353 success_rate: 1.0,
1354 };
1355 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1356
1357 let (polynomial, shares) =
1359 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1360
1361 let relay = Arc::new(mocks::relay::Relay::new());
1363 let mut supervisors = Vec::new();
1364 let mut engine_handlers = Vec::new();
1365 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1366 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1368
1369 let validator = scheme.public_key();
1371 let mut participants = BTreeMap::new();
1372 participants.insert(
1373 0,
1374 (
1375 polynomial.clone(),
1376 validators.clone(),
1377 shares[idx_scheme].clone(),
1378 ),
1379 );
1380 let supervisor_config = mocks::supervisor::Config::<_, V> {
1381 namespace: namespace.clone(),
1382 participants,
1383 };
1384 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1385 supervisors.push(supervisor.clone());
1386 let application_cfg = if idx_scheme == 0 {
1387 mocks::application::Config {
1388 hasher: Sha256::default(),
1389 relay: relay.clone(),
1390 participant: validator.clone(),
1391 propose_latency: (10_000.0, 0.0),
1392 verify_latency: (10_000.0, 5.0),
1393 }
1394 } else {
1395 mocks::application::Config {
1396 hasher: Sha256::default(),
1397 relay: relay.clone(),
1398 participant: validator.clone(),
1399 propose_latency: (10.0, 5.0),
1400 verify_latency: (10.0, 5.0),
1401 }
1402 };
1403 let (actor, application) = mocks::application::Application::new(
1404 context.with_label("application"),
1405 application_cfg,
1406 );
1407 actor.start();
1408 let blocker = oracle.control(scheme.public_key());
1409 let cfg = config::Config {
1410 crypto: scheme,
1411 blocker,
1412 automaton: application.clone(),
1413 relay: application.clone(),
1414 reporter: supervisor.clone(),
1415 supervisor,
1416 partition: validator.to_string(),
1417 compression: Some(3),
1418 mailbox_size: 1024,
1419 namespace: namespace.clone(),
1420 leader_timeout: Duration::from_secs(1),
1421 notarization_timeout: Duration::from_secs(2),
1422 nullify_retry: Duration::from_secs(10),
1423 fetch_timeout: Duration::from_secs(1),
1424 activity_timeout,
1425 skip_timeout,
1426 max_fetch_count: 1,
1427 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1428 fetch_concurrent: 1,
1429 replay_buffer: NZUsize!(1024 * 1024),
1430 write_buffer: NZUsize!(1024 * 1024),
1431 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1432 };
1433 let engine = Engine::new(context.with_label("engine"), cfg);
1434
1435 let (pending, recovered, resolver) = registrations
1437 .remove(&validator)
1438 .expect("validator should be registered");
1439 engine_handlers.push(engine.start(pending, recovered, resolver));
1440 }
1441
1442 let mut finalizers = Vec::new();
1444 for supervisor in supervisors.iter_mut() {
1445 let (mut latest, mut monitor) = supervisor.subscribe().await;
1446 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1447 while latest < required_containers {
1448 latest = monitor.next().await.expect("event missing");
1449 }
1450 }));
1451 }
1452 join_all(finalizers).await;
1453
1454 let slow = &validators[0];
1456 for supervisor in supervisors.iter() {
1457 {
1459 let faults = supervisor.faults.lock().unwrap();
1460 assert!(faults.is_empty());
1461 }
1462
1463 {
1465 let invalid = supervisor.invalid.lock().unwrap();
1466 assert_eq!(*invalid, 0);
1467 }
1468
1469 {
1471 let notarizes = supervisor.notarizes.lock().unwrap();
1472 for (view, payloads) in notarizes.iter() {
1473 for (_, participants) in payloads.iter() {
1474 if participants.contains(slow) {
1475 panic!("view: {view}");
1476 }
1477 }
1478 }
1479 }
1480 {
1481 let finalizes = supervisor.finalizes.lock().unwrap();
1482 for (view, payloads) in finalizes.iter() {
1483 for (_, finalizers) in payloads.iter() {
1484 if finalizers.contains(slow) {
1485 panic!("view: {view}");
1486 }
1487 }
1488 }
1489 }
1490 }
1491
1492 let blocked = oracle.blocked().await.unwrap();
1494 assert!(blocked.is_empty());
1495 });
1496 }
1497
1498 #[test_traced]
1499 fn test_slow_validator() {
1500 slow_validator::<MinPk>();
1501 slow_validator::<MinSig>();
1502 }
1503
1504 fn all_recovery<V: Variant>() {
1505 let n = 5;
1507 let threshold = quorum(n);
1508 let required_containers = 100;
1509 let activity_timeout = 10;
1510 let skip_timeout = 2;
1511 let namespace = b"consensus".to_vec();
1512 let executor = deterministic::Runner::timed(Duration::from_secs(180));
1513 executor.start(|mut context| async move {
1514 let (network, mut oracle) = Network::new(
1516 context.with_label("network"),
1517 Config {
1518 max_size: 1024 * 1024,
1519 },
1520 );
1521
1522 network.start();
1524
1525 let mut schemes = Vec::new();
1527 let mut validators = Vec::new();
1528 for i in 0..n {
1529 let scheme = PrivateKey::from_seed(i as u64);
1530 let pk = scheme.public_key();
1531 schemes.push(scheme);
1532 validators.push(pk);
1533 }
1534 validators.sort();
1535 schemes.sort_by_key(|s| s.public_key());
1536 let mut registrations = register_validators(&mut oracle, &validators).await;
1537
1538 let link = Link {
1540 latency: 3_000.0,
1541 jitter: 0.0,
1542 success_rate: 1.0,
1543 };
1544 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1545
1546 let (polynomial, shares) =
1548 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1549
1550 let relay = Arc::new(mocks::relay::Relay::new());
1552 let mut supervisors = Vec::new();
1553 let mut engine_handlers = Vec::new();
1554 for (idx, scheme) in schemes.iter().enumerate() {
1555 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1557
1558 let validator = scheme.public_key();
1560 let mut participants = BTreeMap::new();
1561 participants.insert(
1562 0,
1563 (polynomial.clone(), validators.clone(), shares[idx].clone()),
1564 );
1565 let supervisor_config = mocks::supervisor::Config::<_, V> {
1566 namespace: namespace.clone(),
1567 participants,
1568 };
1569 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1570 supervisors.push(supervisor.clone());
1571 let application_cfg = mocks::application::Config {
1572 hasher: Sha256::default(),
1573 relay: relay.clone(),
1574 participant: validator.clone(),
1575 propose_latency: (10.0, 5.0),
1576 verify_latency: (10.0, 5.0),
1577 };
1578 let (actor, application) = mocks::application::Application::new(
1579 context.with_label("application"),
1580 application_cfg,
1581 );
1582 actor.start();
1583 let blocker = oracle.control(scheme.public_key());
1584 let cfg = config::Config {
1585 crypto: scheme.clone(),
1586 blocker,
1587 automaton: application.clone(),
1588 relay: application.clone(),
1589 reporter: supervisor.clone(),
1590 supervisor,
1591 partition: validator.to_string(),
1592 compression: Some(3),
1593 mailbox_size: 1024,
1594 namespace: namespace.clone(),
1595 leader_timeout: Duration::from_secs(1),
1596 notarization_timeout: Duration::from_secs(2),
1597 nullify_retry: Duration::from_secs(10),
1598 fetch_timeout: Duration::from_secs(1),
1599 activity_timeout,
1600 skip_timeout,
1601 max_fetch_count: 1,
1602 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1603 fetch_concurrent: 1,
1604 replay_buffer: NZUsize!(1024 * 1024),
1605 write_buffer: NZUsize!(1024 * 1024),
1606 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1607 };
1608 let engine = Engine::new(context.with_label("engine"), cfg);
1609
1610 let (pending, recovered, resolver) = registrations
1612 .remove(&validator)
1613 .expect("validator should be registered");
1614 engine_handlers.push(engine.start(pending, recovered, resolver));
1615 }
1616
1617 let mut finalizers = Vec::new();
1619 for supervisor in supervisors.iter_mut() {
1620 let (_, mut monitor) = supervisor.subscribe().await;
1621 finalizers.push(
1622 context
1623 .with_label("finalizer")
1624 .spawn(move |context| async move {
1625 select! {
1626 _timeout = context.sleep(Duration::from_secs(60)) => {},
1627 _done = monitor.next() => {
1628 panic!("engine should not notarize or finalize anything");
1629 }
1630 }
1631 }),
1632 );
1633 }
1634 join_all(finalizers).await;
1635
1636 link_validators(&mut oracle, &validators, Action::Unlink, None).await;
1638
1639 context.sleep(Duration::from_secs(60)).await;
1641
1642 let mut latest = 0;
1644 for supervisor in supervisors.iter() {
1645 let nullifies = supervisor.nullifies.lock().unwrap();
1646 let max = nullifies.keys().max().unwrap();
1647 if *max > latest {
1648 latest = *max;
1649 }
1650 }
1651
1652 let link = Link {
1654 latency: 10.0,
1655 jitter: 1.0,
1656 success_rate: 1.0,
1657 };
1658 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1659
1660 let mut finalizers = Vec::new();
1662 for supervisor in supervisors.iter_mut() {
1663 let (mut latest, mut monitor) = supervisor.subscribe().await;
1664 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1665 while latest < required_containers {
1666 latest = monitor.next().await.expect("event missing");
1667 }
1668 }));
1669 }
1670 join_all(finalizers).await;
1671
1672 for supervisor in supervisors.iter() {
1674 {
1676 let faults = supervisor.faults.lock().unwrap();
1677 assert!(faults.is_empty());
1678 }
1679
1680 {
1682 let invalid = supervisor.invalid.lock().unwrap();
1683 assert_eq!(*invalid, 0);
1684 }
1685
1686 {
1691 let mut found = 0;
1693 let finalizations = supervisor.finalizations.lock().unwrap();
1694 for i in latest..latest + activity_timeout {
1695 if finalizations.contains_key(&i) {
1696 found += 1;
1697 }
1698 }
1699 assert!(found >= activity_timeout - 2, "found: {found}");
1700 }
1701 }
1702
1703 let blocked = oracle.blocked().await.unwrap();
1705 assert!(blocked.is_empty());
1706 });
1707 }
1708
1709 #[test_traced]
1710 fn test_all_recovery() {
1711 all_recovery::<MinPk>();
1712 all_recovery::<MinSig>();
1713 }
1714
1715 fn partition<V: Variant>() {
1716 let n = 10;
1718 let threshold = quorum(n);
1719 let required_containers = 50;
1720 let activity_timeout = 10;
1721 let skip_timeout = 5;
1722 let namespace = b"consensus".to_vec();
1723 let executor = deterministic::Runner::timed(Duration::from_secs(900));
1724 executor.start(|mut context| async move {
1725 let (network, mut oracle) = Network::new(
1727 context.with_label("network"),
1728 Config {
1729 max_size: 1024 * 1024,
1730 },
1731 );
1732
1733 network.start();
1735
1736 let mut schemes = Vec::new();
1738 let mut validators = Vec::new();
1739 for i in 0..n {
1740 let scheme = PrivateKey::from_seed(i as u64);
1741 let pk = scheme.public_key();
1742 schemes.push(scheme);
1743 validators.push(pk);
1744 }
1745 validators.sort();
1746 schemes.sort_by_key(|s| s.public_key());
1747 let mut registrations = register_validators(&mut oracle, &validators).await;
1748
1749 let link = Link {
1751 latency: 10.0,
1752 jitter: 1.0,
1753 success_rate: 1.0,
1754 };
1755 link_validators(&mut oracle, &validators, Action::Link(link.clone()), None).await;
1756
1757 let (polynomial, shares) =
1759 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1760
1761 let relay = Arc::new(mocks::relay::Relay::new());
1763 let mut supervisors = Vec::new();
1764 let mut engine_handlers = Vec::new();
1765 for (idx, scheme) in schemes.iter().enumerate() {
1766 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1768
1769 let validator = scheme.public_key();
1771 let mut participants = BTreeMap::new();
1772 participants.insert(
1773 0,
1774 (polynomial.clone(), validators.clone(), shares[idx].clone()),
1775 );
1776 let supervisor_config = mocks::supervisor::Config::<_, V> {
1777 namespace: namespace.clone(),
1778 participants,
1779 };
1780 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1781 supervisors.push(supervisor.clone());
1782 let application_cfg = mocks::application::Config {
1783 hasher: Sha256::default(),
1784 relay: relay.clone(),
1785 participant: validator.clone(),
1786 propose_latency: (10.0, 5.0),
1787 verify_latency: (10.0, 5.0),
1788 };
1789 let (actor, application) = mocks::application::Application::new(
1790 context.with_label("application"),
1791 application_cfg,
1792 );
1793 actor.start();
1794 let blocker = oracle.control(scheme.public_key());
1795 let cfg = config::Config {
1796 crypto: scheme.clone(),
1797 blocker,
1798 automaton: application.clone(),
1799 relay: application.clone(),
1800 reporter: supervisor.clone(),
1801 supervisor,
1802 partition: validator.to_string(),
1803 compression: Some(3),
1804 mailbox_size: 1024,
1805 namespace: namespace.clone(),
1806 leader_timeout: Duration::from_secs(1),
1807 notarization_timeout: Duration::from_secs(2),
1808 nullify_retry: Duration::from_secs(10),
1809 fetch_timeout: Duration::from_secs(1),
1810 activity_timeout,
1811 skip_timeout,
1812 max_fetch_count: 1,
1813 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1814 fetch_concurrent: 1,
1815 replay_buffer: NZUsize!(1024 * 1024),
1816 write_buffer: NZUsize!(1024 * 1024),
1817 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1818 };
1819 let engine = Engine::new(context.with_label("engine"), cfg);
1820
1821 let (pending, recovered, resolver) = registrations
1823 .remove(&validator)
1824 .expect("validator should be registered");
1825 engine_handlers.push(engine.start(pending, recovered, resolver));
1826 }
1827
1828 let mut finalizers = Vec::new();
1830 for supervisor in supervisors.iter_mut() {
1831 let (mut latest, mut monitor) = supervisor.subscribe().await;
1832 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1833 while latest < required_containers {
1834 latest = monitor.next().await.expect("event missing");
1835 }
1836 }));
1837 }
1838 join_all(finalizers).await;
1839
1840 fn separated(n: usize, a: usize, b: usize) -> bool {
1842 let m = n / 2;
1843 (a < m && b >= m) || (a >= m && b < m)
1844 }
1845 link_validators(&mut oracle, &validators, Action::Unlink, Some(separated)).await;
1846
1847 context.sleep(Duration::from_secs(10)).await;
1849
1850 let mut finalizers = Vec::new();
1852 for supervisor in supervisors.iter_mut() {
1853 let (_, mut monitor) = supervisor.subscribe().await;
1854 finalizers.push(
1855 context
1856 .with_label("finalizer")
1857 .spawn(move |context| async move {
1858 select! {
1859 _timeout = context.sleep(Duration::from_secs(60)) => {},
1860 _done = monitor.next() => {
1861 panic!("engine should not notarize or finalize anything");
1862 }
1863 }
1864 }),
1865 );
1866 }
1867 join_all(finalizers).await;
1868
1869 link_validators(
1871 &mut oracle,
1872 &validators,
1873 Action::Link(link),
1874 Some(separated),
1875 )
1876 .await;
1877
1878 let mut finalizers = Vec::new();
1880 for supervisor in supervisors.iter_mut() {
1881 let (mut latest, mut monitor) = supervisor.subscribe().await;
1882 let required = latest + required_containers;
1883 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1884 while latest < required {
1885 latest = monitor.next().await.expect("event missing");
1886 }
1887 }));
1888 }
1889 join_all(finalizers).await;
1890
1891 for supervisor in supervisors.iter() {
1893 {
1895 let faults = supervisor.faults.lock().unwrap();
1896 assert!(faults.is_empty());
1897 }
1898
1899 {
1901 let invalid = supervisor.invalid.lock().unwrap();
1902 assert_eq!(*invalid, 0);
1903 }
1904 }
1905
1906 let blocked = oracle.blocked().await.unwrap();
1908 assert!(blocked.is_empty());
1909 });
1910 }
1911
1912 #[test_traced]
1913 #[ignore]
1914 fn test_partition() {
1915 partition::<MinPk>();
1916 partition::<MinSig>();
1917 }
1918
1919 fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
1920 let n = 5;
1922 let threshold = quorum(n);
1923 let required_containers = 50;
1924 let activity_timeout = 10;
1925 let skip_timeout = 5;
1926 let namespace = b"consensus".to_vec();
1927 let cfg = deterministic::Config::new()
1928 .with_seed(seed)
1929 .with_timeout(Some(Duration::from_secs(5_000)));
1930 let executor = deterministic::Runner::new(cfg);
1931 executor.start(|mut context| async move {
1932 let (network, mut oracle) = Network::new(
1934 context.with_label("network"),
1935 Config {
1936 max_size: 1024 * 1024,
1937 },
1938 );
1939
1940 network.start();
1942
1943 let mut schemes = Vec::new();
1945 let mut validators = Vec::new();
1946 for i in 0..n {
1947 let scheme = PrivateKey::from_seed(i as u64);
1948 let pk = scheme.public_key();
1949 schemes.push(scheme);
1950 validators.push(pk);
1951 }
1952 validators.sort();
1953 schemes.sort_by_key(|s| s.public_key());
1954 let mut registrations = register_validators(&mut oracle, &validators).await;
1955
1956 let degraded_link = Link {
1958 latency: 200.0,
1959 jitter: 150.0,
1960 success_rate: 0.5,
1961 };
1962 link_validators(&mut oracle, &validators, Action::Link(degraded_link), None).await;
1963
1964 let (polynomial, shares) =
1966 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1967
1968 let relay = Arc::new(mocks::relay::Relay::new());
1970 let mut supervisors = Vec::new();
1971 let mut engine_handlers = Vec::new();
1972 for (idx, scheme) in schemes.into_iter().enumerate() {
1973 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1975
1976 let validator = scheme.public_key();
1978 let mut participants = BTreeMap::new();
1979 participants.insert(
1980 0,
1981 (polynomial.clone(), validators.clone(), shares[idx].clone()),
1982 );
1983 let supervisor_config = mocks::supervisor::Config::<_, V> {
1984 namespace: namespace.clone(),
1985 participants,
1986 };
1987 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1988 supervisors.push(supervisor.clone());
1989 let application_cfg = mocks::application::Config {
1990 hasher: Sha256::default(),
1991 relay: relay.clone(),
1992 participant: validator.clone(),
1993 propose_latency: (10.0, 5.0),
1994 verify_latency: (10.0, 5.0),
1995 };
1996 let (actor, application) = mocks::application::Application::new(
1997 context.with_label("application"),
1998 application_cfg,
1999 );
2000 actor.start();
2001 let blocker = oracle.control(scheme.public_key());
2002 let cfg = config::Config {
2003 crypto: scheme,
2004 blocker,
2005 automaton: application.clone(),
2006 relay: application.clone(),
2007 reporter: supervisor.clone(),
2008 supervisor,
2009 partition: validator.to_string(),
2010 compression: Some(3),
2011 mailbox_size: 1024,
2012 namespace: namespace.clone(),
2013 leader_timeout: Duration::from_secs(1),
2014 notarization_timeout: Duration::from_secs(2),
2015 nullify_retry: Duration::from_secs(10),
2016 fetch_timeout: Duration::from_secs(1),
2017 activity_timeout,
2018 skip_timeout,
2019 max_fetch_count: 1,
2020 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2021 fetch_concurrent: 1,
2022 replay_buffer: NZUsize!(1024 * 1024),
2023 write_buffer: NZUsize!(1024 * 1024),
2024 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2025 };
2026 let engine = Engine::new(context.with_label("engine"), cfg);
2027
2028 let (pending, recovered, resolver) = registrations
2030 .remove(&validator)
2031 .expect("validator should be registered");
2032 engine_handlers.push(engine.start(pending, recovered, resolver));
2033 }
2034
2035 let mut finalizers = Vec::new();
2037 for supervisor in supervisors.iter_mut() {
2038 let (mut latest, mut monitor) = supervisor.subscribe().await;
2039 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2040 while latest < required_containers {
2041 latest = monitor.next().await.expect("event missing");
2042 }
2043 }));
2044 }
2045 join_all(finalizers).await;
2046
2047 for supervisor in supervisors.iter() {
2049 {
2051 let faults = supervisor.faults.lock().unwrap();
2052 assert!(faults.is_empty());
2053 }
2054
2055 {
2057 let invalid = supervisor.invalid.lock().unwrap();
2058 assert_eq!(*invalid, 0);
2059 }
2060 }
2061
2062 let blocked = oracle.blocked().await.unwrap();
2064 assert!(blocked.is_empty());
2065
2066 context.auditor().state()
2067 })
2068 }
2069
2070 #[test_traced]
2071 fn test_slow_and_lossy_links() {
2072 slow_and_lossy_links::<MinPk>(0);
2073 slow_and_lossy_links::<MinSig>(0);
2074 }
2075
2076 #[test_traced]
2077 #[ignore]
2078 fn test_determinism() {
2079 for seed in 1..6 {
2082 let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
2083 let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
2084 assert_eq!(pk_state_1, pk_state_2);
2085
2086 let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
2087 let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
2088 assert_eq!(sig_state_1, sig_state_2);
2089
2090 assert_ne!(pk_state_1, sig_state_1);
2092 }
2093 }
2094
2095 fn conflicter<V: Variant>(seed: u64) {
2096 let n = 4;
2098 let threshold = quorum(n);
2099 let required_containers = 50;
2100 let activity_timeout = 10;
2101 let skip_timeout = 5;
2102 let namespace = b"consensus".to_vec();
2103 let cfg = deterministic::Config::new()
2104 .with_seed(seed)
2105 .with_timeout(Some(Duration::from_secs(30)));
2106 let executor = deterministic::Runner::new(cfg);
2107 executor.start(|mut context| async move {
2108 let (network, mut oracle) = Network::new(
2110 context.with_label("network"),
2111 Config {
2112 max_size: 1024 * 1024,
2113 },
2114 );
2115
2116 network.start();
2118
2119 let mut schemes = Vec::new();
2121 let mut validators = Vec::new();
2122 for i in 0..n {
2123 let scheme = PrivateKey::from_seed(i as u64);
2124 let pk = scheme.public_key();
2125 schemes.push(scheme);
2126 validators.push(pk);
2127 }
2128 validators.sort();
2129 schemes.sort_by_key(|s| s.public_key());
2130 let mut registrations = register_validators(&mut oracle, &validators).await;
2131
2132 let link = Link {
2134 latency: 10.0,
2135 jitter: 1.0,
2136 success_rate: 1.0,
2137 };
2138 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2139
2140 let (polynomial, shares) =
2142 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2143
2144 let relay = Arc::new(mocks::relay::Relay::new());
2146 let mut supervisors = Vec::new();
2147 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2148 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2150
2151 let validator = scheme.public_key();
2153 let mut participants = BTreeMap::new();
2154 participants.insert(
2155 0,
2156 (
2157 polynomial.clone(),
2158 validators.clone(),
2159 shares[idx_scheme].clone(),
2160 ),
2161 );
2162 let supervisor_config = mocks::supervisor::Config::<_, V> {
2163 namespace: namespace.clone(),
2164 participants,
2165 };
2166 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2167 let (pending, recovered, resolver) = registrations
2168 .remove(&validator)
2169 .expect("validator should be registered");
2170 if idx_scheme == 0 {
2171 let cfg = mocks::conflicter::Config {
2172 supervisor,
2173 namespace: namespace.clone(),
2174 };
2175
2176 let engine: mocks::conflicter::Conflicter<_, V, Sha256, _> =
2177 mocks::conflicter::Conflicter::new(
2178 context.with_label("byzantine_engine"),
2179 cfg,
2180 );
2181 engine.start(pending);
2182 } else {
2183 supervisors.push(supervisor.clone());
2184 let application_cfg = mocks::application::Config {
2185 hasher: Sha256::default(),
2186 relay: relay.clone(),
2187 participant: validator.clone(),
2188 propose_latency: (10.0, 5.0),
2189 verify_latency: (10.0, 5.0),
2190 };
2191 let (actor, application) = mocks::application::Application::new(
2192 context.with_label("application"),
2193 application_cfg,
2194 );
2195 actor.start();
2196 let blocker = oracle.control(scheme.public_key());
2197 let cfg = config::Config {
2198 crypto: scheme,
2199 blocker,
2200 automaton: application.clone(),
2201 relay: application.clone(),
2202 reporter: supervisor.clone(),
2203 supervisor,
2204 partition: validator.to_string(),
2205 compression: Some(3),
2206 mailbox_size: 1024,
2207 namespace: namespace.clone(),
2208 leader_timeout: Duration::from_secs(1),
2209 notarization_timeout: Duration::from_secs(2),
2210 nullify_retry: Duration::from_secs(10),
2211 fetch_timeout: Duration::from_secs(1),
2212 activity_timeout,
2213 skip_timeout,
2214 max_fetch_count: 1,
2215 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2216 fetch_concurrent: 1,
2217 replay_buffer: NZUsize!(1024 * 1024),
2218 write_buffer: NZUsize!(1024 * 1024),
2219 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2220 };
2221 let engine = Engine::new(context.with_label("engine"), cfg);
2222 engine.start(pending, recovered, resolver);
2223 }
2224 }
2225
2226 let mut finalizers = Vec::new();
2228 for supervisor in supervisors.iter_mut() {
2229 let (mut latest, mut monitor) = supervisor.subscribe().await;
2230 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2231 while latest < required_containers {
2232 latest = monitor.next().await.expect("event missing");
2233 }
2234 }));
2235 }
2236 join_all(finalizers).await;
2237
2238 let byz = &validators[0];
2240 let mut count_conflicting = 0;
2241 for supervisor in supervisors.iter() {
2242 {
2244 let faults = supervisor.faults.lock().unwrap();
2245 assert_eq!(faults.len(), 1);
2246 let faulter = faults.get(byz).expect("byzantine party is not faulter");
2247 for (_, faults) in faulter.iter() {
2248 for fault in faults.iter() {
2249 match fault {
2250 Activity::ConflictingNotarize(_) => {
2251 count_conflicting += 1;
2252 }
2253 Activity::ConflictingFinalize(_) => {
2254 count_conflicting += 1;
2255 }
2256 _ => panic!("unexpected fault: {fault:?}"),
2257 }
2258 }
2259 }
2260 }
2261
2262 {
2264 let invalid = supervisor.invalid.lock().unwrap();
2265 assert_eq!(*invalid, 0);
2266 }
2267 }
2268 assert!(count_conflicting > 0);
2269
2270 let blocked = oracle.blocked().await.unwrap();
2272 assert!(!blocked.is_empty());
2273 for (a, b) in blocked {
2274 assert_ne!(&a, byz);
2275 assert_eq!(&b, byz);
2276 }
2277 });
2278 }
2279
2280 #[test_traced]
2281 #[ignore]
2282 fn test_conflicter() {
2283 for seed in 0..5 {
2284 conflicter::<MinPk>(seed);
2285 conflicter::<MinSig>(seed);
2286 }
2287 }
2288
2289 fn invalid<V: Variant>(seed: u64) {
2290 let n = 4;
2292 let threshold = quorum(n);
2293 let required_containers = 50;
2294 let activity_timeout = 10;
2295 let skip_timeout = 5;
2296 let namespace = b"consensus".to_vec();
2297 let cfg = deterministic::Config::new()
2298 .with_seed(seed)
2299 .with_timeout(Some(Duration::from_secs(30)));
2300 let executor = deterministic::Runner::new(cfg);
2301 executor.start(|mut context| async move {
2302 let (network, mut oracle) = Network::new(
2304 context.with_label("network"),
2305 Config {
2306 max_size: 1024 * 1024,
2307 },
2308 );
2309
2310 network.start();
2312
2313 let mut schemes = Vec::new();
2315 let mut validators = Vec::new();
2316 for i in 0..n {
2317 let scheme = PrivateKey::from_seed(i as u64);
2318 let pk = scheme.public_key();
2319 schemes.push(scheme);
2320 validators.push(pk);
2321 }
2322 validators.sort();
2323 schemes.sort_by_key(|s| s.public_key());
2324 let mut registrations = register_validators(&mut oracle, &validators).await;
2325
2326 let link = Link {
2328 latency: 10.0,
2329 jitter: 1.0,
2330 success_rate: 1.0,
2331 };
2332 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2333
2334 let (polynomial, shares) =
2336 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2337
2338 let relay = Arc::new(mocks::relay::Relay::new());
2340 let mut supervisors = Vec::new();
2341 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2342 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2344
2345 let validator = scheme.public_key();
2347 let mut participants = BTreeMap::new();
2348 participants.insert(
2349 0,
2350 (
2351 polynomial.clone(),
2352 validators.clone(),
2353 shares[idx_scheme].clone(),
2354 ),
2355 );
2356 let supervisor_config = mocks::supervisor::Config::<_, V> {
2357 namespace: namespace.clone(),
2358 participants,
2359 };
2360 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2361 let (pending, recovered, resolver) = registrations
2362 .remove(&validator)
2363 .expect("validator should be registered");
2364 if idx_scheme == 0 {
2365 let cfg = mocks::invalid::Config {
2366 supervisor,
2367 namespace: namespace.clone(),
2368 };
2369
2370 let engine: mocks::invalid::Invalid<_, V, Sha256, _> =
2371 mocks::invalid::Invalid::new(context.with_label("byzantine_engine"), cfg);
2372 engine.start(pending);
2373 } else {
2374 supervisors.push(supervisor.clone());
2375 let application_cfg = mocks::application::Config {
2376 hasher: Sha256::default(),
2377 relay: relay.clone(),
2378 participant: validator.clone(),
2379 propose_latency: (10.0, 5.0),
2380 verify_latency: (10.0, 5.0),
2381 };
2382 let (actor, application) = mocks::application::Application::new(
2383 context.with_label("application"),
2384 application_cfg,
2385 );
2386 actor.start();
2387 let blocker = oracle.control(scheme.public_key());
2388 let cfg = config::Config {
2389 crypto: scheme,
2390 blocker,
2391 automaton: application.clone(),
2392 relay: application.clone(),
2393 reporter: supervisor.clone(),
2394 supervisor,
2395 partition: validator.to_string(),
2396 compression: Some(3),
2397 mailbox_size: 1024,
2398 namespace: namespace.clone(),
2399 leader_timeout: Duration::from_secs(1),
2400 notarization_timeout: Duration::from_secs(2),
2401 nullify_retry: Duration::from_secs(10),
2402 fetch_timeout: Duration::from_secs(1),
2403 activity_timeout,
2404 skip_timeout,
2405 max_fetch_count: 1,
2406 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2407 fetch_concurrent: 1,
2408 replay_buffer: NZUsize!(1024 * 1024),
2409 write_buffer: NZUsize!(1024 * 1024),
2410 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2411 };
2412 let engine = Engine::new(context.with_label("engine"), cfg);
2413 engine.start(pending, recovered, resolver);
2414 }
2415 }
2416
2417 let mut finalizers = Vec::new();
2419 for supervisor in supervisors.iter_mut() {
2420 let (mut latest, mut monitor) = supervisor.subscribe().await;
2421 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2422 while latest < required_containers {
2423 latest = monitor.next().await.expect("event missing");
2424 }
2425 }));
2426 }
2427 join_all(finalizers).await;
2428
2429 let mut invalid_count = 0;
2431 let byz = &validators[0];
2432 for supervisor in supervisors.iter() {
2433 {
2435 let faults = supervisor.faults.lock().unwrap();
2436 assert!(faults.is_empty());
2437 }
2438
2439 {
2441 let invalid = supervisor.invalid.lock().unwrap();
2442 if *invalid > 0 {
2443 invalid_count += 1;
2444 }
2445 }
2446 }
2447 assert_eq!(invalid_count, n - 1);
2448
2449 let blocked = oracle.blocked().await.unwrap();
2451 assert!(!blocked.is_empty());
2452 for (a, b) in blocked {
2453 assert_ne!(&a, byz);
2454 assert_eq!(&b, byz);
2455 }
2456 });
2457 }
2458
2459 #[test_traced]
2460 #[ignore]
2461 fn test_invalid() {
2462 for seed in 0..5 {
2463 invalid::<MinPk>(seed);
2464 invalid::<MinSig>(seed);
2465 }
2466 }
2467
2468 fn impersonator<V: Variant>(seed: u64) {
2469 let n = 4;
2471 let threshold = quorum(n);
2472 let required_containers = 50;
2473 let activity_timeout = 10;
2474 let skip_timeout = 5;
2475 let namespace = b"consensus".to_vec();
2476 let cfg = deterministic::Config::new()
2477 .with_seed(seed)
2478 .with_timeout(Some(Duration::from_secs(30)));
2479 let executor = deterministic::Runner::new(cfg);
2480 executor.start(|mut context| async move {
2481 let (network, mut oracle) = Network::new(
2483 context.with_label("network"),
2484 Config {
2485 max_size: 1024 * 1024,
2486 },
2487 );
2488
2489 network.start();
2491
2492 let mut schemes = Vec::new();
2494 let mut validators = Vec::new();
2495 for i in 0..n {
2496 let scheme = PrivateKey::from_seed(i as u64);
2497 let pk = scheme.public_key();
2498 schemes.push(scheme);
2499 validators.push(pk);
2500 }
2501 validators.sort();
2502 schemes.sort_by_key(|s| s.public_key());
2503 let mut registrations = register_validators(&mut oracle, &validators).await;
2504
2505 let link = Link {
2507 latency: 10.0,
2508 jitter: 1.0,
2509 success_rate: 1.0,
2510 };
2511 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2512
2513 let (polynomial, shares) =
2515 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2516
2517 let relay = Arc::new(mocks::relay::Relay::new());
2519 let mut supervisors = Vec::new();
2520 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2521 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2523
2524 let validator = scheme.public_key();
2526 let mut participants = BTreeMap::new();
2527 participants.insert(
2528 0,
2529 (
2530 polynomial.clone(),
2531 validators.clone(),
2532 shares[idx_scheme].clone(),
2533 ),
2534 );
2535 let supervisor_config = mocks::supervisor::Config::<_, V> {
2536 namespace: namespace.clone(),
2537 participants,
2538 };
2539 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2540 let (pending, recovered, resolver) = registrations
2541 .remove(&validator)
2542 .expect("validator should be registered");
2543 if idx_scheme == 0 {
2544 let cfg = mocks::impersonator::Config {
2545 supervisor,
2546 namespace: namespace.clone(),
2547 };
2548
2549 let engine: mocks::impersonator::Impersonator<_, V, Sha256, _> =
2550 mocks::impersonator::Impersonator::new(
2551 context.with_label("byzantine_engine"),
2552 cfg,
2553 );
2554 engine.start(pending);
2555 } else {
2556 supervisors.push(supervisor.clone());
2557 let application_cfg = mocks::application::Config {
2558 hasher: Sha256::default(),
2559 relay: relay.clone(),
2560 participant: validator.clone(),
2561 propose_latency: (10.0, 5.0),
2562 verify_latency: (10.0, 5.0),
2563 };
2564 let (actor, application) = mocks::application::Application::new(
2565 context.with_label("application"),
2566 application_cfg,
2567 );
2568 actor.start();
2569 let blocker = oracle.control(scheme.public_key());
2570 let cfg = config::Config {
2571 crypto: scheme,
2572 blocker,
2573 automaton: application.clone(),
2574 relay: application.clone(),
2575 reporter: supervisor.clone(),
2576 supervisor,
2577 partition: validator.to_string(),
2578 compression: Some(3),
2579 mailbox_size: 1024,
2580 namespace: namespace.clone(),
2581 leader_timeout: Duration::from_secs(1),
2582 notarization_timeout: Duration::from_secs(2),
2583 nullify_retry: Duration::from_secs(10),
2584 fetch_timeout: Duration::from_secs(1),
2585 activity_timeout,
2586 skip_timeout,
2587 max_fetch_count: 1,
2588 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2589 fetch_concurrent: 1,
2590 replay_buffer: NZUsize!(1024 * 1024),
2591 write_buffer: NZUsize!(1024 * 1024),
2592 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2593 };
2594 let engine = Engine::new(context.with_label("engine"), cfg);
2595 engine.start(pending, recovered, resolver);
2596 }
2597 }
2598
2599 let mut finalizers = Vec::new();
2601 for supervisor in supervisors.iter_mut() {
2602 let (mut latest, mut monitor) = supervisor.subscribe().await;
2603 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2604 while latest < required_containers {
2605 latest = monitor.next().await.expect("event missing");
2606 }
2607 }));
2608 }
2609 join_all(finalizers).await;
2610
2611 let byz = &validators[0];
2613 for supervisor in supervisors.iter() {
2614 {
2616 let faults = supervisor.faults.lock().unwrap();
2617 assert!(faults.is_empty());
2618 }
2619
2620 {
2622 let invalid = supervisor.invalid.lock().unwrap();
2623 assert_eq!(*invalid, 0);
2624 }
2625 }
2626
2627 let blocked = oracle.blocked().await.unwrap();
2629 assert!(!blocked.is_empty());
2630 for (a, b) in blocked {
2631 assert_ne!(&a, byz);
2632 assert_eq!(&b, byz);
2633 }
2634 });
2635 }
2636
2637 #[test_traced]
2638 #[ignore]
2639 fn test_impersonator() {
2640 for seed in 0..5 {
2641 impersonator::<MinPk>(seed);
2642 impersonator::<MinSig>(seed);
2643 }
2644 }
2645
2646 fn nuller<V: Variant>(seed: u64) {
2647 let n = 4;
2649 let threshold = quorum(n);
2650 let required_containers = 50;
2651 let activity_timeout = 10;
2652 let skip_timeout = 5;
2653 let namespace = b"consensus".to_vec();
2654 let cfg = deterministic::Config::new()
2655 .with_seed(seed)
2656 .with_timeout(Some(Duration::from_secs(30)));
2657 let executor = deterministic::Runner::new(cfg);
2658 executor.start(|mut context| async move {
2659 let (network, mut oracle) = Network::new(
2661 context.with_label("network"),
2662 Config {
2663 max_size: 1024 * 1024,
2664 },
2665 );
2666
2667 network.start();
2669
2670 let mut schemes = Vec::new();
2672 let mut validators = Vec::new();
2673 for i in 0..n {
2674 let scheme = PrivateKey::from_seed(i as u64);
2675 let pk = scheme.public_key();
2676 schemes.push(scheme);
2677 validators.push(pk);
2678 }
2679 validators.sort();
2680 schemes.sort_by_key(|s| s.public_key());
2681 let mut registrations = register_validators(&mut oracle, &validators).await;
2682
2683 let link = Link {
2685 latency: 10.0,
2686 jitter: 1.0,
2687 success_rate: 1.0,
2688 };
2689 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2690
2691 let (polynomial, shares) =
2693 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2694
2695 let relay = Arc::new(mocks::relay::Relay::new());
2697 let mut supervisors = Vec::new();
2698 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2699 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2701
2702 let validator = scheme.public_key();
2704 let mut participants = BTreeMap::new();
2705 participants.insert(
2706 0,
2707 (
2708 polynomial.clone(),
2709 validators.clone(),
2710 shares[idx_scheme].clone(),
2711 ),
2712 );
2713 let supervisor_config = mocks::supervisor::Config::<_, V> {
2714 namespace: namespace.clone(),
2715 participants,
2716 };
2717 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2718 let (pending, recovered, resolver) = registrations
2719 .remove(&validator)
2720 .expect("validator should be registered");
2721 if idx_scheme == 0 {
2722 let cfg = mocks::nuller::Config {
2723 supervisor,
2724 namespace: namespace.clone(),
2725 };
2726 let engine: mocks::nuller::Nuller<_, V, Sha256, _> =
2727 mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
2728 engine.start(pending);
2729 } else {
2730 supervisors.push(supervisor.clone());
2731 let application_cfg = mocks::application::Config {
2732 hasher: Sha256::default(),
2733 relay: relay.clone(),
2734 participant: validator.clone(),
2735 propose_latency: (10.0, 5.0),
2736 verify_latency: (10.0, 5.0),
2737 };
2738 let (actor, application) = mocks::application::Application::new(
2739 context.with_label("application"),
2740 application_cfg,
2741 );
2742 actor.start();
2743 let blocker = oracle.control(scheme.public_key());
2744 let cfg = config::Config {
2745 crypto: scheme,
2746 blocker,
2747 automaton: application.clone(),
2748 relay: application.clone(),
2749 reporter: supervisor.clone(),
2750 supervisor,
2751 partition: validator.to_string(),
2752 compression: Some(3),
2753 mailbox_size: 1024,
2754 namespace: namespace.clone(),
2755 leader_timeout: Duration::from_secs(1),
2756 notarization_timeout: Duration::from_secs(2),
2757 nullify_retry: Duration::from_secs(10),
2758 fetch_timeout: Duration::from_secs(1),
2759 activity_timeout,
2760 skip_timeout,
2761 max_fetch_count: 1,
2762 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2763 fetch_concurrent: 1,
2764 replay_buffer: NZUsize!(1024 * 1024),
2765 write_buffer: NZUsize!(1024 * 1024),
2766 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2767 };
2768 let engine = Engine::new(context.with_label("engine"), cfg);
2769 engine.start(pending, recovered, resolver);
2770 }
2771 }
2772
2773 let mut finalizers = Vec::new();
2775 for supervisor in supervisors.iter_mut() {
2776 let (mut latest, mut monitor) = supervisor.subscribe().await;
2777 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2778 while latest < required_containers {
2779 latest = monitor.next().await.expect("event missing");
2780 }
2781 }));
2782 }
2783 join_all(finalizers).await;
2784
2785 let byz = &validators[0];
2787 let mut count_nullify_and_finalize = 0;
2788 for supervisor in supervisors.iter() {
2789 {
2791 let faults = supervisor.faults.lock().unwrap();
2792 assert_eq!(faults.len(), 1);
2793 let faulter = faults.get(byz).expect("byzantine party is not faulter");
2794 for (_, faults) in faulter.iter() {
2795 for fault in faults.iter() {
2796 match fault {
2797 Activity::NullifyFinalize(_) => {
2798 count_nullify_and_finalize += 1;
2799 }
2800 _ => panic!("unexpected fault: {fault:?}"),
2801 }
2802 }
2803 }
2804 }
2805
2806 {
2808 let invalid = supervisor.invalid.lock().unwrap();
2809 assert_eq!(*invalid, 0);
2810 }
2811 }
2812 assert!(count_nullify_and_finalize > 0);
2813
2814 let blocked = oracle.blocked().await.unwrap();
2816 assert!(!blocked.is_empty());
2817 for (a, b) in blocked {
2818 assert_ne!(&a, byz);
2819 assert_eq!(&b, byz);
2820 }
2821 });
2822 }
2823
2824 #[test_traced]
2825 #[ignore]
2826 fn test_nuller() {
2827 for seed in 0..5 {
2828 nuller::<MinPk>(seed);
2829 nuller::<MinSig>(seed);
2830 }
2831 }
2832
2833 fn outdated<V: Variant>(seed: u64) {
2834 let n = 4;
2836 let threshold = quorum(n);
2837 let required_containers = 100;
2838 let activity_timeout = 10;
2839 let skip_timeout = 5;
2840 let namespace = b"consensus".to_vec();
2841 let cfg = deterministic::Config::new()
2842 .with_seed(seed)
2843 .with_timeout(Some(Duration::from_secs(30)));
2844 let executor = deterministic::Runner::new(cfg);
2845 executor.start(|mut context| async move {
2846 let (network, mut oracle) = Network::new(
2848 context.with_label("network"),
2849 Config {
2850 max_size: 1024 * 1024,
2851 },
2852 );
2853
2854 network.start();
2856
2857 let mut schemes = Vec::new();
2859 let mut validators = Vec::new();
2860 for i in 0..n {
2861 let scheme = PrivateKey::from_seed(i as u64);
2862 let pk = scheme.public_key();
2863 schemes.push(scheme);
2864 validators.push(pk);
2865 }
2866 validators.sort();
2867 schemes.sort_by_key(|s| s.public_key());
2868 let mut registrations = register_validators(&mut oracle, &validators).await;
2869
2870 let link = Link {
2872 latency: 10.0,
2873 jitter: 1.0,
2874 success_rate: 1.0,
2875 };
2876 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2877
2878 let (polynomial, shares) =
2880 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2881
2882 let relay = Arc::new(mocks::relay::Relay::new());
2884 let mut supervisors = Vec::new();
2885 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2886 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2888
2889 let validator = scheme.public_key();
2891 let mut participants = BTreeMap::new();
2892 participants.insert(
2893 0,
2894 (
2895 polynomial.clone(),
2896 validators.clone(),
2897 shares[idx_scheme].clone(),
2898 ),
2899 );
2900 let supervisor_config = mocks::supervisor::Config::<_, V> {
2901 namespace: namespace.clone(),
2902 participants,
2903 };
2904 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2905 let (pending, recovered, resolver) = registrations
2906 .remove(&validator)
2907 .expect("validator should be registered");
2908 if idx_scheme == 0 {
2909 let cfg = mocks::outdated::Config {
2910 supervisor,
2911 namespace: namespace.clone(),
2912 view_delta: activity_timeout * 4,
2913 };
2914 let engine: mocks::outdated::Outdated<_, V, Sha256, _> =
2915 mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
2916 engine.start(pending);
2917 } else {
2918 supervisors.push(supervisor.clone());
2919 let application_cfg = mocks::application::Config {
2920 hasher: Sha256::default(),
2921 relay: relay.clone(),
2922 participant: validator.clone(),
2923 propose_latency: (10.0, 5.0),
2924 verify_latency: (10.0, 5.0),
2925 };
2926 let (actor, application) = mocks::application::Application::new(
2927 context.with_label("application"),
2928 application_cfg,
2929 );
2930 actor.start();
2931 let blocker = oracle.control(scheme.public_key());
2932 let cfg = config::Config {
2933 crypto: scheme,
2934 blocker,
2935 automaton: application.clone(),
2936 relay: application.clone(),
2937 reporter: supervisor.clone(),
2938 supervisor,
2939 partition: validator.to_string(),
2940 compression: Some(3),
2941 mailbox_size: 1024,
2942 namespace: namespace.clone(),
2943 leader_timeout: Duration::from_secs(1),
2944 notarization_timeout: Duration::from_secs(2),
2945 nullify_retry: Duration::from_secs(10),
2946 fetch_timeout: Duration::from_secs(1),
2947 activity_timeout,
2948 skip_timeout,
2949 max_fetch_count: 1,
2950 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2951 fetch_concurrent: 1,
2952 replay_buffer: NZUsize!(1024 * 1024),
2953 write_buffer: NZUsize!(1024 * 1024),
2954 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2955 };
2956 let engine = Engine::new(context.with_label("engine"), cfg);
2957 engine.start(pending, recovered, resolver);
2958 }
2959 }
2960
2961 let mut finalizers = Vec::new();
2963 for supervisor in supervisors.iter_mut() {
2964 let (mut latest, mut monitor) = supervisor.subscribe().await;
2965 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2966 while latest < required_containers {
2967 latest = monitor.next().await.expect("event missing");
2968 }
2969 }));
2970 }
2971 join_all(finalizers).await;
2972
2973 for supervisor in supervisors.iter() {
2975 {
2977 let faults = supervisor.faults.lock().unwrap();
2978 assert!(faults.is_empty());
2979 }
2980
2981 {
2983 let invalid = supervisor.invalid.lock().unwrap();
2984 assert_eq!(*invalid, 0);
2985 }
2986 }
2987
2988 let blocked = oracle.blocked().await.unwrap();
2990 assert!(blocked.is_empty());
2991 });
2992 }
2993
2994 #[test_traced]
2995 #[ignore]
2996 fn test_outdated() {
2997 for seed in 0..5 {
2998 outdated::<MinPk>(seed);
2999 outdated::<MinSig>(seed);
3000 }
3001 }
3002
3003 fn run_1k<V: Variant>() {
3004 let n = 10;
3006 let threshold = quorum(n);
3007 let required_containers = 1_000;
3008 let activity_timeout = 10;
3009 let skip_timeout = 5;
3010 let namespace = b"consensus".to_vec();
3011 let cfg = deterministic::Config::new();
3012 let executor = deterministic::Runner::new(cfg);
3013 executor.start(|mut context| async move {
3014 let (network, mut oracle) = Network::new(
3016 context.with_label("network"),
3017 Config {
3018 max_size: 1024 * 1024,
3019 },
3020 );
3021
3022 network.start();
3024
3025 let mut schemes = Vec::new();
3027 let mut validators = Vec::new();
3028 for i in 0..n {
3029 let scheme = PrivateKey::from_seed(i as u64);
3030 let pk = scheme.public_key();
3031 schemes.push(scheme);
3032 validators.push(pk);
3033 }
3034 validators.sort();
3035 schemes.sort_by_key(|s| s.public_key());
3036 let mut registrations = register_validators(&mut oracle, &validators).await;
3037
3038 let link = Link {
3040 latency: 80.0,
3041 jitter: 10.0,
3042 success_rate: 0.98,
3043 };
3044 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
3045
3046 let (polynomial, shares) =
3048 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
3049
3050 let relay = Arc::new(mocks::relay::Relay::new());
3052 let mut supervisors = Vec::new();
3053 let mut engine_handlers = Vec::new();
3054 for (idx, scheme) in schemes.into_iter().enumerate() {
3055 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
3057
3058 let validator = scheme.public_key();
3060 let mut participants = BTreeMap::new();
3061 participants.insert(
3062 0,
3063 (polynomial.clone(), validators.clone(), shares[idx].clone()),
3064 );
3065 let supervisor_config = mocks::supervisor::Config::<_, V> {
3066 namespace: namespace.clone(),
3067 participants,
3068 };
3069 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
3070 supervisors.push(supervisor.clone());
3071 let application_cfg = mocks::application::Config {
3072 hasher: Sha256::default(),
3073 relay: relay.clone(),
3074 participant: validator.clone(),
3075 propose_latency: (100.0, 50.0),
3076 verify_latency: (50.0, 40.0),
3077 };
3078 let (actor, application) = mocks::application::Application::new(
3079 context.with_label("application"),
3080 application_cfg,
3081 );
3082 actor.start();
3083 let blocker = oracle.control(scheme.public_key());
3084 let cfg = config::Config {
3085 crypto: scheme,
3086 blocker,
3087 automaton: application.clone(),
3088 relay: application.clone(),
3089 reporter: supervisor.clone(),
3090 supervisor,
3091 partition: validator.to_string(),
3092 compression: Some(3),
3093 mailbox_size: 1024,
3094 namespace: namespace.clone(),
3095 leader_timeout: Duration::from_secs(1),
3096 notarization_timeout: Duration::from_secs(2),
3097 nullify_retry: Duration::from_secs(10),
3098 fetch_timeout: Duration::from_secs(1),
3099 activity_timeout,
3100 skip_timeout,
3101 max_fetch_count: 1,
3102 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
3103 fetch_concurrent: 1,
3104 replay_buffer: NZUsize!(1024 * 1024),
3105 write_buffer: NZUsize!(1024 * 1024),
3106 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3107 };
3108 let engine = Engine::new(context.with_label("engine"), cfg);
3109
3110 let (pending, recovered, resolver) = registrations
3112 .remove(&validator)
3113 .expect("validator should be registered");
3114 engine_handlers.push(engine.start(pending, recovered, resolver));
3115 }
3116
3117 let mut finalizers = Vec::new();
3119 for supervisor in supervisors.iter_mut() {
3120 let (mut latest, mut monitor) = supervisor.subscribe().await;
3121 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3122 while latest < required_containers {
3123 latest = monitor.next().await.expect("event missing");
3124 }
3125 }));
3126 }
3127 join_all(finalizers).await;
3128
3129 for supervisor in supervisors.iter() {
3131 {
3133 let faults = supervisor.faults.lock().unwrap();
3134 assert!(faults.is_empty());
3135 }
3136
3137 {
3139 let invalid = supervisor.invalid.lock().unwrap();
3140 assert_eq!(*invalid, 0);
3141 }
3142 }
3143
3144 let blocked = oracle.blocked().await.unwrap();
3146 assert!(blocked.is_empty());
3147 })
3148 }
3149
3150 #[test_traced]
3151 #[ignore]
3152 fn test_1k() {
3153 run_1k::<MinPk>();
3154 run_1k::<MinSig>();
3155 }
3156
3157 fn tle<V: Variant>() {
3158 let n = 4;
3160 let threshold = quorum(n);
3161 let namespace = b"consensus".to_vec();
3162 let activity_timeout = 100;
3163 let skip_timeout = 50;
3164 let executor = deterministic::Runner::timed(Duration::from_secs(30));
3165 executor.start(|mut context| async move {
3166 let (network, mut oracle) = Network::new(
3168 context.with_label("network"),
3169 Config {
3170 max_size: 1024 * 1024,
3171 },
3172 );
3173
3174 network.start();
3176
3177 let mut schemes = Vec::new();
3179 let mut validators = Vec::new();
3180 for i in 0..n {
3181 let scheme = PrivateKey::from_seed(i as u64);
3182 let pk = scheme.public_key();
3183 schemes.push(scheme);
3184 validators.push(pk);
3185 }
3186 validators.sort();
3187 schemes.sort_by_key(|s| s.public_key());
3188 let mut registrations = register_validators(&mut oracle, &validators).await;
3189
3190 let link = Link {
3192 latency: 10.0,
3193 jitter: 5.0,
3194 success_rate: 1.0,
3195 };
3196 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
3197
3198 let (polynomial, shares) =
3200 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
3201 let public_key = *public::<V>(&polynomial);
3202
3203 let relay = Arc::new(mocks::relay::Relay::new());
3205 let mut supervisors = Vec::new();
3206 let mut engine_handlers = Vec::new();
3207 let monitor_supervisor = Arc::new(Mutex::new(None));
3208 for (idx, scheme) in schemes.into_iter().enumerate() {
3209 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
3211
3212 let validator = scheme.public_key();
3214 let mut participants = BTreeMap::new();
3215 participants.insert(
3216 0,
3217 (polynomial.clone(), validators.clone(), shares[idx].clone()),
3218 );
3219
3220 let supervisor_config = mocks::supervisor::Config::<_, V> {
3222 namespace: namespace.clone(),
3223 participants,
3224 };
3225 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
3226 supervisors.push(supervisor.clone());
3227 if idx == 0 {
3228 *monitor_supervisor.lock().unwrap() = Some(supervisor.clone());
3229 }
3230
3231 let application_cfg = mocks::application::Config {
3233 hasher: Sha256::default(),
3234 relay: relay.clone(),
3235 participant: validator.clone(),
3236 propose_latency: (10.0, 5.0),
3237 verify_latency: (10.0, 5.0),
3238 };
3239 let (actor, application) = mocks::application::Application::new(
3240 context.with_label("application"),
3241 application_cfg,
3242 );
3243 actor.start();
3244 let blocker = oracle.control(scheme.public_key());
3245 let cfg = config::Config {
3246 crypto: scheme,
3247 blocker,
3248 automaton: application.clone(),
3249 relay: application.clone(),
3250 reporter: supervisor.clone(),
3251 supervisor,
3252 partition: validator.to_string(),
3253 compression: Some(3),
3254 mailbox_size: 1024,
3255 namespace: namespace.clone(),
3256 leader_timeout: Duration::from_millis(100),
3257 notarization_timeout: Duration::from_millis(200),
3258 nullify_retry: Duration::from_millis(500),
3259 fetch_timeout: Duration::from_millis(100),
3260 activity_timeout,
3261 skip_timeout,
3262 max_fetch_count: 1,
3263 fetch_rate_per_peer: Quota::per_second(NZU32!(10)),
3264 fetch_concurrent: 1,
3265 replay_buffer: NZUsize!(1024 * 1024),
3266 write_buffer: NZUsize!(1024 * 1024),
3267 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3268 };
3269 let engine = Engine::new(context.with_label("engine"), cfg);
3270
3271 let (pending, recovered, resolver) = registrations
3273 .remove(&validator)
3274 .expect("validator should be registered");
3275 engine_handlers.push(engine.start(pending, recovered, resolver));
3276 }
3277
3278 let target = 10u64; let target_bytes = target.to_be_bytes();
3281 let message_content = b"Secret message for future view10"; let message = Block::new(*message_content);
3283
3284 let seed_namespace = seed_namespace(&namespace);
3286 let ciphertext = encrypt::<_, V>(
3287 &mut context,
3288 public_key,
3289 (Some(&seed_namespace), &target_bytes),
3290 &message,
3291 );
3292
3293 let supervisor = monitor_supervisor.lock().unwrap().clone().unwrap();
3295 loop {
3296 context.sleep(Duration::from_millis(100)).await;
3298 let notarizations = supervisor.notarizations.lock().unwrap();
3299 let Some(notarization) = notarizations.get(&target) else {
3300 continue;
3301 };
3302
3303 let seed_signature = notarization.seed_signature;
3305 let decrypted = decrypt::<V>(&seed_signature, &ciphertext)
3306 .expect("Decryption should succeed with valid seed signature");
3307 assert_eq!(
3308 message.as_ref(),
3309 decrypted.as_ref(),
3310 "Decrypted message should match original message"
3311 );
3312 break;
3313 }
3314 });
3315 }
3316
3317 #[test_traced]
3318 fn test_tle() {
3319 tle::<MinPk>();
3320 tle::<MinSig>();
3321 }
3322}