1pub mod types;
118
119cfg_if::cfg_if! {
120 if #[cfg(not(target_arch = "wasm32"))] {
121 mod actors;
122 mod config;
123 pub use config::Config;
124 mod engine;
125 pub use engine::Engine;
126 mod metrics;
127 }
128}
129
130#[cfg(test)]
131pub mod mocks;
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136 use crate::Monitor;
137 use commonware_cryptography::{
138 ed25519::{PrivateKey, PublicKey},
139 sha256::Digest as Sha256Digest,
140 PrivateKeyExt as _, PublicKey as CPublicKey, Sha256, Signer as _,
141 };
142 use commonware_macros::{select, test_traced};
143 use commonware_p2p::simulated::{Config, Link, Network, Oracle, Receiver, Sender};
144 use commonware_runtime::{buffer::PoolRef, deterministic, Clock, Metrics, Runner, Spawner};
145 use commonware_utils::{quorum, NZUsize, NZU32};
146 use engine::Engine;
147 use futures::{future::join_all, StreamExt};
148 use governor::Quota;
149 use rand::Rng as _;
150 use std::{
151 collections::{BTreeMap, HashMap},
152 num::NonZeroUsize,
153 sync::{Arc, Mutex},
154 time::Duration,
155 };
156 use tracing::debug;
157 use types::Activity;
158
159 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
160 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
161
162 async fn register_validators<P: CPublicKey>(
164 oracle: &mut Oracle<P>,
165 validators: &[P],
166 ) -> HashMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))> {
167 let mut registrations = HashMap::new();
168 for validator in validators.iter() {
169 let (voter_sender, voter_receiver) =
170 oracle.register(validator.clone(), 0).await.unwrap();
171 let (resolver_sender, resolver_receiver) =
172 oracle.register(validator.clone(), 1).await.unwrap();
173 registrations.insert(
174 validator.clone(),
175 (
176 (voter_sender, voter_receiver),
177 (resolver_sender, resolver_receiver),
178 ),
179 );
180 }
181 registrations
182 }
183
184 enum Action {
186 Link(Link),
187 Update(Link), Unlink,
189 }
190
191 async fn link_validators<P: CPublicKey>(
197 oracle: &mut Oracle<P>,
198 validators: &[P],
199 action: Action,
200 restrict_to: Option<fn(usize, usize, usize) -> bool>,
201 ) {
202 for (i1, v1) in validators.iter().enumerate() {
203 for (i2, v2) in validators.iter().enumerate() {
204 if v2 == v1 {
206 continue;
207 }
208
209 if let Some(f) = restrict_to {
211 if !f(validators.len(), i1, i2) {
212 continue;
213 }
214 }
215
216 match action {
218 Action::Update(_) | Action::Unlink => {
219 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
220 }
221 _ => {}
222 }
223
224 match action {
226 Action::Link(ref link) | Action::Update(ref link) => {
227 oracle
228 .add_link(v1.clone(), v2.clone(), link.clone())
229 .await
230 .unwrap();
231 }
232 _ => {}
233 }
234 }
235 }
236 }
237
238 #[test_traced]
239 fn test_all_online() {
240 let n = 5;
242 let threshold = quorum(n);
243 let max_exceptions = 10;
244 let required_containers = 100;
245 let activity_timeout = 10;
246 let skip_timeout = 5;
247 let namespace = b"consensus".to_vec();
248 let executor = deterministic::Runner::timed(Duration::from_secs(30));
249 executor.start(|context| async move {
250 let (network, mut oracle) = Network::new(
252 context.with_label("network"),
253 Config {
254 max_size: 1024 * 1024,
255 },
256 );
257
258 network.start();
260
261 let mut schemes = Vec::new();
263 let mut validators = Vec::new();
264 for i in 0..n {
265 let scheme = PrivateKey::from_seed(i as u64);
266 let pk = scheme.public_key();
267 schemes.push(scheme);
268 validators.push(pk);
269 }
270 validators.sort();
271 schemes.sort_by_key(|s| s.public_key());
272 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
273 let mut registrations = register_validators(&mut oracle, &validators).await;
274
275 let link = Link {
277 latency: 10.0,
278 jitter: 1.0,
279 success_rate: 1.0,
280 };
281 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
282
283 let relay = Arc::new(mocks::relay::Relay::new());
285 let mut supervisors = Vec::new();
286 let mut engine_handlers = Vec::new();
287 for scheme in schemes.into_iter() {
288 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
290
291 let validator = scheme.public_key();
293 let supervisor_config = mocks::supervisor::Config {
294 namespace: namespace.clone(),
295 participants: view_validators.clone(),
296 };
297 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
298 supervisor_config,
299 );
300 supervisors.push(supervisor.clone());
301 let application_cfg = mocks::application::Config {
302 hasher: Sha256::default(),
303 relay: relay.clone(),
304 participant: validator.clone(),
305 propose_latency: (10.0, 5.0),
306 verify_latency: (10.0, 5.0),
307 };
308 let (actor, application) = mocks::application::Application::new(
309 context.with_label("application"),
310 application_cfg,
311 );
312 actor.start();
313 let cfg = config::Config {
314 crypto: scheme,
315 automaton: application.clone(),
316 relay: application.clone(),
317 reporter: supervisor.clone(),
318 supervisor,
319 partition: validator.to_string(),
320 compression: Some(3),
321 mailbox_size: 1024,
322 namespace: namespace.clone(),
323 leader_timeout: Duration::from_secs(1),
324 notarization_timeout: Duration::from_secs(2),
325 nullify_retry: Duration::from_secs(10),
326 fetch_timeout: Duration::from_secs(1),
327 activity_timeout,
328 skip_timeout,
329 max_fetch_count: 1,
330 max_participants: n as usize,
331 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
332 fetch_concurrent: 1,
333 replay_buffer: NZUsize!(1024 * 1024),
334 write_buffer: NZUsize!(1024 * 1024),
335 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
336 };
337 let engine = Engine::new(context.with_label("engine"), cfg);
338 let (voter, resolver) = registrations
339 .remove(&validator)
340 .expect("validator should be registered");
341 engine_handlers.push(engine.start(voter, resolver));
342 }
343
344 let mut finalizers = Vec::new();
346 for supervisor in supervisors.iter_mut() {
347 let (mut latest, mut monitor) = supervisor.subscribe().await;
348 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
349 while latest < required_containers {
350 latest = monitor.next().await.expect("event missing");
351 }
352 }));
353 }
354 join_all(finalizers).await;
355
356 let latest_complete = required_containers - activity_timeout;
358 for supervisor in supervisors.iter() {
359 {
361 let faults = supervisor.faults.lock().unwrap();
362 assert!(faults.is_empty());
363 }
364
365 let mut exceptions = 0;
367 let mut notarized = HashMap::new();
368 let mut finalized = HashMap::new();
369 {
370 let notarizes = supervisor.notarizes.lock().unwrap();
371 for view in 1..latest_complete {
372 let Some(payloads) = notarizes.get(&view) else {
374 exceptions += 1;
375 continue;
376 };
377 if payloads.len() > 1 {
378 panic!("view: {view}");
379 }
380 let (digest, notarizers) = payloads.iter().next().unwrap();
381 notarized.insert(view, *digest);
382
383 if notarizers.len() < threshold as usize {
384 panic!("view: {view}");
387 }
388 if notarizers.len() != n as usize {
389 exceptions += 1;
390 }
391 }
392 }
393 {
394 let notarizations = supervisor.notarizations.lock().unwrap();
395 for view in 1..latest_complete {
396 let Some(notarization) = notarizations.get(&view) else {
398 exceptions += 1;
399 continue;
400 };
401 let Some(digest) = notarized.get(&view) else {
402 exceptions += 1;
403 continue;
404 };
405 assert_eq!(¬arization.proposal.payload, digest);
406 }
407 }
408 {
409 let finalizes = supervisor.finalizes.lock().unwrap();
410 for view in 1..latest_complete {
411 let Some(payloads) = finalizes.get(&view) else {
413 exceptions += 1;
414 continue;
415 };
416 if payloads.len() > 1 {
417 panic!("view: {view}");
418 }
419 let (digest, finalizers) = payloads.iter().next().unwrap();
420 finalized.insert(view, *digest);
421
422 if view > latest_complete {
424 continue;
425 }
426
427 if finalizers.len() < threshold as usize {
429 panic!("view: {view}");
432 }
433 if finalizers.len() != n as usize {
434 exceptions += 1;
435 }
436
437 let nullifies = supervisor.nullifies.lock().unwrap();
439 let Some(nullifies) = nullifies.get(&view) else {
440 continue;
441 };
442 for (_, finalizers) in payloads.iter() {
443 for finalizer in finalizers.iter() {
444 if nullifies.contains(finalizer) {
445 panic!("should not nullify and finalize at same view");
446 }
447 }
448 }
449 }
450 }
451 {
452 let finalizations = supervisor.finalizations.lock().unwrap();
453 for view in 1..latest_complete {
454 let Some(finalization) = finalizations.get(&view) else {
456 exceptions += 1;
457 continue;
458 };
459 let Some(digest) = finalized.get(&view) else {
460 exceptions += 1;
461 continue;
462 };
463 assert_eq!(&finalization.proposal.payload, digest);
464 }
465 }
466
467 assert!(exceptions <= max_exceptions);
469 }
470 });
471 }
472
473 #[test_traced]
474 fn test_unclean_shutdown() {
475 let n = 5;
477 let required_containers = 100;
478 let activity_timeout = 10;
479 let skip_timeout = 5;
480 let namespace = b"consensus".to_vec();
481
482 let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
484 let supervised = Arc::new(Mutex::new(Vec::new()));
485 let mut prev_ctx = None;
486
487 loop {
488 let namespace = namespace.clone();
489 let shutdowns = shutdowns.clone();
490 let supervised = supervised.clone();
491
492 let f = |mut context: deterministic::Context| async move {
493 let (network, mut oracle) = Network::new(
495 context.with_label("network"),
496 Config {
497 max_size: 1024 * 1024,
498 },
499 );
500
501 network.start();
503
504 let mut schemes = Vec::new();
506 let mut validators = Vec::new();
507 for i in 0..n {
508 let scheme = PrivateKey::from_seed(i as u64);
509 let pk = scheme.public_key();
510 schemes.push(scheme);
511 validators.push(pk);
512 }
513 validators.sort();
514 schemes.sort_by_key(|s| s.public_key());
515 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
516 let mut registrations = register_validators(&mut oracle, &validators).await;
517
518 let link = Link {
520 latency: 50.0,
521 jitter: 50.0,
522 success_rate: 1.0,
523 };
524 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
525
526 let relay = Arc::new(mocks::relay::Relay::new());
528 let mut supervisors = HashMap::new();
529 let mut engine_handlers = Vec::new();
530 for scheme in schemes.into_iter() {
531 let context = context
533 .clone()
534 .with_label(&format!("validator-{}", scheme.public_key()));
535
536 let validator = scheme.public_key();
538 let supervisor_config = mocks::supervisor::Config {
539 namespace: namespace.clone(),
540 participants: view_validators.clone(),
541 };
542 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
543 supervisor_config,
544 );
545 supervisors.insert(validator.clone(), supervisor.clone());
546 let application_cfg = mocks::application::Config {
547 hasher: Sha256::default(),
548 relay: relay.clone(),
549 participant: validator.clone(),
550 propose_latency: (10.0, 5.0),
551 verify_latency: (10.0, 5.0),
552 };
553 let (actor, application) = mocks::application::Application::new(
554 context.with_label("application"),
555 application_cfg,
556 );
557 actor.start();
558 let cfg = config::Config {
559 crypto: scheme,
560 automaton: application.clone(),
561 relay: application.clone(),
562 reporter: supervisor.clone(),
563 supervisor,
564 partition: validator.to_string(),
565 compression: Some(3),
566 mailbox_size: 1024,
567 namespace: namespace.clone(),
568 leader_timeout: Duration::from_secs(1),
569 notarization_timeout: Duration::from_secs(2),
570 nullify_retry: Duration::from_secs(10),
571 fetch_timeout: Duration::from_secs(1),
572 activity_timeout,
573 skip_timeout,
574 max_participants: n as usize,
575 max_fetch_count: 1,
576 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
577 fetch_concurrent: 1,
578 replay_buffer: NZUsize!(1024 * 1024),
579 write_buffer: NZUsize!(1024 * 1024),
580 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
581 };
582 let engine = Engine::new(context.with_label("engine"), cfg);
583 let (voter_network, resolver_network) = registrations
584 .remove(&validator)
585 .expect("validator should be registered");
586 engine_handlers.push(engine.start(voter_network, resolver_network));
587 }
588
589 let mut finalizers = Vec::new();
591 for (_, supervisor) in supervisors.iter_mut() {
592 let (mut latest, mut monitor) = supervisor.subscribe().await;
593 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
594 while latest < required_containers {
595 latest = monitor.next().await.expect("event missing");
596 }
597 }));
598 }
599
600 let wait =
602 context.gen_range(Duration::from_millis(10)..Duration::from_millis(2_000));
603 select! {
604 _ = context.sleep(wait) => {
605 {
607 let mut shutdowns = shutdowns.lock().unwrap();
608 debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
609 *shutdowns += 1;
610 }
611 supervised.lock().unwrap().push(supervisors);
612 (false,context)
613 },
614 _ = join_all(finalizers) => {
615 let supervised = supervised.lock().unwrap();
617 for supervisors in supervised.iter() {
618 for (_, supervisor) in supervisors.iter() {
619 let faults = supervisor.faults.lock().unwrap();
620 assert!(faults.is_empty());
621 }
622 }
623 (true,context)
624 }
625 }
626 };
627
628 let (complete, context) = if let Some(prev_ctx) = prev_ctx {
629 deterministic::Runner::from(prev_ctx)
630 } else {
631 deterministic::Runner::timed(Duration::from_secs(30))
632 }
633 .start(f);
634
635 if complete {
637 break;
638 }
639
640 prev_ctx = Some(context.recover());
641 }
642 }
643
644 #[test_traced]
645 fn test_backfill() {
646 let n = 4;
648 let required_containers = 100;
649 let activity_timeout = 10;
650 let skip_timeout = 5;
651 let namespace = b"consensus".to_vec();
652 let executor = deterministic::Runner::timed(Duration::from_secs(360));
653 executor.start(|context| async move {
654 let (network, mut oracle) = Network::new(
656 context.with_label("network"),
657 Config {
658 max_size: 1024 * 1024,
659 },
660 );
661
662 network.start();
664
665 let mut schemes = Vec::new();
667 let mut validators = Vec::new();
668 for i in 0..n {
669 let scheme = PrivateKey::from_seed(i as u64);
670 let pk = scheme.public_key();
671 schemes.push(scheme);
672 validators.push(pk);
673 }
674 validators.sort();
675 schemes.sort_by_key(|s| s.public_key());
676 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
677 let mut registrations = register_validators(&mut oracle, &validators).await;
678
679 let link = Link {
681 latency: 10.0,
682 jitter: 1.0,
683 success_rate: 1.0,
684 };
685 link_validators(
686 &mut oracle,
687 &validators,
688 Action::Link(link),
689 Some(|_, i, j| ![i, j].contains(&0usize)),
690 )
691 .await;
692
693 let relay = Arc::new(mocks::relay::Relay::new());
695 let mut supervisors = Vec::new();
696 let mut engine_handlers = Vec::new();
697 for (idx_scheme, scheme) in schemes.iter().enumerate() {
698 if idx_scheme == 0 {
700 continue;
701 }
702
703 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
705
706 let validator = scheme.public_key();
708 let supervisor_config = mocks::supervisor::Config {
709 namespace: namespace.clone(),
710 participants: view_validators.clone(),
711 };
712 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
713 supervisor_config,
714 );
715 supervisors.push(supervisor.clone());
716 let application_cfg = mocks::application::Config {
717 hasher: Sha256::default(),
718 relay: relay.clone(),
719 participant: validator.clone(),
720 propose_latency: (10.0, 5.0),
721 verify_latency: (10.0, 5.0),
722 };
723 let (actor, application) = mocks::application::Application::new(
724 context.with_label("application"),
725 application_cfg,
726 );
727 actor.start();
728 let cfg = config::Config {
729 crypto: scheme.clone(),
730 automaton: application.clone(),
731 relay: application.clone(),
732 reporter: supervisor.clone(),
733 supervisor,
734 partition: validator.to_string(),
735 compression: Some(3),
736 mailbox_size: 1024,
737 namespace: namespace.clone(),
738 leader_timeout: Duration::from_secs(1),
739 notarization_timeout: Duration::from_secs(2),
740 nullify_retry: Duration::from_secs(10),
741 fetch_timeout: Duration::from_secs(1),
742 activity_timeout,
743 skip_timeout,
744 max_fetch_count: 1, max_participants: n as usize,
746 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
747 fetch_concurrent: 1,
748 replay_buffer: NZUsize!(1024 * 1024),
749 write_buffer: NZUsize!(1024 * 1024),
750 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
751 };
752 let (voter, resolver) = registrations
753 .remove(&validator)
754 .expect("validator should be registered");
755 let engine = Engine::new(context.with_label("engine"), cfg);
756 engine_handlers.push(engine.start(voter, resolver));
757 }
758
759 let mut finalizers = Vec::new();
761 for supervisor in supervisors.iter_mut() {
762 let (mut latest, mut monitor) = supervisor.subscribe().await;
763 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
764 while latest < required_containers {
765 latest = monitor.next().await.expect("event missing");
766 }
767 }));
768 }
769 join_all(finalizers).await;
770
771 let link = Link {
773 latency: 3_000.0,
774 jitter: 0.0,
775 success_rate: 1.0,
776 };
777 link_validators(
778 &mut oracle,
779 &validators,
780 Action::Update(link.clone()),
781 Some(|_, i, j| ![i, j].contains(&0usize)),
782 )
783 .await;
784
785 context.sleep(Duration::from_secs(120)).await;
787
788 link_validators(
790 &mut oracle,
791 &validators,
792 Action::Unlink,
793 Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
794 )
795 .await;
796
797 let scheme = schemes[0].clone();
799 let validator = scheme.public_key();
800
801 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
803
804 link_validators(
806 &mut oracle,
807 &validators,
808 Action::Link(link),
809 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
810 )
811 .await;
812
813 let link = Link {
815 latency: 10.0,
816 jitter: 2.5,
817 success_rate: 1.0,
818 };
819 link_validators(
820 &mut oracle,
821 &validators,
822 Action::Update(link),
823 Some(|_, i, j| ![i, j].contains(&1usize)),
824 )
825 .await;
826
827 let supervisor_config = mocks::supervisor::Config {
829 namespace: namespace.clone(),
830 participants: view_validators.clone(),
831 };
832 let mut supervisor =
833 mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(supervisor_config);
834 supervisors.push(supervisor.clone());
835 let application_cfg = mocks::application::Config {
836 hasher: Sha256::default(),
837 relay: relay.clone(),
838 participant: validator.clone(),
839 propose_latency: (10.0, 5.0),
840 verify_latency: (10.0, 5.0),
841 };
842 let (actor, application) = mocks::application::Application::new(
843 context.with_label("application"),
844 application_cfg,
845 );
846 actor.start();
847 let cfg = config::Config {
848 crypto: scheme,
849 automaton: application.clone(),
850 relay: application.clone(),
851 reporter: supervisor.clone(),
852 supervisor: supervisor.clone(),
853 partition: validator.to_string(),
854 compression: Some(3),
855 mailbox_size: 1024,
856 namespace: namespace.clone(),
857 leader_timeout: Duration::from_secs(1),
858 notarization_timeout: Duration::from_secs(2),
859 nullify_retry: Duration::from_secs(10),
860 fetch_timeout: Duration::from_secs(1),
861 activity_timeout,
862 skip_timeout,
863 max_fetch_count: 1,
864 max_participants: n as usize,
865 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
866 fetch_concurrent: 1,
867 replay_buffer: NZUsize!(1024 * 1024),
868 write_buffer: NZUsize!(1024 * 1024),
869 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
870 };
871 let (voter, resolver) = registrations
872 .remove(&validator)
873 .expect("validator should be registered");
874 let engine = Engine::new(context.with_label("engine"), cfg);
875 engine_handlers.push(engine.start(voter, resolver));
876
877 let (mut latest, mut monitor) = supervisor.subscribe().await;
879 while latest < required_containers {
880 latest = monitor.next().await.expect("event missing");
881 }
882 });
883 }
884
885 #[test_traced]
886 fn test_one_offline() {
887 let n = 5;
889 let threshold = quorum(n);
890 let required_containers = 100;
891 let activity_timeout = 10;
892 let skip_timeout = 5;
893 let namespace = b"consensus".to_vec();
894 let executor = deterministic::Runner::timed(Duration::from_secs(30));
895 executor.start(|context| async move {
896 let (network, mut oracle) = Network::new(
898 context.with_label("network"),
899 Config {
900 max_size: 1024 * 1024,
901 },
902 );
903
904 network.start();
906
907 let mut schemes = Vec::new();
909 let mut validators = Vec::new();
910 for i in 0..n {
911 let scheme = PrivateKey::from_seed(i as u64);
912 let pk = scheme.public_key();
913 schemes.push(scheme);
914 validators.push(pk);
915 }
916 validators.sort();
917 schemes.sort_by_key(|s| s.public_key());
918 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
919 let mut registrations = register_validators(&mut oracle, &validators).await;
920
921 let link = Link {
923 latency: 10.0,
924 jitter: 1.0,
925 success_rate: 1.0,
926 };
927 link_validators(
928 &mut oracle,
929 &validators,
930 Action::Link(link),
931 Some(|_, i, j| ![i, j].contains(&0usize)),
932 )
933 .await;
934
935 let relay = Arc::new(mocks::relay::Relay::new());
937 let mut supervisors = Vec::new();
938 let mut engine_handlers = Vec::new();
939 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
940 if idx_scheme == 0 {
942 continue;
943 }
944
945 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
947
948 let validator = scheme.public_key();
950 let supervisor_config = mocks::supervisor::Config {
951 namespace: namespace.clone(),
952 participants: view_validators.clone(),
953 };
954 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
955 supervisor_config,
956 );
957 supervisors.push(supervisor.clone());
958 let application_cfg = mocks::application::Config {
959 hasher: Sha256::default(),
960 relay: relay.clone(),
961 participant: validator.clone(),
962 propose_latency: (10.0, 5.0),
963 verify_latency: (10.0, 5.0),
964 };
965 let (actor, application) = mocks::application::Application::new(
966 context.with_label("application"),
967 application_cfg,
968 );
969 actor.start();
970 let cfg = config::Config {
971 crypto: scheme,
972 automaton: application.clone(),
973 relay: application.clone(),
974 reporter: supervisor.clone(),
975 partition: validator.to_string(),
976 compression: Some(3),
977 supervisor,
978 mailbox_size: 1024,
979 namespace: namespace.clone(),
980 leader_timeout: Duration::from_secs(1),
981 notarization_timeout: Duration::from_secs(2),
982 nullify_retry: Duration::from_secs(10),
983 fetch_timeout: Duration::from_secs(1),
984 activity_timeout,
985 skip_timeout,
986 max_participants: n as usize,
987 max_fetch_count: 1,
988 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
989 fetch_concurrent: 1,
990 replay_buffer: NZUsize!(1024 * 1024),
991 write_buffer: NZUsize!(1024 * 1024),
992 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
993 };
994 let (voter, resolver) = registrations
995 .remove(&validator)
996 .expect("validator should be registered");
997 let engine = Engine::new(context.with_label("engine"), cfg);
998 engine_handlers.push(engine.start(voter, resolver));
999 }
1000
1001 let mut finalizers = Vec::new();
1003 for supervisor in supervisors.iter_mut() {
1004 let (mut latest, mut monitor) = supervisor.subscribe().await;
1005 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1006 while latest < required_containers {
1007 latest = monitor.next().await.expect("event missing");
1008 }
1009 }));
1010 }
1011 join_all(finalizers).await;
1012
1013 let offline = &validators[0];
1015 for supervisor in supervisors.iter() {
1016 {
1018 let faults = supervisor.faults.lock().unwrap();
1019 assert!(faults.is_empty());
1020 }
1021
1022 {
1024 let notarizes = supervisor.notarizes.lock().unwrap();
1025 for (view, payloads) in notarizes.iter() {
1026 for (_, participants) in payloads.iter() {
1027 if participants.contains(offline) {
1028 panic!("view: {view}");
1029 }
1030 }
1031 }
1032 }
1033 {
1034 let nullifies = supervisor.nullifies.lock().unwrap();
1035 for (view, participants) in nullifies.iter() {
1036 if participants.contains(offline) {
1037 panic!("view: {view}");
1038 }
1039 }
1040 }
1041 {
1042 let finalizes = supervisor.finalizes.lock().unwrap();
1043 for (view, payloads) in finalizes.iter() {
1044 for (_, finalizers) in payloads.iter() {
1045 if finalizers.contains(offline) {
1046 panic!("view: {view}");
1047 }
1048 }
1049 }
1050 }
1051
1052 let mut offline_views = Vec::new();
1054 {
1055 let leaders = supervisor.leaders.lock().unwrap();
1056 for (view, leader) in leaders.iter() {
1057 if leader == offline {
1058 offline_views.push(*view);
1059 }
1060 }
1061 }
1062 assert!(!offline_views.is_empty());
1063
1064 {
1066 let nullifies = supervisor.nullifies.lock().unwrap();
1067 for view in offline_views.iter() {
1068 let nullifies = nullifies.get(view).unwrap();
1069 if nullifies.len() < threshold as usize {
1070 panic!("view: {view}");
1071 }
1072 }
1073 }
1074 {
1075 let nullifications = supervisor.nullifications.lock().unwrap();
1076 for view in offline_views.iter() {
1077 nullifications.get(view).unwrap();
1078 }
1079 }
1080 }
1081
1082 let encoded = context.encode();
1084 let lines = encoded.lines();
1085 let mut skipped_views = 0;
1086 let mut nodes_skipping = 0;
1087 for line in lines {
1088 if line.contains("_engine_voter_skipped_views_total") {
1089 let parts: Vec<&str> = line.split_whitespace().collect();
1090 if let Some(number_str) = parts.last() {
1091 if let Ok(number) = number_str.parse::<u64>() {
1092 if number > 0 {
1093 nodes_skipping += 1;
1094 }
1095 if number > skipped_views {
1096 skipped_views = number;
1097 }
1098 }
1099 }
1100 }
1101 }
1102 assert!(
1103 skipped_views > 0,
1104 "expected skipped views to be greater than 0"
1105 );
1106 assert_eq!(
1107 nodes_skipping,
1108 n - 1,
1109 "expected all online nodes to be skipping views"
1110 );
1111 });
1112 }
1113
1114 #[test_traced]
1115 fn test_slow_validator() {
1116 let n = 5;
1118 let required_containers = 50;
1119 let activity_timeout = 10;
1120 let skip_timeout = 5;
1121 let namespace = b"consensus".to_vec();
1122 let executor = deterministic::Runner::timed(Duration::from_secs(30));
1123 executor.start(|context| async move {
1124 let (network, mut oracle) = Network::new(
1126 context.with_label("network"),
1127 Config {
1128 max_size: 1024 * 1024,
1129 },
1130 );
1131
1132 network.start();
1134
1135 let mut schemes = Vec::new();
1137 let mut validators = Vec::new();
1138 for i in 0..n {
1139 let scheme = PrivateKey::from_seed(i as u64);
1140 let pk = scheme.public_key();
1141 schemes.push(scheme);
1142 validators.push(pk);
1143 }
1144 validators.sort();
1145 schemes.sort_by_key(|s| s.public_key());
1146 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1147 let mut registrations = register_validators(&mut oracle, &validators).await;
1148
1149 let link = Link {
1151 latency: 10.0,
1152 jitter: 1.0,
1153 success_rate: 1.0,
1154 };
1155 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1156
1157 let relay = Arc::new(mocks::relay::Relay::new());
1159 let mut supervisors = Vec::new();
1160 let mut engine_handlers = Vec::new();
1161 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1162 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1164
1165 let validator = scheme.public_key();
1167 let supervisor_config = mocks::supervisor::Config {
1168 namespace: namespace.clone(),
1169 participants: view_validators.clone(),
1170 };
1171 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1172 supervisor_config,
1173 );
1174 supervisors.push(supervisor.clone());
1175 let application_cfg = if idx_scheme == 0 {
1176 mocks::application::Config {
1177 hasher: Sha256::default(),
1178 relay: relay.clone(),
1179 participant: validator.clone(),
1180 propose_latency: (3_000.0, 0.0),
1181 verify_latency: (3_000.0, 5.0),
1182 }
1183 } else {
1184 mocks::application::Config {
1185 hasher: Sha256::default(),
1186 relay: relay.clone(),
1187 participant: validator.clone(),
1188 propose_latency: (10.0, 5.0),
1189 verify_latency: (10.0, 5.0),
1190 }
1191 };
1192 let (actor, application) = mocks::application::Application::new(
1193 context.with_label("application"),
1194 application_cfg,
1195 );
1196 actor.start();
1197 let cfg = config::Config {
1198 crypto: scheme,
1199 automaton: application.clone(),
1200 relay: application.clone(),
1201 reporter: supervisor.clone(),
1202 partition: validator.to_string(),
1203 compression: Some(3),
1204 supervisor,
1205 mailbox_size: 1024,
1206 namespace: namespace.clone(),
1207 leader_timeout: Duration::from_secs(1),
1208 notarization_timeout: Duration::from_secs(2),
1209 nullify_retry: Duration::from_secs(10),
1210 fetch_timeout: Duration::from_secs(1),
1211 activity_timeout,
1212 skip_timeout,
1213 max_fetch_count: 1,
1214 max_participants: n as usize,
1215 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1216 fetch_concurrent: 1,
1217 replay_buffer: NZUsize!(1024 * 1024),
1218 write_buffer: NZUsize!(1024 * 1024),
1219 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1220 };
1221 let (voter, resolver) = registrations
1222 .remove(&validator)
1223 .expect("validator should be registered");
1224 let engine = Engine::new(context.with_label("engine"), cfg);
1225 engine_handlers.push(engine.start(voter, resolver));
1226 }
1227
1228 let mut finalizers = Vec::new();
1230 for supervisor in supervisors.iter_mut() {
1231 let (mut latest, mut monitor) = supervisor.subscribe().await;
1232 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1233 while latest < required_containers {
1234 latest = monitor.next().await.expect("event missing");
1235 }
1236 }));
1237 }
1238 join_all(finalizers).await;
1239
1240 let slow = &validators[0];
1242 for supervisor in supervisors.iter() {
1243 {
1245 let faults = supervisor.faults.lock().unwrap();
1246 assert!(faults.is_empty());
1247 }
1248
1249 {
1251 let notarizes = supervisor.notarizes.lock().unwrap();
1252 for (view, payloads) in notarizes.iter() {
1253 for (_, participants) in payloads.iter() {
1254 if participants.contains(slow) {
1255 panic!("view: {view}");
1256 }
1257 }
1258 }
1259 }
1260 {
1261 let nullifies = supervisor.nullifies.lock().unwrap();
1262 for (view, participants) in nullifies.iter() {
1263 if *view > 10 && participants.contains(slow) {
1265 panic!("view: {view}");
1266 }
1267 }
1268 }
1269 {
1270 let finalizes = supervisor.finalizes.lock().unwrap();
1271 for (view, payloads) in finalizes.iter() {
1272 for (_, finalizers) in payloads.iter() {
1273 if finalizers.contains(slow) {
1274 panic!("view: {view}");
1275 }
1276 }
1277 }
1278 }
1279 }
1280 });
1281 }
1282
1283 #[test_traced]
1284 fn test_all_recovery() {
1285 let n = 5;
1287 let required_containers = 100;
1288 let activity_timeout = 10;
1289 let skip_timeout = 2;
1290 let namespace = b"consensus".to_vec();
1291 let executor = deterministic::Runner::timed(Duration::from_secs(180));
1292 executor.start(|context| async move {
1293 let (network, mut oracle) = Network::new(
1295 context.with_label("network"),
1296 Config {
1297 max_size: 1024 * 1024,
1298 },
1299 );
1300
1301 network.start();
1303
1304 let mut schemes = Vec::new();
1306 let mut validators = Vec::new();
1307 for i in 0..n {
1308 let scheme = PrivateKey::from_seed(i as u64);
1309 let pk = scheme.public_key();
1310 schemes.push(scheme);
1311 validators.push(pk);
1312 }
1313 validators.sort();
1314 schemes.sort_by_key(|s| s.public_key());
1315 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1316 let mut registrations = register_validators(&mut oracle, &validators).await;
1317
1318 let link = Link {
1320 latency: 3_000.0,
1321 jitter: 0.0,
1322 success_rate: 1.0,
1323 };
1324 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1325
1326 let relay = Arc::new(mocks::relay::Relay::new());
1328 let mut supervisors = Vec::new();
1329 let mut engine_handlers = Vec::new();
1330 for scheme in schemes.iter() {
1331 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1333
1334 let validator = scheme.public_key();
1336 let supervisor_config = mocks::supervisor::Config {
1337 namespace: namespace.clone(),
1338 participants: view_validators.clone(),
1339 };
1340 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1341 supervisor_config,
1342 );
1343 supervisors.push(supervisor.clone());
1344 let application_cfg = mocks::application::Config {
1345 hasher: Sha256::default(),
1346 relay: relay.clone(),
1347 participant: validator.clone(),
1348 propose_latency: (10.0, 5.0),
1349 verify_latency: (10.0, 5.0),
1350 };
1351 let (actor, application) = mocks::application::Application::new(
1352 context.with_label("application"),
1353 application_cfg,
1354 );
1355 actor.start();
1356 let cfg = config::Config {
1357 crypto: scheme.clone(),
1358 automaton: application.clone(),
1359 relay: application.clone(),
1360 reporter: supervisor.clone(),
1361 partition: validator.to_string(),
1362 compression: Some(3),
1363 supervisor,
1364 mailbox_size: 1024,
1365 namespace: namespace.clone(),
1366 leader_timeout: Duration::from_secs(1),
1367 notarization_timeout: Duration::from_secs(2),
1368 nullify_retry: Duration::from_secs(10),
1369 fetch_timeout: Duration::from_secs(1),
1370 activity_timeout,
1371 skip_timeout,
1372 max_fetch_count: 1,
1373 max_participants: n as usize,
1374 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1375 fetch_concurrent: 1,
1376 replay_buffer: NZUsize!(1024 * 1024),
1377 write_buffer: NZUsize!(1024 * 1024),
1378 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1379 };
1380 let (voter, resolver) = registrations
1381 .remove(&validator)
1382 .expect("validator should be registered");
1383 let engine = Engine::new(context.with_label("engine"), cfg);
1384 engine_handlers.push(engine.start(voter, resolver));
1385 }
1386
1387 let mut finalizers = Vec::new();
1389 for supervisor in supervisors.iter_mut() {
1390 let (_, mut monitor) = supervisor.subscribe().await;
1391 finalizers.push(
1392 context
1393 .with_label("finalizer")
1394 .spawn(move |context| async move {
1395 select! {
1396 _timeout = context.sleep(Duration::from_secs(60)) => {},
1397 _done = monitor.next() => {
1398 panic!("engine should not notarize or finalize anything");
1399 }
1400 }
1401 }),
1402 );
1403 }
1404 join_all(finalizers).await;
1405
1406 link_validators(&mut oracle, &validators, Action::Unlink, None).await;
1408
1409 context.sleep(Duration::from_secs(60)).await;
1411
1412 let mut latest = 0;
1414 for supervisor in supervisors.iter() {
1415 let nullifies = supervisor.nullifies.lock().unwrap();
1416 let max = nullifies.keys().max().unwrap();
1417 if *max > latest {
1418 latest = *max;
1419 }
1420 }
1421
1422 let link = Link {
1424 latency: 10.0,
1425 jitter: 1.0,
1426 success_rate: 1.0,
1427 };
1428 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1429
1430 let mut finalizers = Vec::new();
1432 for supervisor in supervisors.iter_mut() {
1433 let (mut latest, mut monitor) = supervisor.subscribe().await;
1434 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1435 while latest < required_containers {
1436 latest = monitor.next().await.expect("event missing");
1437 }
1438 }));
1439 }
1440 join_all(finalizers).await;
1441
1442 for supervisor in supervisors.iter() {
1444 {
1446 let faults = supervisor.faults.lock().unwrap();
1447 assert!(faults.is_empty());
1448 }
1449
1450 {
1455 let mut found = 0;
1457 let finalizations = supervisor.finalizations.lock().unwrap();
1458 for i in latest..latest + activity_timeout {
1459 if finalizations.contains_key(&i) {
1460 found += 1;
1461 }
1462 }
1463 assert!(found >= activity_timeout - 2, "found: {found}");
1464 }
1465 }
1466 });
1467 }
1468
1469 #[test_traced]
1470 #[ignore]
1471 fn test_partition() {
1472 let n = 10;
1474 let required_containers = 50;
1475 let activity_timeout = 10;
1476 let skip_timeout = 5;
1477 let namespace = b"consensus".to_vec();
1478 let executor = deterministic::Runner::timed(Duration::from_secs(900));
1479 executor.start(|context| async move {
1480 let (network, mut oracle) = Network::new(
1482 context.with_label("network"),
1483 Config {
1484 max_size: 1024 * 1024,
1485 },
1486 );
1487
1488 network.start();
1490
1491 let mut schemes = Vec::new();
1493 let mut validators = Vec::new();
1494 for i in 0..n {
1495 let scheme = PrivateKey::from_seed(i as u64);
1496 let pk = scheme.public_key();
1497 schemes.push(scheme);
1498 validators.push(pk);
1499 }
1500 validators.sort();
1501 schemes.sort_by_key(|s| s.public_key());
1502 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1503 let mut registrations = register_validators(&mut oracle, &validators).await;
1504
1505 let link = Link {
1507 latency: 10.0,
1508 jitter: 1.0,
1509 success_rate: 1.0,
1510 };
1511 link_validators(&mut oracle, &validators, Action::Link(link.clone()), None).await;
1512
1513 let relay = Arc::new(mocks::relay::Relay::new());
1515 let mut supervisors = Vec::new();
1516 let mut engine_handlers = Vec::new();
1517 for scheme in schemes.iter() {
1518 let validator = scheme.public_key();
1520 let supervisor_config = mocks::supervisor::Config {
1521 namespace: namespace.clone(),
1522 participants: view_validators.clone(),
1523 };
1524 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1525 supervisor_config,
1526 );
1527 supervisors.push(supervisor.clone());
1528 let application_cfg = mocks::application::Config {
1529 hasher: Sha256::default(),
1530 relay: relay.clone(),
1531 participant: validator.clone(),
1532 propose_latency: (10.0, 5.0),
1533 verify_latency: (10.0, 5.0),
1534 };
1535 let (actor, application) = mocks::application::Application::new(
1536 context.with_label("application"),
1537 application_cfg,
1538 );
1539 actor.start();
1540 let cfg = config::Config {
1541 crypto: scheme.clone(),
1542 automaton: application.clone(),
1543 relay: application.clone(),
1544 reporter: supervisor.clone(),
1545 partition: validator.to_string(),
1546 compression: Some(3),
1547 supervisor,
1548 mailbox_size: 1024,
1549 namespace: namespace.clone(),
1550 leader_timeout: Duration::from_secs(1),
1551 notarization_timeout: Duration::from_secs(2),
1552 nullify_retry: Duration::from_secs(10),
1553 fetch_timeout: Duration::from_secs(1),
1554 activity_timeout,
1555 skip_timeout,
1556 max_fetch_count: 1,
1557 max_participants: n as usize,
1558 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1559 fetch_concurrent: 1,
1560 replay_buffer: NZUsize!(1024 * 1024),
1561 write_buffer: NZUsize!(1024 * 1024),
1562 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1563 };
1564 let (voter, resolver) = registrations
1565 .remove(&validator)
1566 .expect("validator should be registered");
1567 let engine = Engine::new(context.with_label("engine"), cfg);
1568 engine_handlers.push(engine.start(voter, resolver));
1569 }
1570
1571 let mut finalizers = Vec::new();
1573 for supervisor in supervisors.iter_mut() {
1574 let (mut latest, mut monitor) = supervisor.subscribe().await;
1575 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1576 while latest < required_containers {
1577 latest = monitor.next().await.expect("event missing");
1578 }
1579 }));
1580 }
1581 join_all(finalizers).await;
1582
1583 fn separated(n: usize, a: usize, b: usize) -> bool {
1585 let m = n / 2;
1586 (a < m && b >= m) || (a >= m && b < m)
1587 }
1588 link_validators(&mut oracle, &validators, Action::Unlink, Some(separated)).await;
1589
1590 context.sleep(Duration::from_secs(10)).await;
1592
1593 let mut finalizers = Vec::new();
1595 for supervisor in supervisors.iter_mut() {
1596 let (_, mut monitor) = supervisor.subscribe().await;
1597 finalizers.push(
1598 context
1599 .with_label("finalizer")
1600 .spawn(move |context| async move {
1601 select! {
1602 _timeout = context.sleep(Duration::from_secs(60)) => {},
1603 _done = monitor.next() => {
1604 panic!("engine should not notarize or finalize anything");
1605 }
1606 }
1607 }),
1608 );
1609 }
1610 join_all(finalizers).await;
1611
1612 link_validators(
1614 &mut oracle,
1615 &validators,
1616 Action::Link(link),
1617 Some(separated),
1618 )
1619 .await;
1620
1621 let mut finalizers = Vec::new();
1623 for supervisor in supervisors.iter_mut() {
1624 let (mut latest, mut monitor) = supervisor.subscribe().await;
1625 let required = latest + required_containers;
1626 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1627 while latest < required {
1628 latest = monitor.next().await.expect("event missing");
1629 }
1630 }));
1631 }
1632 join_all(finalizers).await;
1633
1634 for supervisor in supervisors.iter() {
1636 {
1638 let faults = supervisor.faults.lock().unwrap();
1639 assert!(faults.is_empty());
1640 }
1641 }
1642 });
1643 }
1644
1645 fn slow_and_lossy_links(seed: u64) -> String {
1646 let n = 5;
1648 let required_containers = 50;
1649 let activity_timeout = 10;
1650 let skip_timeout = 5;
1651 let namespace = b"consensus".to_vec();
1652 let cfg = deterministic::Config::new()
1653 .with_seed(seed)
1654 .with_timeout(Some(Duration::from_secs(5_000)));
1655 let executor = deterministic::Runner::new(cfg);
1656 executor.start(|context| async move {
1657 let (network, mut oracle) = Network::new(
1659 context.with_label("network"),
1660 Config {
1661 max_size: 1024 * 1024,
1662 },
1663 );
1664
1665 network.start();
1667
1668 let mut schemes = Vec::new();
1670 let mut validators = Vec::new();
1671 for i in 0..n {
1672 let scheme = PrivateKey::from_seed(i as u64);
1673 let pk = scheme.public_key();
1674 schemes.push(scheme);
1675 validators.push(pk);
1676 }
1677 validators.sort();
1678 schemes.sort_by_key(|s| s.public_key());
1679 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1680 let mut registrations = register_validators(&mut oracle, &validators).await;
1681
1682 let degraded_link = Link {
1684 latency: 200.0,
1685 jitter: 150.0,
1686 success_rate: 0.5,
1687 };
1688 link_validators(&mut oracle, &validators, Action::Link(degraded_link), None).await;
1689
1690 let relay = Arc::new(mocks::relay::Relay::new());
1692 let mut supervisors = Vec::new();
1693 let mut engine_handlers = Vec::new();
1694 for scheme in schemes.into_iter() {
1695 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1697
1698 let validator = scheme.public_key();
1700 let supervisor_config = mocks::supervisor::Config {
1701 namespace: namespace.clone(),
1702 participants: view_validators.clone(),
1703 };
1704 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1705 supervisor_config,
1706 );
1707 supervisors.push(supervisor.clone());
1708 let application_cfg = mocks::application::Config {
1709 hasher: Sha256::default(),
1710 relay: relay.clone(),
1711 participant: validator.clone(),
1712 propose_latency: (10.0, 5.0),
1713 verify_latency: (10.0, 5.0),
1714 };
1715 let (actor, application) = mocks::application::Application::new(
1716 context.with_label("application"),
1717 application_cfg,
1718 );
1719 actor.start();
1720 let cfg = config::Config {
1721 crypto: scheme,
1722 automaton: application.clone(),
1723 relay: application.clone(),
1724 reporter: supervisor.clone(),
1725 partition: validator.to_string(),
1726 compression: Some(3),
1727 supervisor,
1728 mailbox_size: 1024,
1729 namespace: namespace.clone(),
1730 leader_timeout: Duration::from_secs(1),
1731 notarization_timeout: Duration::from_secs(2),
1732 nullify_retry: Duration::from_secs(10),
1733 fetch_timeout: Duration::from_secs(1),
1734 activity_timeout,
1735 skip_timeout,
1736 max_fetch_count: 1,
1737 max_participants: n as usize,
1738 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1739 fetch_concurrent: 1,
1740 replay_buffer: NZUsize!(1024 * 1024),
1741 write_buffer: NZUsize!(1024 * 1024),
1742 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1743 };
1744 let (voter, resolver) = registrations
1745 .remove(&validator)
1746 .expect("validator should be registered");
1747 let engine = Engine::new(context.with_label("engine"), cfg);
1748 engine_handlers.push(engine.start(voter, resolver));
1749 }
1750
1751 let mut finalizers = Vec::new();
1753 for supervisor in supervisors.iter_mut() {
1754 let (mut latest, mut monitor) = supervisor.subscribe().await;
1755 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1756 while latest < required_containers {
1757 latest = monitor.next().await.expect("event missing");
1758 }
1759 }));
1760 }
1761 join_all(finalizers).await;
1762
1763 for supervisor in supervisors.iter() {
1765 {
1767 let faults = supervisor.faults.lock().unwrap();
1768 assert!(faults.is_empty());
1769 }
1770 }
1771
1772 context.auditor().state()
1773 })
1774 }
1775
1776 #[test_traced]
1777 fn test_slow_and_lossy_links() {
1778 slow_and_lossy_links(0);
1779 }
1780
1781 #[test_traced]
1782 #[ignore]
1783 fn test_determinism() {
1784 for seed in 1..6 {
1787 let state_1 = slow_and_lossy_links(seed);
1789
1790 let state_2 = slow_and_lossy_links(seed);
1792
1793 assert_eq!(state_1, state_2);
1795 }
1796 }
1797
1798 fn conflicter(seed: u64) {
1799 let n = 4;
1801 let required_containers = 50;
1802 let activity_timeout = 10;
1803 let skip_timeout = 5;
1804 let namespace = b"consensus".to_vec();
1805 let cfg = deterministic::Config::new()
1806 .with_seed(seed)
1807 .with_timeout(Some(Duration::from_secs(30)));
1808 let executor = deterministic::Runner::new(cfg);
1809 executor.start(|context| async move {
1810 let (network, mut oracle) = Network::new(
1812 context.with_label("network"),
1813 Config {
1814 max_size: 1024 * 1024,
1815 },
1816 );
1817
1818 network.start();
1820
1821 let mut schemes = Vec::new();
1823 let mut validators = Vec::new();
1824 for i in 0..n {
1825 let scheme = PrivateKey::from_seed(i as u64);
1826 let pk = scheme.public_key();
1827 schemes.push(scheme);
1828 validators.push(pk);
1829 }
1830 validators.sort();
1831 schemes.sort_by_key(|s| s.public_key());
1832 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1833 let mut registrations = register_validators(&mut oracle, &validators).await;
1834
1835 let link = Link {
1837 latency: 10.0,
1838 jitter: 1.0,
1839 success_rate: 1.0,
1840 };
1841 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1842
1843 let relay = Arc::new(mocks::relay::Relay::new());
1845 let mut supervisors = Vec::new();
1846 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1847 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1849
1850 let validator = scheme.public_key();
1852 let supervisor_config = mocks::supervisor::Config {
1853 namespace: namespace.clone(),
1854 participants: view_validators.clone(),
1855 };
1856 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1857 supervisor_config,
1858 );
1859 if idx_scheme == 0 {
1860 let cfg = mocks::conflicter::Config {
1861 crypto: scheme,
1862 supervisor,
1863 namespace: namespace.clone(),
1864 };
1865 let (voter, _) = registrations
1866 .remove(&validator)
1867 .expect("validator should be registered");
1868 let engine: mocks::conflicter::Conflicter<_, _, Sha256, _> =
1869 mocks::conflicter::Conflicter::new(
1870 context.with_label("byzantine_engine"),
1871 cfg,
1872 );
1873 engine.start(voter);
1874 } else {
1875 supervisors.push(supervisor.clone());
1876 let application_cfg = mocks::application::Config {
1877 hasher: Sha256::default(),
1878 relay: relay.clone(),
1879 participant: validator.clone(),
1880 propose_latency: (10.0, 5.0),
1881 verify_latency: (10.0, 5.0),
1882 };
1883 let (actor, application) = mocks::application::Application::new(
1884 context.with_label("application"),
1885 application_cfg,
1886 );
1887 actor.start();
1888 let cfg = config::Config {
1889 crypto: scheme,
1890 automaton: application.clone(),
1891 relay: application.clone(),
1892 reporter: supervisor.clone(),
1893 partition: validator.to_string(),
1894 compression: Some(3),
1895 supervisor,
1896 mailbox_size: 1024,
1897 namespace: namespace.clone(),
1898 leader_timeout: Duration::from_secs(1),
1899 notarization_timeout: Duration::from_secs(2),
1900 nullify_retry: Duration::from_secs(10),
1901 fetch_timeout: Duration::from_secs(1),
1902 activity_timeout,
1903 skip_timeout,
1904 max_fetch_count: 1,
1905 max_participants: n as usize,
1906 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1907 fetch_concurrent: 1,
1908 replay_buffer: NZUsize!(1024 * 1024),
1909 write_buffer: NZUsize!(1024 * 1024),
1910 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1911 };
1912 let (voter, resolver) = registrations
1913 .remove(&validator)
1914 .expect("validator should be registered");
1915 let engine = Engine::new(context.with_label("engine"), cfg);
1916 engine.start(voter, resolver);
1917 }
1918 }
1919
1920 let mut finalizers = Vec::new();
1922 for supervisor in supervisors.iter_mut() {
1923 let (mut latest, mut monitor) = supervisor.subscribe().await;
1924 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1925 while latest < required_containers {
1926 latest = monitor.next().await.expect("event missing");
1927 }
1928 }));
1929 }
1930 join_all(finalizers).await;
1931
1932 let byz = &validators[0];
1934 let mut count_conflicting_notarize = 0;
1935 let mut count_conflicting_finalize = 0;
1936 for supervisor in supervisors.iter() {
1937 {
1939 let faults = supervisor.faults.lock().unwrap();
1940 assert_eq!(faults.len(), 1);
1941 let faulter = faults.get(byz).expect("byzantine party is not faulter");
1942 for (_, faults) in faulter.iter() {
1943 for fault in faults.iter() {
1944 match fault {
1945 Activity::ConflictingNotarize(_) => {
1946 count_conflicting_notarize += 1;
1947 }
1948 Activity::ConflictingFinalize(_) => {
1949 count_conflicting_finalize += 1;
1950 }
1951 _ => panic!("unexpected fault: {fault:?}"),
1952 }
1953 }
1954 }
1955 }
1956 }
1957 assert!(count_conflicting_notarize > 0);
1958 assert!(count_conflicting_finalize > 0);
1959 });
1960 }
1961
1962 #[test_traced]
1963 #[ignore]
1964 fn test_conflicter() {
1965 for seed in 0..5 {
1966 conflicter(seed);
1967 }
1968 }
1969
1970 fn nuller(seed: u64) {
1971 let n = 4;
1973 let required_containers = 50;
1974 let activity_timeout = 10;
1975 let skip_timeout = 5;
1976 let namespace = b"consensus".to_vec();
1977 let cfg = deterministic::Config::new()
1978 .with_seed(seed)
1979 .with_timeout(Some(Duration::from_secs(30)));
1980 let executor = deterministic::Runner::new(cfg);
1981 executor.start(|context| async move {
1982 let (network, mut oracle) = Network::new(
1984 context.with_label("network"),
1985 Config {
1986 max_size: 1024 * 1024,
1987 },
1988 );
1989
1990 network.start();
1992
1993 let mut schemes = Vec::new();
1995 let mut validators = Vec::new();
1996 for i in 0..n {
1997 let scheme = PrivateKey::from_seed(i as u64);
1998 let pk = scheme.public_key();
1999 schemes.push(scheme);
2000 validators.push(pk);
2001 }
2002 validators.sort();
2003 schemes.sort_by_key(|s| s.public_key());
2004 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
2005 let mut registrations = register_validators(&mut oracle, &validators).await;
2006
2007 let link = Link {
2009 latency: 10.0,
2010 jitter: 1.0,
2011 success_rate: 1.0,
2012 };
2013 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2014
2015 let relay = Arc::new(mocks::relay::Relay::new());
2017 let mut supervisors = Vec::new();
2018 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2019 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2021
2022 let validator = scheme.public_key();
2024 let supervisor_config = mocks::supervisor::Config {
2025 namespace: namespace.clone(),
2026 participants: view_validators.clone(),
2027 };
2028 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
2029 supervisor_config,
2030 );
2031 if idx_scheme == 0 {
2032 let cfg = mocks::nuller::Config {
2033 crypto: scheme,
2034 supervisor,
2035 namespace: namespace.clone(),
2036 };
2037 let (voter, _) = registrations
2038 .remove(&validator)
2039 .expect("validator should be registered");
2040 let engine: mocks::nuller::Nuller<_, _, Sha256, _> =
2041 mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
2042 engine.start(voter);
2043 } else {
2044 supervisors.push(supervisor.clone());
2045 let application_cfg = mocks::application::Config {
2046 hasher: Sha256::default(),
2047 relay: relay.clone(),
2048 participant: validator.clone(),
2049 propose_latency: (10.0, 5.0),
2050 verify_latency: (10.0, 5.0),
2051 };
2052 let (actor, application) = mocks::application::Application::new(
2053 context.with_label("application"),
2054 application_cfg,
2055 );
2056 actor.start();
2057 let cfg = config::Config {
2058 crypto: scheme,
2059 automaton: application.clone(),
2060 relay: application.clone(),
2061 reporter: supervisor.clone(),
2062 partition: validator.to_string(),
2063 compression: Some(3),
2064 supervisor,
2065 mailbox_size: 1024,
2066 namespace: namespace.clone(),
2067 leader_timeout: Duration::from_secs(1),
2068 notarization_timeout: Duration::from_secs(2),
2069 nullify_retry: Duration::from_secs(10),
2070 fetch_timeout: Duration::from_secs(1),
2071 activity_timeout,
2072 skip_timeout,
2073 max_fetch_count: 1,
2074 max_participants: n as usize,
2075 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2076 fetch_concurrent: 1,
2077 replay_buffer: NZUsize!(1024 * 1024),
2078 write_buffer: NZUsize!(1024 * 1024),
2079 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2080 };
2081 let (voter, resolver) = registrations
2082 .remove(&validator)
2083 .expect("validator should be registered");
2084 let engine = Engine::new(context.with_label("engine"), cfg);
2085 engine.start(voter, resolver);
2086 }
2087 }
2088
2089 let mut finalizers = Vec::new();
2091 for supervisor in supervisors.iter_mut() {
2092 let (mut latest, mut monitor) = supervisor.subscribe().await;
2093 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2094 while latest < required_containers {
2095 latest = monitor.next().await.expect("event missing");
2096 }
2097 }));
2098 }
2099 join_all(finalizers).await;
2100
2101 let byz = &validators[0];
2103 let mut count_nullify_and_finalize = 0;
2104 for supervisor in supervisors.iter() {
2105 {
2107 let faults = supervisor.faults.lock().unwrap();
2108 assert_eq!(faults.len(), 1);
2109 let faulter = faults.get(byz).expect("byzantine party is not faulter");
2110 for (_, faults) in faulter.iter() {
2111 for fault in faults.iter() {
2112 match fault {
2113 Activity::NullifyFinalize(_) => {
2114 count_nullify_and_finalize += 1;
2115 }
2116 _ => panic!("unexpected fault: {fault:?}"),
2117 }
2118 }
2119 }
2120 }
2121 }
2122 assert!(count_nullify_and_finalize > 0);
2123 });
2124 }
2125
2126 #[test_traced]
2127 #[ignore]
2128 fn test_nuller() {
2129 for seed in 0..5 {
2130 nuller(seed);
2131 }
2132 }
2133
2134 fn outdated(seed: u64) {
2135 let n = 4;
2137 let required_containers = 100;
2138 let activity_timeout = 10;
2139 let skip_timeout = 5;
2140 let namespace = b"consensus".to_vec();
2141 let cfg = deterministic::Config::new()
2142 .with_seed(seed)
2143 .with_timeout(Some(Duration::from_secs(30)));
2144 let executor = deterministic::Runner::new(cfg);
2145 executor.start(|context| async move {
2146 let (network, mut oracle) = Network::new(
2148 context.with_label("network"),
2149 Config {
2150 max_size: 1024 * 1024,
2151 },
2152 );
2153
2154 network.start();
2156
2157 let mut schemes = Vec::new();
2159 let mut validators = Vec::new();
2160 for i in 0..n {
2161 let scheme = PrivateKey::from_seed(i as u64);
2162 let pk = scheme.public_key();
2163 schemes.push(scheme);
2164 validators.push(pk);
2165 }
2166 validators.sort();
2167 schemes.sort_by_key(|s| s.public_key());
2168 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
2169 let mut registrations = register_validators(&mut oracle, &validators).await;
2170
2171 let link = Link {
2173 latency: 10.0,
2174 jitter: 1.0,
2175 success_rate: 1.0,
2176 };
2177 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2178
2179 let relay = Arc::new(mocks::relay::Relay::new());
2181 let mut supervisors = Vec::new();
2182 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2183 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2185
2186 let validator = scheme.public_key();
2188 let supervisor_config = mocks::supervisor::Config {
2189 namespace: namespace.clone(),
2190 participants: view_validators.clone(),
2191 };
2192 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
2193 supervisor_config,
2194 );
2195 if idx_scheme == 0 {
2196 let cfg = mocks::outdated::Config {
2197 crypto: scheme,
2198 supervisor,
2199 namespace: namespace.clone(),
2200 view_delta: activity_timeout * 4,
2201 };
2202 let (voter, _) = registrations
2203 .remove(&validator)
2204 .expect("validator should be registered");
2205 let engine: mocks::outdated::Outdated<_, _, Sha256, _> =
2206 mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
2207 engine.start(voter);
2208 } else {
2209 supervisors.push(supervisor.clone());
2210 let application_cfg = mocks::application::Config {
2211 hasher: Sha256::default(),
2212 relay: relay.clone(),
2213 participant: validator.clone(),
2214 propose_latency: (10.0, 5.0),
2215 verify_latency: (10.0, 5.0),
2216 };
2217 let (actor, application) = mocks::application::Application::new(
2218 context.with_label("application"),
2219 application_cfg,
2220 );
2221 actor.start();
2222 let cfg = config::Config {
2223 crypto: scheme,
2224 automaton: application.clone(),
2225 relay: application.clone(),
2226 reporter: supervisor.clone(),
2227 supervisor,
2228 partition: validator.to_string(),
2229 compression: Some(3),
2230 mailbox_size: 1024,
2231 namespace: namespace.clone(),
2232 leader_timeout: Duration::from_secs(1),
2233 notarization_timeout: Duration::from_secs(2),
2234 nullify_retry: Duration::from_secs(10),
2235 fetch_timeout: Duration::from_secs(1),
2236 activity_timeout,
2237 skip_timeout,
2238 max_fetch_count: 1,
2239 max_participants: n as usize,
2240 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2241 fetch_concurrent: 1,
2242 replay_buffer: NZUsize!(1024 * 1024),
2243 write_buffer: NZUsize!(1024 * 1024),
2244 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2245 };
2246 let (voter, resolver) = registrations
2247 .remove(&validator)
2248 .expect("validator should be registered");
2249 let engine = Engine::new(context.with_label("engine"), cfg);
2250 engine.start(voter, resolver);
2251 }
2252 }
2253
2254 let mut finalizers = Vec::new();
2256 for supervisor in supervisors.iter_mut() {
2257 let (mut latest, mut monitor) = supervisor.subscribe().await;
2258 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2259 while latest < required_containers {
2260 latest = monitor.next().await.expect("event missing");
2261 }
2262 }));
2263 }
2264 join_all(finalizers).await;
2265
2266 for supervisor in supervisors.iter() {
2268 {
2269 let faults = supervisor.faults.lock().unwrap();
2270 assert!(faults.is_empty());
2271 }
2272 }
2273 });
2274 }
2275
2276 #[test_traced]
2277 #[ignore]
2278 fn test_outdated() {
2279 for seed in 0..5 {
2280 outdated(seed);
2281 }
2282 }
2283
2284 #[test_traced]
2285 #[ignore]
2286 fn test_1k() {
2287 let n = 10;
2289 let required_containers = 1_000;
2290 let activity_timeout = 10;
2291 let skip_timeout = 5;
2292 let namespace = b"consensus".to_vec();
2293 let cfg = deterministic::Config::new();
2294 let executor = deterministic::Runner::new(cfg);
2295 executor.start(|context| async move {
2296 let (network, mut oracle) = Network::new(
2298 context.with_label("network"),
2299 Config {
2300 max_size: 1024 * 1024,
2301 },
2302 );
2303
2304 network.start();
2306
2307 let mut schemes = Vec::new();
2309 let mut validators = Vec::new();
2310 for i in 0..n {
2311 let scheme = PrivateKey::from_seed(i as u64);
2312 let pk = scheme.public_key();
2313 schemes.push(scheme);
2314 validators.push(pk);
2315 }
2316 validators.sort();
2317 schemes.sort_by_key(|s| s.public_key());
2318 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
2319 let mut registrations = register_validators(&mut oracle, &validators).await;
2320
2321 let link = Link {
2323 latency: 80.0,
2324 jitter: 10.0,
2325 success_rate: 0.98,
2326 };
2327 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2328
2329 let relay = Arc::new(mocks::relay::Relay::new());
2331 let mut supervisors = Vec::new();
2332 let mut engine_handlers = Vec::new();
2333 for scheme in schemes.into_iter() {
2334 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2336
2337 let validator = scheme.public_key();
2339 let supervisor_config = mocks::supervisor::Config {
2340 namespace: namespace.clone(),
2341 participants: view_validators.clone(),
2342 };
2343 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
2344 supervisor_config,
2345 );
2346 supervisors.push(supervisor.clone());
2347 let application_cfg = mocks::application::Config {
2348 hasher: Sha256::default(),
2349 relay: relay.clone(),
2350 participant: validator.clone(),
2351 propose_latency: (100.0, 50.0),
2352 verify_latency: (50.0, 40.0),
2353 };
2354 let (actor, application) = mocks::application::Application::new(
2355 context.with_label("application"),
2356 application_cfg,
2357 );
2358 actor.start();
2359 let cfg = config::Config {
2360 crypto: scheme,
2361 automaton: application.clone(),
2362 relay: application.clone(),
2363 reporter: supervisor.clone(),
2364 partition: validator.to_string(),
2365 compression: Some(3),
2366 supervisor,
2367 mailbox_size: 1024,
2368 namespace: namespace.clone(),
2369 leader_timeout: Duration::from_secs(1),
2370 notarization_timeout: Duration::from_secs(2),
2371 nullify_retry: Duration::from_secs(10),
2372 fetch_timeout: Duration::from_secs(1),
2373 activity_timeout,
2374 skip_timeout,
2375 max_fetch_count: 1,
2376 max_participants: n as usize,
2377 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2378 fetch_concurrent: 1,
2379 replay_buffer: NZUsize!(1024 * 1024),
2380 write_buffer: NZUsize!(1024 * 1024),
2381 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2382 };
2383 let (voter, resolver) = registrations
2384 .remove(&validator)
2385 .expect("validator should be registered");
2386 let engine = Engine::new(context.with_label("engine"), cfg);
2387 engine_handlers.push(engine.start(voter, resolver));
2388 }
2389
2390 let mut finalizers = Vec::new();
2392 for supervisor in supervisors.iter_mut() {
2393 let (mut latest, mut monitor) = supervisor.subscribe().await;
2394 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2395 while latest < required_containers {
2396 latest = monitor.next().await.expect("event missing");
2397 }
2398 }));
2399 }
2400 join_all(finalizers).await;
2401
2402 for supervisor in supervisors.iter() {
2404 {
2406 let faults = supervisor.faults.lock().unwrap();
2407 assert!(faults.is_empty());
2408 }
2409 }
2410 })
2411 }
2412}