1pub mod types;
118
119cfg_if::cfg_if! {
120 if #[cfg(not(target_arch = "wasm32"))] {
121 mod actors;
122 mod config;
123 pub use config::Config;
124 mod engine;
125 pub use engine::Engine;
126 mod metrics;
127 }
128}
129
130#[cfg(test)]
131pub mod mocks;
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136 use crate::Monitor;
137 use commonware_cryptography::{
138 ed25519::{PrivateKey, PublicKey},
139 sha256::Digest as Sha256Digest,
140 PrivateKeyExt as _, PublicKey as CPublicKey, Sha256, Signer as _,
141 };
142 use commonware_macros::{select, test_traced};
143 use commonware_p2p::simulated::{Config, Link, Network, Oracle, Receiver, Sender};
144 use commonware_runtime::{buffer::PoolRef, deterministic, Clock, Metrics, Runner, Spawner};
145 use commonware_utils::{quorum, NZUsize, NZU32};
146 use engine::Engine;
147 use futures::{future::join_all, StreamExt};
148 use governor::Quota;
149 use rand::Rng as _;
150 use std::{
151 collections::{BTreeMap, HashMap},
152 num::NonZeroUsize,
153 sync::{Arc, Mutex},
154 time::Duration,
155 };
156 use tracing::debug;
157 use types::Activity;
158
159 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
160 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
161
162 async fn register_validators<P: CPublicKey>(
164 oracle: &mut Oracle<P>,
165 validators: &[P],
166 ) -> HashMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))> {
167 let mut registrations = HashMap::new();
168 for validator in validators.iter() {
169 let (voter_sender, voter_receiver) =
170 oracle.register(validator.clone(), 0).await.unwrap();
171 let (resolver_sender, resolver_receiver) =
172 oracle.register(validator.clone(), 1).await.unwrap();
173 registrations.insert(
174 validator.clone(),
175 (
176 (voter_sender, voter_receiver),
177 (resolver_sender, resolver_receiver),
178 ),
179 );
180 }
181 registrations
182 }
183
184 enum Action {
186 Link(Link),
187 Update(Link), Unlink,
189 }
190
191 async fn link_validators<P: CPublicKey>(
197 oracle: &mut Oracle<P>,
198 validators: &[P],
199 action: Action,
200 restrict_to: Option<fn(usize, usize, usize) -> bool>,
201 ) {
202 for (i1, v1) in validators.iter().enumerate() {
203 for (i2, v2) in validators.iter().enumerate() {
204 if v2 == v1 {
206 continue;
207 }
208
209 if let Some(f) = restrict_to {
211 if !f(validators.len(), i1, i2) {
212 continue;
213 }
214 }
215
216 match action {
218 Action::Update(_) | Action::Unlink => {
219 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
220 }
221 _ => {}
222 }
223
224 match action {
226 Action::Link(ref link) | Action::Update(ref link) => {
227 oracle
228 .add_link(v1.clone(), v2.clone(), link.clone())
229 .await
230 .unwrap();
231 }
232 _ => {}
233 }
234 }
235 }
236 }
237
238 #[test_traced]
239 fn test_all_online() {
240 let n = 5;
242 let threshold = quorum(n);
243 let max_exceptions = 10;
244 let required_containers = 100;
245 let activity_timeout = 10;
246 let skip_timeout = 5;
247 let namespace = b"consensus".to_vec();
248 let executor = deterministic::Runner::timed(Duration::from_secs(30));
249 executor.start(|context| async move {
250 let (network, mut oracle) = Network::new(
252 context.with_label("network"),
253 Config {
254 max_size: 1024 * 1024,
255 },
256 );
257
258 network.start();
260
261 let mut schemes = Vec::new();
263 let mut validators = Vec::new();
264 for i in 0..n {
265 let scheme = PrivateKey::from_seed(i as u64);
266 let pk = scheme.public_key();
267 schemes.push(scheme);
268 validators.push(pk);
269 }
270 validators.sort();
271 schemes.sort_by_key(|s| s.public_key());
272 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
273 let mut registrations = register_validators(&mut oracle, &validators).await;
274
275 let link = Link {
277 latency: Duration::from_millis(10),
278 jitter: Duration::from_millis(1),
279 success_rate: 1.0,
280 };
281 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
282
283 let relay = Arc::new(mocks::relay::Relay::new());
285 let mut supervisors = Vec::new();
286 let mut engine_handlers = Vec::new();
287 for scheme in schemes.into_iter() {
288 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
290
291 let validator = scheme.public_key();
293 let supervisor_config = mocks::supervisor::Config {
294 namespace: namespace.clone(),
295 participants: view_validators.clone(),
296 };
297 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
298 supervisor_config,
299 );
300 supervisors.push(supervisor.clone());
301 let application_cfg = mocks::application::Config {
302 hasher: Sha256::default(),
303 relay: relay.clone(),
304 participant: validator.clone(),
305 propose_latency: (10.0, 5.0),
306 verify_latency: (10.0, 5.0),
307 };
308 let (actor, application) = mocks::application::Application::new(
309 context.with_label("application"),
310 application_cfg,
311 );
312 actor.start();
313 let cfg = config::Config {
314 crypto: scheme,
315 automaton: application.clone(),
316 relay: application.clone(),
317 reporter: supervisor.clone(),
318 supervisor,
319 partition: validator.to_string(),
320 mailbox_size: 1024,
321 namespace: namespace.clone(),
322 leader_timeout: Duration::from_secs(1),
323 notarization_timeout: Duration::from_secs(2),
324 nullify_retry: Duration::from_secs(10),
325 fetch_timeout: Duration::from_secs(1),
326 activity_timeout,
327 skip_timeout,
328 max_fetch_count: 1,
329 max_participants: n as usize,
330 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
331 fetch_concurrent: 1,
332 replay_buffer: NZUsize!(1024 * 1024),
333 write_buffer: NZUsize!(1024 * 1024),
334 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
335 };
336 let engine = Engine::new(context.with_label("engine"), cfg);
337 let (voter, resolver) = registrations
338 .remove(&validator)
339 .expect("validator should be registered");
340 engine_handlers.push(engine.start(voter, resolver));
341 }
342
343 let mut finalizers = Vec::new();
345 for supervisor in supervisors.iter_mut() {
346 let (mut latest, mut monitor) = supervisor.subscribe().await;
347 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
348 while latest < required_containers {
349 latest = monitor.next().await.expect("event missing");
350 }
351 }));
352 }
353 join_all(finalizers).await;
354
355 let latest_complete = required_containers - activity_timeout;
357 for supervisor in supervisors.iter() {
358 {
360 let faults = supervisor.faults.lock().unwrap();
361 assert!(faults.is_empty());
362 }
363
364 let mut exceptions = 0;
366 let mut notarized = HashMap::new();
367 let mut finalized = HashMap::new();
368 {
369 let notarizes = supervisor.notarizes.lock().unwrap();
370 for view in 1..latest_complete {
371 let Some(payloads) = notarizes.get(&view) else {
373 exceptions += 1;
374 continue;
375 };
376 if payloads.len() > 1 {
377 panic!("view: {view}");
378 }
379 let (digest, notarizers) = payloads.iter().next().unwrap();
380 notarized.insert(view, *digest);
381
382 if notarizers.len() < threshold as usize {
383 panic!("view: {view}");
386 }
387 if notarizers.len() != n as usize {
388 exceptions += 1;
389 }
390 }
391 }
392 {
393 let notarizations = supervisor.notarizations.lock().unwrap();
394 for view in 1..latest_complete {
395 let Some(notarization) = notarizations.get(&view) else {
397 exceptions += 1;
398 continue;
399 };
400 let Some(digest) = notarized.get(&view) else {
401 exceptions += 1;
402 continue;
403 };
404 assert_eq!(¬arization.proposal.payload, digest);
405 }
406 }
407 {
408 let finalizes = supervisor.finalizes.lock().unwrap();
409 for view in 1..latest_complete {
410 let Some(payloads) = finalizes.get(&view) else {
412 exceptions += 1;
413 continue;
414 };
415 if payloads.len() > 1 {
416 panic!("view: {view}");
417 }
418 let (digest, finalizers) = payloads.iter().next().unwrap();
419 finalized.insert(view, *digest);
420
421 if view > latest_complete {
423 continue;
424 }
425
426 if finalizers.len() < threshold as usize {
428 panic!("view: {view}");
431 }
432 if finalizers.len() != n as usize {
433 exceptions += 1;
434 }
435
436 let nullifies = supervisor.nullifies.lock().unwrap();
438 let Some(nullifies) = nullifies.get(&view) else {
439 continue;
440 };
441 for (_, finalizers) in payloads.iter() {
442 for finalizer in finalizers.iter() {
443 if nullifies.contains(finalizer) {
444 panic!("should not nullify and finalize at same view");
445 }
446 }
447 }
448 }
449 }
450 {
451 let finalizations = supervisor.finalizations.lock().unwrap();
452 for view in 1..latest_complete {
453 let Some(finalization) = finalizations.get(&view) else {
455 exceptions += 1;
456 continue;
457 };
458 let Some(digest) = finalized.get(&view) else {
459 exceptions += 1;
460 continue;
461 };
462 assert_eq!(&finalization.proposal.payload, digest);
463 }
464 }
465
466 assert!(exceptions <= max_exceptions);
468 }
469 });
470 }
471
472 #[test_traced]
473 fn test_observer() {
474 let n_active = 5;
476 let required_containers = 100;
477 let activity_timeout = 10;
478 let skip_timeout = 5;
479 let namespace = b"consensus".to_vec();
480 let executor = deterministic::Runner::timed(Duration::from_secs(30));
481 executor.start(|context| async move {
482 let (network, mut oracle) = Network::new(
484 context.with_label("network"),
485 Config {
486 max_size: 1024 * 1024,
487 },
488 );
489
490 network.start();
492
493 let mut schemes = Vec::new();
495 let mut validators_active = Vec::new();
496 for i in 0..n_active {
497 let scheme = PrivateKey::from_seed(i as u64);
498 let pk = scheme.public_key();
499 schemes.push(scheme);
500 validators_active.push(pk);
501 }
502 validators_active.sort();
503
504 let scheme_observer = PrivateKey::from_seed(n_active as u64);
506 let pk_observer = scheme_observer.public_key();
507 schemes.push(scheme_observer);
508
509 let mut all_validators = validators_active.clone();
511 all_validators.push(pk_observer.clone());
512 all_validators.sort();
513 let mut registrations = register_validators(&mut oracle, &all_validators).await;
514
515 let link = Link {
517 latency: Duration::from_millis(10),
518 jitter: Duration::from_millis(1),
519 success_rate: 1.0,
520 };
521 link_validators(&mut oracle, &all_validators, Action::Link(link), None).await;
522
523 let relay = Arc::new(mocks::relay::Relay::new());
525 let mut supervisors = Vec::new();
526 let participants = BTreeMap::from_iter(vec![(0, validators_active.clone())]);
527 for scheme in schemes.into_iter() {
528 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
530
531 let supervisor_config = mocks::supervisor::Config {
533 namespace: namespace.clone(),
534 participants: participants.clone(),
535 };
536 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
537 supervisor_config,
538 );
539 supervisors.push(supervisor.clone());
540
541 let validator = scheme.public_key();
543 let application_cfg = mocks::application::Config {
544 hasher: Sha256::default(),
545 relay: relay.clone(),
546 participant: validator.clone(),
547 propose_latency: (10.0, 5.0),
548 verify_latency: (10.0, 5.0),
549 };
550 let (actor, application) = mocks::application::Application::new(
551 context.with_label("application"),
552 application_cfg,
553 );
554 actor.start();
555
556 let cfg = config::Config {
558 crypto: scheme,
559 automaton: application.clone(),
560 relay: application.clone(),
561 reporter: supervisor.clone(),
562 supervisor,
563 partition: validator.to_string(),
564 mailbox_size: 1024,
565 namespace: namespace.clone(),
566 leader_timeout: Duration::from_secs(1),
567 notarization_timeout: Duration::from_secs(2),
568 nullify_retry: Duration::from_secs(10),
569 fetch_timeout: Duration::from_secs(1),
570 activity_timeout,
571 skip_timeout,
572 max_participants: n_active as usize,
573 max_fetch_count: 1,
574 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
575 fetch_concurrent: 1,
576 replay_buffer: NZUsize!(1024 * 1024),
577 write_buffer: NZUsize!(1024 * 1024),
578 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
579 };
580 let engine = Engine::new(context.with_label("engine"), cfg);
581 let (voter, resolver) = registrations
582 .remove(&validator)
583 .expect("validator should be registered");
584 engine.start(voter, resolver);
585 }
586
587 let mut finalizers = Vec::new();
589 for supervisor in supervisors.iter_mut() {
590 let (mut latest, mut monitor) = supervisor.subscribe().await;
591 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
592 while latest < required_containers {
593 latest = monitor.next().await.expect("event missing");
594 }
595 }));
596 }
597 join_all(finalizers).await;
598 });
599 }
600
601 #[test_traced]
602 fn test_unclean_shutdown() {
603 let n = 5;
605 let required_containers = 100;
606 let activity_timeout = 10;
607 let skip_timeout = 5;
608 let namespace = b"consensus".to_vec();
609
610 let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
612 let supervised = Arc::new(Mutex::new(Vec::new()));
613 let mut prev_ctx = None;
614
615 loop {
616 let namespace = namespace.clone();
617 let shutdowns = shutdowns.clone();
618 let supervised = supervised.clone();
619
620 let f = |mut context: deterministic::Context| async move {
621 let (network, mut oracle) = Network::new(
623 context.with_label("network"),
624 Config {
625 max_size: 1024 * 1024,
626 },
627 );
628
629 network.start();
631
632 let mut schemes = Vec::new();
634 let mut validators = Vec::new();
635 for i in 0..n {
636 let scheme = PrivateKey::from_seed(i as u64);
637 let pk = scheme.public_key();
638 schemes.push(scheme);
639 validators.push(pk);
640 }
641 validators.sort();
642 schemes.sort_by_key(|s| s.public_key());
643 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
644 let mut registrations = register_validators(&mut oracle, &validators).await;
645
646 let link = Link {
648 latency: Duration::from_millis(50),
649 jitter: Duration::from_millis(50),
650 success_rate: 1.0,
651 };
652 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
653
654 let relay = Arc::new(mocks::relay::Relay::new());
656 let mut supervisors = HashMap::new();
657 let mut engine_handlers = Vec::new();
658 for scheme in schemes.into_iter() {
659 let context = context
661 .clone()
662 .with_label(&format!("validator-{}", scheme.public_key()));
663
664 let validator = scheme.public_key();
666 let supervisor_config = mocks::supervisor::Config {
667 namespace: namespace.clone(),
668 participants: view_validators.clone(),
669 };
670 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
671 supervisor_config,
672 );
673 supervisors.insert(validator.clone(), supervisor.clone());
674 let application_cfg = mocks::application::Config {
675 hasher: Sha256::default(),
676 relay: relay.clone(),
677 participant: validator.clone(),
678 propose_latency: (10.0, 5.0),
679 verify_latency: (10.0, 5.0),
680 };
681 let (actor, application) = mocks::application::Application::new(
682 context.with_label("application"),
683 application_cfg,
684 );
685 actor.start();
686 let cfg = config::Config {
687 crypto: scheme,
688 automaton: application.clone(),
689 relay: application.clone(),
690 reporter: supervisor.clone(),
691 supervisor,
692 partition: validator.to_string(),
693 mailbox_size: 1024,
694 namespace: namespace.clone(),
695 leader_timeout: Duration::from_secs(1),
696 notarization_timeout: Duration::from_secs(2),
697 nullify_retry: Duration::from_secs(10),
698 fetch_timeout: Duration::from_secs(1),
699 activity_timeout,
700 skip_timeout,
701 max_participants: n as usize,
702 max_fetch_count: 1,
703 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
704 fetch_concurrent: 1,
705 replay_buffer: NZUsize!(1024 * 1024),
706 write_buffer: NZUsize!(1024 * 1024),
707 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
708 };
709 let engine = Engine::new(context.with_label("engine"), cfg);
710 let (voter_network, resolver_network) = registrations
711 .remove(&validator)
712 .expect("validator should be registered");
713 engine_handlers.push(engine.start(voter_network, resolver_network));
714 }
715
716 let mut finalizers = Vec::new();
718 for (_, supervisor) in supervisors.iter_mut() {
719 let (mut latest, mut monitor) = supervisor.subscribe().await;
720 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
721 while latest < required_containers {
722 latest = monitor.next().await.expect("event missing");
723 }
724 }));
725 }
726
727 let wait =
729 context.gen_range(Duration::from_millis(10)..Duration::from_millis(2_000));
730 select! {
731 _ = context.sleep(wait) => {
732 {
734 let mut shutdowns = shutdowns.lock().unwrap();
735 debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
736 *shutdowns += 1;
737 }
738 supervised.lock().unwrap().push(supervisors);
739 (false,context)
740 },
741 _ = join_all(finalizers) => {
742 let supervised = supervised.lock().unwrap();
744 for supervisors in supervised.iter() {
745 for (_, supervisor) in supervisors.iter() {
746 let faults = supervisor.faults.lock().unwrap();
747 assert!(faults.is_empty());
748 }
749 }
750 (true,context)
751 }
752 }
753 };
754
755 let (complete, context) = if let Some(prev_ctx) = prev_ctx {
756 deterministic::Runner::from(prev_ctx)
757 } else {
758 deterministic::Runner::timed(Duration::from_secs(30))
759 }
760 .start(f);
761
762 if complete {
764 break;
765 }
766
767 prev_ctx = Some(context.recover());
768 }
769 }
770
771 #[test_traced]
772 fn test_backfill() {
773 let n = 4;
775 let required_containers = 100;
776 let activity_timeout = 10;
777 let skip_timeout = 5;
778 let namespace = b"consensus".to_vec();
779 let executor = deterministic::Runner::timed(Duration::from_secs(360));
780 executor.start(|context| async move {
781 let (network, mut oracle) = Network::new(
783 context.with_label("network"),
784 Config {
785 max_size: 1024 * 1024,
786 },
787 );
788
789 network.start();
791
792 let mut schemes = Vec::new();
794 let mut validators = Vec::new();
795 for i in 0..n {
796 let scheme = PrivateKey::from_seed(i as u64);
797 let pk = scheme.public_key();
798 schemes.push(scheme);
799 validators.push(pk);
800 }
801 validators.sort();
802 schemes.sort_by_key(|s| s.public_key());
803 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
804 let mut registrations = register_validators(&mut oracle, &validators).await;
805
806 let link = Link {
808 latency: Duration::from_millis(10),
809 jitter: Duration::from_millis(1),
810 success_rate: 1.0,
811 };
812 link_validators(
813 &mut oracle,
814 &validators,
815 Action::Link(link),
816 Some(|_, i, j| ![i, j].contains(&0usize)),
817 )
818 .await;
819
820 let relay = Arc::new(mocks::relay::Relay::new());
822 let mut supervisors = Vec::new();
823 let mut engine_handlers = Vec::new();
824 for (idx_scheme, scheme) in schemes.iter().enumerate() {
825 if idx_scheme == 0 {
827 continue;
828 }
829
830 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
832
833 let validator = scheme.public_key();
835 let supervisor_config = mocks::supervisor::Config {
836 namespace: namespace.clone(),
837 participants: view_validators.clone(),
838 };
839 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
840 supervisor_config,
841 );
842 supervisors.push(supervisor.clone());
843 let application_cfg = mocks::application::Config {
844 hasher: Sha256::default(),
845 relay: relay.clone(),
846 participant: validator.clone(),
847 propose_latency: (10.0, 5.0),
848 verify_latency: (10.0, 5.0),
849 };
850 let (actor, application) = mocks::application::Application::new(
851 context.with_label("application"),
852 application_cfg,
853 );
854 actor.start();
855 let cfg = config::Config {
856 crypto: scheme.clone(),
857 automaton: application.clone(),
858 relay: application.clone(),
859 reporter: supervisor.clone(),
860 supervisor,
861 partition: validator.to_string(),
862 mailbox_size: 1024,
863 namespace: namespace.clone(),
864 leader_timeout: Duration::from_secs(1),
865 notarization_timeout: Duration::from_secs(2),
866 nullify_retry: Duration::from_secs(10),
867 fetch_timeout: Duration::from_secs(1),
868 activity_timeout,
869 skip_timeout,
870 max_fetch_count: 1, max_participants: n as usize,
872 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
873 fetch_concurrent: 1,
874 replay_buffer: NZUsize!(1024 * 1024),
875 write_buffer: NZUsize!(1024 * 1024),
876 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
877 };
878 let (voter, resolver) = registrations
879 .remove(&validator)
880 .expect("validator should be registered");
881 let engine = Engine::new(context.with_label("engine"), cfg);
882 engine_handlers.push(engine.start(voter, resolver));
883 }
884
885 let mut finalizers = Vec::new();
887 for supervisor in supervisors.iter_mut() {
888 let (mut latest, mut monitor) = supervisor.subscribe().await;
889 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
890 while latest < required_containers {
891 latest = monitor.next().await.expect("event missing");
892 }
893 }));
894 }
895 join_all(finalizers).await;
896
897 let link = Link {
899 latency: Duration::from_secs(3),
900 jitter: Duration::from_millis(0),
901 success_rate: 1.0,
902 };
903 link_validators(
904 &mut oracle,
905 &validators,
906 Action::Update(link.clone()),
907 Some(|_, i, j| ![i, j].contains(&0usize)),
908 )
909 .await;
910
911 context.sleep(Duration::from_secs(120)).await;
913
914 link_validators(
916 &mut oracle,
917 &validators,
918 Action::Unlink,
919 Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
920 )
921 .await;
922
923 let scheme = schemes[0].clone();
925 let validator = scheme.public_key();
926
927 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
929
930 link_validators(
932 &mut oracle,
933 &validators,
934 Action::Link(link),
935 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
936 )
937 .await;
938
939 let link = Link {
941 latency: Duration::from_millis(10),
942 jitter: Duration::from_millis(3),
943 success_rate: 1.0,
944 };
945 link_validators(
946 &mut oracle,
947 &validators,
948 Action::Update(link),
949 Some(|_, i, j| ![i, j].contains(&1usize)),
950 )
951 .await;
952
953 let supervisor_config = mocks::supervisor::Config {
955 namespace: namespace.clone(),
956 participants: view_validators.clone(),
957 };
958 let mut supervisor =
959 mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(supervisor_config);
960 supervisors.push(supervisor.clone());
961 let application_cfg = mocks::application::Config {
962 hasher: Sha256::default(),
963 relay: relay.clone(),
964 participant: validator.clone(),
965 propose_latency: (10.0, 5.0),
966 verify_latency: (10.0, 5.0),
967 };
968 let (actor, application) = mocks::application::Application::new(
969 context.with_label("application"),
970 application_cfg,
971 );
972 actor.start();
973 let cfg = config::Config {
974 crypto: scheme,
975 automaton: application.clone(),
976 relay: application.clone(),
977 reporter: supervisor.clone(),
978 supervisor: supervisor.clone(),
979 partition: validator.to_string(),
980 mailbox_size: 1024,
981 namespace: namespace.clone(),
982 leader_timeout: Duration::from_secs(1),
983 notarization_timeout: Duration::from_secs(2),
984 nullify_retry: Duration::from_secs(10),
985 fetch_timeout: Duration::from_secs(1),
986 activity_timeout,
987 skip_timeout,
988 max_fetch_count: 1,
989 max_participants: n as usize,
990 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
991 fetch_concurrent: 1,
992 replay_buffer: NZUsize!(1024 * 1024),
993 write_buffer: NZUsize!(1024 * 1024),
994 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
995 };
996 let (voter, resolver) = registrations
997 .remove(&validator)
998 .expect("validator should be registered");
999 let engine = Engine::new(context.with_label("engine"), cfg);
1000 engine_handlers.push(engine.start(voter, resolver));
1001
1002 let (mut latest, mut monitor) = supervisor.subscribe().await;
1004 while latest < required_containers {
1005 latest = monitor.next().await.expect("event missing");
1006 }
1007 });
1008 }
1009
1010 #[test_traced]
1011 fn test_one_offline() {
1012 let n = 5;
1014 let threshold = quorum(n);
1015 let required_containers = 100;
1016 let activity_timeout = 10;
1017 let skip_timeout = 5;
1018 let namespace = b"consensus".to_vec();
1019 let executor = deterministic::Runner::timed(Duration::from_secs(30));
1020 executor.start(|context| async move {
1021 let (network, mut oracle) = Network::new(
1023 context.with_label("network"),
1024 Config {
1025 max_size: 1024 * 1024,
1026 },
1027 );
1028
1029 network.start();
1031
1032 let mut schemes = Vec::new();
1034 let mut validators = Vec::new();
1035 for i in 0..n {
1036 let scheme = PrivateKey::from_seed(i as u64);
1037 let pk = scheme.public_key();
1038 schemes.push(scheme);
1039 validators.push(pk);
1040 }
1041 validators.sort();
1042 schemes.sort_by_key(|s| s.public_key());
1043 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1044 let mut registrations = register_validators(&mut oracle, &validators).await;
1045
1046 let link = Link {
1048 latency: Duration::from_millis(10),
1049 jitter: Duration::from_millis(1),
1050 success_rate: 1.0,
1051 };
1052 link_validators(
1053 &mut oracle,
1054 &validators,
1055 Action::Link(link),
1056 Some(|_, i, j| ![i, j].contains(&0usize)),
1057 )
1058 .await;
1059
1060 let relay = Arc::new(mocks::relay::Relay::new());
1062 let mut supervisors = Vec::new();
1063 let mut engine_handlers = Vec::new();
1064 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1065 if idx_scheme == 0 {
1067 continue;
1068 }
1069
1070 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1072
1073 let validator = scheme.public_key();
1075 let supervisor_config = mocks::supervisor::Config {
1076 namespace: namespace.clone(),
1077 participants: view_validators.clone(),
1078 };
1079 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1080 supervisor_config,
1081 );
1082 supervisors.push(supervisor.clone());
1083 let application_cfg = mocks::application::Config {
1084 hasher: Sha256::default(),
1085 relay: relay.clone(),
1086 participant: validator.clone(),
1087 propose_latency: (10.0, 5.0),
1088 verify_latency: (10.0, 5.0),
1089 };
1090 let (actor, application) = mocks::application::Application::new(
1091 context.with_label("application"),
1092 application_cfg,
1093 );
1094 actor.start();
1095 let cfg = config::Config {
1096 crypto: scheme,
1097 automaton: application.clone(),
1098 relay: application.clone(),
1099 reporter: supervisor.clone(),
1100 partition: validator.to_string(),
1101 supervisor,
1102 mailbox_size: 1024,
1103 namespace: namespace.clone(),
1104 leader_timeout: Duration::from_secs(1),
1105 notarization_timeout: Duration::from_secs(2),
1106 nullify_retry: Duration::from_secs(10),
1107 fetch_timeout: Duration::from_secs(1),
1108 activity_timeout,
1109 skip_timeout,
1110 max_participants: n as usize,
1111 max_fetch_count: 1,
1112 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1113 fetch_concurrent: 1,
1114 replay_buffer: NZUsize!(1024 * 1024),
1115 write_buffer: NZUsize!(1024 * 1024),
1116 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1117 };
1118 let (voter, resolver) = registrations
1119 .remove(&validator)
1120 .expect("validator should be registered");
1121 let engine = Engine::new(context.with_label("engine"), cfg);
1122 engine_handlers.push(engine.start(voter, resolver));
1123 }
1124
1125 let mut finalizers = Vec::new();
1127 for supervisor in supervisors.iter_mut() {
1128 let (mut latest, mut monitor) = supervisor.subscribe().await;
1129 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1130 while latest < required_containers {
1131 latest = monitor.next().await.expect("event missing");
1132 }
1133 }));
1134 }
1135 join_all(finalizers).await;
1136
1137 let offline = &validators[0];
1139 for supervisor in supervisors.iter() {
1140 {
1142 let faults = supervisor.faults.lock().unwrap();
1143 assert!(faults.is_empty());
1144 }
1145
1146 {
1148 let notarizes = supervisor.notarizes.lock().unwrap();
1149 for (view, payloads) in notarizes.iter() {
1150 for (_, participants) in payloads.iter() {
1151 if participants.contains(offline) {
1152 panic!("view: {view}");
1153 }
1154 }
1155 }
1156 }
1157 {
1158 let nullifies = supervisor.nullifies.lock().unwrap();
1159 for (view, participants) in nullifies.iter() {
1160 if participants.contains(offline) {
1161 panic!("view: {view}");
1162 }
1163 }
1164 }
1165 {
1166 let finalizes = supervisor.finalizes.lock().unwrap();
1167 for (view, payloads) in finalizes.iter() {
1168 for (_, finalizers) in payloads.iter() {
1169 if finalizers.contains(offline) {
1170 panic!("view: {view}");
1171 }
1172 }
1173 }
1174 }
1175
1176 let mut offline_views = Vec::new();
1178 {
1179 let leaders = supervisor.leaders.lock().unwrap();
1180 for (view, leader) in leaders.iter() {
1181 if leader == offline {
1182 offline_views.push(*view);
1183 }
1184 }
1185 }
1186 assert!(!offline_views.is_empty());
1187
1188 {
1190 let nullifies = supervisor.nullifies.lock().unwrap();
1191 for view in offline_views.iter() {
1192 let nullifies = nullifies.get(view).unwrap();
1193 if nullifies.len() < threshold as usize {
1194 panic!("view: {view}");
1195 }
1196 }
1197 }
1198 {
1199 let nullifications = supervisor.nullifications.lock().unwrap();
1200 for view in offline_views.iter() {
1201 nullifications.get(view).unwrap();
1202 }
1203 }
1204 }
1205
1206 let encoded = context.encode();
1208 let lines = encoded.lines();
1209 let mut skipped_views = 0;
1210 let mut nodes_skipping = 0;
1211 for line in lines {
1212 if line.contains("_engine_voter_skipped_views_total") {
1213 let parts: Vec<&str> = line.split_whitespace().collect();
1214 if let Some(number_str) = parts.last() {
1215 if let Ok(number) = number_str.parse::<u64>() {
1216 if number > 0 {
1217 nodes_skipping += 1;
1218 }
1219 if number > skipped_views {
1220 skipped_views = number;
1221 }
1222 }
1223 }
1224 }
1225 }
1226 assert!(
1227 skipped_views > 0,
1228 "expected skipped views to be greater than 0"
1229 );
1230 assert_eq!(
1231 nodes_skipping,
1232 n - 1,
1233 "expected all online nodes to be skipping views"
1234 );
1235 });
1236 }
1237
1238 #[test_traced]
1239 fn test_slow_validator() {
1240 let n = 5;
1242 let required_containers = 50;
1243 let activity_timeout = 10;
1244 let skip_timeout = 5;
1245 let namespace = b"consensus".to_vec();
1246 let executor = deterministic::Runner::timed(Duration::from_secs(30));
1247 executor.start(|context| async move {
1248 let (network, mut oracle) = Network::new(
1250 context.with_label("network"),
1251 Config {
1252 max_size: 1024 * 1024,
1253 },
1254 );
1255
1256 network.start();
1258
1259 let mut schemes = Vec::new();
1261 let mut validators = Vec::new();
1262 for i in 0..n {
1263 let scheme = PrivateKey::from_seed(i as u64);
1264 let pk = scheme.public_key();
1265 schemes.push(scheme);
1266 validators.push(pk);
1267 }
1268 validators.sort();
1269 schemes.sort_by_key(|s| s.public_key());
1270 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1271 let mut registrations = register_validators(&mut oracle, &validators).await;
1272
1273 let link = Link {
1275 latency: Duration::from_millis(10),
1276 jitter: Duration::from_millis(1),
1277 success_rate: 1.0,
1278 };
1279 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1280
1281 let relay = Arc::new(mocks::relay::Relay::new());
1283 let mut supervisors = Vec::new();
1284 let mut engine_handlers = Vec::new();
1285 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1286 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1288
1289 let validator = scheme.public_key();
1291 let supervisor_config = mocks::supervisor::Config {
1292 namespace: namespace.clone(),
1293 participants: view_validators.clone(),
1294 };
1295 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1296 supervisor_config,
1297 );
1298 supervisors.push(supervisor.clone());
1299 let application_cfg = if idx_scheme == 0 {
1300 mocks::application::Config {
1301 hasher: Sha256::default(),
1302 relay: relay.clone(),
1303 participant: validator.clone(),
1304 propose_latency: (3_000.0, 0.0),
1305 verify_latency: (3_000.0, 5.0),
1306 }
1307 } else {
1308 mocks::application::Config {
1309 hasher: Sha256::default(),
1310 relay: relay.clone(),
1311 participant: validator.clone(),
1312 propose_latency: (10.0, 5.0),
1313 verify_latency: (10.0, 5.0),
1314 }
1315 };
1316 let (actor, application) = mocks::application::Application::new(
1317 context.with_label("application"),
1318 application_cfg,
1319 );
1320 actor.start();
1321 let cfg = config::Config {
1322 crypto: scheme,
1323 automaton: application.clone(),
1324 relay: application.clone(),
1325 reporter: supervisor.clone(),
1326 partition: validator.to_string(),
1327 supervisor,
1328 mailbox_size: 1024,
1329 namespace: namespace.clone(),
1330 leader_timeout: Duration::from_secs(1),
1331 notarization_timeout: Duration::from_secs(2),
1332 nullify_retry: Duration::from_secs(10),
1333 fetch_timeout: Duration::from_secs(1),
1334 activity_timeout,
1335 skip_timeout,
1336 max_fetch_count: 1,
1337 max_participants: n as usize,
1338 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1339 fetch_concurrent: 1,
1340 replay_buffer: NZUsize!(1024 * 1024),
1341 write_buffer: NZUsize!(1024 * 1024),
1342 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1343 };
1344 let (voter, resolver) = registrations
1345 .remove(&validator)
1346 .expect("validator should be registered");
1347 let engine = Engine::new(context.with_label("engine"), cfg);
1348 engine_handlers.push(engine.start(voter, resolver));
1349 }
1350
1351 let mut finalizers = Vec::new();
1353 for supervisor in supervisors.iter_mut() {
1354 let (mut latest, mut monitor) = supervisor.subscribe().await;
1355 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1356 while latest < required_containers {
1357 latest = monitor.next().await.expect("event missing");
1358 }
1359 }));
1360 }
1361 join_all(finalizers).await;
1362
1363 let slow = &validators[0];
1365 for supervisor in supervisors.iter() {
1366 {
1368 let faults = supervisor.faults.lock().unwrap();
1369 assert!(faults.is_empty());
1370 }
1371
1372 {
1374 let notarizes = supervisor.notarizes.lock().unwrap();
1375 for (view, payloads) in notarizes.iter() {
1376 for (_, participants) in payloads.iter() {
1377 if participants.contains(slow) {
1378 panic!("view: {view}");
1379 }
1380 }
1381 }
1382 }
1383 {
1384 let nullifies = supervisor.nullifies.lock().unwrap();
1385 for (view, participants) in nullifies.iter() {
1386 if *view > 10 && participants.contains(slow) {
1388 panic!("view: {view}");
1389 }
1390 }
1391 }
1392 {
1393 let finalizes = supervisor.finalizes.lock().unwrap();
1394 for (view, payloads) in finalizes.iter() {
1395 for (_, finalizers) in payloads.iter() {
1396 if finalizers.contains(slow) {
1397 panic!("view: {view}");
1398 }
1399 }
1400 }
1401 }
1402 }
1403 });
1404 }
1405
1406 #[test_traced]
1407 fn test_all_recovery() {
1408 let n = 5;
1410 let required_containers = 100;
1411 let activity_timeout = 10;
1412 let skip_timeout = 3;
1413 let namespace = b"consensus".to_vec();
1414 let executor = deterministic::Runner::timed(Duration::from_secs(180));
1415 executor.start(|context| async move {
1416 let (network, mut oracle) = Network::new(
1418 context.with_label("network"),
1419 Config {
1420 max_size: 1024 * 1024,
1421 },
1422 );
1423
1424 network.start();
1426
1427 let mut schemes = Vec::new();
1429 let mut validators = Vec::new();
1430 for i in 0..n {
1431 let scheme = PrivateKey::from_seed(i as u64);
1432 let pk = scheme.public_key();
1433 schemes.push(scheme);
1434 validators.push(pk);
1435 }
1436 validators.sort();
1437 schemes.sort_by_key(|s| s.public_key());
1438 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1439 let mut registrations = register_validators(&mut oracle, &validators).await;
1440
1441 let link = Link {
1443 latency: Duration::from_secs(3),
1444 jitter: Duration::from_millis(0),
1445 success_rate: 1.0,
1446 };
1447 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1448
1449 let relay = Arc::new(mocks::relay::Relay::new());
1451 let mut supervisors = Vec::new();
1452 let mut engine_handlers = Vec::new();
1453 for scheme in schemes.iter() {
1454 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1456
1457 let validator = scheme.public_key();
1459 let supervisor_config = mocks::supervisor::Config {
1460 namespace: namespace.clone(),
1461 participants: view_validators.clone(),
1462 };
1463 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1464 supervisor_config,
1465 );
1466 supervisors.push(supervisor.clone());
1467 let application_cfg = mocks::application::Config {
1468 hasher: Sha256::default(),
1469 relay: relay.clone(),
1470 participant: validator.clone(),
1471 propose_latency: (10.0, 5.0),
1472 verify_latency: (10.0, 5.0),
1473 };
1474 let (actor, application) = mocks::application::Application::new(
1475 context.with_label("application"),
1476 application_cfg,
1477 );
1478 actor.start();
1479 let cfg = config::Config {
1480 crypto: scheme.clone(),
1481 automaton: application.clone(),
1482 relay: application.clone(),
1483 reporter: supervisor.clone(),
1484 partition: validator.to_string(),
1485 supervisor,
1486 mailbox_size: 1024,
1487 namespace: namespace.clone(),
1488 leader_timeout: Duration::from_secs(1),
1489 notarization_timeout: Duration::from_secs(2),
1490 nullify_retry: Duration::from_secs(10),
1491 fetch_timeout: Duration::from_secs(1),
1492 activity_timeout,
1493 skip_timeout,
1494 max_fetch_count: 1,
1495 max_participants: n as usize,
1496 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1497 fetch_concurrent: 1,
1498 replay_buffer: NZUsize!(1024 * 1024),
1499 write_buffer: NZUsize!(1024 * 1024),
1500 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1501 };
1502 let (voter, resolver) = registrations
1503 .remove(&validator)
1504 .expect("validator should be registered");
1505 let engine = Engine::new(context.with_label("engine"), cfg);
1506 engine_handlers.push(engine.start(voter, resolver));
1507 }
1508
1509 let mut finalizers = Vec::new();
1511 for supervisor in supervisors.iter_mut() {
1512 let (_, mut monitor) = supervisor.subscribe().await;
1513 finalizers.push(
1514 context
1515 .with_label("finalizer")
1516 .spawn(move |context| async move {
1517 select! {
1518 _timeout = context.sleep(Duration::from_secs(60)) => {},
1519 _done = monitor.next() => {
1520 panic!("engine should not notarize or finalize anything");
1521 }
1522 }
1523 }),
1524 );
1525 }
1526 join_all(finalizers).await;
1527
1528 link_validators(&mut oracle, &validators, Action::Unlink, None).await;
1530
1531 context.sleep(Duration::from_secs(60)).await;
1533
1534 let mut latest = 0;
1536 for supervisor in supervisors.iter() {
1537 let nullifies = supervisor.nullifies.lock().unwrap();
1538 let max = nullifies.keys().max().unwrap();
1539 if *max > latest {
1540 latest = *max;
1541 }
1542 }
1543
1544 let link = Link {
1546 latency: Duration::from_millis(10),
1547 jitter: Duration::from_millis(1),
1548 success_rate: 1.0,
1549 };
1550 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1551
1552 let mut finalizers = Vec::new();
1554 for supervisor in supervisors.iter_mut() {
1555 let (mut latest, mut monitor) = supervisor.subscribe().await;
1556 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1557 while latest < required_containers {
1558 latest = monitor.next().await.expect("event missing");
1559 }
1560 }));
1561 }
1562 join_all(finalizers).await;
1563
1564 for supervisor in supervisors.iter() {
1566 {
1568 let faults = supervisor.faults.lock().unwrap();
1569 assert!(faults.is_empty());
1570 }
1571
1572 {
1577 let mut found = 0;
1579 let finalizations = supervisor.finalizations.lock().unwrap();
1580 for i in latest..latest + activity_timeout {
1581 if finalizations.contains_key(&i) {
1582 found += 1;
1583 }
1584 }
1585 assert!(found >= activity_timeout - 2, "found: {found}");
1586 }
1587 }
1588 });
1589 }
1590
1591 #[test_traced]
1592 #[ignore]
1593 fn test_partition() {
1594 let n = 10;
1596 let required_containers = 50;
1597 let activity_timeout = 10;
1598 let skip_timeout = 5;
1599 let namespace = b"consensus".to_vec();
1600 let executor = deterministic::Runner::timed(Duration::from_secs(900));
1601 executor.start(|context| async move {
1602 let (network, mut oracle) = Network::new(
1604 context.with_label("network"),
1605 Config {
1606 max_size: 1024 * 1024,
1607 },
1608 );
1609
1610 network.start();
1612
1613 let mut schemes = Vec::new();
1615 let mut validators = Vec::new();
1616 for i in 0..n {
1617 let scheme = PrivateKey::from_seed(i as u64);
1618 let pk = scheme.public_key();
1619 schemes.push(scheme);
1620 validators.push(pk);
1621 }
1622 validators.sort();
1623 schemes.sort_by_key(|s| s.public_key());
1624 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1625 let mut registrations = register_validators(&mut oracle, &validators).await;
1626
1627 let link = Link {
1629 latency: Duration::from_millis(10),
1630 jitter: Duration::from_millis(1),
1631 success_rate: 1.0,
1632 };
1633 link_validators(&mut oracle, &validators, Action::Link(link.clone()), None).await;
1634
1635 let relay = Arc::new(mocks::relay::Relay::new());
1637 let mut supervisors = Vec::new();
1638 let mut engine_handlers = Vec::new();
1639 for scheme in schemes.iter() {
1640 let validator = scheme.public_key();
1642 let supervisor_config = mocks::supervisor::Config {
1643 namespace: namespace.clone(),
1644 participants: view_validators.clone(),
1645 };
1646 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1647 supervisor_config,
1648 );
1649 supervisors.push(supervisor.clone());
1650 let application_cfg = mocks::application::Config {
1651 hasher: Sha256::default(),
1652 relay: relay.clone(),
1653 participant: validator.clone(),
1654 propose_latency: (10.0, 5.0),
1655 verify_latency: (10.0, 5.0),
1656 };
1657 let (actor, application) = mocks::application::Application::new(
1658 context.with_label("application"),
1659 application_cfg,
1660 );
1661 actor.start();
1662 let cfg = config::Config {
1663 crypto: scheme.clone(),
1664 automaton: application.clone(),
1665 relay: application.clone(),
1666 reporter: supervisor.clone(),
1667 partition: validator.to_string(),
1668 supervisor,
1669 mailbox_size: 1024,
1670 namespace: namespace.clone(),
1671 leader_timeout: Duration::from_secs(1),
1672 notarization_timeout: Duration::from_secs(2),
1673 nullify_retry: Duration::from_secs(10),
1674 fetch_timeout: Duration::from_secs(1),
1675 activity_timeout,
1676 skip_timeout,
1677 max_fetch_count: 1,
1678 max_participants: n as usize,
1679 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1680 fetch_concurrent: 1,
1681 replay_buffer: NZUsize!(1024 * 1024),
1682 write_buffer: NZUsize!(1024 * 1024),
1683 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1684 };
1685 let (voter, resolver) = registrations
1686 .remove(&validator)
1687 .expect("validator should be registered");
1688 let engine = Engine::new(context.with_label("engine"), cfg);
1689 engine_handlers.push(engine.start(voter, resolver));
1690 }
1691
1692 let mut finalizers = Vec::new();
1694 for supervisor in supervisors.iter_mut() {
1695 let (mut latest, mut monitor) = supervisor.subscribe().await;
1696 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1697 while latest < required_containers {
1698 latest = monitor.next().await.expect("event missing");
1699 }
1700 }));
1701 }
1702 join_all(finalizers).await;
1703
1704 fn separated(n: usize, a: usize, b: usize) -> bool {
1706 let m = n / 2;
1707 (a < m && b >= m) || (a >= m && b < m)
1708 }
1709 link_validators(&mut oracle, &validators, Action::Unlink, Some(separated)).await;
1710
1711 context.sleep(Duration::from_secs(10)).await;
1713
1714 let mut finalizers = Vec::new();
1716 for supervisor in supervisors.iter_mut() {
1717 let (_, mut monitor) = supervisor.subscribe().await;
1718 finalizers.push(
1719 context
1720 .with_label("finalizer")
1721 .spawn(move |context| async move {
1722 select! {
1723 _timeout = context.sleep(Duration::from_secs(60)) => {},
1724 _done = monitor.next() => {
1725 panic!("engine should not notarize or finalize anything");
1726 }
1727 }
1728 }),
1729 );
1730 }
1731 join_all(finalizers).await;
1732
1733 link_validators(
1735 &mut oracle,
1736 &validators,
1737 Action::Link(link),
1738 Some(separated),
1739 )
1740 .await;
1741
1742 let mut finalizers = Vec::new();
1744 for supervisor in supervisors.iter_mut() {
1745 let (mut latest, mut monitor) = supervisor.subscribe().await;
1746 let required = latest + required_containers;
1747 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1748 while latest < required {
1749 latest = monitor.next().await.expect("event missing");
1750 }
1751 }));
1752 }
1753 join_all(finalizers).await;
1754
1755 for supervisor in supervisors.iter() {
1757 {
1759 let faults = supervisor.faults.lock().unwrap();
1760 assert!(faults.is_empty());
1761 }
1762 }
1763 });
1764 }
1765
1766 fn slow_and_lossy_links(seed: u64) -> String {
1767 let n = 5;
1769 let required_containers = 50;
1770 let activity_timeout = 10;
1771 let skip_timeout = 5;
1772 let namespace = b"consensus".to_vec();
1773 let cfg = deterministic::Config::new()
1774 .with_seed(seed)
1775 .with_timeout(Some(Duration::from_secs(5_000)));
1776 let executor = deterministic::Runner::new(cfg);
1777 executor.start(|context| async move {
1778 let (network, mut oracle) = Network::new(
1780 context.with_label("network"),
1781 Config {
1782 max_size: 1024 * 1024,
1783 },
1784 );
1785
1786 network.start();
1788
1789 let mut schemes = Vec::new();
1791 let mut validators = Vec::new();
1792 for i in 0..n {
1793 let scheme = PrivateKey::from_seed(i as u64);
1794 let pk = scheme.public_key();
1795 schemes.push(scheme);
1796 validators.push(pk);
1797 }
1798 validators.sort();
1799 schemes.sort_by_key(|s| s.public_key());
1800 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1801 let mut registrations = register_validators(&mut oracle, &validators).await;
1802
1803 let degraded_link = Link {
1805 latency: Duration::from_millis(200),
1806 jitter: Duration::from_millis(150),
1807 success_rate: 0.5,
1808 };
1809 link_validators(&mut oracle, &validators, Action::Link(degraded_link), None).await;
1810
1811 let relay = Arc::new(mocks::relay::Relay::new());
1813 let mut supervisors = Vec::new();
1814 let mut engine_handlers = Vec::new();
1815 for scheme in schemes.into_iter() {
1816 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1818
1819 let validator = scheme.public_key();
1821 let supervisor_config = mocks::supervisor::Config {
1822 namespace: namespace.clone(),
1823 participants: view_validators.clone(),
1824 };
1825 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1826 supervisor_config,
1827 );
1828 supervisors.push(supervisor.clone());
1829 let application_cfg = mocks::application::Config {
1830 hasher: Sha256::default(),
1831 relay: relay.clone(),
1832 participant: validator.clone(),
1833 propose_latency: (10.0, 5.0),
1834 verify_latency: (10.0, 5.0),
1835 };
1836 let (actor, application) = mocks::application::Application::new(
1837 context.with_label("application"),
1838 application_cfg,
1839 );
1840 actor.start();
1841 let cfg = config::Config {
1842 crypto: scheme,
1843 automaton: application.clone(),
1844 relay: application.clone(),
1845 reporter: supervisor.clone(),
1846 partition: validator.to_string(),
1847 supervisor,
1848 mailbox_size: 1024,
1849 namespace: namespace.clone(),
1850 leader_timeout: Duration::from_secs(1),
1851 notarization_timeout: Duration::from_secs(2),
1852 nullify_retry: Duration::from_secs(10),
1853 fetch_timeout: Duration::from_secs(1),
1854 activity_timeout,
1855 skip_timeout,
1856 max_fetch_count: 1,
1857 max_participants: n as usize,
1858 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1859 fetch_concurrent: 1,
1860 replay_buffer: NZUsize!(1024 * 1024),
1861 write_buffer: NZUsize!(1024 * 1024),
1862 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1863 };
1864 let (voter, resolver) = registrations
1865 .remove(&validator)
1866 .expect("validator should be registered");
1867 let engine = Engine::new(context.with_label("engine"), cfg);
1868 engine_handlers.push(engine.start(voter, resolver));
1869 }
1870
1871 let mut finalizers = Vec::new();
1873 for supervisor in supervisors.iter_mut() {
1874 let (mut latest, mut monitor) = supervisor.subscribe().await;
1875 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1876 while latest < required_containers {
1877 latest = monitor.next().await.expect("event missing");
1878 }
1879 }));
1880 }
1881 join_all(finalizers).await;
1882
1883 for supervisor in supervisors.iter() {
1885 {
1887 let faults = supervisor.faults.lock().unwrap();
1888 assert!(faults.is_empty());
1889 }
1890 }
1891
1892 context.auditor().state()
1893 })
1894 }
1895
1896 #[test_traced]
1897 fn test_slow_and_lossy_links() {
1898 slow_and_lossy_links(0);
1899 }
1900
1901 #[test_traced]
1902 #[ignore]
1903 fn test_determinism() {
1904 for seed in 1..6 {
1907 let state_1 = slow_and_lossy_links(seed);
1909
1910 let state_2 = slow_and_lossy_links(seed);
1912
1913 assert_eq!(state_1, state_2);
1915 }
1916 }
1917
1918 fn conflicter(seed: u64) {
1919 let n = 4;
1921 let required_containers = 50;
1922 let activity_timeout = 10;
1923 let skip_timeout = 5;
1924 let namespace = b"consensus".to_vec();
1925 let cfg = deterministic::Config::new()
1926 .with_seed(seed)
1927 .with_timeout(Some(Duration::from_secs(30)));
1928 let executor = deterministic::Runner::new(cfg);
1929 executor.start(|context| async move {
1930 let (network, mut oracle) = Network::new(
1932 context.with_label("network"),
1933 Config {
1934 max_size: 1024 * 1024,
1935 },
1936 );
1937
1938 network.start();
1940
1941 let mut schemes = Vec::new();
1943 let mut validators = Vec::new();
1944 for i in 0..n {
1945 let scheme = PrivateKey::from_seed(i as u64);
1946 let pk = scheme.public_key();
1947 schemes.push(scheme);
1948 validators.push(pk);
1949 }
1950 validators.sort();
1951 schemes.sort_by_key(|s| s.public_key());
1952 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1953 let mut registrations = register_validators(&mut oracle, &validators).await;
1954
1955 let link = Link {
1957 latency: Duration::from_millis(10),
1958 jitter: Duration::from_millis(1),
1959 success_rate: 1.0,
1960 };
1961 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1962
1963 let relay = Arc::new(mocks::relay::Relay::new());
1965 let mut supervisors = Vec::new();
1966 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1967 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1969
1970 let validator = scheme.public_key();
1972 let supervisor_config = mocks::supervisor::Config {
1973 namespace: namespace.clone(),
1974 participants: view_validators.clone(),
1975 };
1976 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1977 supervisor_config,
1978 );
1979 if idx_scheme == 0 {
1980 let cfg = mocks::conflicter::Config {
1981 crypto: scheme,
1982 supervisor,
1983 namespace: namespace.clone(),
1984 };
1985 let (voter, _) = registrations
1986 .remove(&validator)
1987 .expect("validator should be registered");
1988 let engine: mocks::conflicter::Conflicter<_, _, Sha256, _> =
1989 mocks::conflicter::Conflicter::new(
1990 context.with_label("byzantine_engine"),
1991 cfg,
1992 );
1993 engine.start(voter);
1994 } else {
1995 supervisors.push(supervisor.clone());
1996 let application_cfg = mocks::application::Config {
1997 hasher: Sha256::default(),
1998 relay: relay.clone(),
1999 participant: validator.clone(),
2000 propose_latency: (10.0, 5.0),
2001 verify_latency: (10.0, 5.0),
2002 };
2003 let (actor, application) = mocks::application::Application::new(
2004 context.with_label("application"),
2005 application_cfg,
2006 );
2007 actor.start();
2008 let cfg = config::Config {
2009 crypto: scheme,
2010 automaton: application.clone(),
2011 relay: application.clone(),
2012 reporter: supervisor.clone(),
2013 partition: validator.to_string(),
2014 supervisor,
2015 mailbox_size: 1024,
2016 namespace: namespace.clone(),
2017 leader_timeout: Duration::from_secs(1),
2018 notarization_timeout: Duration::from_secs(2),
2019 nullify_retry: Duration::from_secs(10),
2020 fetch_timeout: Duration::from_secs(1),
2021 activity_timeout,
2022 skip_timeout,
2023 max_fetch_count: 1,
2024 max_participants: n as usize,
2025 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2026 fetch_concurrent: 1,
2027 replay_buffer: NZUsize!(1024 * 1024),
2028 write_buffer: NZUsize!(1024 * 1024),
2029 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2030 };
2031 let (voter, resolver) = registrations
2032 .remove(&validator)
2033 .expect("validator should be registered");
2034 let engine = Engine::new(context.with_label("engine"), cfg);
2035 engine.start(voter, resolver);
2036 }
2037 }
2038
2039 let mut finalizers = Vec::new();
2041 for supervisor in supervisors.iter_mut() {
2042 let (mut latest, mut monitor) = supervisor.subscribe().await;
2043 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2044 while latest < required_containers {
2045 latest = monitor.next().await.expect("event missing");
2046 }
2047 }));
2048 }
2049 join_all(finalizers).await;
2050
2051 let byz = &validators[0];
2053 let mut count_conflicting_notarize = 0;
2054 let mut count_conflicting_finalize = 0;
2055 for supervisor in supervisors.iter() {
2056 {
2058 let faults = supervisor.faults.lock().unwrap();
2059 assert_eq!(faults.len(), 1);
2060 let faulter = faults.get(byz).expect("byzantine party is not faulter");
2061 for (_, faults) in faulter.iter() {
2062 for fault in faults.iter() {
2063 match fault {
2064 Activity::ConflictingNotarize(_) => {
2065 count_conflicting_notarize += 1;
2066 }
2067 Activity::ConflictingFinalize(_) => {
2068 count_conflicting_finalize += 1;
2069 }
2070 _ => panic!("unexpected fault: {fault:?}"),
2071 }
2072 }
2073 }
2074 }
2075 }
2076 assert!(count_conflicting_notarize > 0);
2077 assert!(count_conflicting_finalize > 0);
2078 });
2079 }
2080
2081 #[test_traced]
2082 #[ignore]
2083 fn test_conflicter() {
2084 for seed in 0..5 {
2085 conflicter(seed);
2086 }
2087 }
2088
2089 fn nuller(seed: u64) {
2090 let n = 4;
2092 let required_containers = 50;
2093 let activity_timeout = 10;
2094 let skip_timeout = 5;
2095 let namespace = b"consensus".to_vec();
2096 let cfg = deterministic::Config::new()
2097 .with_seed(seed)
2098 .with_timeout(Some(Duration::from_secs(30)));
2099 let executor = deterministic::Runner::new(cfg);
2100 executor.start(|context| async move {
2101 let (network, mut oracle) = Network::new(
2103 context.with_label("network"),
2104 Config {
2105 max_size: 1024 * 1024,
2106 },
2107 );
2108
2109 network.start();
2111
2112 let mut schemes = Vec::new();
2114 let mut validators = Vec::new();
2115 for i in 0..n {
2116 let scheme = PrivateKey::from_seed(i as u64);
2117 let pk = scheme.public_key();
2118 schemes.push(scheme);
2119 validators.push(pk);
2120 }
2121 validators.sort();
2122 schemes.sort_by_key(|s| s.public_key());
2123 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
2124 let mut registrations = register_validators(&mut oracle, &validators).await;
2125
2126 let link = Link {
2128 latency: Duration::from_millis(10),
2129 jitter: Duration::from_millis(1),
2130 success_rate: 1.0,
2131 };
2132 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2133
2134 let relay = Arc::new(mocks::relay::Relay::new());
2136 let mut supervisors = Vec::new();
2137 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2138 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2140
2141 let validator = scheme.public_key();
2143 let supervisor_config = mocks::supervisor::Config {
2144 namespace: namespace.clone(),
2145 participants: view_validators.clone(),
2146 };
2147 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
2148 supervisor_config,
2149 );
2150 if idx_scheme == 0 {
2151 let cfg = mocks::nuller::Config {
2152 crypto: scheme,
2153 supervisor,
2154 namespace: namespace.clone(),
2155 };
2156 let (voter, _) = registrations
2157 .remove(&validator)
2158 .expect("validator should be registered");
2159 let engine: mocks::nuller::Nuller<_, _, Sha256, _> =
2160 mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
2161 engine.start(voter);
2162 } else {
2163 supervisors.push(supervisor.clone());
2164 let application_cfg = mocks::application::Config {
2165 hasher: Sha256::default(),
2166 relay: relay.clone(),
2167 participant: validator.clone(),
2168 propose_latency: (10.0, 5.0),
2169 verify_latency: (10.0, 5.0),
2170 };
2171 let (actor, application) = mocks::application::Application::new(
2172 context.with_label("application"),
2173 application_cfg,
2174 );
2175 actor.start();
2176 let cfg = config::Config {
2177 crypto: scheme,
2178 automaton: application.clone(),
2179 relay: application.clone(),
2180 reporter: supervisor.clone(),
2181 partition: validator.to_string(),
2182 supervisor,
2183 mailbox_size: 1024,
2184 namespace: namespace.clone(),
2185 leader_timeout: Duration::from_secs(1),
2186 notarization_timeout: Duration::from_secs(2),
2187 nullify_retry: Duration::from_secs(10),
2188 fetch_timeout: Duration::from_secs(1),
2189 activity_timeout,
2190 skip_timeout,
2191 max_fetch_count: 1,
2192 max_participants: n as usize,
2193 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2194 fetch_concurrent: 1,
2195 replay_buffer: NZUsize!(1024 * 1024),
2196 write_buffer: NZUsize!(1024 * 1024),
2197 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2198 };
2199 let (voter, resolver) = registrations
2200 .remove(&validator)
2201 .expect("validator should be registered");
2202 let engine = Engine::new(context.with_label("engine"), cfg);
2203 engine.start(voter, resolver);
2204 }
2205 }
2206
2207 let mut finalizers = Vec::new();
2209 for supervisor in supervisors.iter_mut() {
2210 let (mut latest, mut monitor) = supervisor.subscribe().await;
2211 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2212 while latest < required_containers {
2213 latest = monitor.next().await.expect("event missing");
2214 }
2215 }));
2216 }
2217 join_all(finalizers).await;
2218
2219 let byz = &validators[0];
2221 let mut count_nullify_and_finalize = 0;
2222 for supervisor in supervisors.iter() {
2223 {
2225 let faults = supervisor.faults.lock().unwrap();
2226 assert_eq!(faults.len(), 1);
2227 let faulter = faults.get(byz).expect("byzantine party is not faulter");
2228 for (_, faults) in faulter.iter() {
2229 for fault in faults.iter() {
2230 match fault {
2231 Activity::NullifyFinalize(_) => {
2232 count_nullify_and_finalize += 1;
2233 }
2234 _ => panic!("unexpected fault: {fault:?}"),
2235 }
2236 }
2237 }
2238 }
2239 }
2240 assert!(count_nullify_and_finalize > 0);
2241 });
2242 }
2243
2244 #[test_traced]
2245 #[ignore]
2246 fn test_nuller() {
2247 for seed in 0..5 {
2248 nuller(seed);
2249 }
2250 }
2251
2252 fn outdated(seed: u64) {
2253 let n = 4;
2255 let required_containers = 100;
2256 let activity_timeout = 10;
2257 let skip_timeout = 5;
2258 let namespace = b"consensus".to_vec();
2259 let cfg = deterministic::Config::new()
2260 .with_seed(seed)
2261 .with_timeout(Some(Duration::from_secs(30)));
2262 let executor = deterministic::Runner::new(cfg);
2263 executor.start(|context| async move {
2264 let (network, mut oracle) = Network::new(
2266 context.with_label("network"),
2267 Config {
2268 max_size: 1024 * 1024,
2269 },
2270 );
2271
2272 network.start();
2274
2275 let mut schemes = Vec::new();
2277 let mut validators = Vec::new();
2278 for i in 0..n {
2279 let scheme = PrivateKey::from_seed(i as u64);
2280 let pk = scheme.public_key();
2281 schemes.push(scheme);
2282 validators.push(pk);
2283 }
2284 validators.sort();
2285 schemes.sort_by_key(|s| s.public_key());
2286 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
2287 let mut registrations = register_validators(&mut oracle, &validators).await;
2288
2289 let link = Link {
2291 latency: Duration::from_millis(10),
2292 jitter: Duration::from_millis(1),
2293 success_rate: 1.0,
2294 };
2295 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2296
2297 let relay = Arc::new(mocks::relay::Relay::new());
2299 let mut supervisors = Vec::new();
2300 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2301 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2303
2304 let validator = scheme.public_key();
2306 let supervisor_config = mocks::supervisor::Config {
2307 namespace: namespace.clone(),
2308 participants: view_validators.clone(),
2309 };
2310 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
2311 supervisor_config,
2312 );
2313 if idx_scheme == 0 {
2314 let cfg = mocks::outdated::Config {
2315 crypto: scheme,
2316 supervisor,
2317 namespace: namespace.clone(),
2318 view_delta: activity_timeout * 4,
2319 };
2320 let (voter, _) = registrations
2321 .remove(&validator)
2322 .expect("validator should be registered");
2323 let engine: mocks::outdated::Outdated<_, _, Sha256, _> =
2324 mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
2325 engine.start(voter);
2326 } else {
2327 supervisors.push(supervisor.clone());
2328 let application_cfg = mocks::application::Config {
2329 hasher: Sha256::default(),
2330 relay: relay.clone(),
2331 participant: validator.clone(),
2332 propose_latency: (10.0, 5.0),
2333 verify_latency: (10.0, 5.0),
2334 };
2335 let (actor, application) = mocks::application::Application::new(
2336 context.with_label("application"),
2337 application_cfg,
2338 );
2339 actor.start();
2340 let cfg = config::Config {
2341 crypto: scheme,
2342 automaton: application.clone(),
2343 relay: application.clone(),
2344 reporter: supervisor.clone(),
2345 supervisor,
2346 partition: validator.to_string(),
2347 mailbox_size: 1024,
2348 namespace: namespace.clone(),
2349 leader_timeout: Duration::from_secs(1),
2350 notarization_timeout: Duration::from_secs(2),
2351 nullify_retry: Duration::from_secs(10),
2352 fetch_timeout: Duration::from_secs(1),
2353 activity_timeout,
2354 skip_timeout,
2355 max_fetch_count: 1,
2356 max_participants: n as usize,
2357 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2358 fetch_concurrent: 1,
2359 replay_buffer: NZUsize!(1024 * 1024),
2360 write_buffer: NZUsize!(1024 * 1024),
2361 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2362 };
2363 let (voter, resolver) = registrations
2364 .remove(&validator)
2365 .expect("validator should be registered");
2366 let engine = Engine::new(context.with_label("engine"), cfg);
2367 engine.start(voter, resolver);
2368 }
2369 }
2370
2371 let mut finalizers = Vec::new();
2373 for supervisor in supervisors.iter_mut() {
2374 let (mut latest, mut monitor) = supervisor.subscribe().await;
2375 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2376 while latest < required_containers {
2377 latest = monitor.next().await.expect("event missing");
2378 }
2379 }));
2380 }
2381 join_all(finalizers).await;
2382
2383 for supervisor in supervisors.iter() {
2385 {
2386 let faults = supervisor.faults.lock().unwrap();
2387 assert!(faults.is_empty());
2388 }
2389 }
2390 });
2391 }
2392
2393 #[test_traced]
2394 #[ignore]
2395 fn test_outdated() {
2396 for seed in 0..5 {
2397 outdated(seed);
2398 }
2399 }
2400
2401 #[test_traced]
2402 #[ignore]
2403 fn test_1k() {
2404 let n = 10;
2406 let required_containers = 1_000;
2407 let activity_timeout = 10;
2408 let skip_timeout = 5;
2409 let namespace = b"consensus".to_vec();
2410 let cfg = deterministic::Config::new();
2411 let executor = deterministic::Runner::new(cfg);
2412 executor.start(|context| async move {
2413 let (network, mut oracle) = Network::new(
2415 context.with_label("network"),
2416 Config {
2417 max_size: 1024 * 1024,
2418 },
2419 );
2420
2421 network.start();
2423
2424 let mut schemes = Vec::new();
2426 let mut validators = Vec::new();
2427 for i in 0..n {
2428 let scheme = PrivateKey::from_seed(i as u64);
2429 let pk = scheme.public_key();
2430 schemes.push(scheme);
2431 validators.push(pk);
2432 }
2433 validators.sort();
2434 schemes.sort_by_key(|s| s.public_key());
2435 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
2436 let mut registrations = register_validators(&mut oracle, &validators).await;
2437
2438 let link = Link {
2440 latency: Duration::from_millis(80),
2441 jitter: Duration::from_millis(10),
2442 success_rate: 0.98,
2443 };
2444 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2445
2446 let relay = Arc::new(mocks::relay::Relay::new());
2448 let mut supervisors = Vec::new();
2449 let mut engine_handlers = Vec::new();
2450 for scheme in schemes.into_iter() {
2451 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2453
2454 let validator = scheme.public_key();
2456 let supervisor_config = mocks::supervisor::Config {
2457 namespace: namespace.clone(),
2458 participants: view_validators.clone(),
2459 };
2460 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
2461 supervisor_config,
2462 );
2463 supervisors.push(supervisor.clone());
2464 let application_cfg = mocks::application::Config {
2465 hasher: Sha256::default(),
2466 relay: relay.clone(),
2467 participant: validator.clone(),
2468 propose_latency: (100.0, 50.0),
2469 verify_latency: (50.0, 40.0),
2470 };
2471 let (actor, application) = mocks::application::Application::new(
2472 context.with_label("application"),
2473 application_cfg,
2474 );
2475 actor.start();
2476 let cfg = config::Config {
2477 crypto: scheme,
2478 automaton: application.clone(),
2479 relay: application.clone(),
2480 reporter: supervisor.clone(),
2481 partition: validator.to_string(),
2482 supervisor,
2483 mailbox_size: 1024,
2484 namespace: namespace.clone(),
2485 leader_timeout: Duration::from_secs(1),
2486 notarization_timeout: Duration::from_secs(2),
2487 nullify_retry: Duration::from_secs(10),
2488 fetch_timeout: Duration::from_secs(1),
2489 activity_timeout,
2490 skip_timeout,
2491 max_fetch_count: 1,
2492 max_participants: n as usize,
2493 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2494 fetch_concurrent: 1,
2495 replay_buffer: NZUsize!(1024 * 1024),
2496 write_buffer: NZUsize!(1024 * 1024),
2497 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2498 };
2499 let (voter, resolver) = registrations
2500 .remove(&validator)
2501 .expect("validator should be registered");
2502 let engine = Engine::new(context.with_label("engine"), cfg);
2503 engine_handlers.push(engine.start(voter, resolver));
2504 }
2505
2506 let mut finalizers = Vec::new();
2508 for supervisor in supervisors.iter_mut() {
2509 let (mut latest, mut monitor) = supervisor.subscribe().await;
2510 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2511 while latest < required_containers {
2512 latest = monitor.next().await.expect("event missing");
2513 }
2514 }));
2515 }
2516 join_all(finalizers).await;
2517
2518 for supervisor in supervisors.iter() {
2520 {
2522 let faults = supervisor.faults.lock().unwrap();
2523 assert!(faults.is_empty());
2524 }
2525 }
2526 })
2527 }
2528}