1use types::View;
168
169pub mod types;
170
171cfg_if::cfg_if! {
172 if #[cfg(not(target_arch = "wasm32"))] {
173 mod actors;
174 mod config;
175 pub use config::Config;
176 mod engine;
177 pub use engine::Engine;
178 mod metrics;
179 }
180}
181
182#[cfg(test)]
183pub mod mocks;
184
185pub(crate) fn min_active(activity_timeout: View, last_finalized: View) -> View {
187 last_finalized.saturating_sub(activity_timeout)
188}
189
190pub(crate) fn interesting(
194 activity_timeout: View,
195 last_finalized: View,
196 current: View,
197 pending: View,
198 allow_future: bool,
199) -> bool {
200 if pending < min_active(activity_timeout, last_finalized) {
201 return false;
202 }
203 if !allow_future && pending > current + 1 {
204 return false;
205 }
206 true
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212 use crate::{threshold_simplex::types::seed_namespace, Monitor};
213 use commonware_cryptography::{
214 bls12381::{
215 dkg::ops,
216 primitives::{
217 poly::public,
218 variant::{MinPk, MinSig, Variant},
219 },
220 tle::{decrypt, encrypt, Block},
221 },
222 ed25519::PrivateKey,
223 PrivateKeyExt as _, PublicKey, Sha256, Signer as _,
224 };
225 use commonware_macros::{select, test_traced};
226 use commonware_p2p::simulated::{Config, Link, Network, Oracle, Receiver, Sender};
227 use commonware_runtime::{buffer::PoolRef, deterministic, Clock, Metrics, Runner, Spawner};
228 use commonware_utils::{quorum, NZUsize, NZU32};
229 use engine::Engine;
230 use futures::{future::join_all, StreamExt};
231 use governor::Quota;
232 use rand::{rngs::StdRng, Rng as _, SeedableRng as _};
233 use std::{
234 collections::{BTreeMap, HashMap},
235 num::NonZeroUsize,
236 sync::{Arc, Mutex},
237 time::Duration,
238 };
239 use tracing::{debug, warn};
240 use types::Activity;
241
242 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
243 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
244
245 async fn register_validators<P: PublicKey>(
247 oracle: &mut Oracle<P>,
248 validators: &[P],
249 ) -> HashMap<
250 P,
251 (
252 (Sender<P>, Receiver<P>),
253 (Sender<P>, Receiver<P>),
254 (Sender<P>, Receiver<P>),
255 ),
256 > {
257 let mut registrations = HashMap::new();
258 for validator in validators.iter() {
259 let (pending_sender, pending_receiver) =
260 oracle.register(validator.clone(), 0).await.unwrap();
261 let (recovered_sender, recovered_receiver) =
262 oracle.register(validator.clone(), 1).await.unwrap();
263 let (resolver_sender, resolver_receiver) =
264 oracle.register(validator.clone(), 2).await.unwrap();
265 registrations.insert(
266 validator.clone(),
267 (
268 (pending_sender, pending_receiver),
269 (recovered_sender, recovered_receiver),
270 (resolver_sender, resolver_receiver),
271 ),
272 );
273 }
274 registrations
275 }
276
277 enum Action {
279 Link(Link),
280 Update(Link), Unlink,
282 }
283
284 async fn link_validators<P: PublicKey>(
290 oracle: &mut Oracle<P>,
291 validators: &[P],
292 action: Action,
293 restrict_to: Option<fn(usize, usize, usize) -> bool>,
294 ) {
295 for (i1, v1) in validators.iter().enumerate() {
296 for (i2, v2) in validators.iter().enumerate() {
297 if v2 == v1 {
299 continue;
300 }
301
302 if let Some(f) = restrict_to {
304 if !f(validators.len(), i1, i2) {
305 continue;
306 }
307 }
308
309 match action {
311 Action::Update(_) | Action::Unlink => {
312 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
313 }
314 _ => {}
315 }
316
317 match action {
319 Action::Link(ref link) | Action::Update(ref link) => {
320 oracle
321 .add_link(v1.clone(), v2.clone(), link.clone())
322 .await
323 .unwrap();
324 }
325 _ => {}
326 }
327 }
328 }
329 }
330
331 fn all_online<V: Variant>() {
332 let n = 5;
334 let threshold = quorum(n);
335 let required_containers = 100;
336 let activity_timeout = 10;
337 let skip_timeout = 5;
338 let namespace = b"consensus".to_vec();
339 let executor = deterministic::Runner::timed(Duration::from_secs(30));
340 executor.start(|mut context| async move {
341 let (network, mut oracle) = Network::new(
343 context.with_label("network"),
344 Config {
345 max_size: 1024 * 1024,
346 },
347 );
348
349 network.start();
351
352 let mut schemes = Vec::new();
354 let mut validators = Vec::new();
355 for i in 0..n {
356 let scheme = PrivateKey::from_seed(i as u64);
357 let pk = scheme.public_key();
358 schemes.push(scheme);
359 validators.push(pk);
360 }
361 validators.sort();
362 schemes.sort_by_key(|s| s.public_key());
363 let mut registrations = register_validators(&mut oracle, &validators).await;
364
365 let link = Link {
367 latency: Duration::from_millis(10),
368 jitter: Duration::from_millis(1),
369 success_rate: 1.0,
370 };
371 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
372
373 let (polynomial, shares) =
375 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
376
377 let relay = Arc::new(mocks::relay::Relay::new());
379 let mut supervisors = Vec::new();
380 let mut engine_handlers = Vec::new();
381 for (idx, scheme) in schemes.into_iter().enumerate() {
382 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
384
385 let validator = scheme.public_key();
387 let mut participants = BTreeMap::new();
388 participants.insert(
389 0,
390 (
391 polynomial.clone(),
392 validators.clone(),
393 Some(shares[idx].clone()),
394 ),
395 );
396 let supervisor_config = mocks::supervisor::Config::<_, V> {
397 namespace: namespace.clone(),
398 participants,
399 };
400 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
401 supervisors.push(supervisor.clone());
402 let application_cfg = mocks::application::Config {
403 hasher: Sha256::default(),
404 relay: relay.clone(),
405 participant: validator.clone(),
406 propose_latency: (10.0, 5.0),
407 verify_latency: (10.0, 5.0),
408 };
409 let (actor, application) = mocks::application::Application::new(
410 context.with_label("application"),
411 application_cfg,
412 );
413 actor.start();
414 let blocker = oracle.control(scheme.public_key());
415 let cfg = config::Config {
416 crypto: scheme,
417 blocker,
418 automaton: application.clone(),
419 relay: application.clone(),
420 reporter: supervisor.clone(),
421 supervisor,
422 partition: validator.to_string(),
423 mailbox_size: 1024,
424 namespace: namespace.clone(),
425 leader_timeout: Duration::from_secs(1),
426 notarization_timeout: Duration::from_secs(2),
427 nullify_retry: Duration::from_secs(10),
428 fetch_timeout: Duration::from_secs(1),
429 activity_timeout,
430 skip_timeout,
431 max_fetch_count: 1,
432 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
433 fetch_concurrent: 1,
434 replay_buffer: NZUsize!(1024 * 1024),
435 write_buffer: NZUsize!(1024 * 1024),
436 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
437 };
438 let engine = Engine::new(context.with_label("engine"), cfg);
439
440 let (pending, recovered, resolver) = registrations
442 .remove(&validator)
443 .expect("validator should be registered");
444 engine_handlers.push(engine.start(pending, recovered, resolver));
445 }
446
447 let mut finalizers = Vec::new();
449 for supervisor in supervisors.iter_mut() {
450 let (mut latest, mut monitor) = supervisor.subscribe().await;
451 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
452 while latest < required_containers {
453 latest = monitor.next().await.expect("event missing");
454 }
455 }));
456 }
457 join_all(finalizers).await;
458
459 let latest_complete = required_containers - activity_timeout;
461 for supervisor in supervisors.iter() {
462 {
464 let faults = supervisor.faults.lock().unwrap();
465 assert!(faults.is_empty());
466 }
467
468 {
470 let invalid = supervisor.invalid.lock().unwrap();
471 assert_eq!(*invalid, 0);
472 }
473
474 {
476 let seeds = supervisor.seeds.lock().unwrap();
477 for view in 1..latest_complete {
478 if !seeds.contains_key(&view) {
480 panic!("view: {view}");
481 }
482 }
483 }
484
485 let mut notarized = HashMap::new();
487 let mut finalized = HashMap::new();
488 {
489 let notarizes = supervisor.notarizes.lock().unwrap();
490 for view in 1..latest_complete {
491 let Some(payloads) = notarizes.get(&view) else {
493 continue;
494 };
495 if payloads.len() > 1 {
496 panic!("view: {view}");
497 }
498 let (digest, notarizers) = payloads.iter().next().unwrap();
499 notarized.insert(view, *digest);
500
501 if notarizers.len() < threshold as usize {
502 panic!("view: {view}");
505 }
506 }
507 }
508 {
509 let notarizations = supervisor.notarizations.lock().unwrap();
510 for view in 1..latest_complete {
511 let Some(notarization) = notarizations.get(&view) else {
513 continue;
514 };
515 let Some(digest) = notarized.get(&view) else {
516 continue;
517 };
518 assert_eq!(¬arization.proposal.payload, digest);
519 }
520 }
521 {
522 let finalizes = supervisor.finalizes.lock().unwrap();
523 for view in 1..latest_complete {
524 let Some(payloads) = finalizes.get(&view) else {
526 continue;
527 };
528 if payloads.len() > 1 {
529 panic!("view: {view}");
530 }
531 let (digest, finalizers) = payloads.iter().next().unwrap();
532 finalized.insert(view, *digest);
533
534 if view > latest_complete {
536 continue;
537 }
538
539 if finalizers.len() < threshold as usize {
541 panic!("view: {view}");
544 }
545
546 let nullifies = supervisor.nullifies.lock().unwrap();
548 let Some(nullifies) = nullifies.get(&view) else {
549 continue;
550 };
551 for (_, finalizers) in payloads.iter() {
552 for finalizer in finalizers.iter() {
553 if nullifies.contains(finalizer) {
554 panic!("should not nullify and finalize at same view");
555 }
556 }
557 }
558 }
559 }
560 {
561 let finalizations = supervisor.finalizations.lock().unwrap();
562 for view in 1..latest_complete {
563 let Some(finalization) = finalizations.get(&view) else {
565 continue;
566 };
567 let Some(digest) = finalized.get(&view) else {
568 continue;
569 };
570 assert_eq!(&finalization.proposal.payload, digest);
571 }
572 }
573 }
574
575 let blocked = oracle.blocked().await.unwrap();
577 assert!(blocked.is_empty());
578 });
579 }
580
581 #[test_traced]
582 fn test_all_online() {
583 all_online::<MinPk>();
584 all_online::<MinSig>();
585 }
586
587 fn observer<V: Variant>() {
588 let n_active = 5;
590 let threshold = quorum(n_active);
591 let required_containers = 100;
592 let activity_timeout = 10;
593 let skip_timeout = 5;
594 let namespace = b"consensus".to_vec();
595 let executor = deterministic::Runner::timed(Duration::from_secs(30));
596 executor.start(|mut context| async move {
597 let (network, mut oracle) = Network::new(
599 context.with_label("network"),
600 Config {
601 max_size: 1024 * 1024,
602 },
603 );
604
605 network.start();
607
608 let mut schemes = Vec::new();
610 let mut validators = Vec::new();
611 for i in 0..n_active {
612 let scheme = PrivateKey::from_seed(i as u64);
613 let pk = scheme.public_key();
614 schemes.push(scheme);
615 validators.push(pk);
616 }
617 schemes.sort_by_key(|s| s.public_key());
618 validators.sort();
619
620 let scheme_observer = PrivateKey::from_seed(n_active as u64);
622 let pk_observer = scheme_observer.public_key();
623 schemes.push(scheme_observer);
624
625 let mut all_validators = validators.clone();
627 all_validators.push(pk_observer.clone());
628 all_validators.sort();
629 let mut registrations = register_validators(&mut oracle, &all_validators).await;
630
631 let link = Link {
633 latency: Duration::from_millis(10),
634 jitter: Duration::from_millis(1),
635 success_rate: 1.0,
636 };
637 link_validators(&mut oracle, &all_validators, Action::Link(link), None).await;
638
639 let (polynomial, shares) =
641 ops::generate_shares::<_, V>(&mut context, None, n_active, threshold);
642
643 let relay = Arc::new(mocks::relay::Relay::new());
645 let mut supervisors = Vec::new();
646
647 for (idx, scheme) in schemes.into_iter().enumerate() {
648 let is_observer = scheme.public_key() == pk_observer;
649
650 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
652
653 let validator = scheme.public_key();
655 let mut participants = BTreeMap::new();
656 let share = if is_observer {
657 None
658 } else {
659 Some(shares[idx].clone())
660 };
661 participants.insert(0, (polynomial.clone(), validators.clone(), share));
662 let supervisor_config = mocks::supervisor::Config::<_, V> {
663 namespace: namespace.clone(),
664 participants,
665 };
666 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
667 supervisors.push(supervisor.clone());
668 let application_cfg = mocks::application::Config {
669 hasher: Sha256::default(),
670 relay: relay.clone(),
671 participant: validator.clone(),
672 propose_latency: (10.0, 5.0),
673 verify_latency: (10.0, 5.0),
674 };
675 let (actor, application) = mocks::application::Application::new(
676 context.with_label("application"),
677 application_cfg,
678 );
679 actor.start();
680 let blocker = oracle.control(validator.clone());
681 let cfg = config::Config {
682 crypto: scheme,
683 blocker,
684 automaton: application.clone(),
685 relay: application.clone(),
686 reporter: supervisor.clone(),
687 supervisor,
688 partition: validator.to_string(),
689 mailbox_size: 1024,
690 namespace: namespace.clone(),
691 leader_timeout: Duration::from_secs(1),
692 notarization_timeout: Duration::from_secs(2),
693 nullify_retry: Duration::from_secs(10),
694 fetch_timeout: Duration::from_secs(1),
695 activity_timeout,
696 skip_timeout,
697 max_fetch_count: 1,
698 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
699 fetch_concurrent: 1,
700 replay_buffer: NZUsize!(1024 * 1024),
701 write_buffer: NZUsize!(1024 * 1024),
702 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
703 };
704 let engine = Engine::new(context.with_label("engine"), cfg);
705
706 let (pending, recovered, resolver) = registrations
708 .remove(&validator)
709 .expect("validator should be registered");
710 engine.start(pending, recovered, resolver);
711 }
712
713 let mut finalizers = Vec::new();
715 for supervisor in supervisors.iter_mut() {
716 let (mut latest, mut monitor) = supervisor.subscribe().await;
717 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
718 while latest < required_containers {
719 latest = monitor.next().await.expect("event missing");
720 }
721 }));
722 }
723 join_all(finalizers).await;
724
725 for supervisor in supervisors.iter() {
727 {
729 let faults = supervisor.faults.lock().unwrap();
730 assert!(faults.is_empty());
731 }
732 {
733 let invalid = supervisor.invalid.lock().unwrap();
734 assert_eq!(*invalid, 0);
735 }
736
737 let blocked = oracle.blocked().await.unwrap();
739 assert!(blocked.is_empty());
740 }
741 });
742 }
743
744 #[test_traced]
745 fn test_observer() {
746 observer::<MinPk>();
747 observer::<MinSig>();
748 }
749
750 fn unclean_shutdown<V: Variant>() {
751 let n = 5;
753 let threshold = quorum(n);
754 let required_containers = 100;
755 let activity_timeout = 10;
756 let skip_timeout = 5;
757 let namespace = b"consensus".to_vec();
758
759 let mut rng = StdRng::seed_from_u64(0);
761 let (polynomial, shares) = ops::generate_shares::<_, V>(&mut rng, None, n, threshold);
762
763 let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
765 let supervised = Arc::new(Mutex::new(Vec::new()));
766 let mut prev_ctx = None;
767
768 loop {
769 let namespace = namespace.clone();
770 let shutdowns = shutdowns.clone();
771 let supervised = supervised.clone();
772 let polynomial = polynomial.clone();
773 let shares = shares.clone();
774
775 let f = |mut context: deterministic::Context| async move {
776 let (network, mut oracle) = Network::new(
778 context.with_label("network"),
779 Config {
780 max_size: 1024 * 1024,
781 },
782 );
783
784 network.start();
786
787 let mut schemes = Vec::new();
789 let mut validators = Vec::new();
790 for i in 0..n {
791 let scheme = PrivateKey::from_seed(i as u64);
792 let pk = scheme.public_key();
793 schemes.push(scheme);
794 validators.push(pk);
795 }
796 validators.sort();
797 schemes.sort_by_key(|s| s.public_key());
798 let mut registrations = register_validators(&mut oracle, &validators).await;
799
800 let link = Link {
802 latency: Duration::from_millis(50),
803 jitter: Duration::from_millis(50),
804 success_rate: 1.0,
805 };
806 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
807
808 let relay = Arc::new(mocks::relay::Relay::new());
810 let mut supervisors = HashMap::new();
811 let mut engine_handlers = Vec::new();
812 for (idx, scheme) in schemes.into_iter().enumerate() {
813 let context = context
815 .clone()
816 .with_label(&format!("validator-{}", scheme.public_key()));
817
818 let validator = scheme.public_key();
820 let mut participants = BTreeMap::new();
821 participants.insert(
822 0,
823 (
824 polynomial.clone(),
825 validators.clone(),
826 Some(shares[idx].clone()),
827 ),
828 );
829 let supervisor_config = mocks::supervisor::Config::<_, V> {
830 namespace: namespace.clone(),
831 participants,
832 };
833 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
834 supervisors.insert(validator.clone(), supervisor.clone());
835 let application_cfg = mocks::application::Config {
836 hasher: Sha256::default(),
837 relay: relay.clone(),
838 participant: validator.clone(),
839 propose_latency: (10.0, 5.0),
840 verify_latency: (10.0, 5.0),
841 };
842 let (actor, application) = mocks::application::Application::new(
843 context.with_label("application"),
844 application_cfg,
845 );
846 actor.start();
847 let blocker = oracle.control(scheme.public_key());
848 let cfg = config::Config {
849 crypto: scheme,
850 blocker,
851 automaton: application.clone(),
852 relay: application.clone(),
853 reporter: supervisor.clone(),
854 supervisor,
855 partition: validator.to_string(),
856 mailbox_size: 1024,
857 namespace: namespace.clone(),
858 leader_timeout: Duration::from_secs(1),
859 notarization_timeout: Duration::from_secs(2),
860 nullify_retry: Duration::from_secs(10),
861 fetch_timeout: Duration::from_secs(1),
862 activity_timeout,
863 skip_timeout,
864 max_fetch_count: 1,
865 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
866 fetch_concurrent: 1,
867 replay_buffer: NZUsize!(1024 * 1024),
868 write_buffer: NZUsize!(1024 * 1024),
869 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
870 };
871 let engine = Engine::new(context.with_label("engine"), cfg);
872
873 let (pending, recovered, resolver) = registrations
875 .remove(&validator)
876 .expect("validator should be registered");
877 engine_handlers.push(engine.start(pending, recovered, resolver));
878 }
879
880 let mut finalizers = Vec::new();
882 for (_, supervisor) in supervisors.iter_mut() {
883 let (mut latest, mut monitor) = supervisor.subscribe().await;
884 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
885 while latest < required_containers {
886 latest = monitor.next().await.expect("event missing");
887 }
888 }));
889 }
890
891 let wait =
893 context.gen_range(Duration::from_millis(10)..Duration::from_millis(2_000));
894 let result = select! {
895 _ = context.sleep(wait) => {
896 {
898 let mut shutdowns = shutdowns.lock().unwrap();
899 debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
900 *shutdowns += 1;
901 }
902 supervised.lock().unwrap().push(supervisors);
903 (false,context)
904 },
905 _ = join_all(finalizers) => {
906 let supervised = supervised.lock().unwrap();
908 for supervisors in supervised.iter() {
909 for (_, supervisor) in supervisors.iter() {
910 let faults = supervisor.faults.lock().unwrap();
911 assert!(faults.is_empty());
912 }
913 }
914 (true,context)
915 }
916 };
917
918 let blocked = oracle.blocked().await.unwrap();
920 assert!(blocked.is_empty());
921
922 result
923 };
924
925 let (complete, context) = if let Some(prev_ctx) = prev_ctx {
926 deterministic::Runner::from(prev_ctx)
927 } else {
928 deterministic::Runner::timed(Duration::from_secs(30))
929 }
930 .start(f);
931
932 if complete {
934 break;
935 }
936
937 prev_ctx = Some(context.recover());
938 }
939 }
940
941 #[test_traced]
942 fn test_unclean_shutdown() {
943 unclean_shutdown::<MinPk>();
944 unclean_shutdown::<MinSig>();
945 }
946
947 fn backfill<V: Variant>() {
948 let n = 4;
950 let threshold = quorum(n);
951 let required_containers = 100;
952 let activity_timeout = 10;
953 let skip_timeout = 5;
954 let namespace = b"consensus".to_vec();
955 let executor = deterministic::Runner::timed(Duration::from_secs(720));
956 executor.start(|mut context| async move {
957 let (network, mut oracle) = Network::new(
959 context.with_label("network"),
960 Config {
961 max_size: 1024 * 1024,
962 },
963 );
964
965 network.start();
967
968 let mut schemes = Vec::new();
970 let mut validators = Vec::new();
971 for i in 0..n {
972 let scheme = PrivateKey::from_seed(i as u64);
973 let pk = scheme.public_key();
974 schemes.push(scheme);
975 validators.push(pk);
976 }
977 validators.sort();
978 schemes.sort_by_key(|s| s.public_key());
979 let mut registrations = register_validators(&mut oracle, &validators).await;
980
981 let link = Link {
983 latency: Duration::from_millis(10),
984 jitter: Duration::from_millis(1),
985 success_rate: 1.0,
986 };
987 link_validators(
988 &mut oracle,
989 &validators,
990 Action::Link(link),
991 Some(|_, i, j| ![i, j].contains(&0usize)),
992 )
993 .await;
994
995 let (polynomial, shares) =
997 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
998
999 let relay = Arc::new(mocks::relay::Relay::new());
1001 let mut supervisors = Vec::new();
1002 let mut engine_handlers = Vec::new();
1003 for (idx_scheme, scheme) in schemes.iter().enumerate() {
1004 if idx_scheme == 0 {
1006 continue;
1007 }
1008
1009 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1011
1012 let validator = scheme.public_key();
1014 let mut participants = BTreeMap::new();
1015 participants.insert(
1016 0,
1017 (
1018 polynomial.clone(),
1019 validators.clone(),
1020 Some(shares[idx_scheme].clone()),
1021 ),
1022 );
1023 let supervisor_config = mocks::supervisor::Config {
1024 namespace: namespace.clone(),
1025 participants,
1026 };
1027 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1028 supervisors.push(supervisor.clone());
1029 let application_cfg = mocks::application::Config {
1030 hasher: Sha256::default(),
1031 relay: relay.clone(),
1032 participant: validator.clone(),
1033 propose_latency: (10.0, 5.0),
1034 verify_latency: (10.0, 5.0),
1035 };
1036 let (actor, application) = mocks::application::Application::new(
1037 context.with_label("application"),
1038 application_cfg,
1039 );
1040 actor.start();
1041 let blocker = oracle.control(scheme.public_key());
1042 let cfg = config::Config {
1043 crypto: scheme.clone(),
1044 blocker,
1045 automaton: application.clone(),
1046 relay: application.clone(),
1047 reporter: supervisor.clone(),
1048 supervisor,
1049 partition: validator.to_string(),
1050 mailbox_size: 1024,
1051 namespace: namespace.clone(),
1052 leader_timeout: Duration::from_secs(1),
1053 notarization_timeout: Duration::from_secs(2),
1054 nullify_retry: Duration::from_secs(10),
1055 fetch_timeout: Duration::from_secs(1),
1056 activity_timeout,
1057 skip_timeout,
1058 max_fetch_count: 1, fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1060 fetch_concurrent: 1,
1061 replay_buffer: NZUsize!(1024 * 1024),
1062 write_buffer: NZUsize!(1024 * 1024),
1063 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1064 };
1065 let engine = Engine::new(context.with_label("engine"), cfg);
1066
1067 let (pending, recovered, resolver) = registrations
1069 .remove(&validator)
1070 .expect("validator should be registered");
1071 engine_handlers.push(engine.start(pending, recovered, resolver));
1072 }
1073
1074 let mut finalizers = Vec::new();
1076 for supervisor in supervisors.iter_mut() {
1077 let (mut latest, mut monitor) = supervisor.subscribe().await;
1078 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1079 while latest < required_containers {
1080 latest = monitor.next().await.expect("event missing");
1081 }
1082 }));
1083 }
1084 join_all(finalizers).await;
1085
1086 let link = Link {
1088 latency: Duration::from_secs(3),
1089 jitter: Duration::from_millis(0),
1090 success_rate: 1.0,
1091 };
1092 link_validators(
1093 &mut oracle,
1094 &validators,
1095 Action::Update(link.clone()),
1096 Some(|_, i, j| ![i, j].contains(&0usize)),
1097 )
1098 .await;
1099
1100 context.sleep(Duration::from_secs(120)).await;
1102
1103 link_validators(
1105 &mut oracle,
1106 &validators,
1107 Action::Unlink,
1108 Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
1109 )
1110 .await;
1111
1112 let scheme = schemes[0].clone();
1114 let validator = scheme.public_key();
1115 let context = context.with_label(&format!("validator-{validator}"));
1116
1117 link_validators(
1119 &mut oracle,
1120 &validators,
1121 Action::Link(link),
1122 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
1123 )
1124 .await;
1125
1126 let link = Link {
1128 latency: Duration::from_millis(10),
1129 jitter: Duration::from_millis(3),
1130 success_rate: 1.0,
1131 };
1132 link_validators(
1133 &mut oracle,
1134 &validators,
1135 Action::Update(link),
1136 Some(|_, i, j| ![i, j].contains(&1usize)),
1137 )
1138 .await;
1139
1140 let mut participants = BTreeMap::new();
1142 participants.insert(
1143 0,
1144 (
1145 polynomial.clone(),
1146 validators.clone(),
1147 Some(shares[0].clone()),
1148 ),
1149 );
1150 let supervisor_config = mocks::supervisor::Config::<_, V> {
1151 namespace: namespace.clone(),
1152 participants,
1153 };
1154 let mut supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1155 supervisors.push(supervisor.clone());
1156 let application_cfg = mocks::application::Config {
1157 hasher: Sha256::default(),
1158 relay: relay.clone(),
1159 participant: validator.clone(),
1160 propose_latency: (10.0, 5.0),
1161 verify_latency: (10.0, 5.0),
1162 };
1163 let (actor, application) = mocks::application::Application::new(
1164 context.with_label("application"),
1165 application_cfg,
1166 );
1167 actor.start();
1168 let blocker = oracle.control(scheme.public_key());
1169 let cfg = config::Config {
1170 crypto: scheme,
1171 blocker,
1172 automaton: application.clone(),
1173 relay: application.clone(),
1174 reporter: supervisor.clone(),
1175 supervisor: supervisor.clone(),
1176 partition: validator.to_string(),
1177 mailbox_size: 1024,
1178 namespace: namespace.clone(),
1179 leader_timeout: Duration::from_secs(1),
1180 notarization_timeout: Duration::from_secs(2),
1181 nullify_retry: Duration::from_secs(10),
1182 fetch_timeout: Duration::from_secs(1),
1183 activity_timeout,
1184 skip_timeout,
1185 max_fetch_count: 1,
1186 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1187 fetch_concurrent: 1,
1188 replay_buffer: NZUsize!(1024 * 1024),
1189 write_buffer: NZUsize!(1024 * 1024),
1190 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1191 };
1192 let engine = Engine::new(context.with_label("engine"), cfg);
1193
1194 let (pending, recovered, resolver) = registrations
1196 .remove(&validator)
1197 .expect("validator should be registered");
1198 engine_handlers.push(engine.start(pending, recovered, resolver));
1199
1200 let (mut latest, mut monitor) = supervisor.subscribe().await;
1202 while latest < required_containers {
1203 latest = monitor.next().await.expect("event missing");
1204 }
1205
1206 let blocked = oracle.blocked().await.unwrap();
1208 assert!(blocked.is_empty());
1209 });
1210 }
1211
1212 #[test_traced]
1213 fn test_backfill() {
1214 backfill::<MinPk>();
1215 backfill::<MinSig>();
1216 }
1217
1218 fn one_offline<V: Variant>() {
1219 let n = 5;
1221 let threshold = quorum(n);
1222 let required_containers = 100;
1223 let activity_timeout = 10;
1224 let skip_timeout = 5;
1225 let max_exceptions = 10;
1226 let namespace = b"consensus".to_vec();
1227 let executor = deterministic::Runner::timed(Duration::from_secs(30));
1228 executor.start(|mut context| async move {
1229 let (network, mut oracle) = Network::new(
1231 context.with_label("network"),
1232 Config {
1233 max_size: 1024 * 1024,
1234 },
1235 );
1236
1237 network.start();
1239
1240 let mut schemes = Vec::new();
1242 let mut validators = Vec::new();
1243 for i in 0..n {
1244 let scheme = PrivateKey::from_seed(i as u64);
1245 let pk = scheme.public_key();
1246 schemes.push(scheme);
1247 validators.push(pk);
1248 }
1249 validators.sort();
1250 schemes.sort_by_key(|s| s.public_key());
1251 let mut registrations = register_validators(&mut oracle, &validators).await;
1252
1253 let link = Link {
1255 latency: Duration::from_millis(10),
1256 jitter: Duration::from_millis(1),
1257 success_rate: 1.0,
1258 };
1259 link_validators(
1260 &mut oracle,
1261 &validators,
1262 Action::Link(link),
1263 Some(|_, i, j| ![i, j].contains(&0usize)),
1264 )
1265 .await;
1266
1267 let (polynomial, shares) =
1269 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1270
1271 let relay = Arc::new(mocks::relay::Relay::new());
1273 let mut supervisors = Vec::new();
1274 let mut engine_handlers = Vec::new();
1275 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1276 if idx_scheme == 0 {
1278 continue;
1279 }
1280
1281 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1283
1284 let validator = scheme.public_key();
1286 let mut participants = BTreeMap::new();
1287 participants.insert(
1288 0,
1289 (
1290 polynomial.clone(),
1291 validators.clone(),
1292 Some(shares[idx_scheme].clone()),
1293 ),
1294 );
1295 let supervisor_config = mocks::supervisor::Config::<_, V> {
1296 namespace: namespace.clone(),
1297 participants,
1298 };
1299 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1300 supervisors.push(supervisor.clone());
1301 let application_cfg = mocks::application::Config {
1302 hasher: Sha256::default(),
1303 relay: relay.clone(),
1304 participant: validator.clone(),
1305 propose_latency: (10.0, 5.0),
1306 verify_latency: (10.0, 5.0),
1307 };
1308 let (actor, application) = mocks::application::Application::new(
1309 context.with_label("application"),
1310 application_cfg,
1311 );
1312 actor.start();
1313 let blocker = oracle.control(scheme.public_key());
1314 let cfg = config::Config {
1315 crypto: scheme,
1316 blocker,
1317 automaton: application.clone(),
1318 relay: application.clone(),
1319 reporter: supervisor.clone(),
1320 supervisor,
1321 partition: validator.to_string(),
1322 mailbox_size: 1024,
1323 namespace: namespace.clone(),
1324 leader_timeout: Duration::from_secs(1),
1325 notarization_timeout: Duration::from_secs(2),
1326 nullify_retry: Duration::from_secs(10),
1327 fetch_timeout: Duration::from_secs(1),
1328 activity_timeout,
1329 skip_timeout,
1330 max_fetch_count: 1,
1331 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1332 fetch_concurrent: 1,
1333 replay_buffer: NZUsize!(1024 * 1024),
1334 write_buffer: NZUsize!(1024 * 1024),
1335 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1336 };
1337 let engine = Engine::new(context.with_label("engine"), cfg);
1338
1339 let (pending, recovered, resolver) = registrations
1341 .remove(&validator)
1342 .expect("validator should be registered");
1343 engine_handlers.push(engine.start(pending, recovered, resolver));
1344 }
1345
1346 let mut finalizers = Vec::new();
1348 for supervisor in supervisors.iter_mut() {
1349 let (mut latest, mut monitor) = supervisor.subscribe().await;
1350 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1351 while latest < required_containers {
1352 latest = monitor.next().await.expect("event missing");
1353 }
1354 }));
1355 }
1356 join_all(finalizers).await;
1357
1358 let exceptions = 0;
1360 let offline = &validators[0];
1361 for supervisor in supervisors.iter() {
1362 {
1364 let faults = supervisor.faults.lock().unwrap();
1365 assert!(faults.is_empty());
1366 }
1367
1368 {
1370 let invalid = supervisor.invalid.lock().unwrap();
1371 assert_eq!(*invalid, 0);
1372 }
1373
1374 let mut exceptions = 0;
1376 {
1377 let notarizes = supervisor.notarizes.lock().unwrap();
1378 for (view, payloads) in notarizes.iter() {
1379 for (_, participants) in payloads.iter() {
1380 if participants.contains(offline) {
1381 panic!("view: {view}");
1382 }
1383 }
1384 }
1385 }
1386 {
1387 let nullifies = supervisor.nullifies.lock().unwrap();
1388 for (view, participants) in nullifies.iter() {
1389 if participants.contains(offline) {
1390 panic!("view: {view}");
1391 }
1392 }
1393 }
1394 {
1395 let finalizes = supervisor.finalizes.lock().unwrap();
1396 for (view, payloads) in finalizes.iter() {
1397 for (_, finalizers) in payloads.iter() {
1398 if finalizers.contains(offline) {
1399 panic!("view: {view}");
1400 }
1401 }
1402 }
1403 }
1404
1405 let mut offline_views = Vec::new();
1407 {
1408 let leaders = supervisor.leaders.lock().unwrap();
1409 for (view, leader) in leaders.iter() {
1410 if leader == offline {
1411 offline_views.push(*view);
1412 }
1413 }
1414 }
1415 assert!(!offline_views.is_empty());
1416
1417 {
1419 let nullifies = supervisor.nullifies.lock().unwrap();
1420 for view in offline_views.iter() {
1421 let nullifies = nullifies.get(view).map_or(0, |n| n.len());
1422 if nullifies < threshold as usize {
1423 warn!("missing expected view nullifies: {}", view);
1424 exceptions += 1;
1425 }
1426 }
1427 }
1428 {
1429 let nullifications = supervisor.nullifications.lock().unwrap();
1430 for view in offline_views.iter() {
1431 if !nullifications.contains_key(view) {
1432 warn!("missing expected view nullifies: {}", view);
1433 exceptions += 1;
1434 }
1435 }
1436 }
1437
1438 assert!(exceptions <= max_exceptions);
1440 }
1441 assert!(exceptions <= max_exceptions);
1442
1443 let blocked = oracle.blocked().await.unwrap();
1445 assert!(blocked.is_empty());
1446
1447 let encoded = context.encode();
1449 let lines = encoded.lines();
1450 let mut skipped_views = 0;
1451 let mut nodes_skipping = 0;
1452 for line in lines {
1453 if line.contains("_engine_voter_skipped_views_total") {
1454 let parts: Vec<&str> = line.split_whitespace().collect();
1455 if let Some(number_str) = parts.last() {
1456 if let Ok(number) = number_str.parse::<u64>() {
1457 if number > 0 {
1458 nodes_skipping += 1;
1459 }
1460 if number > skipped_views {
1461 skipped_views = number;
1462 }
1463 }
1464 }
1465 }
1466 }
1467 assert!(
1468 skipped_views > 0,
1469 "expected skipped views to be greater than 0"
1470 );
1471 assert_eq!(
1472 nodes_skipping,
1473 n - 1,
1474 "expected all online nodes to be skipping views"
1475 );
1476 });
1477 }
1478
1479 #[test_traced]
1480 fn test_one_offline() {
1481 one_offline::<MinPk>();
1482 one_offline::<MinSig>();
1483 }
1484
1485 fn slow_validator<V: Variant>() {
1486 let n = 5;
1488 let threshold = quorum(n);
1489 let required_containers = 50;
1490 let activity_timeout = 10;
1491 let skip_timeout = 5;
1492 let namespace = b"consensus".to_vec();
1493 let executor = deterministic::Runner::timed(Duration::from_secs(30));
1494 executor.start(|mut context| async move {
1495 let (network, mut oracle) = Network::new(
1497 context.with_label("network"),
1498 Config {
1499 max_size: 1024 * 1024,
1500 },
1501 );
1502
1503 network.start();
1505
1506 let mut schemes = Vec::new();
1508 let mut validators = Vec::new();
1509 for i in 0..n {
1510 let scheme = PrivateKey::from_seed(i as u64);
1511 let pk = scheme.public_key();
1512 schemes.push(scheme);
1513 validators.push(pk);
1514 }
1515 validators.sort();
1516 schemes.sort_by_key(|s| s.public_key());
1517 let mut registrations = register_validators(&mut oracle, &validators).await;
1518
1519 let link = Link {
1521 latency: Duration::from_millis(10),
1522 jitter: Duration::from_millis(1),
1523 success_rate: 1.0,
1524 };
1525 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1526
1527 let (polynomial, shares) =
1529 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1530
1531 let relay = Arc::new(mocks::relay::Relay::new());
1533 let mut supervisors = Vec::new();
1534 let mut engine_handlers = Vec::new();
1535 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1536 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1538
1539 let validator = scheme.public_key();
1541 let mut participants = BTreeMap::new();
1542 participants.insert(
1543 0,
1544 (
1545 polynomial.clone(),
1546 validators.clone(),
1547 Some(shares[idx_scheme].clone()),
1548 ),
1549 );
1550 let supervisor_config = mocks::supervisor::Config::<_, V> {
1551 namespace: namespace.clone(),
1552 participants,
1553 };
1554 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1555 supervisors.push(supervisor.clone());
1556 let application_cfg = if idx_scheme == 0 {
1557 mocks::application::Config {
1558 hasher: Sha256::default(),
1559 relay: relay.clone(),
1560 participant: validator.clone(),
1561 propose_latency: (10_000.0, 0.0),
1562 verify_latency: (10_000.0, 5.0),
1563 }
1564 } else {
1565 mocks::application::Config {
1566 hasher: Sha256::default(),
1567 relay: relay.clone(),
1568 participant: validator.clone(),
1569 propose_latency: (10.0, 5.0),
1570 verify_latency: (10.0, 5.0),
1571 }
1572 };
1573 let (actor, application) = mocks::application::Application::new(
1574 context.with_label("application"),
1575 application_cfg,
1576 );
1577 actor.start();
1578 let blocker = oracle.control(scheme.public_key());
1579 let cfg = config::Config {
1580 crypto: scheme,
1581 blocker,
1582 automaton: application.clone(),
1583 relay: application.clone(),
1584 reporter: supervisor.clone(),
1585 supervisor,
1586 partition: validator.to_string(),
1587 mailbox_size: 1024,
1588 namespace: namespace.clone(),
1589 leader_timeout: Duration::from_secs(1),
1590 notarization_timeout: Duration::from_secs(2),
1591 nullify_retry: Duration::from_secs(10),
1592 fetch_timeout: Duration::from_secs(1),
1593 activity_timeout,
1594 skip_timeout,
1595 max_fetch_count: 1,
1596 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1597 fetch_concurrent: 1,
1598 replay_buffer: NZUsize!(1024 * 1024),
1599 write_buffer: NZUsize!(1024 * 1024),
1600 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1601 };
1602 let engine = Engine::new(context.with_label("engine"), cfg);
1603
1604 let (pending, recovered, resolver) = registrations
1606 .remove(&validator)
1607 .expect("validator should be registered");
1608 engine_handlers.push(engine.start(pending, recovered, resolver));
1609 }
1610
1611 let mut finalizers = Vec::new();
1613 for supervisor in supervisors.iter_mut() {
1614 let (mut latest, mut monitor) = supervisor.subscribe().await;
1615 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1616 while latest < required_containers {
1617 latest = monitor.next().await.expect("event missing");
1618 }
1619 }));
1620 }
1621 join_all(finalizers).await;
1622
1623 let slow = &validators[0];
1625 for supervisor in supervisors.iter() {
1626 {
1628 let faults = supervisor.faults.lock().unwrap();
1629 assert!(faults.is_empty());
1630 }
1631
1632 {
1634 let invalid = supervisor.invalid.lock().unwrap();
1635 assert_eq!(*invalid, 0);
1636 }
1637
1638 {
1640 let notarizes = supervisor.notarizes.lock().unwrap();
1641 for (view, payloads) in notarizes.iter() {
1642 for (_, participants) in payloads.iter() {
1643 if participants.contains(slow) {
1644 panic!("view: {view}");
1645 }
1646 }
1647 }
1648 }
1649 {
1650 let finalizes = supervisor.finalizes.lock().unwrap();
1651 for (view, payloads) in finalizes.iter() {
1652 for (_, finalizers) in payloads.iter() {
1653 if finalizers.contains(slow) {
1654 panic!("view: {view}");
1655 }
1656 }
1657 }
1658 }
1659 }
1660
1661 let blocked = oracle.blocked().await.unwrap();
1663 assert!(blocked.is_empty());
1664 });
1665 }
1666
1667 #[test_traced]
1668 fn test_slow_validator() {
1669 slow_validator::<MinPk>();
1670 slow_validator::<MinSig>();
1671 }
1672
1673 fn all_recovery<V: Variant>() {
1674 let n = 5;
1676 let threshold = quorum(n);
1677 let required_containers = 100;
1678 let activity_timeout = 10;
1679 let skip_timeout = 2;
1680 let namespace = b"consensus".to_vec();
1681 let executor = deterministic::Runner::timed(Duration::from_secs(180));
1682 executor.start(|mut context| async move {
1683 let (network, mut oracle) = Network::new(
1685 context.with_label("network"),
1686 Config {
1687 max_size: 1024 * 1024,
1688 },
1689 );
1690
1691 network.start();
1693
1694 let mut schemes = Vec::new();
1696 let mut validators = Vec::new();
1697 for i in 0..n {
1698 let scheme = PrivateKey::from_seed(i as u64);
1699 let pk = scheme.public_key();
1700 schemes.push(scheme);
1701 validators.push(pk);
1702 }
1703 validators.sort();
1704 schemes.sort_by_key(|s| s.public_key());
1705 let mut registrations = register_validators(&mut oracle, &validators).await;
1706
1707 let link = Link {
1709 latency: Duration::from_secs(3),
1710 jitter: Duration::from_millis(0),
1711 success_rate: 1.0,
1712 };
1713 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1714
1715 let (polynomial, shares) =
1717 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1718
1719 let relay = Arc::new(mocks::relay::Relay::new());
1721 let mut supervisors = Vec::new();
1722 let mut engine_handlers = Vec::new();
1723 for (idx, scheme) in schemes.iter().enumerate() {
1724 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1726
1727 let validator = scheme.public_key();
1729 let mut participants = BTreeMap::new();
1730 participants.insert(
1731 0,
1732 (
1733 polynomial.clone(),
1734 validators.clone(),
1735 Some(shares[idx].clone()),
1736 ),
1737 );
1738 let supervisor_config = mocks::supervisor::Config::<_, V> {
1739 namespace: namespace.clone(),
1740 participants,
1741 };
1742 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1743 supervisors.push(supervisor.clone());
1744 let application_cfg = mocks::application::Config {
1745 hasher: Sha256::default(),
1746 relay: relay.clone(),
1747 participant: validator.clone(),
1748 propose_latency: (10.0, 5.0),
1749 verify_latency: (10.0, 5.0),
1750 };
1751 let (actor, application) = mocks::application::Application::new(
1752 context.with_label("application"),
1753 application_cfg,
1754 );
1755 actor.start();
1756 let blocker = oracle.control(scheme.public_key());
1757 let cfg = config::Config {
1758 crypto: scheme.clone(),
1759 blocker,
1760 automaton: application.clone(),
1761 relay: application.clone(),
1762 reporter: supervisor.clone(),
1763 supervisor,
1764 partition: validator.to_string(),
1765 mailbox_size: 1024,
1766 namespace: namespace.clone(),
1767 leader_timeout: Duration::from_secs(1),
1768 notarization_timeout: Duration::from_secs(2),
1769 nullify_retry: Duration::from_secs(10),
1770 fetch_timeout: Duration::from_secs(1),
1771 activity_timeout,
1772 skip_timeout,
1773 max_fetch_count: 1,
1774 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1775 fetch_concurrent: 1,
1776 replay_buffer: NZUsize!(1024 * 1024),
1777 write_buffer: NZUsize!(1024 * 1024),
1778 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1779 };
1780 let engine = Engine::new(context.with_label("engine"), cfg);
1781
1782 let (pending, recovered, resolver) = registrations
1784 .remove(&validator)
1785 .expect("validator should be registered");
1786 engine_handlers.push(engine.start(pending, recovered, resolver));
1787 }
1788
1789 let mut finalizers = Vec::new();
1791 for supervisor in supervisors.iter_mut() {
1792 let (_, mut monitor) = supervisor.subscribe().await;
1793 finalizers.push(
1794 context
1795 .with_label("finalizer")
1796 .spawn(move |context| async move {
1797 select! {
1798 _timeout = context.sleep(Duration::from_secs(60)) => {},
1799 _done = monitor.next() => {
1800 panic!("engine should not notarize or finalize anything");
1801 }
1802 }
1803 }),
1804 );
1805 }
1806 join_all(finalizers).await;
1807
1808 link_validators(&mut oracle, &validators, Action::Unlink, None).await;
1810
1811 context.sleep(Duration::from_secs(60)).await;
1813
1814 let mut latest = 0;
1816 for supervisor in supervisors.iter() {
1817 let nullifies = supervisor.nullifies.lock().unwrap();
1818 let max = nullifies.keys().max().unwrap();
1819 if *max > latest {
1820 latest = *max;
1821 }
1822 }
1823
1824 let link = Link {
1826 latency: Duration::from_millis(10),
1827 jitter: Duration::from_millis(1),
1828 success_rate: 1.0,
1829 };
1830 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1831
1832 let mut finalizers = Vec::new();
1834 for supervisor in supervisors.iter_mut() {
1835 let (mut latest, mut monitor) = supervisor.subscribe().await;
1836 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1837 while latest < required_containers {
1838 latest = monitor.next().await.expect("event missing");
1839 }
1840 }));
1841 }
1842 join_all(finalizers).await;
1843
1844 for supervisor in supervisors.iter() {
1846 {
1848 let faults = supervisor.faults.lock().unwrap();
1849 assert!(faults.is_empty());
1850 }
1851
1852 {
1854 let invalid = supervisor.invalid.lock().unwrap();
1855 assert_eq!(*invalid, 0);
1856 }
1857
1858 {
1863 let mut found = 0;
1865 let finalizations = supervisor.finalizations.lock().unwrap();
1866 for i in latest..latest + activity_timeout {
1867 if finalizations.contains_key(&i) {
1868 found += 1;
1869 }
1870 }
1871 assert!(found >= activity_timeout - 2, "found: {found}");
1872 }
1873 }
1874
1875 let blocked = oracle.blocked().await.unwrap();
1877 assert!(blocked.is_empty());
1878 });
1879 }
1880
1881 #[test_traced]
1882 fn test_all_recovery() {
1883 all_recovery::<MinPk>();
1884 all_recovery::<MinSig>();
1885 }
1886
1887 fn partition<V: Variant>() {
1888 let n = 10;
1890 let threshold = quorum(n);
1891 let required_containers = 50;
1892 let activity_timeout = 10;
1893 let skip_timeout = 5;
1894 let namespace = b"consensus".to_vec();
1895 let executor = deterministic::Runner::timed(Duration::from_secs(900));
1896 executor.start(|mut context| async move {
1897 let (network, mut oracle) = Network::new(
1899 context.with_label("network"),
1900 Config {
1901 max_size: 1024 * 1024,
1902 },
1903 );
1904
1905 network.start();
1907
1908 let mut schemes = Vec::new();
1910 let mut validators = Vec::new();
1911 for i in 0..n {
1912 let scheme = PrivateKey::from_seed(i as u64);
1913 let pk = scheme.public_key();
1914 schemes.push(scheme);
1915 validators.push(pk);
1916 }
1917 validators.sort();
1918 schemes.sort_by_key(|s| s.public_key());
1919 let mut registrations = register_validators(&mut oracle, &validators).await;
1920
1921 let link = Link {
1923 latency: Duration::from_millis(10),
1924 jitter: Duration::from_millis(1),
1925 success_rate: 1.0,
1926 };
1927 link_validators(&mut oracle, &validators, Action::Link(link.clone()), None).await;
1928
1929 let (polynomial, shares) =
1931 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
1932
1933 let relay = Arc::new(mocks::relay::Relay::new());
1935 let mut supervisors = Vec::new();
1936 let mut engine_handlers = Vec::new();
1937 for (idx, scheme) in schemes.iter().enumerate() {
1938 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1940
1941 let validator = scheme.public_key();
1943 let mut participants = BTreeMap::new();
1944 participants.insert(
1945 0,
1946 (
1947 polynomial.clone(),
1948 validators.clone(),
1949 Some(shares[idx].clone()),
1950 ),
1951 );
1952 let supervisor_config = mocks::supervisor::Config::<_, V> {
1953 namespace: namespace.clone(),
1954 participants,
1955 };
1956 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1957 supervisors.push(supervisor.clone());
1958 let application_cfg = mocks::application::Config {
1959 hasher: Sha256::default(),
1960 relay: relay.clone(),
1961 participant: validator.clone(),
1962 propose_latency: (10.0, 5.0),
1963 verify_latency: (10.0, 5.0),
1964 };
1965 let (actor, application) = mocks::application::Application::new(
1966 context.with_label("application"),
1967 application_cfg,
1968 );
1969 actor.start();
1970 let blocker = oracle.control(scheme.public_key());
1971 let cfg = config::Config {
1972 crypto: scheme.clone(),
1973 blocker,
1974 automaton: application.clone(),
1975 relay: application.clone(),
1976 reporter: supervisor.clone(),
1977 supervisor,
1978 partition: validator.to_string(),
1979 mailbox_size: 1024,
1980 namespace: namespace.clone(),
1981 leader_timeout: Duration::from_secs(1),
1982 notarization_timeout: Duration::from_secs(2),
1983 nullify_retry: Duration::from_secs(10),
1984 fetch_timeout: Duration::from_secs(1),
1985 activity_timeout,
1986 skip_timeout,
1987 max_fetch_count: 1,
1988 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1989 fetch_concurrent: 1,
1990 replay_buffer: NZUsize!(1024 * 1024),
1991 write_buffer: NZUsize!(1024 * 1024),
1992 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1993 };
1994 let engine = Engine::new(context.with_label("engine"), cfg);
1995
1996 let (pending, recovered, resolver) = registrations
1998 .remove(&validator)
1999 .expect("validator should be registered");
2000 engine_handlers.push(engine.start(pending, recovered, resolver));
2001 }
2002
2003 let mut finalizers = Vec::new();
2005 for supervisor in supervisors.iter_mut() {
2006 let (mut latest, mut monitor) = supervisor.subscribe().await;
2007 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2008 while latest < required_containers {
2009 latest = monitor.next().await.expect("event missing");
2010 }
2011 }));
2012 }
2013 join_all(finalizers).await;
2014
2015 fn separated(n: usize, a: usize, b: usize) -> bool {
2017 let m = n / 2;
2018 (a < m && b >= m) || (a >= m && b < m)
2019 }
2020 link_validators(&mut oracle, &validators, Action::Unlink, Some(separated)).await;
2021
2022 context.sleep(Duration::from_secs(10)).await;
2024
2025 let mut finalizers = Vec::new();
2027 for supervisor in supervisors.iter_mut() {
2028 let (_, mut monitor) = supervisor.subscribe().await;
2029 finalizers.push(
2030 context
2031 .with_label("finalizer")
2032 .spawn(move |context| async move {
2033 select! {
2034 _timeout = context.sleep(Duration::from_secs(60)) => {},
2035 _done = monitor.next() => {
2036 panic!("engine should not notarize or finalize anything");
2037 }
2038 }
2039 }),
2040 );
2041 }
2042 join_all(finalizers).await;
2043
2044 link_validators(
2046 &mut oracle,
2047 &validators,
2048 Action::Link(link),
2049 Some(separated),
2050 )
2051 .await;
2052
2053 let mut finalizers = Vec::new();
2055 for supervisor in supervisors.iter_mut() {
2056 let (mut latest, mut monitor) = supervisor.subscribe().await;
2057 let required = latest + required_containers;
2058 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2059 while latest < required {
2060 latest = monitor.next().await.expect("event missing");
2061 }
2062 }));
2063 }
2064 join_all(finalizers).await;
2065
2066 for supervisor in supervisors.iter() {
2068 {
2070 let faults = supervisor.faults.lock().unwrap();
2071 assert!(faults.is_empty());
2072 }
2073
2074 {
2076 let invalid = supervisor.invalid.lock().unwrap();
2077 assert_eq!(*invalid, 0);
2078 }
2079 }
2080
2081 let blocked = oracle.blocked().await.unwrap();
2083 assert!(blocked.is_empty());
2084 });
2085 }
2086
2087 #[test_traced]
2088 #[ignore]
2089 fn test_partition() {
2090 partition::<MinPk>();
2091 partition::<MinSig>();
2092 }
2093
2094 fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
2095 let n = 5;
2097 let threshold = quorum(n);
2098 let required_containers = 50;
2099 let activity_timeout = 10;
2100 let skip_timeout = 5;
2101 let namespace = b"consensus".to_vec();
2102 let cfg = deterministic::Config::new()
2103 .with_seed(seed)
2104 .with_timeout(Some(Duration::from_secs(5_000)));
2105 let executor = deterministic::Runner::new(cfg);
2106 executor.start(|mut context| async move {
2107 let (network, mut oracle) = Network::new(
2109 context.with_label("network"),
2110 Config {
2111 max_size: 1024 * 1024,
2112 },
2113 );
2114
2115 network.start();
2117
2118 let mut schemes = Vec::new();
2120 let mut validators = Vec::new();
2121 for i in 0..n {
2122 let scheme = PrivateKey::from_seed(i as u64);
2123 let pk = scheme.public_key();
2124 schemes.push(scheme);
2125 validators.push(pk);
2126 }
2127 validators.sort();
2128 schemes.sort_by_key(|s| s.public_key());
2129 let mut registrations = register_validators(&mut oracle, &validators).await;
2130
2131 let degraded_link = Link {
2133 latency: Duration::from_millis(200),
2134 jitter: Duration::from_millis(150),
2135 success_rate: 0.5,
2136 };
2137 link_validators(&mut oracle, &validators, Action::Link(degraded_link), None).await;
2138
2139 let (polynomial, shares) =
2141 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2142
2143 let relay = Arc::new(mocks::relay::Relay::new());
2145 let mut supervisors = Vec::new();
2146 let mut engine_handlers = Vec::new();
2147 for (idx, scheme) in schemes.into_iter().enumerate() {
2148 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2150
2151 let validator = scheme.public_key();
2153 let mut participants = BTreeMap::new();
2154 participants.insert(
2155 0,
2156 (
2157 polynomial.clone(),
2158 validators.clone(),
2159 Some(shares[idx].clone()),
2160 ),
2161 );
2162 let supervisor_config = mocks::supervisor::Config::<_, V> {
2163 namespace: namespace.clone(),
2164 participants,
2165 };
2166 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2167 supervisors.push(supervisor.clone());
2168 let application_cfg = mocks::application::Config {
2169 hasher: Sha256::default(),
2170 relay: relay.clone(),
2171 participant: validator.clone(),
2172 propose_latency: (10.0, 5.0),
2173 verify_latency: (10.0, 5.0),
2174 };
2175 let (actor, application) = mocks::application::Application::new(
2176 context.with_label("application"),
2177 application_cfg,
2178 );
2179 actor.start();
2180 let blocker = oracle.control(scheme.public_key());
2181 let cfg = config::Config {
2182 crypto: scheme,
2183 blocker,
2184 automaton: application.clone(),
2185 relay: application.clone(),
2186 reporter: supervisor.clone(),
2187 supervisor,
2188 partition: validator.to_string(),
2189 mailbox_size: 1024,
2190 namespace: namespace.clone(),
2191 leader_timeout: Duration::from_secs(1),
2192 notarization_timeout: Duration::from_secs(2),
2193 nullify_retry: Duration::from_secs(10),
2194 fetch_timeout: Duration::from_secs(1),
2195 activity_timeout,
2196 skip_timeout,
2197 max_fetch_count: 1,
2198 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2199 fetch_concurrent: 1,
2200 replay_buffer: NZUsize!(1024 * 1024),
2201 write_buffer: NZUsize!(1024 * 1024),
2202 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2203 };
2204 let engine = Engine::new(context.with_label("engine"), cfg);
2205
2206 let (pending, recovered, resolver) = registrations
2208 .remove(&validator)
2209 .expect("validator should be registered");
2210 engine_handlers.push(engine.start(pending, recovered, resolver));
2211 }
2212
2213 let mut finalizers = Vec::new();
2215 for supervisor in supervisors.iter_mut() {
2216 let (mut latest, mut monitor) = supervisor.subscribe().await;
2217 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2218 while latest < required_containers {
2219 latest = monitor.next().await.expect("event missing");
2220 }
2221 }));
2222 }
2223 join_all(finalizers).await;
2224
2225 for supervisor in supervisors.iter() {
2227 {
2229 let faults = supervisor.faults.lock().unwrap();
2230 assert!(faults.is_empty());
2231 }
2232
2233 {
2235 let invalid = supervisor.invalid.lock().unwrap();
2236 assert_eq!(*invalid, 0);
2237 }
2238 }
2239
2240 let blocked = oracle.blocked().await.unwrap();
2242 assert!(blocked.is_empty());
2243
2244 context.auditor().state()
2245 })
2246 }
2247
2248 #[test_traced]
2249 fn test_slow_and_lossy_links() {
2250 slow_and_lossy_links::<MinPk>(0);
2251 slow_and_lossy_links::<MinSig>(0);
2252 }
2253
2254 #[test_traced]
2255 #[ignore]
2256 fn test_determinism() {
2257 for seed in 1..6 {
2260 let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
2261 let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
2262 assert_eq!(pk_state_1, pk_state_2);
2263
2264 let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
2265 let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
2266 assert_eq!(sig_state_1, sig_state_2);
2267
2268 assert_ne!(pk_state_1, sig_state_1);
2270 }
2271 }
2272
2273 fn conflicter<V: Variant>(seed: u64) {
2274 let n = 4;
2276 let threshold = quorum(n);
2277 let required_containers = 50;
2278 let activity_timeout = 10;
2279 let skip_timeout = 5;
2280 let namespace = b"consensus".to_vec();
2281 let cfg = deterministic::Config::new()
2282 .with_seed(seed)
2283 .with_timeout(Some(Duration::from_secs(30)));
2284 let executor = deterministic::Runner::new(cfg);
2285 executor.start(|mut context| async move {
2286 let (network, mut oracle) = Network::new(
2288 context.with_label("network"),
2289 Config {
2290 max_size: 1024 * 1024,
2291 },
2292 );
2293
2294 network.start();
2296
2297 let mut schemes = Vec::new();
2299 let mut validators = Vec::new();
2300 for i in 0..n {
2301 let scheme = PrivateKey::from_seed(i as u64);
2302 let pk = scheme.public_key();
2303 schemes.push(scheme);
2304 validators.push(pk);
2305 }
2306 validators.sort();
2307 schemes.sort_by_key(|s| s.public_key());
2308 let mut registrations = register_validators(&mut oracle, &validators).await;
2309
2310 let link = Link {
2312 latency: Duration::from_millis(10),
2313 jitter: Duration::from_millis(1),
2314 success_rate: 1.0,
2315 };
2316 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2317
2318 let (polynomial, shares) =
2320 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2321
2322 let relay = Arc::new(mocks::relay::Relay::new());
2324 let mut supervisors = Vec::new();
2325 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2326 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2328
2329 let validator = scheme.public_key();
2331 let mut participants = BTreeMap::new();
2332 participants.insert(
2333 0,
2334 (
2335 polynomial.clone(),
2336 validators.clone(),
2337 Some(shares[idx_scheme].clone()),
2338 ),
2339 );
2340 let supervisor_config = mocks::supervisor::Config::<_, V> {
2341 namespace: namespace.clone(),
2342 participants,
2343 };
2344 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2345 let (pending, recovered, resolver) = registrations
2346 .remove(&validator)
2347 .expect("validator should be registered");
2348 if idx_scheme == 0 {
2349 let cfg = mocks::conflicter::Config {
2350 supervisor,
2351 namespace: namespace.clone(),
2352 };
2353
2354 let engine: mocks::conflicter::Conflicter<_, V, Sha256, _> =
2355 mocks::conflicter::Conflicter::new(
2356 context.with_label("byzantine_engine"),
2357 cfg,
2358 );
2359 engine.start(pending);
2360 } else {
2361 supervisors.push(supervisor.clone());
2362 let application_cfg = mocks::application::Config {
2363 hasher: Sha256::default(),
2364 relay: relay.clone(),
2365 participant: validator.clone(),
2366 propose_latency: (10.0, 5.0),
2367 verify_latency: (10.0, 5.0),
2368 };
2369 let (actor, application) = mocks::application::Application::new(
2370 context.with_label("application"),
2371 application_cfg,
2372 );
2373 actor.start();
2374 let blocker = oracle.control(scheme.public_key());
2375 let cfg = config::Config {
2376 crypto: scheme,
2377 blocker,
2378 automaton: application.clone(),
2379 relay: application.clone(),
2380 reporter: supervisor.clone(),
2381 supervisor,
2382 partition: validator.to_string(),
2383 mailbox_size: 1024,
2384 namespace: namespace.clone(),
2385 leader_timeout: Duration::from_secs(1),
2386 notarization_timeout: Duration::from_secs(2),
2387 nullify_retry: Duration::from_secs(10),
2388 fetch_timeout: Duration::from_secs(1),
2389 activity_timeout,
2390 skip_timeout,
2391 max_fetch_count: 1,
2392 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2393 fetch_concurrent: 1,
2394 replay_buffer: NZUsize!(1024 * 1024),
2395 write_buffer: NZUsize!(1024 * 1024),
2396 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2397 };
2398 let engine = Engine::new(context.with_label("engine"), cfg);
2399 engine.start(pending, recovered, resolver);
2400 }
2401 }
2402
2403 let mut finalizers = Vec::new();
2405 for supervisor in supervisors.iter_mut() {
2406 let (mut latest, mut monitor) = supervisor.subscribe().await;
2407 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2408 while latest < required_containers {
2409 latest = monitor.next().await.expect("event missing");
2410 }
2411 }));
2412 }
2413 join_all(finalizers).await;
2414
2415 let byz = &validators[0];
2417 let mut count_conflicting = 0;
2418 for supervisor in supervisors.iter() {
2419 {
2421 let faults = supervisor.faults.lock().unwrap();
2422 assert_eq!(faults.len(), 1);
2423 let faulter = faults.get(byz).expect("byzantine party is not faulter");
2424 for (_, faults) in faulter.iter() {
2425 for fault in faults.iter() {
2426 match fault {
2427 Activity::ConflictingNotarize(_) => {
2428 count_conflicting += 1;
2429 }
2430 Activity::ConflictingFinalize(_) => {
2431 count_conflicting += 1;
2432 }
2433 _ => panic!("unexpected fault: {fault:?}"),
2434 }
2435 }
2436 }
2437 }
2438
2439 {
2441 let invalid = supervisor.invalid.lock().unwrap();
2442 assert_eq!(*invalid, 0);
2443 }
2444 }
2445 assert!(count_conflicting > 0);
2446
2447 let blocked = oracle.blocked().await.unwrap();
2449 assert!(!blocked.is_empty());
2450 for (a, b) in blocked {
2451 assert_ne!(&a, byz);
2452 assert_eq!(&b, byz);
2453 }
2454 });
2455 }
2456
2457 #[test_traced]
2458 #[ignore]
2459 fn test_conflicter() {
2460 for seed in 0..5 {
2461 conflicter::<MinPk>(seed);
2462 conflicter::<MinSig>(seed);
2463 }
2464 }
2465
2466 fn invalid<V: Variant>(seed: u64) {
2467 let n = 4;
2469 let threshold = quorum(n);
2470 let required_containers = 50;
2471 let activity_timeout = 10;
2472 let skip_timeout = 5;
2473 let namespace = b"consensus".to_vec();
2474 let cfg = deterministic::Config::new()
2475 .with_seed(seed)
2476 .with_timeout(Some(Duration::from_secs(30)));
2477 let executor = deterministic::Runner::new(cfg);
2478 executor.start(|mut context| async move {
2479 let (network, mut oracle) = Network::new(
2481 context.with_label("network"),
2482 Config {
2483 max_size: 1024 * 1024,
2484 },
2485 );
2486
2487 network.start();
2489
2490 let mut schemes = Vec::new();
2492 let mut validators = Vec::new();
2493 for i in 0..n {
2494 let scheme = PrivateKey::from_seed(i as u64);
2495 let pk = scheme.public_key();
2496 schemes.push(scheme);
2497 validators.push(pk);
2498 }
2499 validators.sort();
2500 schemes.sort_by_key(|s| s.public_key());
2501 let mut registrations = register_validators(&mut oracle, &validators).await;
2502
2503 let link = Link {
2505 latency: Duration::from_millis(10),
2506 jitter: Duration::from_millis(1),
2507 success_rate: 1.0,
2508 };
2509 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2510
2511 let (polynomial, shares) =
2513 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2514
2515 let relay = Arc::new(mocks::relay::Relay::new());
2517 let mut supervisors = Vec::new();
2518 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2519 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2521
2522 let validator = scheme.public_key();
2524 let mut participants = BTreeMap::new();
2525 participants.insert(
2526 0,
2527 (
2528 polynomial.clone(),
2529 validators.clone(),
2530 Some(shares[idx_scheme].clone()),
2531 ),
2532 );
2533 let supervisor_config = mocks::supervisor::Config::<_, V> {
2534 namespace: namespace.clone(),
2535 participants,
2536 };
2537 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2538 let (pending, recovered, resolver) = registrations
2539 .remove(&validator)
2540 .expect("validator should be registered");
2541 if idx_scheme == 0 {
2542 let cfg = mocks::invalid::Config {
2543 supervisor,
2544 namespace: namespace.clone(),
2545 };
2546
2547 let engine: mocks::invalid::Invalid<_, V, Sha256, _> =
2548 mocks::invalid::Invalid::new(context.with_label("byzantine_engine"), cfg);
2549 engine.start(pending);
2550 } else {
2551 supervisors.push(supervisor.clone());
2552 let application_cfg = mocks::application::Config {
2553 hasher: Sha256::default(),
2554 relay: relay.clone(),
2555 participant: validator.clone(),
2556 propose_latency: (10.0, 5.0),
2557 verify_latency: (10.0, 5.0),
2558 };
2559 let (actor, application) = mocks::application::Application::new(
2560 context.with_label("application"),
2561 application_cfg,
2562 );
2563 actor.start();
2564 let blocker = oracle.control(scheme.public_key());
2565 let cfg = config::Config {
2566 crypto: scheme,
2567 blocker,
2568 automaton: application.clone(),
2569 relay: application.clone(),
2570 reporter: supervisor.clone(),
2571 supervisor,
2572 partition: validator.to_string(),
2573 mailbox_size: 1024,
2574 namespace: namespace.clone(),
2575 leader_timeout: Duration::from_secs(1),
2576 notarization_timeout: Duration::from_secs(2),
2577 nullify_retry: Duration::from_secs(10),
2578 fetch_timeout: Duration::from_secs(1),
2579 activity_timeout,
2580 skip_timeout,
2581 max_fetch_count: 1,
2582 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2583 fetch_concurrent: 1,
2584 replay_buffer: NZUsize!(1024 * 1024),
2585 write_buffer: NZUsize!(1024 * 1024),
2586 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2587 };
2588 let engine = Engine::new(context.with_label("engine"), cfg);
2589 engine.start(pending, recovered, resolver);
2590 }
2591 }
2592
2593 let mut finalizers = Vec::new();
2595 for supervisor in supervisors.iter_mut() {
2596 let (mut latest, mut monitor) = supervisor.subscribe().await;
2597 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2598 while latest < required_containers {
2599 latest = monitor.next().await.expect("event missing");
2600 }
2601 }));
2602 }
2603 join_all(finalizers).await;
2604
2605 let mut invalid_count = 0;
2607 let byz = &validators[0];
2608 for supervisor in supervisors.iter() {
2609 {
2611 let faults = supervisor.faults.lock().unwrap();
2612 assert!(faults.is_empty());
2613 }
2614
2615 {
2617 let invalid = supervisor.invalid.lock().unwrap();
2618 if *invalid > 0 {
2619 invalid_count += 1;
2620 }
2621 }
2622 }
2623 assert_eq!(invalid_count, n - 1);
2624
2625 let blocked = oracle.blocked().await.unwrap();
2627 assert!(!blocked.is_empty());
2628 for (a, b) in blocked {
2629 assert_ne!(&a, byz);
2630 assert_eq!(&b, byz);
2631 }
2632 });
2633 }
2634
2635 #[test_traced]
2636 #[ignore]
2637 fn test_invalid() {
2638 for seed in 0..5 {
2639 invalid::<MinPk>(seed);
2640 invalid::<MinSig>(seed);
2641 }
2642 }
2643
2644 fn impersonator<V: Variant>(seed: u64) {
2645 let n = 4;
2647 let threshold = quorum(n);
2648 let required_containers = 50;
2649 let activity_timeout = 10;
2650 let skip_timeout = 5;
2651 let namespace = b"consensus".to_vec();
2652 let cfg = deterministic::Config::new()
2653 .with_seed(seed)
2654 .with_timeout(Some(Duration::from_secs(30)));
2655 let executor = deterministic::Runner::new(cfg);
2656 executor.start(|mut context| async move {
2657 let (network, mut oracle) = Network::new(
2659 context.with_label("network"),
2660 Config {
2661 max_size: 1024 * 1024,
2662 },
2663 );
2664
2665 network.start();
2667
2668 let mut schemes = Vec::new();
2670 let mut validators = Vec::new();
2671 for i in 0..n {
2672 let scheme = PrivateKey::from_seed(i as u64);
2673 let pk = scheme.public_key();
2674 schemes.push(scheme);
2675 validators.push(pk);
2676 }
2677 validators.sort();
2678 schemes.sort_by_key(|s| s.public_key());
2679 let mut registrations = register_validators(&mut oracle, &validators).await;
2680
2681 let link = Link {
2683 latency: Duration::from_millis(10),
2684 jitter: Duration::from_millis(1),
2685 success_rate: 1.0,
2686 };
2687 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2688
2689 let (polynomial, shares) =
2691 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2692
2693 let relay = Arc::new(mocks::relay::Relay::new());
2695 let mut supervisors = Vec::new();
2696 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2697 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2699
2700 let validator = scheme.public_key();
2702 let mut participants = BTreeMap::new();
2703 participants.insert(
2704 0,
2705 (
2706 polynomial.clone(),
2707 validators.clone(),
2708 Some(shares[idx_scheme].clone()),
2709 ),
2710 );
2711 let supervisor_config = mocks::supervisor::Config::<_, V> {
2712 namespace: namespace.clone(),
2713 participants,
2714 };
2715 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2716 let (pending, recovered, resolver) = registrations
2717 .remove(&validator)
2718 .expect("validator should be registered");
2719 if idx_scheme == 0 {
2720 let cfg = mocks::impersonator::Config {
2721 supervisor,
2722 namespace: namespace.clone(),
2723 };
2724
2725 let engine: mocks::impersonator::Impersonator<_, V, Sha256, _> =
2726 mocks::impersonator::Impersonator::new(
2727 context.with_label("byzantine_engine"),
2728 cfg,
2729 );
2730 engine.start(pending);
2731 } else {
2732 supervisors.push(supervisor.clone());
2733 let application_cfg = mocks::application::Config {
2734 hasher: Sha256::default(),
2735 relay: relay.clone(),
2736 participant: validator.clone(),
2737 propose_latency: (10.0, 5.0),
2738 verify_latency: (10.0, 5.0),
2739 };
2740 let (actor, application) = mocks::application::Application::new(
2741 context.with_label("application"),
2742 application_cfg,
2743 );
2744 actor.start();
2745 let blocker = oracle.control(scheme.public_key());
2746 let cfg = config::Config {
2747 crypto: scheme,
2748 blocker,
2749 automaton: application.clone(),
2750 relay: application.clone(),
2751 reporter: supervisor.clone(),
2752 supervisor,
2753 partition: validator.to_string(),
2754 mailbox_size: 1024,
2755 namespace: namespace.clone(),
2756 leader_timeout: Duration::from_secs(1),
2757 notarization_timeout: Duration::from_secs(2),
2758 nullify_retry: Duration::from_secs(10),
2759 fetch_timeout: Duration::from_secs(1),
2760 activity_timeout,
2761 skip_timeout,
2762 max_fetch_count: 1,
2763 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2764 fetch_concurrent: 1,
2765 replay_buffer: NZUsize!(1024 * 1024),
2766 write_buffer: NZUsize!(1024 * 1024),
2767 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2768 };
2769 let engine = Engine::new(context.with_label("engine"), cfg);
2770 engine.start(pending, recovered, resolver);
2771 }
2772 }
2773
2774 let mut finalizers = Vec::new();
2776 for supervisor in supervisors.iter_mut() {
2777 let (mut latest, mut monitor) = supervisor.subscribe().await;
2778 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2779 while latest < required_containers {
2780 latest = monitor.next().await.expect("event missing");
2781 }
2782 }));
2783 }
2784 join_all(finalizers).await;
2785
2786 let byz = &validators[0];
2788 for supervisor in supervisors.iter() {
2789 {
2791 let faults = supervisor.faults.lock().unwrap();
2792 assert!(faults.is_empty());
2793 }
2794
2795 {
2797 let invalid = supervisor.invalid.lock().unwrap();
2798 assert_eq!(*invalid, 0);
2799 }
2800 }
2801
2802 let blocked = oracle.blocked().await.unwrap();
2804 assert!(!blocked.is_empty());
2805 for (a, b) in blocked {
2806 assert_ne!(&a, byz);
2807 assert_eq!(&b, byz);
2808 }
2809 });
2810 }
2811
2812 #[test_traced]
2813 #[ignore]
2814 fn test_impersonator() {
2815 for seed in 0..5 {
2816 impersonator::<MinPk>(seed);
2817 impersonator::<MinSig>(seed);
2818 }
2819 }
2820
2821 fn nuller<V: Variant>(seed: u64) {
2822 let n = 4;
2824 let threshold = quorum(n);
2825 let required_containers = 50;
2826 let activity_timeout = 10;
2827 let skip_timeout = 5;
2828 let namespace = b"consensus".to_vec();
2829 let cfg = deterministic::Config::new()
2830 .with_seed(seed)
2831 .with_timeout(Some(Duration::from_secs(30)));
2832 let executor = deterministic::Runner::new(cfg);
2833 executor.start(|mut context| async move {
2834 let (network, mut oracle) = Network::new(
2836 context.with_label("network"),
2837 Config {
2838 max_size: 1024 * 1024,
2839 },
2840 );
2841
2842 network.start();
2844
2845 let mut schemes = Vec::new();
2847 let mut validators = Vec::new();
2848 for i in 0..n {
2849 let scheme = PrivateKey::from_seed(i as u64);
2850 let pk = scheme.public_key();
2851 schemes.push(scheme);
2852 validators.push(pk);
2853 }
2854 validators.sort();
2855 schemes.sort_by_key(|s| s.public_key());
2856 let mut registrations = register_validators(&mut oracle, &validators).await;
2857
2858 let link = Link {
2860 latency: Duration::from_millis(10),
2861 jitter: Duration::from_millis(1),
2862 success_rate: 1.0,
2863 };
2864 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2865
2866 let (polynomial, shares) =
2868 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
2869
2870 let relay = Arc::new(mocks::relay::Relay::new());
2872 let mut supervisors = Vec::new();
2873 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2874 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2876
2877 let validator = scheme.public_key();
2879 let mut participants = BTreeMap::new();
2880 participants.insert(
2881 0,
2882 (
2883 polynomial.clone(),
2884 validators.clone(),
2885 Some(shares[idx_scheme].clone()),
2886 ),
2887 );
2888 let supervisor_config = mocks::supervisor::Config::<_, V> {
2889 namespace: namespace.clone(),
2890 participants,
2891 };
2892 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2893 let (pending, recovered, resolver) = registrations
2894 .remove(&validator)
2895 .expect("validator should be registered");
2896 if idx_scheme == 0 {
2897 let cfg = mocks::nuller::Config {
2898 supervisor,
2899 namespace: namespace.clone(),
2900 };
2901 let engine: mocks::nuller::Nuller<_, V, Sha256, _> =
2902 mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
2903 engine.start(pending);
2904 } else {
2905 supervisors.push(supervisor.clone());
2906 let application_cfg = mocks::application::Config {
2907 hasher: Sha256::default(),
2908 relay: relay.clone(),
2909 participant: validator.clone(),
2910 propose_latency: (10.0, 5.0),
2911 verify_latency: (10.0, 5.0),
2912 };
2913 let (actor, application) = mocks::application::Application::new(
2914 context.with_label("application"),
2915 application_cfg,
2916 );
2917 actor.start();
2918 let blocker = oracle.control(scheme.public_key());
2919 let cfg = config::Config {
2920 crypto: scheme,
2921 blocker,
2922 automaton: application.clone(),
2923 relay: application.clone(),
2924 reporter: supervisor.clone(),
2925 supervisor,
2926 partition: validator.to_string(),
2927 mailbox_size: 1024,
2928 namespace: namespace.clone(),
2929 leader_timeout: Duration::from_secs(1),
2930 notarization_timeout: Duration::from_secs(2),
2931 nullify_retry: Duration::from_secs(10),
2932 fetch_timeout: Duration::from_secs(1),
2933 activity_timeout,
2934 skip_timeout,
2935 max_fetch_count: 1,
2936 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2937 fetch_concurrent: 1,
2938 replay_buffer: NZUsize!(1024 * 1024),
2939 write_buffer: NZUsize!(1024 * 1024),
2940 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2941 };
2942 let engine = Engine::new(context.with_label("engine"), cfg);
2943 engine.start(pending, recovered, resolver);
2944 }
2945 }
2946
2947 let mut finalizers = Vec::new();
2949 for supervisor in supervisors.iter_mut() {
2950 let (mut latest, mut monitor) = supervisor.subscribe().await;
2951 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2952 while latest < required_containers {
2953 latest = monitor.next().await.expect("event missing");
2954 }
2955 }));
2956 }
2957 join_all(finalizers).await;
2958
2959 let byz = &validators[0];
2961 let mut count_nullify_and_finalize = 0;
2962 for supervisor in supervisors.iter() {
2963 {
2965 let faults = supervisor.faults.lock().unwrap();
2966 assert_eq!(faults.len(), 1);
2967 let faulter = faults.get(byz).expect("byzantine party is not faulter");
2968 for (_, faults) in faulter.iter() {
2969 for fault in faults.iter() {
2970 match fault {
2971 Activity::NullifyFinalize(_) => {
2972 count_nullify_and_finalize += 1;
2973 }
2974 _ => panic!("unexpected fault: {fault:?}"),
2975 }
2976 }
2977 }
2978 }
2979
2980 {
2982 let invalid = supervisor.invalid.lock().unwrap();
2983 assert_eq!(*invalid, 0);
2984 }
2985 }
2986 assert!(count_nullify_and_finalize > 0);
2987
2988 let blocked = oracle.blocked().await.unwrap();
2990 assert!(!blocked.is_empty());
2991 for (a, b) in blocked {
2992 assert_ne!(&a, byz);
2993 assert_eq!(&b, byz);
2994 }
2995 });
2996 }
2997
2998 #[test_traced]
2999 #[ignore]
3000 fn test_nuller() {
3001 for seed in 0..5 {
3002 nuller::<MinPk>(seed);
3003 nuller::<MinSig>(seed);
3004 }
3005 }
3006
3007 fn outdated<V: Variant>(seed: u64) {
3008 let n = 4;
3010 let threshold = quorum(n);
3011 let required_containers = 100;
3012 let activity_timeout = 10;
3013 let skip_timeout = 5;
3014 let namespace = b"consensus".to_vec();
3015 let cfg = deterministic::Config::new()
3016 .with_seed(seed)
3017 .with_timeout(Some(Duration::from_secs(30)));
3018 let executor = deterministic::Runner::new(cfg);
3019 executor.start(|mut context| async move {
3020 let (network, mut oracle) = Network::new(
3022 context.with_label("network"),
3023 Config {
3024 max_size: 1024 * 1024,
3025 },
3026 );
3027
3028 network.start();
3030
3031 let mut schemes = Vec::new();
3033 let mut validators = Vec::new();
3034 for i in 0..n {
3035 let scheme = PrivateKey::from_seed(i as u64);
3036 let pk = scheme.public_key();
3037 schemes.push(scheme);
3038 validators.push(pk);
3039 }
3040 validators.sort();
3041 schemes.sort_by_key(|s| s.public_key());
3042 let mut registrations = register_validators(&mut oracle, &validators).await;
3043
3044 let link = Link {
3046 latency: Duration::from_millis(10),
3047 jitter: Duration::from_millis(1),
3048 success_rate: 1.0,
3049 };
3050 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
3051
3052 let (polynomial, shares) =
3054 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
3055
3056 let relay = Arc::new(mocks::relay::Relay::new());
3058 let mut supervisors = Vec::new();
3059 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
3060 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
3062
3063 let validator = scheme.public_key();
3065 let mut participants = BTreeMap::new();
3066 participants.insert(
3067 0,
3068 (
3069 polynomial.clone(),
3070 validators.clone(),
3071 Some(shares[idx_scheme].clone()),
3072 ),
3073 );
3074 let supervisor_config = mocks::supervisor::Config::<_, V> {
3075 namespace: namespace.clone(),
3076 participants,
3077 };
3078 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
3079 let (pending, recovered, resolver) = registrations
3080 .remove(&validator)
3081 .expect("validator should be registered");
3082 if idx_scheme == 0 {
3083 let cfg = mocks::outdated::Config {
3084 supervisor,
3085 namespace: namespace.clone(),
3086 view_delta: activity_timeout * 4,
3087 };
3088 let engine: mocks::outdated::Outdated<_, V, Sha256, _> =
3089 mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
3090 engine.start(pending);
3091 } else {
3092 supervisors.push(supervisor.clone());
3093 let application_cfg = mocks::application::Config {
3094 hasher: Sha256::default(),
3095 relay: relay.clone(),
3096 participant: validator.clone(),
3097 propose_latency: (10.0, 5.0),
3098 verify_latency: (10.0, 5.0),
3099 };
3100 let (actor, application) = mocks::application::Application::new(
3101 context.with_label("application"),
3102 application_cfg,
3103 );
3104 actor.start();
3105 let blocker = oracle.control(scheme.public_key());
3106 let cfg = config::Config {
3107 crypto: scheme,
3108 blocker,
3109 automaton: application.clone(),
3110 relay: application.clone(),
3111 reporter: supervisor.clone(),
3112 supervisor,
3113 partition: validator.to_string(),
3114 mailbox_size: 1024,
3115 namespace: namespace.clone(),
3116 leader_timeout: Duration::from_secs(1),
3117 notarization_timeout: Duration::from_secs(2),
3118 nullify_retry: Duration::from_secs(10),
3119 fetch_timeout: Duration::from_secs(1),
3120 activity_timeout,
3121 skip_timeout,
3122 max_fetch_count: 1,
3123 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
3124 fetch_concurrent: 1,
3125 replay_buffer: NZUsize!(1024 * 1024),
3126 write_buffer: NZUsize!(1024 * 1024),
3127 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3128 };
3129 let engine = Engine::new(context.with_label("engine"), cfg);
3130 engine.start(pending, recovered, resolver);
3131 }
3132 }
3133
3134 let mut finalizers = Vec::new();
3136 for supervisor in supervisors.iter_mut() {
3137 let (mut latest, mut monitor) = supervisor.subscribe().await;
3138 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3139 while latest < required_containers {
3140 latest = monitor.next().await.expect("event missing");
3141 }
3142 }));
3143 }
3144 join_all(finalizers).await;
3145
3146 for supervisor in supervisors.iter() {
3148 {
3150 let faults = supervisor.faults.lock().unwrap();
3151 assert!(faults.is_empty());
3152 }
3153
3154 {
3156 let invalid = supervisor.invalid.lock().unwrap();
3157 assert_eq!(*invalid, 0);
3158 }
3159 }
3160
3161 let blocked = oracle.blocked().await.unwrap();
3163 assert!(blocked.is_empty());
3164 });
3165 }
3166
3167 #[test_traced]
3168 #[ignore]
3169 fn test_outdated() {
3170 for seed in 0..5 {
3171 outdated::<MinPk>(seed);
3172 outdated::<MinSig>(seed);
3173 }
3174 }
3175
3176 fn run_1k<V: Variant>() {
3177 let n = 10;
3179 let threshold = quorum(n);
3180 let required_containers = 1_000;
3181 let activity_timeout = 10;
3182 let skip_timeout = 5;
3183 let namespace = b"consensus".to_vec();
3184 let cfg = deterministic::Config::new();
3185 let executor = deterministic::Runner::new(cfg);
3186 executor.start(|mut context| async move {
3187 let (network, mut oracle) = Network::new(
3189 context.with_label("network"),
3190 Config {
3191 max_size: 1024 * 1024,
3192 },
3193 );
3194
3195 network.start();
3197
3198 let mut schemes = Vec::new();
3200 let mut validators = Vec::new();
3201 for i in 0..n {
3202 let scheme = PrivateKey::from_seed(i as u64);
3203 let pk = scheme.public_key();
3204 schemes.push(scheme);
3205 validators.push(pk);
3206 }
3207 validators.sort();
3208 schemes.sort_by_key(|s| s.public_key());
3209 let mut registrations = register_validators(&mut oracle, &validators).await;
3210
3211 let link = Link {
3213 latency: Duration::from_millis(80),
3214 jitter: Duration::from_millis(10),
3215 success_rate: 0.98,
3216 };
3217 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
3218
3219 let (polynomial, shares) =
3221 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
3222
3223 let relay = Arc::new(mocks::relay::Relay::new());
3225 let mut supervisors = Vec::new();
3226 let mut engine_handlers = Vec::new();
3227 for (idx, scheme) in schemes.into_iter().enumerate() {
3228 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
3230
3231 let validator = scheme.public_key();
3233 let mut participants = BTreeMap::new();
3234 participants.insert(
3235 0,
3236 (
3237 polynomial.clone(),
3238 validators.clone(),
3239 Some(shares[idx].clone()),
3240 ),
3241 );
3242 let supervisor_config = mocks::supervisor::Config::<_, V> {
3243 namespace: namespace.clone(),
3244 participants,
3245 };
3246 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
3247 supervisors.push(supervisor.clone());
3248 let application_cfg = mocks::application::Config {
3249 hasher: Sha256::default(),
3250 relay: relay.clone(),
3251 participant: validator.clone(),
3252 propose_latency: (100.0, 50.0),
3253 verify_latency: (50.0, 40.0),
3254 };
3255 let (actor, application) = mocks::application::Application::new(
3256 context.with_label("application"),
3257 application_cfg,
3258 );
3259 actor.start();
3260 let blocker = oracle.control(scheme.public_key());
3261 let cfg = config::Config {
3262 crypto: scheme,
3263 blocker,
3264 automaton: application.clone(),
3265 relay: application.clone(),
3266 reporter: supervisor.clone(),
3267 supervisor,
3268 partition: validator.to_string(),
3269 mailbox_size: 1024,
3270 namespace: namespace.clone(),
3271 leader_timeout: Duration::from_secs(1),
3272 notarization_timeout: Duration::from_secs(2),
3273 nullify_retry: Duration::from_secs(10),
3274 fetch_timeout: Duration::from_secs(1),
3275 activity_timeout,
3276 skip_timeout,
3277 max_fetch_count: 1,
3278 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
3279 fetch_concurrent: 1,
3280 replay_buffer: NZUsize!(1024 * 1024),
3281 write_buffer: NZUsize!(1024 * 1024),
3282 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3283 };
3284 let engine = Engine::new(context.with_label("engine"), cfg);
3285
3286 let (pending, recovered, resolver) = registrations
3288 .remove(&validator)
3289 .expect("validator should be registered");
3290 engine_handlers.push(engine.start(pending, recovered, resolver));
3291 }
3292
3293 let mut finalizers = Vec::new();
3295 for supervisor in supervisors.iter_mut() {
3296 let (mut latest, mut monitor) = supervisor.subscribe().await;
3297 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3298 while latest < required_containers {
3299 latest = monitor.next().await.expect("event missing");
3300 }
3301 }));
3302 }
3303 join_all(finalizers).await;
3304
3305 for supervisor in supervisors.iter() {
3307 {
3309 let faults = supervisor.faults.lock().unwrap();
3310 assert!(faults.is_empty());
3311 }
3312
3313 {
3315 let invalid = supervisor.invalid.lock().unwrap();
3316 assert_eq!(*invalid, 0);
3317 }
3318 }
3319
3320 let blocked = oracle.blocked().await.unwrap();
3322 assert!(blocked.is_empty());
3323 })
3324 }
3325
3326 #[test_traced]
3327 #[ignore]
3328 fn test_1k() {
3329 run_1k::<MinPk>();
3330 run_1k::<MinSig>();
3331 }
3332
3333 fn tle<V: Variant>() {
3334 let n = 4;
3336 let threshold = quorum(n);
3337 let namespace = b"consensus".to_vec();
3338 let activity_timeout = 100;
3339 let skip_timeout = 50;
3340 let executor = deterministic::Runner::timed(Duration::from_secs(30));
3341 executor.start(|mut context| async move {
3342 let (network, mut oracle) = Network::new(
3344 context.with_label("network"),
3345 Config {
3346 max_size: 1024 * 1024,
3347 },
3348 );
3349
3350 network.start();
3352
3353 let mut schemes = Vec::new();
3355 let mut validators = Vec::new();
3356 for i in 0..n {
3357 let scheme = PrivateKey::from_seed(i as u64);
3358 let pk = scheme.public_key();
3359 schemes.push(scheme);
3360 validators.push(pk);
3361 }
3362 validators.sort();
3363 schemes.sort_by_key(|s| s.public_key());
3364 let mut registrations = register_validators(&mut oracle, &validators).await;
3365
3366 let link = Link {
3368 latency: Duration::from_millis(10),
3369 jitter: Duration::from_millis(5),
3370 success_rate: 1.0,
3371 };
3372 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
3373
3374 let (polynomial, shares) =
3376 ops::generate_shares::<_, V>(&mut context, None, n, threshold);
3377 let public_key = *public::<V>(&polynomial);
3378
3379 let relay = Arc::new(mocks::relay::Relay::new());
3381 let mut supervisors = Vec::new();
3382 let mut engine_handlers = Vec::new();
3383 let monitor_supervisor = Arc::new(Mutex::new(None));
3384 for (idx, scheme) in schemes.into_iter().enumerate() {
3385 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
3387
3388 let validator = scheme.public_key();
3390 let mut participants = BTreeMap::new();
3391 participants.insert(
3392 0,
3393 (
3394 polynomial.clone(),
3395 validators.clone(),
3396 Some(shares[idx].clone()),
3397 ),
3398 );
3399
3400 let supervisor_config = mocks::supervisor::Config::<_, V> {
3402 namespace: namespace.clone(),
3403 participants,
3404 };
3405 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
3406 supervisors.push(supervisor.clone());
3407 if idx == 0 {
3408 *monitor_supervisor.lock().unwrap() = Some(supervisor.clone());
3409 }
3410
3411 let application_cfg = mocks::application::Config {
3413 hasher: Sha256::default(),
3414 relay: relay.clone(),
3415 participant: validator.clone(),
3416 propose_latency: (10.0, 5.0),
3417 verify_latency: (10.0, 5.0),
3418 };
3419 let (actor, application) = mocks::application::Application::new(
3420 context.with_label("application"),
3421 application_cfg,
3422 );
3423 actor.start();
3424 let blocker = oracle.control(scheme.public_key());
3425 let cfg = config::Config {
3426 crypto: scheme,
3427 blocker,
3428 automaton: application.clone(),
3429 relay: application.clone(),
3430 reporter: supervisor.clone(),
3431 supervisor,
3432 partition: validator.to_string(),
3433 mailbox_size: 1024,
3434 namespace: namespace.clone(),
3435 leader_timeout: Duration::from_millis(100),
3436 notarization_timeout: Duration::from_millis(200),
3437 nullify_retry: Duration::from_millis(500),
3438 fetch_timeout: Duration::from_millis(100),
3439 activity_timeout,
3440 skip_timeout,
3441 max_fetch_count: 1,
3442 fetch_rate_per_peer: Quota::per_second(NZU32!(10)),
3443 fetch_concurrent: 1,
3444 replay_buffer: NZUsize!(1024 * 1024),
3445 write_buffer: NZUsize!(1024 * 1024),
3446 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3447 };
3448 let engine = Engine::new(context.with_label("engine"), cfg);
3449
3450 let (pending, recovered, resolver) = registrations
3452 .remove(&validator)
3453 .expect("validator should be registered");
3454 engine_handlers.push(engine.start(pending, recovered, resolver));
3455 }
3456
3457 let target = 10u64; let target_bytes = target.to_be_bytes();
3460 let message_content = b"Secret message for future view10"; let message = Block::new(*message_content);
3462
3463 let seed_namespace = seed_namespace(&namespace);
3465 let ciphertext = encrypt::<_, V>(
3466 &mut context,
3467 public_key,
3468 (Some(&seed_namespace), &target_bytes),
3469 &message,
3470 );
3471
3472 let supervisor = monitor_supervisor.lock().unwrap().clone().unwrap();
3474 loop {
3475 context.sleep(Duration::from_millis(100)).await;
3477 let notarizations = supervisor.notarizations.lock().unwrap();
3478 let Some(notarization) = notarizations.get(&target) else {
3479 continue;
3480 };
3481
3482 let seed_signature = notarization.seed_signature;
3484 let decrypted = decrypt::<V>(&seed_signature, &ciphertext)
3485 .expect("Decryption should succeed with valid seed signature");
3486 assert_eq!(
3487 message.as_ref(),
3488 decrypted.as_ref(),
3489 "Decrypted message should match original message"
3490 );
3491 break;
3492 }
3493 });
3494 }
3495
3496 #[test_traced]
3497 fn test_tle() {
3498 tle::<MinPk>();
3499 tle::<MinSig>();
3500 }
3501}