1pub mod types;
118
119cfg_if::cfg_if! {
120 if #[cfg(not(target_arch = "wasm32"))] {
121 mod actors;
122 mod config;
123 pub use config::Config;
124 mod engine;
125 pub use engine::Engine;
126 mod metrics;
127 }
128}
129
130#[cfg(test)]
131pub mod mocks;
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136 use crate::Monitor;
137 use commonware_cryptography::{
138 ed25519::{PrivateKey, PublicKey},
139 sha256::Digest as Sha256Digest,
140 PrivateKeyExt as _, PublicKey as CPublicKey, Sha256, Signer as _,
141 };
142 use commonware_macros::{select, test_traced};
143 use commonware_p2p::simulated::{Config, Link, Network, Oracle, Receiver, Sender};
144 use commonware_runtime::{deterministic, Clock, Metrics, Runner, Spawner};
145 use commonware_utils::{quorum, NZU32};
146 use engine::Engine;
147 use futures::{future::join_all, StreamExt};
148 use governor::Quota;
149 use rand::Rng as _;
150 use std::{
151 collections::{BTreeMap, HashMap},
152 sync::{Arc, Mutex},
153 time::Duration,
154 };
155 use tracing::debug;
156 use types::Activity;
157
158 async fn register_validators<P: CPublicKey>(
160 oracle: &mut Oracle<P>,
161 validators: &[P],
162 ) -> HashMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))> {
163 let mut registrations = HashMap::new();
164 for validator in validators.iter() {
165 let (voter_sender, voter_receiver) =
166 oracle.register(validator.clone(), 0).await.unwrap();
167 let (resolver_sender, resolver_receiver) =
168 oracle.register(validator.clone(), 1).await.unwrap();
169 registrations.insert(
170 validator.clone(),
171 (
172 (voter_sender, voter_receiver),
173 (resolver_sender, resolver_receiver),
174 ),
175 );
176 }
177 registrations
178 }
179
180 enum Action {
182 Link(Link),
183 Update(Link), Unlink,
185 }
186
187 async fn link_validators<P: CPublicKey>(
193 oracle: &mut Oracle<P>,
194 validators: &[P],
195 action: Action,
196 restrict_to: Option<fn(usize, usize, usize) -> bool>,
197 ) {
198 for (i1, v1) in validators.iter().enumerate() {
199 for (i2, v2) in validators.iter().enumerate() {
200 if v2 == v1 {
202 continue;
203 }
204
205 if let Some(f) = restrict_to {
207 if !f(validators.len(), i1, i2) {
208 continue;
209 }
210 }
211
212 match action {
214 Action::Update(_) | Action::Unlink => {
215 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
216 }
217 _ => {}
218 }
219
220 match action {
222 Action::Link(ref link) | Action::Update(ref link) => {
223 oracle
224 .add_link(v1.clone(), v2.clone(), link.clone())
225 .await
226 .unwrap();
227 }
228 _ => {}
229 }
230 }
231 }
232 }
233
234 #[test_traced]
235 fn test_all_online() {
236 let n = 5;
238 let threshold = quorum(n);
239 let max_exceptions = 10;
240 let required_containers = 100;
241 let activity_timeout = 10;
242 let skip_timeout = 5;
243 let namespace = b"consensus".to_vec();
244 let executor = deterministic::Runner::timed(Duration::from_secs(30));
245 executor.start(|context| async move {
246 let (network, mut oracle) = Network::new(
248 context.with_label("network"),
249 Config {
250 max_size: 1024 * 1024,
251 },
252 );
253
254 network.start();
256
257 let mut schemes = Vec::new();
259 let mut validators = Vec::new();
260 for i in 0..n {
261 let scheme = PrivateKey::from_seed(i as u64);
262 let pk = scheme.public_key();
263 schemes.push(scheme);
264 validators.push(pk);
265 }
266 validators.sort();
267 schemes.sort_by_key(|s| s.public_key());
268 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
269 let mut registrations = register_validators(&mut oracle, &validators).await;
270
271 let link = Link {
273 latency: 10.0,
274 jitter: 1.0,
275 success_rate: 1.0,
276 };
277 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
278
279 let relay = Arc::new(mocks::relay::Relay::new());
281 let mut supervisors = Vec::new();
282 let mut engine_handlers = Vec::new();
283 for scheme in schemes.into_iter() {
284 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
286
287 let validator = scheme.public_key();
289 let supervisor_config = mocks::supervisor::Config {
290 namespace: namespace.clone(),
291 participants: view_validators.clone(),
292 };
293 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
294 supervisor_config,
295 );
296 supervisors.push(supervisor.clone());
297 let application_cfg = mocks::application::Config {
298 hasher: Sha256::default(),
299 relay: relay.clone(),
300 participant: validator.clone(),
301 propose_latency: (10.0, 5.0),
302 verify_latency: (10.0, 5.0),
303 };
304 let (actor, application) = mocks::application::Application::new(
305 context.with_label("application"),
306 application_cfg,
307 );
308 actor.start();
309 let cfg = config::Config {
310 crypto: scheme,
311 automaton: application.clone(),
312 relay: application.clone(),
313 reporter: supervisor.clone(),
314 supervisor,
315 partition: validator.to_string(),
316 compression: Some(3),
317 mailbox_size: 1024,
318 namespace: namespace.clone(),
319 leader_timeout: Duration::from_secs(1),
320 notarization_timeout: Duration::from_secs(2),
321 nullify_retry: Duration::from_secs(10),
322 fetch_timeout: Duration::from_secs(1),
323 activity_timeout,
324 skip_timeout,
325 max_fetch_count: 1,
326 max_participants: n as usize,
327 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
328 fetch_concurrent: 1,
329 replay_buffer: 1024 * 1024,
330 write_buffer: 1024 * 1024,
331 };
332 let engine = Engine::new(context.with_label("engine"), cfg);
333 let (voter, resolver) = registrations
334 .remove(&validator)
335 .expect("validator should be registered");
336 engine_handlers.push(engine.start(voter, resolver));
337 }
338
339 let mut finalizers = Vec::new();
341 for supervisor in supervisors.iter_mut() {
342 let (mut latest, mut monitor) = supervisor.subscribe().await;
343 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
344 while latest < required_containers {
345 latest = monitor.next().await.expect("event missing");
346 }
347 }));
348 }
349 join_all(finalizers).await;
350
351 let latest_complete = required_containers - activity_timeout;
353 for supervisor in supervisors.iter() {
354 {
356 let faults = supervisor.faults.lock().unwrap();
357 assert!(faults.is_empty());
358 }
359
360 let mut exceptions = 0;
362 let mut notarized = HashMap::new();
363 let mut finalized = HashMap::new();
364 {
365 let notarizes = supervisor.notarizes.lock().unwrap();
366 for view in 1..latest_complete {
367 let Some(payloads) = notarizes.get(&view) else {
369 exceptions += 1;
370 continue;
371 };
372 if payloads.len() > 1 {
373 panic!("view: {view}");
374 }
375 let (digest, notarizers) = payloads.iter().next().unwrap();
376 notarized.insert(view, *digest);
377
378 if notarizers.len() < threshold as usize {
379 panic!("view: {view}");
382 }
383 if notarizers.len() != n as usize {
384 exceptions += 1;
385 }
386 }
387 }
388 {
389 let notarizations = supervisor.notarizations.lock().unwrap();
390 for view in 1..latest_complete {
391 let Some(notarization) = notarizations.get(&view) else {
393 exceptions += 1;
394 continue;
395 };
396 let Some(digest) = notarized.get(&view) else {
397 exceptions += 1;
398 continue;
399 };
400 assert_eq!(¬arization.proposal.payload, digest);
401 }
402 }
403 {
404 let finalizes = supervisor.finalizes.lock().unwrap();
405 for view in 1..latest_complete {
406 let Some(payloads) = finalizes.get(&view) else {
408 exceptions += 1;
409 continue;
410 };
411 if payloads.len() > 1 {
412 panic!("view: {view}");
413 }
414 let (digest, finalizers) = payloads.iter().next().unwrap();
415 finalized.insert(view, *digest);
416
417 if view > latest_complete {
419 continue;
420 }
421
422 if finalizers.len() < threshold as usize {
424 panic!("view: {view}");
427 }
428 if finalizers.len() != n as usize {
429 exceptions += 1;
430 }
431
432 let nullifies = supervisor.nullifies.lock().unwrap();
434 let Some(nullifies) = nullifies.get(&view) else {
435 continue;
436 };
437 for (_, finalizers) in payloads.iter() {
438 for finalizer in finalizers.iter() {
439 if nullifies.contains(finalizer) {
440 panic!("should not nullify and finalize at same view");
441 }
442 }
443 }
444 }
445 }
446 {
447 let finalizations = supervisor.finalizations.lock().unwrap();
448 for view in 1..latest_complete {
449 let Some(finalization) = finalizations.get(&view) else {
451 exceptions += 1;
452 continue;
453 };
454 let Some(digest) = finalized.get(&view) else {
455 exceptions += 1;
456 continue;
457 };
458 assert_eq!(&finalization.proposal.payload, digest);
459 }
460 }
461
462 assert!(exceptions <= max_exceptions);
464 }
465 });
466 }
467
468 #[test_traced]
469 fn test_unclean_shutdown() {
470 let n = 5;
472 let required_containers = 100;
473 let activity_timeout = 10;
474 let skip_timeout = 5;
475 let namespace = b"consensus".to_vec();
476
477 let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
479 let supervised = Arc::new(Mutex::new(Vec::new()));
480 let mut prev_ctx = None;
481
482 loop {
483 let namespace = namespace.clone();
484 let shutdowns = shutdowns.clone();
485 let supervised = supervised.clone();
486
487 let f = |mut context: deterministic::Context| async move {
488 let (network, mut oracle) = Network::new(
490 context.with_label("network"),
491 Config {
492 max_size: 1024 * 1024,
493 },
494 );
495
496 network.start();
498
499 let mut schemes = Vec::new();
501 let mut validators = Vec::new();
502 for i in 0..n {
503 let scheme = PrivateKey::from_seed(i as u64);
504 let pk = scheme.public_key();
505 schemes.push(scheme);
506 validators.push(pk);
507 }
508 validators.sort();
509 schemes.sort_by_key(|s| s.public_key());
510 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
511 let mut registrations = register_validators(&mut oracle, &validators).await;
512
513 let link = Link {
515 latency: 50.0,
516 jitter: 50.0,
517 success_rate: 1.0,
518 };
519 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
520
521 let relay = Arc::new(mocks::relay::Relay::new());
523 let mut supervisors = HashMap::new();
524 let mut engine_handlers = Vec::new();
525 for scheme in schemes.into_iter() {
526 let context = context
528 .clone()
529 .with_label(&format!("validator-{}", scheme.public_key()));
530
531 let validator = scheme.public_key();
533 let supervisor_config = mocks::supervisor::Config {
534 namespace: namespace.clone(),
535 participants: view_validators.clone(),
536 };
537 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
538 supervisor_config,
539 );
540 supervisors.insert(validator.clone(), supervisor.clone());
541 let application_cfg = mocks::application::Config {
542 hasher: Sha256::default(),
543 relay: relay.clone(),
544 participant: validator.clone(),
545 propose_latency: (10.0, 5.0),
546 verify_latency: (10.0, 5.0),
547 };
548 let (actor, application) = mocks::application::Application::new(
549 context.with_label("application"),
550 application_cfg,
551 );
552 actor.start();
553 let cfg = config::Config {
554 crypto: scheme,
555 automaton: application.clone(),
556 relay: application.clone(),
557 reporter: supervisor.clone(),
558 supervisor,
559 partition: validator.to_string(),
560 compression: Some(3),
561 mailbox_size: 1024,
562 namespace: namespace.clone(),
563 leader_timeout: Duration::from_secs(1),
564 notarization_timeout: Duration::from_secs(2),
565 nullify_retry: Duration::from_secs(10),
566 fetch_timeout: Duration::from_secs(1),
567 activity_timeout,
568 skip_timeout,
569 max_participants: n as usize,
570 max_fetch_count: 1,
571 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
572 fetch_concurrent: 1,
573 replay_buffer: 1024 * 1024,
574 write_buffer: 1024 * 1024,
575 };
576 let engine = Engine::new(context.with_label("engine"), cfg);
577 let (voter_network, resolver_network) = registrations
578 .remove(&validator)
579 .expect("validator should be registered");
580 engine_handlers.push(engine.start(voter_network, resolver_network));
581 }
582
583 let mut finalizers = Vec::new();
585 for (_, supervisor) in supervisors.iter_mut() {
586 let (mut latest, mut monitor) = supervisor.subscribe().await;
587 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
588 while latest < required_containers {
589 latest = monitor.next().await.expect("event missing");
590 }
591 }));
592 }
593
594 let wait =
596 context.gen_range(Duration::from_millis(10)..Duration::from_millis(2_000));
597 select! {
598 _ = context.sleep(wait) => {
599 {
601 let mut shutdowns = shutdowns.lock().unwrap();
602 debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
603 *shutdowns += 1;
604 }
605 supervised.lock().unwrap().push(supervisors);
606 (false,context)
607 },
608 _ = join_all(finalizers) => {
609 let supervised = supervised.lock().unwrap();
611 for supervisors in supervised.iter() {
612 for (_, supervisor) in supervisors.iter() {
613 let faults = supervisor.faults.lock().unwrap();
614 assert!(faults.is_empty());
615 }
616 }
617 (true,context)
618 }
619 }
620 };
621
622 let (complete, context) = if let Some(prev_ctx) = prev_ctx {
623 deterministic::Runner::from(prev_ctx)
624 } else {
625 deterministic::Runner::timed(Duration::from_secs(30))
626 }
627 .start(f);
628
629 if complete {
631 break;
632 }
633
634 prev_ctx = Some(context.recover());
635 }
636 }
637
638 #[test_traced]
639 fn test_backfill() {
640 let n = 4;
642 let required_containers = 100;
643 let activity_timeout = 10;
644 let skip_timeout = 5;
645 let namespace = b"consensus".to_vec();
646 let executor = deterministic::Runner::timed(Duration::from_secs(360));
647 executor.start(|context| async move {
648 let (network, mut oracle) = Network::new(
650 context.with_label("network"),
651 Config {
652 max_size: 1024 * 1024,
653 },
654 );
655
656 network.start();
658
659 let mut schemes = Vec::new();
661 let mut validators = Vec::new();
662 for i in 0..n {
663 let scheme = PrivateKey::from_seed(i as u64);
664 let pk = scheme.public_key();
665 schemes.push(scheme);
666 validators.push(pk);
667 }
668 validators.sort();
669 schemes.sort_by_key(|s| s.public_key());
670 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
671 let mut registrations = register_validators(&mut oracle, &validators).await;
672
673 let link = Link {
675 latency: 10.0,
676 jitter: 1.0,
677 success_rate: 1.0,
678 };
679 link_validators(
680 &mut oracle,
681 &validators,
682 Action::Link(link),
683 Some(|_, i, j| ![i, j].contains(&0usize)),
684 )
685 .await;
686
687 let relay = Arc::new(mocks::relay::Relay::new());
689 let mut supervisors = Vec::new();
690 let mut engine_handlers = Vec::new();
691 for (idx_scheme, scheme) in schemes.iter().enumerate() {
692 if idx_scheme == 0 {
694 continue;
695 }
696
697 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
699
700 let validator = scheme.public_key();
702 let supervisor_config = mocks::supervisor::Config {
703 namespace: namespace.clone(),
704 participants: view_validators.clone(),
705 };
706 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
707 supervisor_config,
708 );
709 supervisors.push(supervisor.clone());
710 let application_cfg = mocks::application::Config {
711 hasher: Sha256::default(),
712 relay: relay.clone(),
713 participant: validator.clone(),
714 propose_latency: (10.0, 5.0),
715 verify_latency: (10.0, 5.0),
716 };
717 let (actor, application) = mocks::application::Application::new(
718 context.with_label("application"),
719 application_cfg,
720 );
721 actor.start();
722 let cfg = config::Config {
723 crypto: scheme.clone(),
724 automaton: application.clone(),
725 relay: application.clone(),
726 reporter: supervisor.clone(),
727 supervisor,
728 partition: validator.to_string(),
729 compression: Some(3),
730 mailbox_size: 1024,
731 namespace: namespace.clone(),
732 leader_timeout: Duration::from_secs(1),
733 notarization_timeout: Duration::from_secs(2),
734 nullify_retry: Duration::from_secs(10),
735 fetch_timeout: Duration::from_secs(1),
736 activity_timeout,
737 skip_timeout,
738 max_fetch_count: 1, max_participants: n as usize,
740 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
741 fetch_concurrent: 1,
742 replay_buffer: 1024 * 1024,
743 write_buffer: 1024 * 1024,
744 };
745 let (voter, resolver) = registrations
746 .remove(&validator)
747 .expect("validator should be registered");
748 let engine = Engine::new(context.with_label("engine"), cfg);
749 engine_handlers.push(engine.start(voter, resolver));
750 }
751
752 let mut finalizers = Vec::new();
754 for supervisor in supervisors.iter_mut() {
755 let (mut latest, mut monitor) = supervisor.subscribe().await;
756 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
757 while latest < required_containers {
758 latest = monitor.next().await.expect("event missing");
759 }
760 }));
761 }
762 join_all(finalizers).await;
763
764 let link = Link {
766 latency: 3_000.0,
767 jitter: 0.0,
768 success_rate: 1.0,
769 };
770 link_validators(
771 &mut oracle,
772 &validators,
773 Action::Update(link.clone()),
774 Some(|_, i, j| ![i, j].contains(&0usize)),
775 )
776 .await;
777
778 context.sleep(Duration::from_secs(120)).await;
780
781 link_validators(
783 &mut oracle,
784 &validators,
785 Action::Unlink,
786 Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
787 )
788 .await;
789
790 let scheme = schemes[0].clone();
792 let validator = scheme.public_key();
793
794 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
796
797 link_validators(
799 &mut oracle,
800 &validators,
801 Action::Link(link),
802 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
803 )
804 .await;
805
806 let link = Link {
808 latency: 10.0,
809 jitter: 2.5,
810 success_rate: 1.0,
811 };
812 link_validators(
813 &mut oracle,
814 &validators,
815 Action::Update(link),
816 Some(|_, i, j| ![i, j].contains(&1usize)),
817 )
818 .await;
819
820 let supervisor_config = mocks::supervisor::Config {
822 namespace: namespace.clone(),
823 participants: view_validators.clone(),
824 };
825 let mut supervisor =
826 mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(supervisor_config);
827 supervisors.push(supervisor.clone());
828 let application_cfg = mocks::application::Config {
829 hasher: Sha256::default(),
830 relay: relay.clone(),
831 participant: validator.clone(),
832 propose_latency: (10.0, 5.0),
833 verify_latency: (10.0, 5.0),
834 };
835 let (actor, application) = mocks::application::Application::new(
836 context.with_label("application"),
837 application_cfg,
838 );
839 actor.start();
840 let cfg = config::Config {
841 crypto: scheme,
842 automaton: application.clone(),
843 relay: application.clone(),
844 reporter: supervisor.clone(),
845 supervisor: supervisor.clone(),
846 partition: validator.to_string(),
847 compression: Some(3),
848 mailbox_size: 1024,
849 namespace: namespace.clone(),
850 leader_timeout: Duration::from_secs(1),
851 notarization_timeout: Duration::from_secs(2),
852 nullify_retry: Duration::from_secs(10),
853 fetch_timeout: Duration::from_secs(1),
854 activity_timeout,
855 skip_timeout,
856 max_fetch_count: 1,
857 max_participants: n as usize,
858 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
859 fetch_concurrent: 1,
860 replay_buffer: 1024 * 1024,
861 write_buffer: 1024 * 1024,
862 };
863 let (voter, resolver) = registrations
864 .remove(&validator)
865 .expect("validator should be registered");
866 let engine = Engine::new(context.with_label("engine"), cfg);
867 engine_handlers.push(engine.start(voter, resolver));
868
869 let (mut latest, mut monitor) = supervisor.subscribe().await;
871 while latest < required_containers {
872 latest = monitor.next().await.expect("event missing");
873 }
874 });
875 }
876
877 #[test_traced]
878 fn test_one_offline() {
879 let n = 5;
881 let threshold = quorum(n);
882 let required_containers = 100;
883 let activity_timeout = 10;
884 let skip_timeout = 5;
885 let namespace = b"consensus".to_vec();
886 let executor = deterministic::Runner::timed(Duration::from_secs(30));
887 executor.start(|context| async move {
888 let (network, mut oracle) = Network::new(
890 context.with_label("network"),
891 Config {
892 max_size: 1024 * 1024,
893 },
894 );
895
896 network.start();
898
899 let mut schemes = Vec::new();
901 let mut validators = Vec::new();
902 for i in 0..n {
903 let scheme = PrivateKey::from_seed(i as u64);
904 let pk = scheme.public_key();
905 schemes.push(scheme);
906 validators.push(pk);
907 }
908 validators.sort();
909 schemes.sort_by_key(|s| s.public_key());
910 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
911 let mut registrations = register_validators(&mut oracle, &validators).await;
912
913 let link = Link {
915 latency: 10.0,
916 jitter: 1.0,
917 success_rate: 1.0,
918 };
919 link_validators(
920 &mut oracle,
921 &validators,
922 Action::Link(link),
923 Some(|_, i, j| ![i, j].contains(&0usize)),
924 )
925 .await;
926
927 let relay = Arc::new(mocks::relay::Relay::new());
929 let mut supervisors = Vec::new();
930 let mut engine_handlers = Vec::new();
931 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
932 if idx_scheme == 0 {
934 continue;
935 }
936
937 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
939
940 let validator = scheme.public_key();
942 let supervisor_config = mocks::supervisor::Config {
943 namespace: namespace.clone(),
944 participants: view_validators.clone(),
945 };
946 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
947 supervisor_config,
948 );
949 supervisors.push(supervisor.clone());
950 let application_cfg = mocks::application::Config {
951 hasher: Sha256::default(),
952 relay: relay.clone(),
953 participant: validator.clone(),
954 propose_latency: (10.0, 5.0),
955 verify_latency: (10.0, 5.0),
956 };
957 let (actor, application) = mocks::application::Application::new(
958 context.with_label("application"),
959 application_cfg,
960 );
961 actor.start();
962 let cfg = config::Config {
963 crypto: scheme,
964 automaton: application.clone(),
965 relay: application.clone(),
966 reporter: supervisor.clone(),
967 partition: validator.to_string(),
968 compression: Some(3),
969 supervisor,
970 mailbox_size: 1024,
971 namespace: namespace.clone(),
972 leader_timeout: Duration::from_secs(1),
973 notarization_timeout: Duration::from_secs(2),
974 nullify_retry: Duration::from_secs(10),
975 fetch_timeout: Duration::from_secs(1),
976 activity_timeout,
977 skip_timeout,
978 max_participants: n as usize,
979 max_fetch_count: 1,
980 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
981 fetch_concurrent: 1,
982 replay_buffer: 1024 * 1024,
983 write_buffer: 1024 * 1024,
984 };
985 let (voter, resolver) = registrations
986 .remove(&validator)
987 .expect("validator should be registered");
988 let engine = Engine::new(context.with_label("engine"), cfg);
989 engine_handlers.push(engine.start(voter, resolver));
990 }
991
992 let mut finalizers = Vec::new();
994 for supervisor in supervisors.iter_mut() {
995 let (mut latest, mut monitor) = supervisor.subscribe().await;
996 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
997 while latest < required_containers {
998 latest = monitor.next().await.expect("event missing");
999 }
1000 }));
1001 }
1002 join_all(finalizers).await;
1003
1004 let offline = &validators[0];
1006 for supervisor in supervisors.iter() {
1007 {
1009 let faults = supervisor.faults.lock().unwrap();
1010 assert!(faults.is_empty());
1011 }
1012
1013 {
1015 let notarizes = supervisor.notarizes.lock().unwrap();
1016 for (view, payloads) in notarizes.iter() {
1017 for (_, participants) in payloads.iter() {
1018 if participants.contains(offline) {
1019 panic!("view: {view}");
1020 }
1021 }
1022 }
1023 }
1024 {
1025 let nullifies = supervisor.nullifies.lock().unwrap();
1026 for (view, participants) in nullifies.iter() {
1027 if participants.contains(offline) {
1028 panic!("view: {view}");
1029 }
1030 }
1031 }
1032 {
1033 let finalizes = supervisor.finalizes.lock().unwrap();
1034 for (view, payloads) in finalizes.iter() {
1035 for (_, finalizers) in payloads.iter() {
1036 if finalizers.contains(offline) {
1037 panic!("view: {view}");
1038 }
1039 }
1040 }
1041 }
1042
1043 let mut offline_views = Vec::new();
1045 {
1046 let leaders = supervisor.leaders.lock().unwrap();
1047 for (view, leader) in leaders.iter() {
1048 if leader == offline {
1049 offline_views.push(*view);
1050 }
1051 }
1052 }
1053 assert!(!offline_views.is_empty());
1054
1055 {
1057 let nullifies = supervisor.nullifies.lock().unwrap();
1058 for view in offline_views.iter() {
1059 let nullifies = nullifies.get(view).unwrap();
1060 if nullifies.len() < threshold as usize {
1061 panic!("view: {view}");
1062 }
1063 }
1064 }
1065 {
1066 let nullifications = supervisor.nullifications.lock().unwrap();
1067 for view in offline_views.iter() {
1068 nullifications.get(view).unwrap();
1069 }
1070 }
1071 }
1072
1073 let encoded = context.encode();
1075 let lines = encoded.lines();
1076 let mut skipped_views = 0;
1077 let mut nodes_skipping = 0;
1078 for line in lines {
1079 if line.contains("_engine_voter_skipped_views_total") {
1080 let parts: Vec<&str> = line.split_whitespace().collect();
1081 if let Some(number_str) = parts.last() {
1082 if let Ok(number) = number_str.parse::<u64>() {
1083 if number > 0 {
1084 nodes_skipping += 1;
1085 }
1086 if number > skipped_views {
1087 skipped_views = number;
1088 }
1089 }
1090 }
1091 }
1092 }
1093 assert!(
1094 skipped_views > 0,
1095 "expected skipped views to be greater than 0"
1096 );
1097 assert_eq!(
1098 nodes_skipping,
1099 n - 1,
1100 "expected all online nodes to be skipping views"
1101 );
1102 });
1103 }
1104
1105 #[test_traced]
1106 fn test_slow_validator() {
1107 let n = 5;
1109 let required_containers = 50;
1110 let activity_timeout = 10;
1111 let skip_timeout = 5;
1112 let namespace = b"consensus".to_vec();
1113 let executor = deterministic::Runner::timed(Duration::from_secs(30));
1114 executor.start(|context| async move {
1115 let (network, mut oracle) = Network::new(
1117 context.with_label("network"),
1118 Config {
1119 max_size: 1024 * 1024,
1120 },
1121 );
1122
1123 network.start();
1125
1126 let mut schemes = Vec::new();
1128 let mut validators = Vec::new();
1129 for i in 0..n {
1130 let scheme = PrivateKey::from_seed(i as u64);
1131 let pk = scheme.public_key();
1132 schemes.push(scheme);
1133 validators.push(pk);
1134 }
1135 validators.sort();
1136 schemes.sort_by_key(|s| s.public_key());
1137 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1138 let mut registrations = register_validators(&mut oracle, &validators).await;
1139
1140 let link = Link {
1142 latency: 10.0,
1143 jitter: 1.0,
1144 success_rate: 1.0,
1145 };
1146 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1147
1148 let relay = Arc::new(mocks::relay::Relay::new());
1150 let mut supervisors = Vec::new();
1151 let mut engine_handlers = Vec::new();
1152 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1153 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1155
1156 let validator = scheme.public_key();
1158 let supervisor_config = mocks::supervisor::Config {
1159 namespace: namespace.clone(),
1160 participants: view_validators.clone(),
1161 };
1162 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1163 supervisor_config,
1164 );
1165 supervisors.push(supervisor.clone());
1166 let application_cfg = if idx_scheme == 0 {
1167 mocks::application::Config {
1168 hasher: Sha256::default(),
1169 relay: relay.clone(),
1170 participant: validator.clone(),
1171 propose_latency: (3_000.0, 0.0),
1172 verify_latency: (3_000.0, 5.0),
1173 }
1174 } else {
1175 mocks::application::Config {
1176 hasher: Sha256::default(),
1177 relay: relay.clone(),
1178 participant: validator.clone(),
1179 propose_latency: (10.0, 5.0),
1180 verify_latency: (10.0, 5.0),
1181 }
1182 };
1183 let (actor, application) = mocks::application::Application::new(
1184 context.with_label("application"),
1185 application_cfg,
1186 );
1187 actor.start();
1188 let cfg = config::Config {
1189 crypto: scheme,
1190 automaton: application.clone(),
1191 relay: application.clone(),
1192 reporter: supervisor.clone(),
1193 partition: validator.to_string(),
1194 compression: Some(3),
1195 supervisor,
1196 mailbox_size: 1024,
1197 namespace: namespace.clone(),
1198 leader_timeout: Duration::from_secs(1),
1199 notarization_timeout: Duration::from_secs(2),
1200 nullify_retry: Duration::from_secs(10),
1201 fetch_timeout: Duration::from_secs(1),
1202 activity_timeout,
1203 skip_timeout,
1204 max_fetch_count: 1,
1205 max_participants: n as usize,
1206 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1207 fetch_concurrent: 1,
1208 replay_buffer: 1024 * 1024,
1209 write_buffer: 1024 * 1024,
1210 };
1211 let (voter, resolver) = registrations
1212 .remove(&validator)
1213 .expect("validator should be registered");
1214 let engine = Engine::new(context.with_label("engine"), cfg);
1215 engine_handlers.push(engine.start(voter, resolver));
1216 }
1217
1218 let mut finalizers = Vec::new();
1220 for supervisor in supervisors.iter_mut() {
1221 let (mut latest, mut monitor) = supervisor.subscribe().await;
1222 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1223 while latest < required_containers {
1224 latest = monitor.next().await.expect("event missing");
1225 }
1226 }));
1227 }
1228 join_all(finalizers).await;
1229
1230 let slow = &validators[0];
1232 for supervisor in supervisors.iter() {
1233 {
1235 let faults = supervisor.faults.lock().unwrap();
1236 assert!(faults.is_empty());
1237 }
1238
1239 {
1241 let notarizes = supervisor.notarizes.lock().unwrap();
1242 for (view, payloads) in notarizes.iter() {
1243 for (_, participants) in payloads.iter() {
1244 if participants.contains(slow) {
1245 panic!("view: {view}");
1246 }
1247 }
1248 }
1249 }
1250 {
1251 let nullifies = supervisor.nullifies.lock().unwrap();
1252 for (view, participants) in nullifies.iter() {
1253 if *view > 10 && participants.contains(slow) {
1255 panic!("view: {view}");
1256 }
1257 }
1258 }
1259 {
1260 let finalizes = supervisor.finalizes.lock().unwrap();
1261 for (view, payloads) in finalizes.iter() {
1262 for (_, finalizers) in payloads.iter() {
1263 if finalizers.contains(slow) {
1264 panic!("view: {view}");
1265 }
1266 }
1267 }
1268 }
1269 }
1270 });
1271 }
1272
1273 #[test_traced]
1274 fn test_all_recovery() {
1275 let n = 5;
1277 let required_containers = 100;
1278 let activity_timeout = 10;
1279 let skip_timeout = 2;
1280 let namespace = b"consensus".to_vec();
1281 let executor = deterministic::Runner::timed(Duration::from_secs(180));
1282 executor.start(|context| async move {
1283 let (network, mut oracle) = Network::new(
1285 context.with_label("network"),
1286 Config {
1287 max_size: 1024 * 1024,
1288 },
1289 );
1290
1291 network.start();
1293
1294 let mut schemes = Vec::new();
1296 let mut validators = Vec::new();
1297 for i in 0..n {
1298 let scheme = PrivateKey::from_seed(i as u64);
1299 let pk = scheme.public_key();
1300 schemes.push(scheme);
1301 validators.push(pk);
1302 }
1303 validators.sort();
1304 schemes.sort_by_key(|s| s.public_key());
1305 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1306 let mut registrations = register_validators(&mut oracle, &validators).await;
1307
1308 let link = Link {
1310 latency: 3_000.0,
1311 jitter: 0.0,
1312 success_rate: 1.0,
1313 };
1314 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1315
1316 let relay = Arc::new(mocks::relay::Relay::new());
1318 let mut supervisors = Vec::new();
1319 let mut engine_handlers = Vec::new();
1320 for scheme in schemes.iter() {
1321 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1323
1324 let validator = scheme.public_key();
1326 let supervisor_config = mocks::supervisor::Config {
1327 namespace: namespace.clone(),
1328 participants: view_validators.clone(),
1329 };
1330 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1331 supervisor_config,
1332 );
1333 supervisors.push(supervisor.clone());
1334 let application_cfg = mocks::application::Config {
1335 hasher: Sha256::default(),
1336 relay: relay.clone(),
1337 participant: validator.clone(),
1338 propose_latency: (10.0, 5.0),
1339 verify_latency: (10.0, 5.0),
1340 };
1341 let (actor, application) = mocks::application::Application::new(
1342 context.with_label("application"),
1343 application_cfg,
1344 );
1345 actor.start();
1346 let cfg = config::Config {
1347 crypto: scheme.clone(),
1348 automaton: application.clone(),
1349 relay: application.clone(),
1350 reporter: supervisor.clone(),
1351 partition: validator.to_string(),
1352 compression: Some(3),
1353 supervisor,
1354 mailbox_size: 1024,
1355 namespace: namespace.clone(),
1356 leader_timeout: Duration::from_secs(1),
1357 notarization_timeout: Duration::from_secs(2),
1358 nullify_retry: Duration::from_secs(10),
1359 fetch_timeout: Duration::from_secs(1),
1360 activity_timeout,
1361 skip_timeout,
1362 max_fetch_count: 1,
1363 max_participants: n as usize,
1364 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1365 fetch_concurrent: 1,
1366 replay_buffer: 1024 * 1024,
1367 write_buffer: 1024 * 1024,
1368 };
1369 let (voter, resolver) = registrations
1370 .remove(&validator)
1371 .expect("validator should be registered");
1372 let engine = Engine::new(context.with_label("engine"), cfg);
1373 engine_handlers.push(engine.start(voter, resolver));
1374 }
1375
1376 let mut finalizers = Vec::new();
1378 for supervisor in supervisors.iter_mut() {
1379 let (_, mut monitor) = supervisor.subscribe().await;
1380 finalizers.push(
1381 context
1382 .with_label("finalizer")
1383 .spawn(move |context| async move {
1384 select! {
1385 _timeout = context.sleep(Duration::from_secs(60)) => {},
1386 _done = monitor.next() => {
1387 panic!("engine should not notarize or finalize anything");
1388 }
1389 }
1390 }),
1391 );
1392 }
1393 join_all(finalizers).await;
1394
1395 link_validators(&mut oracle, &validators, Action::Unlink, None).await;
1397
1398 context.sleep(Duration::from_secs(60)).await;
1400
1401 let mut latest = 0;
1403 for supervisor in supervisors.iter() {
1404 let nullifies = supervisor.nullifies.lock().unwrap();
1405 let max = nullifies.keys().max().unwrap();
1406 if *max > latest {
1407 latest = *max;
1408 }
1409 }
1410
1411 let link = Link {
1413 latency: 10.0,
1414 jitter: 1.0,
1415 success_rate: 1.0,
1416 };
1417 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1418
1419 let mut finalizers = Vec::new();
1421 for supervisor in supervisors.iter_mut() {
1422 let (mut latest, mut monitor) = supervisor.subscribe().await;
1423 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1424 while latest < required_containers {
1425 latest = monitor.next().await.expect("event missing");
1426 }
1427 }));
1428 }
1429 join_all(finalizers).await;
1430
1431 for supervisor in supervisors.iter() {
1433 {
1435 let faults = supervisor.faults.lock().unwrap();
1436 assert!(faults.is_empty());
1437 }
1438
1439 {
1444 let mut found = 0;
1446 let finalizations = supervisor.finalizations.lock().unwrap();
1447 for i in latest..latest + activity_timeout {
1448 if finalizations.contains_key(&i) {
1449 found += 1;
1450 }
1451 }
1452 assert!(found >= activity_timeout - 2, "found: {found}");
1453 }
1454 }
1455 });
1456 }
1457
1458 #[test_traced]
1459 #[ignore]
1460 fn test_partition() {
1461 let n = 10;
1463 let required_containers = 50;
1464 let activity_timeout = 10;
1465 let skip_timeout = 5;
1466 let namespace = b"consensus".to_vec();
1467 let executor = deterministic::Runner::timed(Duration::from_secs(900));
1468 executor.start(|context| async move {
1469 let (network, mut oracle) = Network::new(
1471 context.with_label("network"),
1472 Config {
1473 max_size: 1024 * 1024,
1474 },
1475 );
1476
1477 network.start();
1479
1480 let mut schemes = Vec::new();
1482 let mut validators = Vec::new();
1483 for i in 0..n {
1484 let scheme = PrivateKey::from_seed(i as u64);
1485 let pk = scheme.public_key();
1486 schemes.push(scheme);
1487 validators.push(pk);
1488 }
1489 validators.sort();
1490 schemes.sort_by_key(|s| s.public_key());
1491 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1492 let mut registrations = register_validators(&mut oracle, &validators).await;
1493
1494 let link = Link {
1496 latency: 10.0,
1497 jitter: 1.0,
1498 success_rate: 1.0,
1499 };
1500 link_validators(&mut oracle, &validators, Action::Link(link.clone()), None).await;
1501
1502 let relay = Arc::new(mocks::relay::Relay::new());
1504 let mut supervisors = Vec::new();
1505 let mut engine_handlers = Vec::new();
1506 for scheme in schemes.iter() {
1507 let validator = scheme.public_key();
1509 let supervisor_config = mocks::supervisor::Config {
1510 namespace: namespace.clone(),
1511 participants: view_validators.clone(),
1512 };
1513 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1514 supervisor_config,
1515 );
1516 supervisors.push(supervisor.clone());
1517 let application_cfg = mocks::application::Config {
1518 hasher: Sha256::default(),
1519 relay: relay.clone(),
1520 participant: validator.clone(),
1521 propose_latency: (10.0, 5.0),
1522 verify_latency: (10.0, 5.0),
1523 };
1524 let (actor, application) = mocks::application::Application::new(
1525 context.with_label("application"),
1526 application_cfg,
1527 );
1528 actor.start();
1529 let cfg = config::Config {
1530 crypto: scheme.clone(),
1531 automaton: application.clone(),
1532 relay: application.clone(),
1533 reporter: supervisor.clone(),
1534 partition: validator.to_string(),
1535 compression: Some(3),
1536 supervisor,
1537 mailbox_size: 1024,
1538 namespace: namespace.clone(),
1539 leader_timeout: Duration::from_secs(1),
1540 notarization_timeout: Duration::from_secs(2),
1541 nullify_retry: Duration::from_secs(10),
1542 fetch_timeout: Duration::from_secs(1),
1543 activity_timeout,
1544 skip_timeout,
1545 max_fetch_count: 1,
1546 max_participants: n as usize,
1547 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1548 fetch_concurrent: 1,
1549 replay_buffer: 1024 * 1024,
1550 write_buffer: 1024 * 1024,
1551 };
1552 let (voter, resolver) = registrations
1553 .remove(&validator)
1554 .expect("validator should be registered");
1555 let engine = Engine::new(context.with_label("engine"), cfg);
1556 engine_handlers.push(engine.start(voter, resolver));
1557 }
1558
1559 let mut finalizers = Vec::new();
1561 for supervisor in supervisors.iter_mut() {
1562 let (mut latest, mut monitor) = supervisor.subscribe().await;
1563 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1564 while latest < required_containers {
1565 latest = monitor.next().await.expect("event missing");
1566 }
1567 }));
1568 }
1569 join_all(finalizers).await;
1570
1571 fn separated(n: usize, a: usize, b: usize) -> bool {
1573 let m = n / 2;
1574 (a < m && b >= m) || (a >= m && b < m)
1575 }
1576 link_validators(&mut oracle, &validators, Action::Unlink, Some(separated)).await;
1577
1578 context.sleep(Duration::from_secs(10)).await;
1580
1581 let mut finalizers = Vec::new();
1583 for supervisor in supervisors.iter_mut() {
1584 let (_, mut monitor) = supervisor.subscribe().await;
1585 finalizers.push(
1586 context
1587 .with_label("finalizer")
1588 .spawn(move |context| async move {
1589 select! {
1590 _timeout = context.sleep(Duration::from_secs(60)) => {},
1591 _done = monitor.next() => {
1592 panic!("engine should not notarize or finalize anything");
1593 }
1594 }
1595 }),
1596 );
1597 }
1598 join_all(finalizers).await;
1599
1600 link_validators(
1602 &mut oracle,
1603 &validators,
1604 Action::Link(link),
1605 Some(separated),
1606 )
1607 .await;
1608
1609 let mut finalizers = Vec::new();
1611 for supervisor in supervisors.iter_mut() {
1612 let (mut latest, mut monitor) = supervisor.subscribe().await;
1613 let required = latest + required_containers;
1614 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1615 while latest < required {
1616 latest = monitor.next().await.expect("event missing");
1617 }
1618 }));
1619 }
1620 join_all(finalizers).await;
1621
1622 for supervisor in supervisors.iter() {
1624 {
1626 let faults = supervisor.faults.lock().unwrap();
1627 assert!(faults.is_empty());
1628 }
1629 }
1630 });
1631 }
1632
1633 fn slow_and_lossy_links(seed: u64) -> String {
1634 let n = 5;
1636 let required_containers = 50;
1637 let activity_timeout = 10;
1638 let skip_timeout = 5;
1639 let namespace = b"consensus".to_vec();
1640 let cfg = deterministic::Config::new()
1641 .with_seed(seed)
1642 .with_timeout(Some(Duration::from_secs(5_000)));
1643 let executor = deterministic::Runner::new(cfg);
1644 executor.start(|context| async move {
1645 let (network, mut oracle) = Network::new(
1647 context.with_label("network"),
1648 Config {
1649 max_size: 1024 * 1024,
1650 },
1651 );
1652
1653 network.start();
1655
1656 let mut schemes = Vec::new();
1658 let mut validators = Vec::new();
1659 for i in 0..n {
1660 let scheme = PrivateKey::from_seed(i as u64);
1661 let pk = scheme.public_key();
1662 schemes.push(scheme);
1663 validators.push(pk);
1664 }
1665 validators.sort();
1666 schemes.sort_by_key(|s| s.public_key());
1667 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1668 let mut registrations = register_validators(&mut oracle, &validators).await;
1669
1670 let degraded_link = Link {
1672 latency: 200.0,
1673 jitter: 150.0,
1674 success_rate: 0.5,
1675 };
1676 link_validators(&mut oracle, &validators, Action::Link(degraded_link), None).await;
1677
1678 let relay = Arc::new(mocks::relay::Relay::new());
1680 let mut supervisors = Vec::new();
1681 let mut engine_handlers = Vec::new();
1682 for scheme in schemes.into_iter() {
1683 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1685
1686 let validator = scheme.public_key();
1688 let supervisor_config = mocks::supervisor::Config {
1689 namespace: namespace.clone(),
1690 participants: view_validators.clone(),
1691 };
1692 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1693 supervisor_config,
1694 );
1695 supervisors.push(supervisor.clone());
1696 let application_cfg = mocks::application::Config {
1697 hasher: Sha256::default(),
1698 relay: relay.clone(),
1699 participant: validator.clone(),
1700 propose_latency: (10.0, 5.0),
1701 verify_latency: (10.0, 5.0),
1702 };
1703 let (actor, application) = mocks::application::Application::new(
1704 context.with_label("application"),
1705 application_cfg,
1706 );
1707 actor.start();
1708 let cfg = config::Config {
1709 crypto: scheme,
1710 automaton: application.clone(),
1711 relay: application.clone(),
1712 reporter: supervisor.clone(),
1713 partition: validator.to_string(),
1714 compression: Some(3),
1715 supervisor,
1716 mailbox_size: 1024,
1717 namespace: namespace.clone(),
1718 leader_timeout: Duration::from_secs(1),
1719 notarization_timeout: Duration::from_secs(2),
1720 nullify_retry: Duration::from_secs(10),
1721 fetch_timeout: Duration::from_secs(1),
1722 activity_timeout,
1723 skip_timeout,
1724 max_fetch_count: 1,
1725 max_participants: n as usize,
1726 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1727 fetch_concurrent: 1,
1728 replay_buffer: 1024 * 1024,
1729 write_buffer: 1024 * 1024,
1730 };
1731 let (voter, resolver) = registrations
1732 .remove(&validator)
1733 .expect("validator should be registered");
1734 let engine = Engine::new(context.with_label("engine"), cfg);
1735 engine_handlers.push(engine.start(voter, resolver));
1736 }
1737
1738 let mut finalizers = Vec::new();
1740 for supervisor in supervisors.iter_mut() {
1741 let (mut latest, mut monitor) = supervisor.subscribe().await;
1742 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1743 while latest < required_containers {
1744 latest = monitor.next().await.expect("event missing");
1745 }
1746 }));
1747 }
1748 join_all(finalizers).await;
1749
1750 for supervisor in supervisors.iter() {
1752 {
1754 let faults = supervisor.faults.lock().unwrap();
1755 assert!(faults.is_empty());
1756 }
1757 }
1758
1759 context.auditor().state()
1760 })
1761 }
1762
1763 #[test_traced]
1764 fn test_slow_and_lossy_links() {
1765 slow_and_lossy_links(0);
1766 }
1767
1768 #[test_traced]
1769 #[ignore]
1770 fn test_determinism() {
1771 for seed in 1..6 {
1774 let state_1 = slow_and_lossy_links(seed);
1776
1777 let state_2 = slow_and_lossy_links(seed);
1779
1780 assert_eq!(state_1, state_2);
1782 }
1783 }
1784
1785 fn conflicter(seed: u64) {
1786 let n = 4;
1788 let required_containers = 50;
1789 let activity_timeout = 10;
1790 let skip_timeout = 5;
1791 let namespace = b"consensus".to_vec();
1792 let cfg = deterministic::Config::new()
1793 .with_seed(seed)
1794 .with_timeout(Some(Duration::from_secs(30)));
1795 let executor = deterministic::Runner::new(cfg);
1796 executor.start(|context| async move {
1797 let (network, mut oracle) = Network::new(
1799 context.with_label("network"),
1800 Config {
1801 max_size: 1024 * 1024,
1802 },
1803 );
1804
1805 network.start();
1807
1808 let mut schemes = Vec::new();
1810 let mut validators = Vec::new();
1811 for i in 0..n {
1812 let scheme = PrivateKey::from_seed(i as u64);
1813 let pk = scheme.public_key();
1814 schemes.push(scheme);
1815 validators.push(pk);
1816 }
1817 validators.sort();
1818 schemes.sort_by_key(|s| s.public_key());
1819 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1820 let mut registrations = register_validators(&mut oracle, &validators).await;
1821
1822 let link = Link {
1824 latency: 10.0,
1825 jitter: 1.0,
1826 success_rate: 1.0,
1827 };
1828 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1829
1830 let relay = Arc::new(mocks::relay::Relay::new());
1832 let mut supervisors = Vec::new();
1833 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1834 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1836
1837 let validator = scheme.public_key();
1839 let supervisor_config = mocks::supervisor::Config {
1840 namespace: namespace.clone(),
1841 participants: view_validators.clone(),
1842 };
1843 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
1844 supervisor_config,
1845 );
1846 if idx_scheme == 0 {
1847 let cfg = mocks::conflicter::Config {
1848 crypto: scheme,
1849 supervisor,
1850 namespace: namespace.clone(),
1851 };
1852 let (voter, _) = registrations
1853 .remove(&validator)
1854 .expect("validator should be registered");
1855 let engine: mocks::conflicter::Conflicter<_, _, Sha256, _> =
1856 mocks::conflicter::Conflicter::new(
1857 context.with_label("byzantine_engine"),
1858 cfg,
1859 );
1860 engine.start(voter);
1861 } else {
1862 supervisors.push(supervisor.clone());
1863 let application_cfg = mocks::application::Config {
1864 hasher: Sha256::default(),
1865 relay: relay.clone(),
1866 participant: validator.clone(),
1867 propose_latency: (10.0, 5.0),
1868 verify_latency: (10.0, 5.0),
1869 };
1870 let (actor, application) = mocks::application::Application::new(
1871 context.with_label("application"),
1872 application_cfg,
1873 );
1874 actor.start();
1875 let cfg = config::Config {
1876 crypto: scheme,
1877 automaton: application.clone(),
1878 relay: application.clone(),
1879 reporter: supervisor.clone(),
1880 partition: validator.to_string(),
1881 compression: Some(3),
1882 supervisor,
1883 mailbox_size: 1024,
1884 namespace: namespace.clone(),
1885 leader_timeout: Duration::from_secs(1),
1886 notarization_timeout: Duration::from_secs(2),
1887 nullify_retry: Duration::from_secs(10),
1888 fetch_timeout: Duration::from_secs(1),
1889 activity_timeout,
1890 skip_timeout,
1891 max_fetch_count: 1,
1892 max_participants: n as usize,
1893 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1894 fetch_concurrent: 1,
1895 replay_buffer: 1024 * 1024,
1896 write_buffer: 1024 * 1024,
1897 };
1898 let (voter, resolver) = registrations
1899 .remove(&validator)
1900 .expect("validator should be registered");
1901 let engine = Engine::new(context.with_label("engine"), cfg);
1902 engine.start(voter, resolver);
1903 }
1904 }
1905
1906 let mut finalizers = Vec::new();
1908 for supervisor in supervisors.iter_mut() {
1909 let (mut latest, mut monitor) = supervisor.subscribe().await;
1910 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1911 while latest < required_containers {
1912 latest = monitor.next().await.expect("event missing");
1913 }
1914 }));
1915 }
1916 join_all(finalizers).await;
1917
1918 let byz = &validators[0];
1920 let mut count_conflicting_notarize = 0;
1921 let mut count_conflicting_finalize = 0;
1922 for supervisor in supervisors.iter() {
1923 {
1925 let faults = supervisor.faults.lock().unwrap();
1926 assert_eq!(faults.len(), 1);
1927 let faulter = faults.get(byz).expect("byzantine party is not faulter");
1928 for (_, faults) in faulter.iter() {
1929 for fault in faults.iter() {
1930 match fault {
1931 Activity::ConflictingNotarize(_) => {
1932 count_conflicting_notarize += 1;
1933 }
1934 Activity::ConflictingFinalize(_) => {
1935 count_conflicting_finalize += 1;
1936 }
1937 _ => panic!("unexpected fault: {fault:?}"),
1938 }
1939 }
1940 }
1941 }
1942 }
1943 assert!(count_conflicting_notarize > 0);
1944 assert!(count_conflicting_finalize > 0);
1945 });
1946 }
1947
1948 #[test_traced]
1949 #[ignore]
1950 fn test_conflicter() {
1951 for seed in 0..5 {
1952 conflicter(seed);
1953 }
1954 }
1955
1956 fn nuller(seed: u64) {
1957 let n = 4;
1959 let required_containers = 50;
1960 let activity_timeout = 10;
1961 let skip_timeout = 5;
1962 let namespace = b"consensus".to_vec();
1963 let cfg = deterministic::Config::new()
1964 .with_seed(seed)
1965 .with_timeout(Some(Duration::from_secs(30)));
1966 let executor = deterministic::Runner::new(cfg);
1967 executor.start(|context| async move {
1968 let (network, mut oracle) = Network::new(
1970 context.with_label("network"),
1971 Config {
1972 max_size: 1024 * 1024,
1973 },
1974 );
1975
1976 network.start();
1978
1979 let mut schemes = Vec::new();
1981 let mut validators = Vec::new();
1982 for i in 0..n {
1983 let scheme = PrivateKey::from_seed(i as u64);
1984 let pk = scheme.public_key();
1985 schemes.push(scheme);
1986 validators.push(pk);
1987 }
1988 validators.sort();
1989 schemes.sort_by_key(|s| s.public_key());
1990 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1991 let mut registrations = register_validators(&mut oracle, &validators).await;
1992
1993 let link = Link {
1995 latency: 10.0,
1996 jitter: 1.0,
1997 success_rate: 1.0,
1998 };
1999 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2000
2001 let relay = Arc::new(mocks::relay::Relay::new());
2003 let mut supervisors = Vec::new();
2004 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2005 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2007
2008 let validator = scheme.public_key();
2010 let supervisor_config = mocks::supervisor::Config {
2011 namespace: namespace.clone(),
2012 participants: view_validators.clone(),
2013 };
2014 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
2015 supervisor_config,
2016 );
2017 if idx_scheme == 0 {
2018 let cfg = mocks::nuller::Config {
2019 crypto: scheme,
2020 supervisor,
2021 namespace: namespace.clone(),
2022 };
2023 let (voter, _) = registrations
2024 .remove(&validator)
2025 .expect("validator should be registered");
2026 let engine: mocks::nuller::Nuller<_, _, Sha256, _> =
2027 mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
2028 engine.start(voter);
2029 } else {
2030 supervisors.push(supervisor.clone());
2031 let application_cfg = mocks::application::Config {
2032 hasher: Sha256::default(),
2033 relay: relay.clone(),
2034 participant: validator.clone(),
2035 propose_latency: (10.0, 5.0),
2036 verify_latency: (10.0, 5.0),
2037 };
2038 let (actor, application) = mocks::application::Application::new(
2039 context.with_label("application"),
2040 application_cfg,
2041 );
2042 actor.start();
2043 let cfg = config::Config {
2044 crypto: scheme,
2045 automaton: application.clone(),
2046 relay: application.clone(),
2047 reporter: supervisor.clone(),
2048 partition: validator.to_string(),
2049 compression: Some(3),
2050 supervisor,
2051 mailbox_size: 1024,
2052 namespace: namespace.clone(),
2053 leader_timeout: Duration::from_secs(1),
2054 notarization_timeout: Duration::from_secs(2),
2055 nullify_retry: Duration::from_secs(10),
2056 fetch_timeout: Duration::from_secs(1),
2057 activity_timeout,
2058 skip_timeout,
2059 max_fetch_count: 1,
2060 max_participants: n as usize,
2061 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2062 fetch_concurrent: 1,
2063 replay_buffer: 1024 * 1024,
2064 write_buffer: 1024 * 1024,
2065 };
2066 let (voter, resolver) = registrations
2067 .remove(&validator)
2068 .expect("validator should be registered");
2069 let engine = Engine::new(context.with_label("engine"), cfg);
2070 engine.start(voter, resolver);
2071 }
2072 }
2073
2074 let mut finalizers = Vec::new();
2076 for supervisor in supervisors.iter_mut() {
2077 let (mut latest, mut monitor) = supervisor.subscribe().await;
2078 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2079 while latest < required_containers {
2080 latest = monitor.next().await.expect("event missing");
2081 }
2082 }));
2083 }
2084 join_all(finalizers).await;
2085
2086 let byz = &validators[0];
2088 let mut count_nullify_and_finalize = 0;
2089 for supervisor in supervisors.iter() {
2090 {
2092 let faults = supervisor.faults.lock().unwrap();
2093 assert_eq!(faults.len(), 1);
2094 let faulter = faults.get(byz).expect("byzantine party is not faulter");
2095 for (_, faults) in faulter.iter() {
2096 for fault in faults.iter() {
2097 match fault {
2098 Activity::NullifyFinalize(_) => {
2099 count_nullify_and_finalize += 1;
2100 }
2101 _ => panic!("unexpected fault: {fault:?}"),
2102 }
2103 }
2104 }
2105 }
2106 }
2107 assert!(count_nullify_and_finalize > 0);
2108 });
2109 }
2110
2111 #[test_traced]
2112 #[ignore]
2113 fn test_nuller() {
2114 for seed in 0..5 {
2115 nuller(seed);
2116 }
2117 }
2118
2119 fn outdated(seed: u64) {
2120 let n = 4;
2122 let required_containers = 100;
2123 let activity_timeout = 10;
2124 let skip_timeout = 5;
2125 let namespace = b"consensus".to_vec();
2126 let cfg = deterministic::Config::new()
2127 .with_seed(seed)
2128 .with_timeout(Some(Duration::from_secs(30)));
2129 let executor = deterministic::Runner::new(cfg);
2130 executor.start(|context| async move {
2131 let (network, mut oracle) = Network::new(
2133 context.with_label("network"),
2134 Config {
2135 max_size: 1024 * 1024,
2136 },
2137 );
2138
2139 network.start();
2141
2142 let mut schemes = Vec::new();
2144 let mut validators = Vec::new();
2145 for i in 0..n {
2146 let scheme = PrivateKey::from_seed(i as u64);
2147 let pk = scheme.public_key();
2148 schemes.push(scheme);
2149 validators.push(pk);
2150 }
2151 validators.sort();
2152 schemes.sort_by_key(|s| s.public_key());
2153 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
2154 let mut registrations = register_validators(&mut oracle, &validators).await;
2155
2156 let link = Link {
2158 latency: 10.0,
2159 jitter: 1.0,
2160 success_rate: 1.0,
2161 };
2162 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2163
2164 let relay = Arc::new(mocks::relay::Relay::new());
2166 let mut supervisors = Vec::new();
2167 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2168 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2170
2171 let validator = scheme.public_key();
2173 let supervisor_config = mocks::supervisor::Config {
2174 namespace: namespace.clone(),
2175 participants: view_validators.clone(),
2176 };
2177 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
2178 supervisor_config,
2179 );
2180 if idx_scheme == 0 {
2181 let cfg = mocks::outdated::Config {
2182 crypto: scheme,
2183 supervisor,
2184 namespace: namespace.clone(),
2185 view_delta: activity_timeout * 4,
2186 };
2187 let (voter, _) = registrations
2188 .remove(&validator)
2189 .expect("validator should be registered");
2190 let engine: mocks::outdated::Outdated<_, _, Sha256, _> =
2191 mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
2192 engine.start(voter);
2193 } else {
2194 supervisors.push(supervisor.clone());
2195 let application_cfg = mocks::application::Config {
2196 hasher: Sha256::default(),
2197 relay: relay.clone(),
2198 participant: validator.clone(),
2199 propose_latency: (10.0, 5.0),
2200 verify_latency: (10.0, 5.0),
2201 };
2202 let (actor, application) = mocks::application::Application::new(
2203 context.with_label("application"),
2204 application_cfg,
2205 );
2206 actor.start();
2207 let cfg = config::Config {
2208 crypto: scheme,
2209 automaton: application.clone(),
2210 relay: application.clone(),
2211 reporter: supervisor.clone(),
2212 supervisor,
2213 partition: validator.to_string(),
2214 compression: Some(3),
2215 mailbox_size: 1024,
2216 namespace: namespace.clone(),
2217 leader_timeout: Duration::from_secs(1),
2218 notarization_timeout: Duration::from_secs(2),
2219 nullify_retry: Duration::from_secs(10),
2220 fetch_timeout: Duration::from_secs(1),
2221 activity_timeout,
2222 skip_timeout,
2223 max_fetch_count: 1,
2224 max_participants: n as usize,
2225 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2226 fetch_concurrent: 1,
2227 replay_buffer: 1024 * 1024,
2228 write_buffer: 1024 * 1024,
2229 };
2230 let (voter, resolver) = registrations
2231 .remove(&validator)
2232 .expect("validator should be registered");
2233 let engine = Engine::new(context.with_label("engine"), cfg);
2234 engine.start(voter, resolver);
2235 }
2236 }
2237
2238 let mut finalizers = Vec::new();
2240 for supervisor in supervisors.iter_mut() {
2241 let (mut latest, mut monitor) = supervisor.subscribe().await;
2242 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2243 while latest < required_containers {
2244 latest = monitor.next().await.expect("event missing");
2245 }
2246 }));
2247 }
2248 join_all(finalizers).await;
2249
2250 for supervisor in supervisors.iter() {
2252 {
2253 let faults = supervisor.faults.lock().unwrap();
2254 assert!(faults.is_empty());
2255 }
2256 }
2257 });
2258 }
2259
2260 #[test_traced]
2261 #[ignore]
2262 fn test_outdated() {
2263 for seed in 0..5 {
2264 outdated(seed);
2265 }
2266 }
2267
2268 #[test_traced]
2269 #[ignore]
2270 fn test_1k() {
2271 let n = 10;
2273 let required_containers = 1_000;
2274 let activity_timeout = 10;
2275 let skip_timeout = 5;
2276 let namespace = b"consensus".to_vec();
2277 let cfg = deterministic::Config::new();
2278 let executor = deterministic::Runner::new(cfg);
2279 executor.start(|context| async move {
2280 let (network, mut oracle) = Network::new(
2282 context.with_label("network"),
2283 Config {
2284 max_size: 1024 * 1024,
2285 },
2286 );
2287
2288 network.start();
2290
2291 let mut schemes = Vec::new();
2293 let mut validators = Vec::new();
2294 for i in 0..n {
2295 let scheme = PrivateKey::from_seed(i as u64);
2296 let pk = scheme.public_key();
2297 schemes.push(scheme);
2298 validators.push(pk);
2299 }
2300 validators.sort();
2301 schemes.sort_by_key(|s| s.public_key());
2302 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
2303 let mut registrations = register_validators(&mut oracle, &validators).await;
2304
2305 let link = Link {
2307 latency: 80.0,
2308 jitter: 10.0,
2309 success_rate: 0.98,
2310 };
2311 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2312
2313 let relay = Arc::new(mocks::relay::Relay::new());
2315 let mut supervisors = Vec::new();
2316 let mut engine_handlers = Vec::new();
2317 for scheme in schemes.into_iter() {
2318 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2320
2321 let validator = scheme.public_key();
2323 let supervisor_config = mocks::supervisor::Config {
2324 namespace: namespace.clone(),
2325 participants: view_validators.clone(),
2326 };
2327 let supervisor = mocks::supervisor::Supervisor::<PublicKey, Sha256Digest>::new(
2328 supervisor_config,
2329 );
2330 supervisors.push(supervisor.clone());
2331 let application_cfg = mocks::application::Config {
2332 hasher: Sha256::default(),
2333 relay: relay.clone(),
2334 participant: validator.clone(),
2335 propose_latency: (100.0, 50.0),
2336 verify_latency: (50.0, 40.0),
2337 };
2338 let (actor, application) = mocks::application::Application::new(
2339 context.with_label("application"),
2340 application_cfg,
2341 );
2342 actor.start();
2343 let cfg = config::Config {
2344 crypto: scheme,
2345 automaton: application.clone(),
2346 relay: application.clone(),
2347 reporter: supervisor.clone(),
2348 partition: validator.to_string(),
2349 compression: Some(3),
2350 supervisor,
2351 mailbox_size: 1024,
2352 namespace: namespace.clone(),
2353 leader_timeout: Duration::from_secs(1),
2354 notarization_timeout: Duration::from_secs(2),
2355 nullify_retry: Duration::from_secs(10),
2356 fetch_timeout: Duration::from_secs(1),
2357 activity_timeout,
2358 skip_timeout,
2359 max_fetch_count: 1,
2360 max_participants: n as usize,
2361 fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2362 fetch_concurrent: 1,
2363 replay_buffer: 1024 * 1024,
2364 write_buffer: 1024 * 1024,
2365 };
2366 let (voter, resolver) = registrations
2367 .remove(&validator)
2368 .expect("validator should be registered");
2369 let engine = Engine::new(context.with_label("engine"), cfg);
2370 engine_handlers.push(engine.start(voter, resolver));
2371 }
2372
2373 let mut finalizers = Vec::new();
2375 for supervisor in supervisors.iter_mut() {
2376 let (mut latest, mut monitor) = supervisor.subscribe().await;
2377 finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2378 while latest < required_containers {
2379 latest = monitor.next().await.expect("event missing");
2380 }
2381 }));
2382 }
2383 join_all(finalizers).await;
2384
2385 for supervisor in supervisors.iter() {
2387 {
2389 let faults = supervisor.faults.lock().unwrap();
2390 assert!(faults.is_empty());
2391 }
2392 }
2393 })
2394 }
2395}