1use commonware_utils::Array;
119
120mod encoder;
121mod prover;
122pub use prover::Prover;
123mod wire {
124 include!(concat!(env!("OUT_DIR"), "/simplex.wire.rs"));
125}
126
127cfg_if::cfg_if! {
128 if #[cfg(not(target_arch = "wasm32"))] {
129 mod actors;
130 mod config;
131 pub use config::Config;
132 mod engine;
133 pub use engine::Engine;
134 mod metrics;
135 mod verifier;
136 }
137}
138
139#[cfg(test)]
140pub mod mocks;
141
142pub type View = u64;
144
145use crate::Activity;
146
147#[derive(Clone)]
149pub struct Context<D: Array> {
150 pub view: View,
152
153 pub parent: (View, D),
160}
161
162pub const NOTARIZE: Activity = 0;
170pub const FINALIZE: Activity = 1;
172pub const CONFLICTING_NOTARIZE: Activity = 2;
174pub const CONFLICTING_FINALIZE: Activity = 3;
176pub const NULLIFY_AND_FINALIZE: Activity = 4;
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use commonware_cryptography::{sha256::Digest as Sha256Digest, Ed25519, Scheme, Sha256};
183 use commonware_macros::{select, test_traced};
184 use commonware_p2p::simulated::{Config, Link, Network, Oracle, Receiver, Sender};
185 use commonware_runtime::{
186 deterministic::{self, Executor},
187 Clock, Metrics, Runner, Spawner,
188 };
189 use commonware_storage::journal::variable::{Config as JConfig, Journal};
190 use commonware_utils::quorum;
191 use engine::Engine;
192 use futures::{channel::mpsc, StreamExt};
193 use governor::Quota;
194 use prover::Prover;
195 use rand::Rng;
196 use std::{
197 collections::{BTreeMap, HashMap, HashSet},
198 num::NonZeroU32,
199 sync::{Arc, Mutex},
200 time::Duration,
201 };
202 use tracing::debug;
203
204 async fn register_validators<P: Array>(
206 oracle: &mut Oracle<P>,
207 validators: &[P],
208 ) -> HashMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))> {
209 let mut registrations = HashMap::new();
210 for validator in validators.iter() {
211 let (voter_sender, voter_receiver) =
212 oracle.register(validator.clone(), 0).await.unwrap();
213 let (resolver_sender, resolver_receiver) =
214 oracle.register(validator.clone(), 1).await.unwrap();
215 registrations.insert(
216 validator.clone(),
217 (
218 (voter_sender, voter_receiver),
219 (resolver_sender, resolver_receiver),
220 ),
221 );
222 }
223 registrations
224 }
225
226 enum Action {
228 Link(Link),
229 Update(Link), Unlink,
231 }
232
233 async fn link_validators<P: Array>(
239 oracle: &mut Oracle<P>,
240 validators: &[P],
241 action: Action,
242 restrict_to: Option<fn(usize, usize, usize) -> bool>,
243 ) {
244 for (i1, v1) in validators.iter().enumerate() {
245 for (i2, v2) in validators.iter().enumerate() {
246 if v2 == v1 {
248 continue;
249 }
250
251 if let Some(f) = restrict_to {
253 if !f(validators.len(), i1, i2) {
254 continue;
255 }
256 }
257
258 match action {
260 Action::Update(_) | Action::Unlink => {
261 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
262 }
263 _ => {}
264 }
265
266 match action {
268 Action::Link(ref link) | Action::Update(ref link) => {
269 oracle
270 .add_link(v1.clone(), v2.clone(), link.clone())
271 .await
272 .unwrap();
273 }
274 _ => {}
275 }
276 }
277 }
278 }
279
280 #[test_traced]
281 fn test_all_online() {
282 let n = 5;
284 let threshold = quorum(n).expect("unable to calculate threshold");
285 let max_exceptions = 4;
286 let required_containers = 100;
287 let activity_timeout = 10;
288 let namespace = b"consensus".to_vec();
289 let (executor, context, _) = Executor::timed(Duration::from_secs(30));
290 executor.start(async move {
291 let (network, mut oracle) = Network::new(
293 context.with_label("network"),
294 Config {
295 max_size: 1024 * 1024,
296 },
297 );
298
299 network.start();
301
302 let mut schemes = Vec::new();
304 let mut validators = Vec::new();
305 for i in 0..n {
306 let scheme = Ed25519::from_seed(i as u64);
307 let pk = scheme.public_key();
308 schemes.push(scheme);
309 validators.push(pk);
310 }
311 validators.sort();
312 schemes.sort_by_key(|s| s.public_key());
313 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
314 let mut registrations = register_validators(&mut oracle, &validators).await;
315
316 let link = Link {
318 latency: 10.0,
319 jitter: 1.0,
320 success_rate: 1.0,
321 };
322 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
323
324 let prover = Prover::new(&namespace);
326 let relay = Arc::new(mocks::relay::Relay::new());
327 let mut supervisors = Vec::new();
328 let (done_sender, mut done_receiver) = mpsc::unbounded();
329 let mut engine_handlers = Vec::new();
330 for scheme in schemes.into_iter() {
331 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
333
334 let validator = scheme.public_key();
336 let supervisor_config = mocks::supervisor::Config {
337 prover: prover.clone(),
338 participants: view_validators.clone(),
339 };
340 let supervisor =
341 mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
342 supervisors.push(supervisor.clone());
343 let application_cfg = mocks::application::Config {
344 hasher: Sha256::default(),
345 relay: relay.clone(),
346 participant: validator.clone(),
347 tracker: done_sender.clone(),
348 propose_latency: (10.0, 5.0),
349 verify_latency: (10.0, 5.0),
350 };
351 let (actor, application) = mocks::application::Application::new(
352 context.with_label("application"),
353 application_cfg,
354 );
355 actor.start();
356 let cfg = JConfig {
357 partition: validator.to_string(),
358 };
359 let journal = Journal::init(context.with_label("journal"), cfg)
360 .await
361 .expect("unable to create journal");
362 let cfg = config::Config {
363 crypto: scheme,
364 automaton: application.clone(),
365 relay: application.clone(),
366 committer: application,
367 supervisor,
368 mailbox_size: 1024,
369 namespace: namespace.clone(),
370 leader_timeout: Duration::from_secs(1),
371 notarization_timeout: Duration::from_secs(2),
372 nullify_retry: Duration::from_secs(10),
373 fetch_timeout: Duration::from_secs(1),
374 activity_timeout,
375 max_fetch_count: 1,
376 max_fetch_size: 1024 * 512,
377 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
378 fetch_concurrent: 1,
379 replay_concurrency: 1,
380 };
381 let engine = Engine::new(context.with_label("engine"), journal, cfg);
382 let (voter, resolver) = registrations
383 .remove(&validator)
384 .expect("validator should be registered");
385 engine_handlers.push(engine.start(voter, resolver));
386 }
387
388 let mut completed = HashSet::new();
390 let mut finalized = HashMap::new();
391 loop {
392 let (validator, event) = done_receiver.next().await.unwrap();
393 if let mocks::application::Progress::Finalized(proof, digest) = event {
394 let (view, _, payload, _) =
395 prover.deserialize_finalization(proof, 5, true).unwrap();
396 if digest != payload {
397 panic!(
398 "finalization mismatch digest: {:?}, payload: {:?}",
399 digest, payload
400 );
401 }
402 if let Some(previous) = finalized.insert(view, digest.clone()) {
403 if previous != digest {
404 panic!(
405 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
406 view, previous, digest
407 );
408 }
409 }
410 if (finalized.len() as u64) < required_containers {
411 continue;
412 }
413 completed.insert(validator);
414 }
415 if completed.len() == n as usize {
416 break;
417 }
418 }
419
420 let latest_complete = required_containers - activity_timeout;
422 for supervisor in supervisors.iter() {
423 {
425 let faults = supervisor.faults.lock().unwrap();
426 assert!(faults.is_empty());
427 }
428
429 let mut exceptions = 0;
431 {
432 let notarizes = supervisor.notarizes.lock().unwrap();
433 for (view, payloads) in notarizes.iter() {
434 if payloads.len() > 1 {
436 panic!("view: {}", view);
437 }
438
439 if *view > latest_complete {
441 continue;
442 }
443
444 let digest = finalized.get(view).expect("view should be finalized");
446 let voters = payloads.get(digest).expect("digest should exist");
447 if voters.len() < threshold as usize {
448 panic!("view: {}", view);
451 }
452 if voters.len() != n as usize {
453 exceptions += 1;
454 }
455 }
456 }
457 {
458 let finalizes = supervisor.finalizes.lock().unwrap();
459 for (view, payloads) in finalizes.iter() {
460 if payloads.len() > 1 {
462 panic!("view: {}", view);
463 }
464
465 if *view > latest_complete {
467 continue;
468 }
469
470 let digest = finalized.get(view).expect("view should be finalized");
472 let finalizers = payloads.get(digest).expect("digest should exist");
473 if finalizers.len() < threshold as usize {
474 panic!("view: {}", view);
477 }
478 if finalizers.len() != n as usize {
479 exceptions += 1;
480 }
481 }
482 }
483
484 assert!(exceptions <= max_exceptions);
486 }
487 });
488 }
489
490 #[test_traced]
491 fn test_unclean_shutdown() {
492 let n = 5;
494 let required_containers = 100;
495 let activity_timeout = 10;
496 let namespace = b"consensus".to_vec();
497
498 let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
500 let notarized = Arc::new(Mutex::new(HashMap::new()));
501 let finalized = Arc::new(Mutex::new(HashMap::new()));
502 let completed = Arc::new(Mutex::new(HashSet::new()));
503 let supervised = Arc::new(Mutex::new(Vec::new()));
504 let (mut executor, mut context, _) = Executor::timed(Duration::from_secs(30));
505 while completed.lock().unwrap().len() != n as usize {
506 let namespace = namespace.clone();
507 let shutdowns = shutdowns.clone();
508 let notarized = notarized.clone();
509 let finalized = finalized.clone();
510 let completed = completed.clone();
511 let supervised = supervised.clone();
512 executor.start({
513 let mut context = context.clone();
514 async move {
515 let (network, mut oracle) = Network::new(
517 context.with_label("network"),
518 Config {
519 max_size: 1024 * 1024,
520 },
521 );
522
523 network.start();
525
526 let mut schemes = Vec::new();
528 let mut validators = Vec::new();
529 for i in 0..n {
530 let scheme = Ed25519::from_seed(i as u64);
531 let pk = scheme.public_key();
532 schemes.push(scheme);
533 validators.push(pk);
534 }
535 validators.sort();
536 schemes.sort_by_key(|s| s.public_key());
537 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
538 let mut registrations = register_validators(&mut oracle, &validators).await;
539
540 let link = Link {
542 latency: 50.0,
543 jitter: 50.0,
544 success_rate: 1.0,
545 };
546 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
547
548 let prover = Prover::new(&namespace);
550 let relay = Arc::new(mocks::relay::Relay::new());
551 let mut supervisors = HashMap::new();
552 let (done_sender, mut done_receiver) = mpsc::unbounded();
553 let mut engine_handlers = Vec::new();
554 for scheme in schemes.into_iter() {
555 let context = context
557 .clone()
558 .with_label(&format!("validator-{}", scheme.public_key()));
559
560 let validator = scheme.public_key();
562 let supervisor_config = mocks::supervisor::Config {
563 prover: prover.clone(),
564 participants: view_validators.clone(),
565 };
566 let supervisor =
567 mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
568 supervisors.insert(validator.clone(), supervisor.clone());
569 let application_cfg = mocks::application::Config {
570 hasher: Sha256::default(),
571 relay: relay.clone(),
572 participant: validator.clone(),
573 tracker: done_sender.clone(),
574 propose_latency: (10.0, 5.0),
575 verify_latency: (10.0, 5.0),
576 };
577 let (actor, application) =
578 mocks::application::Application::new(context.with_label("application"), application_cfg);
579 actor.start();
580 let cfg = JConfig {
581 partition: validator.to_string(),
582 };
583 let journal = Journal::init(context.with_label("journal"), cfg)
584 .await
585 .expect("unable to create journal");
586 let cfg = config::Config {
587 crypto: scheme,
588 automaton: application.clone(),
589 relay: application.clone(),
590 committer: application,
591 supervisor,
592 mailbox_size: 1024,
593 namespace: namespace.clone(),
594 leader_timeout: Duration::from_secs(1),
595 notarization_timeout: Duration::from_secs(2),
596 nullify_retry: Duration::from_secs(10),
597 fetch_timeout: Duration::from_secs(1),
598 activity_timeout,
599 max_fetch_count: 1,
600 max_fetch_size: 1024 * 512,
601 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
602 fetch_concurrent: 1,
603 replay_concurrency: 1,
604 };
605 let engine = Engine::new(context.with_label("engine"), journal, cfg);
606 let (voter_network, resolver_network) =
607 registrations.remove(&validator).expect("validator should be registered");
608 engine_handlers.push(engine.start(voter_network, resolver_network));
609 }
610
611 context.with_label("confirmed").spawn(move |_| async move {
613 loop {
614 let (validator, event) = done_receiver.next().await.unwrap();
616 match event {
617 mocks::application::Progress::Notarized(proof, digest) => {
618 let (view, _, payload, _) =
620 prover.deserialize_notarization(proof, 5, true).unwrap();
621 if digest != payload {
622 panic!(
623 "notarization mismatch digest: {:?}, payload: {:?}",
624 digest, payload
625 );
626 }
627
628 {
630 let mut notarized = notarized.lock().unwrap();
631 if let Some(previous) = notarized.insert(view, digest.clone())
632 {
633 if previous != digest {
634 panic!(
635 "notarization mismatch at {:?} previous: {:?}, current: {:?}",
636 view, previous, digest
637 );
638 }
639 }
640 if (notarized.len() as u64) < required_containers {
641 continue;
642 }
643 }
644 }
645 mocks::application::Progress::Finalized(proof, digest) => {
646 let (view, _, payload, _) =
648 prover.deserialize_finalization(proof, 5, true).unwrap();
649 if digest != payload {
650 panic!(
651 "finalization mismatch digest: {:?}, payload: {:?}",
652 digest, payload
653 );
654 }
655
656 {
658 let mut finalized = finalized.lock().unwrap();
659 if let Some(previous) = finalized.insert(view, digest.clone()) {
660 if previous != digest {
661 panic!(
662 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
663 view, previous, digest
664 );
665 }
666 }
667 if (finalized.len() as u64) < required_containers {
668 continue;
669 }
670 }
671 completed.lock().unwrap().insert(validator);
672 }
673 }
674 }
675 });
676
677 let wait =
679 context.gen_range(Duration::from_millis(10)..Duration::from_millis(2_000));
680 context.sleep(wait).await;
681 {
682 let mut shutdowns = shutdowns.lock().unwrap();
683 debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
684 *shutdowns += 1;
685 }
686
687 supervised.lock().unwrap().push(supervisors);
689 }});
690
691 (executor, context, _) = context.recover();
693 }
694
695 let supervised = supervised.lock().unwrap();
697 for supervisors in supervised.iter() {
698 for (_, supervisor) in supervisors.iter() {
699 let faults = supervisor.faults.lock().unwrap();
700 assert!(faults.is_empty());
701 }
702 }
703 }
704
705 #[test_traced]
706 fn test_backfill() {
707 let n = 4;
709 let required_containers = 100;
710 let activity_timeout = 10;
711 let namespace = b"consensus".to_vec();
712 let (executor, context, _) = Executor::timed(Duration::from_secs(360));
713 executor.start(async move {
714 let (network, mut oracle) = Network::new(
716 context.with_label("network"),
717 Config {
718 max_size: 1024 * 1024,
719 },
720 );
721
722 network.start();
724
725 let mut schemes = Vec::new();
727 let mut validators = Vec::new();
728 for i in 0..n {
729 let scheme = Ed25519::from_seed(i as u64);
730 let pk = scheme.public_key();
731 schemes.push(scheme);
732 validators.push(pk);
733 }
734 validators.sort();
735 schemes.sort_by_key(|s| s.public_key());
736 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
737 let mut registrations = register_validators(&mut oracle, &validators).await;
738
739 let link = Link {
741 latency: 10.0,
742 jitter: 1.0,
743 success_rate: 1.0,
744 };
745 link_validators(
746 &mut oracle,
747 &validators,
748 Action::Link(link),
749 Some(|_, i, j| ![i, j].contains(&0usize)),
750 )
751 .await;
752
753 let prover = Prover::new(&namespace);
755 let relay = Arc::new(mocks::relay::Relay::new());
756 let mut supervisors = Vec::new();
757 let (done_sender, mut done_receiver) = mpsc::unbounded();
758 let mut engine_handlers = Vec::new();
759 for (idx_scheme, scheme) in schemes.iter().enumerate() {
760 if idx_scheme == 0 {
762 continue;
763 }
764
765 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
767
768 let validator = scheme.public_key();
770 let supervisor_config = mocks::supervisor::Config {
771 prover: prover.clone(),
772 participants: view_validators.clone(),
773 };
774 let supervisor =
775 mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
776 supervisors.push(supervisor.clone());
777 let application_cfg = mocks::application::Config {
778 hasher: Sha256::default(),
779 relay: relay.clone(),
780 participant: validator.clone(),
781 tracker: done_sender.clone(),
782 propose_latency: (10.0, 5.0),
783 verify_latency: (10.0, 5.0),
784 };
785 let (actor, application) = mocks::application::Application::new(
786 context.with_label("application"),
787 application_cfg,
788 );
789 actor.start();
790 let cfg = JConfig {
791 partition: validator.to_string(),
792 };
793 let journal = Journal::init(context.with_label("journal"), cfg)
794 .await
795 .expect("unable to create journal");
796 let cfg = config::Config {
797 crypto: scheme.clone(),
798 automaton: application.clone(),
799 relay: application.clone(),
800 committer: application,
801 supervisor,
802 mailbox_size: 1024,
803 namespace: namespace.clone(),
804 leader_timeout: Duration::from_secs(1),
805 notarization_timeout: Duration::from_secs(2),
806 nullify_retry: Duration::from_secs(10),
807 fetch_timeout: Duration::from_secs(1),
808 activity_timeout,
809 max_fetch_count: 1, max_fetch_size: 1024 * 1024,
811 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
812 fetch_concurrent: 1,
813 replay_concurrency: 1,
814 };
815 let (voter, resolver) = registrations
816 .remove(&validator)
817 .expect("validator should be registered");
818 let engine = Engine::new(context.with_label("engine"), journal, cfg);
819 engine_handlers.push(engine.start(voter, resolver));
820 }
821
822 let mut completed = HashSet::new();
824 let mut finalized = HashMap::new();
825 loop {
826 let (validator, event) = done_receiver.next().await.unwrap();
827 if let mocks::application::Progress::Finalized(proof, digest) = event {
828 let (view, _, payload, _) =
829 prover.deserialize_finalization(proof, 5, true).unwrap();
830 if digest != payload {
831 panic!(
832 "finalization mismatch digest: {:?}, payload: {:?}",
833 digest, payload
834 );
835 }
836 finalized.insert(view, digest);
837 if (finalized.len() as u64) < required_containers {
838 continue;
839 }
840 completed.insert(validator);
841 }
842 if completed.len() == (n - 1) as usize {
843 break;
844 }
845 }
846
847 let link = Link {
849 latency: 3_000.0,
850 jitter: 0.0,
851 success_rate: 1.0,
852 };
853 link_validators(
854 &mut oracle,
855 &validators,
856 Action::Update(link.clone()),
857 Some(|_, i, j| ![i, j].contains(&0usize)),
858 )
859 .await;
860
861 context.sleep(Duration::from_secs(120)).await;
863
864 link_validators(
866 &mut oracle,
867 &validators,
868 Action::Unlink,
869 Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
870 )
871 .await;
872
873 let scheme = schemes[0].clone();
875 let validator = scheme.public_key();
876 {
877 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
879
880 link_validators(
882 &mut oracle,
883 &validators,
884 Action::Link(link),
885 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
886 )
887 .await;
888
889 let link = Link {
891 latency: 10.0,
892 jitter: 2.5,
893 success_rate: 1.0,
894 };
895 link_validators(
896 &mut oracle,
897 &validators,
898 Action::Update(link),
899 Some(|_, i, j| ![i, j].contains(&1usize)),
900 )
901 .await;
902
903 let supervisor_config = mocks::supervisor::Config {
905 prover: prover.clone(),
906 participants: view_validators.clone(),
907 };
908 let supervisor =
909 mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
910 supervisors.push(supervisor.clone());
911 let application_cfg = mocks::application::Config {
912 hasher: Sha256::default(),
913 relay: relay.clone(),
914 participant: validator.clone(),
915 tracker: done_sender.clone(),
916 propose_latency: (10.0, 5.0),
917 verify_latency: (10.0, 5.0),
918 };
919 let (actor, application) = mocks::application::Application::new(
920 context.with_label("application"),
921 application_cfg,
922 );
923 actor.start();
924 let cfg = JConfig {
925 partition: validator.to_string(),
926 };
927 let journal = Journal::init(context.with_label("journal"), cfg)
928 .await
929 .expect("unable to create journal");
930 let cfg = config::Config {
931 crypto: scheme,
932 automaton: application.clone(),
933 relay: application.clone(),
934 committer: application,
935 supervisor,
936 mailbox_size: 1024,
937 namespace: namespace.clone(),
938 leader_timeout: Duration::from_secs(1),
939 notarization_timeout: Duration::from_secs(2),
940 nullify_retry: Duration::from_secs(10),
941 fetch_timeout: Duration::from_secs(1),
942 activity_timeout,
943 max_fetch_count: 1,
944 max_fetch_size: 1024 * 512,
945 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
946 fetch_concurrent: 1,
947 replay_concurrency: 1,
948 };
949 let (voter, resolver) = registrations
950 .remove(&validator)
951 .expect("validator should be registered");
952 let engine = Engine::new(context.with_label("engine"), journal, cfg);
953 engine_handlers.push(engine.start(voter, resolver));
954 }
955
956 let mut finalized = HashMap::new();
958 let mut validator_finalized = HashSet::new();
959 loop {
960 let (candidate, event) = done_receiver.next().await.unwrap();
961 if let mocks::application::Progress::Finalized(proof, digest) = event {
962 let (view, _, payload, _) =
963 prover.deserialize_finalization(proof, 5, true).unwrap();
964 if digest != payload {
965 panic!(
966 "finalization mismatch digest: {:?}, payload: {:?}",
967 digest, payload
968 );
969 }
970 if let Some(previous) = finalized.insert(view, digest.clone()) {
971 if previous != digest {
972 panic!(
973 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
974 view, previous, digest
975 );
976 }
977 }
978 if validator == candidate {
979 validator_finalized.insert(view);
980 }
981 }
982 if validator_finalized.len() == required_containers as usize {
983 break;
984 }
985 }
986 });
987 }
988
989 #[test_traced]
990 fn test_one_offline() {
991 let n = 5;
993 let required_containers = 100;
994 let activity_timeout = 10;
995 let namespace = b"consensus".to_vec();
996 let (executor, context, _) = Executor::timed(Duration::from_secs(30));
997 executor.start(async move {
998 let (network, mut oracle) = Network::new(
1000 context.with_label("network"),
1001 Config {
1002 max_size: 1024 * 1024,
1003 },
1004 );
1005
1006 network.start();
1008
1009 let mut schemes = Vec::new();
1011 let mut validators = Vec::new();
1012 for i in 0..n {
1013 let scheme = Ed25519::from_seed(i as u64);
1014 let pk = scheme.public_key();
1015 schemes.push(scheme);
1016 validators.push(pk);
1017 }
1018 validators.sort();
1019 schemes.sort_by_key(|s| s.public_key());
1020 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1021 let mut registrations = register_validators(&mut oracle, &validators).await;
1022
1023 let link = Link {
1025 latency: 10.0,
1026 jitter: 1.0,
1027 success_rate: 1.0,
1028 };
1029 link_validators(
1030 &mut oracle,
1031 &validators,
1032 Action::Link(link),
1033 Some(|_, i, j| ![i, j].contains(&0usize)),
1034 )
1035 .await;
1036
1037 let prover = Prover::new(&namespace);
1039 let relay = Arc::new(mocks::relay::Relay::new());
1040 let mut supervisors = Vec::new();
1041 let (done_sender, mut done_receiver) = mpsc::unbounded();
1042 let mut engine_handlers = Vec::new();
1043 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1044 if idx_scheme == 0 {
1046 continue;
1047 }
1048
1049 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1051
1052 let validator = scheme.public_key();
1054 let supervisor_config = mocks::supervisor::Config {
1055 prover: prover.clone(),
1056 participants: view_validators.clone(),
1057 };
1058 let supervisor =
1059 mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
1060 supervisors.push(supervisor.clone());
1061 let application_cfg = mocks::application::Config {
1062 hasher: Sha256::default(),
1063 relay: relay.clone(),
1064 participant: validator.clone(),
1065 tracker: done_sender.clone(),
1066 propose_latency: (10.0, 5.0),
1067 verify_latency: (10.0, 5.0),
1068 };
1069 let (actor, application) = mocks::application::Application::new(
1070 context.with_label("application"),
1071 application_cfg,
1072 );
1073 actor.start();
1074 let cfg = JConfig {
1075 partition: validator.to_string(),
1076 };
1077 let journal = Journal::init(context.with_label("journal"), cfg)
1078 .await
1079 .expect("unable to create journal");
1080 let cfg = config::Config {
1081 crypto: scheme,
1082 automaton: application.clone(),
1083 relay: application.clone(),
1084 committer: application,
1085 supervisor,
1086 mailbox_size: 1024,
1087 namespace: namespace.clone(),
1088 leader_timeout: Duration::from_secs(1),
1089 notarization_timeout: Duration::from_secs(2),
1090 nullify_retry: Duration::from_secs(10),
1091 fetch_timeout: Duration::from_secs(1),
1092 activity_timeout,
1093 max_fetch_count: 1,
1094 max_fetch_size: 1024 * 512,
1095 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1096 fetch_concurrent: 1,
1097 replay_concurrency: 1,
1098 };
1099 let (voter, resolver) = registrations
1100 .remove(&validator)
1101 .expect("validator should be registered");
1102 let engine = Engine::new(context.with_label("engine"), journal, cfg);
1103 engine_handlers.push(engine.start(voter, resolver));
1104 }
1105
1106 let mut completed = HashSet::new();
1108 let mut finalized = HashMap::new();
1109 loop {
1110 let (validator, event) = done_receiver.next().await.unwrap();
1111 if let mocks::application::Progress::Finalized(proof, digest) = event {
1112 let (view, _, payload, _) =
1113 prover.deserialize_finalization(proof, 5, true).unwrap();
1114 if digest != payload {
1115 panic!(
1116 "finalization mismatch digest: {:?}, payload: {:?}",
1117 digest, payload
1118 );
1119 }
1120 if let Some(previous) = finalized.insert(view, digest.clone()) {
1121 if previous != digest {
1122 panic!(
1123 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1124 view, previous, digest
1125 );
1126 }
1127 }
1128 if (finalized.len() as u64) < required_containers {
1129 continue;
1130 }
1131 completed.insert(validator);
1132 }
1133 if completed.len() == (n - 1) as usize {
1134 break;
1135 }
1136 }
1137
1138 let offline = &validators[0];
1140 for supervisor in supervisors.iter() {
1141 {
1143 let faults = supervisor.faults.lock().unwrap();
1144 assert!(faults.is_empty());
1145 }
1146
1147 {
1149 let notarizes = supervisor.notarizes.lock().unwrap();
1150 for (view, payloads) in notarizes.iter() {
1151 for (_, participants) in payloads.iter() {
1152 if participants.contains(offline) {
1153 panic!("view: {}", view);
1154 }
1155 }
1156 }
1157 }
1158 {
1159 let finalizes = supervisor.finalizes.lock().unwrap();
1160 for (view, payloads) in finalizes.iter() {
1161 for (_, finalizers) in payloads.iter() {
1162 if finalizers.contains(offline) {
1163 panic!("view: {}", view);
1164 }
1165 }
1166 }
1167 }
1168 }
1169 });
1170 }
1171
1172 #[test_traced]
1173 fn test_slow_validator() {
1174 let n = 5;
1176 let required_containers = 50;
1177 let activity_timeout = 10;
1178 let namespace = b"consensus".to_vec();
1179 let (executor, context, _) = Executor::timed(Duration::from_secs(30));
1180 executor.start(async move {
1181 let (network, mut oracle) = Network::new(
1183 context.with_label("network"),
1184 Config {
1185 max_size: 1024 * 1024,
1186 },
1187 );
1188
1189 network.start();
1191
1192 let mut schemes = Vec::new();
1194 let mut validators = Vec::new();
1195 for i in 0..n {
1196 let scheme = Ed25519::from_seed(i as u64);
1197 let pk = scheme.public_key();
1198 schemes.push(scheme);
1199 validators.push(pk);
1200 }
1201 validators.sort();
1202 schemes.sort_by_key(|s| s.public_key());
1203 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1204 let mut registrations = register_validators(&mut oracle, &validators).await;
1205
1206 let link = Link {
1208 latency: 10.0,
1209 jitter: 1.0,
1210 success_rate: 1.0,
1211 };
1212 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1213
1214 let prover = Prover::new(&namespace);
1216 let relay = Arc::new(mocks::relay::Relay::new());
1217 let mut supervisors = Vec::new();
1218 let (done_sender, mut done_receiver) = mpsc::unbounded();
1219 let mut engine_handlers = Vec::new();
1220 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1221 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1223
1224 let validator = scheme.public_key();
1226 let supervisor_config = mocks::supervisor::Config {
1227 prover: prover.clone(),
1228 participants: view_validators.clone(),
1229 };
1230 let supervisor =
1231 mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
1232 supervisors.push(supervisor.clone());
1233 let application_cfg = if idx_scheme == 0 {
1234 mocks::application::Config {
1235 hasher: Sha256::default(),
1236 relay: relay.clone(),
1237 participant: validator.clone(),
1238 tracker: done_sender.clone(),
1239 propose_latency: (3_000.0, 0.0),
1240 verify_latency: (3_000.0, 5.0),
1241 }
1242 } else {
1243 mocks::application::Config {
1244 hasher: Sha256::default(),
1245 relay: relay.clone(),
1246 participant: validator.clone(),
1247 tracker: done_sender.clone(),
1248 propose_latency: (10.0, 5.0),
1249 verify_latency: (10.0, 5.0),
1250 }
1251 };
1252 let (actor, application) = mocks::application::Application::new(
1253 context.with_label("application"),
1254 application_cfg,
1255 );
1256 actor.start();
1257 let cfg = JConfig {
1258 partition: validator.to_string(),
1259 };
1260 let journal = Journal::init(context.with_label("journal"), cfg)
1261 .await
1262 .expect("unable to create journal");
1263 let cfg = config::Config {
1264 crypto: scheme,
1265 automaton: application.clone(),
1266 relay: application.clone(),
1267 committer: application,
1268 supervisor,
1269 mailbox_size: 1024,
1270 namespace: namespace.clone(),
1271 leader_timeout: Duration::from_secs(1),
1272 notarization_timeout: Duration::from_secs(2),
1273 nullify_retry: Duration::from_secs(10),
1274 fetch_timeout: Duration::from_secs(1),
1275 activity_timeout,
1276 max_fetch_count: 1,
1277 max_fetch_size: 1024 * 512,
1278 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1279 fetch_concurrent: 1,
1280 replay_concurrency: 1,
1281 };
1282 let (voter, resolver) = registrations
1283 .remove(&validator)
1284 .expect("validator should be registered");
1285 let engine = Engine::new(context.with_label("engine"), journal, cfg);
1286 engine_handlers.push(engine.start(voter, resolver));
1287 }
1288
1289 let mut completed = HashSet::new();
1291 let mut finalized = HashMap::new();
1292 loop {
1293 let (validator, event) = done_receiver.next().await.unwrap();
1294 if let mocks::application::Progress::Finalized(proof, digest) = event {
1295 let (view, _, payload, _) =
1296 prover.deserialize_finalization(proof, 5, true).unwrap();
1297 if digest != payload {
1298 panic!(
1299 "finalization mismatch digest: {:?}, payload: {:?}",
1300 digest, payload
1301 );
1302 }
1303 if let Some(previous) = finalized.insert(view, digest.clone()) {
1304 if previous != digest {
1305 panic!(
1306 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1307 view, previous, digest
1308 );
1309 }
1310 }
1311 if (finalized.len() as u64) < required_containers {
1312 continue;
1313 }
1314 completed.insert(validator);
1315 }
1316 if completed.len() == n as usize {
1317 break;
1318 }
1319 }
1320
1321 let slow = &validators[0];
1323 for supervisor in supervisors.iter() {
1324 {
1326 let faults = supervisor.faults.lock().unwrap();
1327 assert!(faults.is_empty());
1328 }
1329
1330 {
1332 let notarizes = supervisor.notarizes.lock().unwrap();
1333 for (view, payloads) in notarizes.iter() {
1334 for (_, participants) in payloads.iter() {
1335 if participants.contains(slow) {
1336 panic!("view: {}", view);
1337 }
1338 }
1339 }
1340 }
1341 {
1342 let finalizes = supervisor.finalizes.lock().unwrap();
1343 for (view, payloads) in finalizes.iter() {
1344 for (_, finalizers) in payloads.iter() {
1345 if finalizers.contains(slow) {
1346 panic!("view: {}", view);
1347 }
1348 }
1349 }
1350 }
1351 }
1352 });
1353 }
1354
1355 #[test_traced]
1356 fn test_all_recovery() {
1357 let n = 5;
1359 let required_containers = 100;
1360 let activity_timeout = 10;
1361 let namespace = b"consensus".to_vec();
1362 let (executor, context, _) = Executor::timed(Duration::from_secs(120));
1363 executor.start(async move {
1364 let (network, mut oracle) = Network::new(
1366 context.with_label("network"),
1367 Config {
1368 max_size: 1024 * 1024,
1369 },
1370 );
1371
1372 network.start();
1374
1375 let mut schemes = Vec::new();
1377 let mut validators = Vec::new();
1378 for i in 0..n {
1379 let scheme = Ed25519::from_seed(i as u64);
1380 let pk = scheme.public_key();
1381 schemes.push(scheme);
1382 validators.push(pk);
1383 }
1384 validators.sort();
1385 schemes.sort_by_key(|s| s.public_key());
1386 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1387 let mut registrations = register_validators(&mut oracle, &validators).await;
1388
1389 let link = Link {
1391 latency: 3_000.0,
1392 jitter: 0.0,
1393 success_rate: 1.0,
1394 };
1395 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1396
1397 let prover = Prover::new(&namespace);
1399 let relay = Arc::new(mocks::relay::Relay::new());
1400 let mut supervisors = Vec::new();
1401 let (done_sender, mut done_receiver) = mpsc::unbounded();
1402 let mut engine_handlers = Vec::new();
1403 for scheme in schemes.iter() {
1404 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1406
1407 let validator = scheme.public_key();
1409 let supervisor_config = mocks::supervisor::Config {
1410 prover: prover.clone(),
1411 participants: view_validators.clone(),
1412 };
1413 let supervisor =
1414 mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
1415 supervisors.push(supervisor.clone());
1416 let application_cfg = mocks::application::Config {
1417 hasher: Sha256::default(),
1418 relay: relay.clone(),
1419 participant: validator.clone(),
1420 tracker: done_sender.clone(),
1421 propose_latency: (10.0, 5.0),
1422 verify_latency: (10.0, 5.0),
1423 };
1424 let (actor, application) = mocks::application::Application::new(
1425 context.with_label("application"),
1426 application_cfg,
1427 );
1428 actor.start();
1429 let cfg = JConfig {
1430 partition: validator.to_string(),
1431 };
1432 let journal = Journal::init(context.with_label("journal"), cfg)
1433 .await
1434 .expect("unable to create journal");
1435 let cfg = config::Config {
1436 crypto: scheme.clone(),
1437 automaton: application.clone(),
1438 relay: application.clone(),
1439 committer: application,
1440 supervisor,
1441 mailbox_size: 1024,
1442 namespace: namespace.clone(),
1443 leader_timeout: Duration::from_secs(1),
1444 notarization_timeout: Duration::from_secs(2),
1445 nullify_retry: Duration::from_secs(10),
1446 fetch_timeout: Duration::from_secs(1),
1447 activity_timeout,
1448 max_fetch_count: 1,
1449 max_fetch_size: 1024 * 512,
1450 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1451 fetch_concurrent: 1,
1452 replay_concurrency: 1,
1453 };
1454 let (voter, resolver) = registrations
1455 .remove(&validator)
1456 .expect("validator should be registered");
1457 let engine = Engine::new(context.with_label("engine"), journal, cfg);
1458 engine_handlers.push(engine.start(voter, resolver));
1459 }
1460
1461 select! {
1463 _timeout = context.sleep(Duration::from_secs(60)) => {},
1464 _done = done_receiver.next() => {
1465 panic!("engine should not notarize or finalize anything");
1466 }
1467 }
1468
1469 let link = Link {
1471 latency: 10.0,
1472 jitter: 1.0,
1473 success_rate: 1.0,
1474 };
1475 link_validators(&mut oracle, &validators, Action::Update(link), None).await;
1476
1477 let mut completed = HashSet::new();
1479 let mut finalized = HashMap::new();
1480 loop {
1481 let (validator, event) = done_receiver.next().await.unwrap();
1482 if let mocks::application::Progress::Finalized(proof, digest) = event {
1483 let (view, _, payload, _) =
1484 prover.deserialize_finalization(proof, 5, true).unwrap();
1485 if digest != payload {
1486 panic!(
1487 "finalization mismatch digest: {:?}, payload: {:?}",
1488 digest, payload
1489 );
1490 }
1491 if let Some(previous) = finalized.insert(view, digest.clone()) {
1492 if previous != digest {
1493 panic!(
1494 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1495 view, previous, digest
1496 );
1497 }
1498 }
1499 if (finalized.len() as u64) < required_containers {
1500 continue;
1501 }
1502 completed.insert(validator);
1503 }
1504 if completed.len() == n as usize {
1505 break;
1506 }
1507 }
1508
1509 for supervisor in supervisors.iter() {
1511 {
1513 let faults = supervisor.faults.lock().unwrap();
1514 assert!(faults.is_empty());
1515 }
1516 }
1517 });
1518 }
1519
1520 #[test_traced]
1521 fn test_partition() {
1522 let n = 10;
1524 let required_containers = 50;
1525 let activity_timeout = 10;
1526 let namespace = b"consensus".to_vec();
1527 let (executor, context, _) = Executor::timed(Duration::from_secs(900));
1528 executor.start(async move {
1529 let (network, mut oracle) = Network::new(
1531 context.with_label("network"),
1532 Config {
1533 max_size: 1024 * 1024,
1534 },
1535 );
1536
1537 network.start();
1539
1540 let mut schemes = Vec::new();
1542 let mut validators = Vec::new();
1543 for i in 0..n {
1544 let scheme = Ed25519::from_seed(i as u64);
1545 let pk = scheme.public_key();
1546 schemes.push(scheme);
1547 validators.push(pk);
1548 }
1549 validators.sort();
1550 schemes.sort_by_key(|s| s.public_key());
1551 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1552 let mut registrations = register_validators(&mut oracle, &validators).await;
1553
1554 let link = Link {
1556 latency: 10.0,
1557 jitter: 1.0,
1558 success_rate: 1.0,
1559 };
1560 link_validators(&mut oracle, &validators, Action::Link(link.clone()), None).await;
1561
1562 let prover = Prover::new(&namespace);
1564 let relay = Arc::new(mocks::relay::Relay::new());
1565 let mut supervisors = Vec::new();
1566 let (done_sender, mut done_receiver) = mpsc::unbounded();
1567 let mut engine_handlers = Vec::new();
1568 for scheme in schemes.iter() {
1569 let validator = scheme.public_key();
1571 let supervisor_config = mocks::supervisor::Config {
1572 prover: prover.clone(),
1573 participants: view_validators.clone(),
1574 };
1575 let supervisor =
1576 mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
1577 supervisors.push(supervisor.clone());
1578 let application_cfg = mocks::application::Config {
1579 hasher: Sha256::default(),
1580 relay: relay.clone(),
1581 participant: validator.clone(),
1582 tracker: done_sender.clone(),
1583 propose_latency: (10.0, 5.0),
1584 verify_latency: (10.0, 5.0),
1585 };
1586 let (actor, application) = mocks::application::Application::new(
1587 context.with_label("application"),
1588 application_cfg,
1589 );
1590 actor.start();
1591 let cfg = JConfig {
1592 partition: validator.to_string(),
1593 };
1594 let journal = Journal::init(context.with_label("journal"), cfg)
1595 .await
1596 .expect("unable to create journal");
1597 let cfg = config::Config {
1598 crypto: scheme.clone(),
1599 automaton: application.clone(),
1600 relay: application.clone(),
1601 committer: application,
1602 supervisor,
1603 mailbox_size: 1024,
1604 namespace: namespace.clone(),
1605 leader_timeout: Duration::from_secs(1),
1606 notarization_timeout: Duration::from_secs(2),
1607 nullify_retry: Duration::from_secs(10),
1608 fetch_timeout: Duration::from_secs(1),
1609 activity_timeout,
1610 max_fetch_count: 1,
1611 max_fetch_size: 1024 * 512,
1612 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1613 fetch_concurrent: 1,
1614 replay_concurrency: 1,
1615 };
1616 let (voter, resolver) = registrations
1617 .remove(&validator)
1618 .expect("validator should be registered");
1619 let engine = Engine::new(context.with_label("engine"), journal, cfg);
1620 engine_handlers.push(engine.start(voter, resolver));
1621 }
1622
1623 let mut completed = HashSet::new();
1625 let mut finalized = HashMap::new();
1626 let mut highest_finalized = 0;
1627 loop {
1628 let (validator, event) = done_receiver.next().await.unwrap();
1629 if let mocks::application::Progress::Finalized(proof, digest) = event {
1630 let (view, _, payload, _) =
1631 prover.deserialize_finalization(proof, 10, true).unwrap();
1632 if digest != payload {
1633 panic!(
1634 "finalization mismatch digest: {:?}, payload: {:?}",
1635 digest, payload
1636 );
1637 }
1638 if let Some(previous) = finalized.insert(view, digest.clone()) {
1639 if previous != digest {
1640 panic!(
1641 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1642 view, previous, digest
1643 );
1644 }
1645 }
1646 if view > highest_finalized {
1647 highest_finalized = view;
1648 }
1649 if (finalized.len() as u64) < required_containers {
1650 continue;
1651 }
1652 completed.insert(validator);
1653 }
1654 if completed.len() == n {
1655 break;
1656 }
1657 }
1658
1659 fn separated(n: usize, a: usize, b: usize) -> bool {
1661 let m = n / 2;
1662 (a < m && b >= m) || (a >= m && b < m)
1663 }
1664 link_validators(&mut oracle, &validators, Action::Unlink, Some(separated)).await;
1665
1666 context.sleep(Duration::from_secs(10)).await;
1668
1669 loop {
1671 if done_receiver.try_next().is_err() {
1672 break;
1673 }
1674 }
1675
1676 select! {
1678 _timeout = context.sleep(Duration::from_secs(600)) => {},
1679 _done = done_receiver.next() => {
1680 panic!("engine should not notarize or finalize anything");
1681 }
1682 }
1683
1684 link_validators(
1686 &mut oracle,
1687 &validators,
1688 Action::Link(link),
1689 Some(separated),
1690 )
1691 .await;
1692
1693 let mut completed = HashSet::new();
1695 loop {
1696 let (validator, event) = done_receiver.next().await.unwrap();
1697 if let mocks::application::Progress::Finalized(proof, digest) = event {
1698 let (view, _, payload, _) =
1699 prover.deserialize_finalization(proof, 10, true).unwrap();
1700 if digest != payload {
1701 panic!(
1702 "finalization mismatch digest: {:?}, payload: {:?}",
1703 digest, payload
1704 );
1705 }
1706 if let Some(previous) = finalized.insert(view, digest.clone()) {
1707 if previous != digest {
1708 panic!(
1709 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1710 view, previous, digest
1711 );
1712 }
1713 }
1714 if (finalized.len() as u64) < required_containers + highest_finalized {
1715 continue;
1716 }
1717 completed.insert(validator);
1718 }
1719 if completed.len() == n {
1720 break;
1721 }
1722 }
1723
1724 for supervisor in supervisors.iter() {
1726 {
1728 let faults = supervisor.faults.lock().unwrap();
1729 assert!(faults.is_empty());
1730 }
1731 }
1732 });
1733 }
1734
1735 fn slow_and_lossy_links(seed: u64) -> String {
1736 let n = 5;
1738 let required_containers = 50;
1739 let activity_timeout = 10;
1740 let namespace = b"consensus".to_vec();
1741 let cfg = deterministic::Config {
1742 seed,
1743 timeout: Some(Duration::from_secs(3_000)),
1744 ..deterministic::Config::default()
1745 };
1746 let (executor, context, auditor) = Executor::init(cfg);
1747 executor.start(async move {
1748 let (network, mut oracle) = Network::new(
1750 context.with_label("network"),
1751 Config {
1752 max_size: 1024 * 1024,
1753 },
1754 );
1755
1756 network.start();
1758
1759 let mut schemes = Vec::new();
1761 let mut validators = Vec::new();
1762 for i in 0..n {
1763 let scheme = Ed25519::from_seed(i as u64);
1764 let pk = scheme.public_key();
1765 schemes.push(scheme);
1766 validators.push(pk);
1767 }
1768 validators.sort();
1769 schemes.sort_by_key(|s| s.public_key());
1770 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1771 let mut registrations = register_validators(&mut oracle, &validators).await;
1772
1773 let degraded_link = Link {
1775 latency: 200.0,
1776 jitter: 150.0,
1777 success_rate: 0.5,
1778 };
1779 link_validators(&mut oracle, &validators, Action::Link(degraded_link), None).await;
1780
1781 let prover = Prover::new(&namespace);
1783 let relay = Arc::new(mocks::relay::Relay::new());
1784 let mut supervisors = Vec::new();
1785 let (done_sender, mut done_receiver) = mpsc::unbounded();
1786 let mut engine_handlers = Vec::new();
1787 for scheme in schemes.into_iter() {
1788 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1790
1791 let validator = scheme.public_key();
1793 let supervisor_config = mocks::supervisor::Config {
1794 prover: prover.clone(),
1795 participants: view_validators.clone(),
1796 };
1797 let supervisor =
1798 mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
1799 supervisors.push(supervisor.clone());
1800 let application_cfg = mocks::application::Config {
1801 hasher: Sha256::default(),
1802 relay: relay.clone(),
1803 participant: validator.clone(),
1804 tracker: done_sender.clone(),
1805 propose_latency: (10.0, 5.0),
1806 verify_latency: (10.0, 5.0),
1807 };
1808 let (actor, application) = mocks::application::Application::new(
1809 context.with_label("application"),
1810 application_cfg,
1811 );
1812 actor.start();
1813 let cfg = JConfig {
1814 partition: validator.to_string(),
1815 };
1816 let journal = Journal::init(context.with_label("journal"), cfg)
1817 .await
1818 .expect("unable to create journal");
1819 let cfg = config::Config {
1820 crypto: scheme,
1821 automaton: application.clone(),
1822 relay: application.clone(),
1823 committer: application,
1824 supervisor,
1825 mailbox_size: 1024,
1826 namespace: namespace.clone(),
1827 leader_timeout: Duration::from_secs(1),
1828 notarization_timeout: Duration::from_secs(2),
1829 nullify_retry: Duration::from_secs(10),
1830 fetch_timeout: Duration::from_secs(1),
1831 activity_timeout,
1832 max_fetch_count: 1,
1833 max_fetch_size: 1024 * 512,
1834 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1835 fetch_concurrent: 1,
1836 replay_concurrency: 1,
1837 };
1838 let (voter, resolver) = registrations
1839 .remove(&validator)
1840 .expect("validator should be registered");
1841 let engine = Engine::new(context.with_label("engine"), journal, cfg);
1842 engine_handlers.push(engine.start(voter, resolver));
1843 }
1844
1845 let mut completed = HashSet::new();
1847 let mut finalized = HashMap::new();
1848 loop {
1849 let (validator, event) = done_receiver.next().await.unwrap();
1850 if let mocks::application::Progress::Finalized(proof, digest) = event {
1851 let (view, _, payload, _) =
1852 prover.deserialize_finalization(proof, 5, true).unwrap();
1853 if digest != payload {
1854 panic!(
1855 "finalization mismatch digest: {:?}, payload: {:?}",
1856 digest, payload
1857 );
1858 }
1859 if let Some(previous) = finalized.insert(view, digest.clone()) {
1860 if previous != digest {
1861 panic!(
1862 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1863 view, previous, digest
1864 );
1865 }
1866 }
1867 if (finalized.len() as u64) < required_containers {
1868 continue;
1869 }
1870 completed.insert(validator);
1871 }
1872 if completed.len() == n as usize {
1873 break;
1874 }
1875 }
1876
1877 for supervisor in supervisors.iter() {
1879 {
1881 let faults = supervisor.faults.lock().unwrap();
1882 assert!(faults.is_empty());
1883 }
1884 }
1885 });
1886 auditor.state()
1887 }
1888
1889 #[test_traced]
1890 fn test_slow_and_lossy_links() {
1891 slow_and_lossy_links(0);
1892 }
1893
1894 #[test_traced]
1895 fn test_determinism() {
1896 for seed in 1..6 {
1899 let state_1 = slow_and_lossy_links(seed);
1901
1902 let state_2 = slow_and_lossy_links(seed);
1904
1905 assert_eq!(state_1, state_2);
1907 }
1908 }
1909
1910 #[test_traced]
1911 fn test_conflicter() {
1912 let n = 4;
1914 let required_containers = 50;
1915 let activity_timeout = 10;
1916 let namespace = b"consensus".to_vec();
1917 let (executor, context, _) = Executor::timed(Duration::from_secs(30));
1918 executor.start(async move {
1919 let (network, mut oracle) = Network::new(
1921 context.with_label("network"),
1922 Config {
1923 max_size: 1024 * 1024,
1924 },
1925 );
1926
1927 network.start();
1929
1930 let mut schemes = Vec::new();
1932 let mut validators = Vec::new();
1933 for i in 0..n {
1934 let scheme = Ed25519::from_seed(i as u64);
1935 let pk = scheme.public_key();
1936 schemes.push(scheme);
1937 validators.push(pk);
1938 }
1939 validators.sort();
1940 schemes.sort_by_key(|s| s.public_key());
1941 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
1942 let mut registrations = register_validators(&mut oracle, &validators).await;
1943
1944 let link = Link {
1946 latency: 10.0,
1947 jitter: 1.0,
1948 success_rate: 1.0,
1949 };
1950 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1951
1952 let prover = Prover::new(&namespace);
1954 let relay = Arc::new(mocks::relay::Relay::new());
1955 let mut supervisors = Vec::new();
1956 let (done_sender, mut done_receiver) = mpsc::unbounded();
1957 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1958 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1960
1961 let validator = scheme.public_key();
1963 let supervisor_config = mocks::supervisor::Config {
1964 prover: prover.clone(),
1965 participants: view_validators.clone(),
1966 };
1967 let supervisor =
1968 mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
1969 if idx_scheme == 0 {
1970 let cfg = mocks::conflicter::Config {
1971 crypto: scheme,
1972 supervisor,
1973 namespace: namespace.clone(),
1974 };
1975 let (voter, _) = registrations
1976 .remove(&validator)
1977 .expect("validator should be registered");
1978 let engine: mocks::conflicter::Conflicter<_, _, Sha256, _> =
1979 mocks::conflicter::Conflicter::new(
1980 context.with_label("byzantine_engine"),
1981 cfg,
1982 );
1983 engine.start(voter);
1984 } else {
1985 supervisors.push(supervisor.clone());
1986 let application_cfg = mocks::application::Config {
1987 hasher: Sha256::default(),
1988 relay: relay.clone(),
1989 participant: validator.clone(),
1990 tracker: done_sender.clone(),
1991 propose_latency: (10.0, 5.0),
1992 verify_latency: (10.0, 5.0),
1993 };
1994 let (actor, application) = mocks::application::Application::new(
1995 context.with_label("application"),
1996 application_cfg,
1997 );
1998 actor.start();
1999 let cfg = JConfig {
2000 partition: validator.to_string(),
2001 };
2002 let journal = Journal::init(context.with_label("journal"), cfg)
2003 .await
2004 .expect("unable to create journal");
2005 let cfg = config::Config {
2006 crypto: scheme,
2007 automaton: application.clone(),
2008 relay: application.clone(),
2009 committer: application,
2010 supervisor,
2011 mailbox_size: 1024,
2012 namespace: namespace.clone(),
2013 leader_timeout: Duration::from_secs(1),
2014 notarization_timeout: Duration::from_secs(2),
2015 nullify_retry: Duration::from_secs(10),
2016 fetch_timeout: Duration::from_secs(1),
2017 activity_timeout,
2018 max_fetch_count: 1,
2019 max_fetch_size: 1024 * 512,
2020 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
2021 fetch_concurrent: 1,
2022 replay_concurrency: 1,
2023 };
2024 let (voter, resolver) = registrations
2025 .remove(&validator)
2026 .expect("validator should be registered");
2027 let engine = Engine::new(context.with_label("engine"), journal, cfg);
2028 engine.start(voter, resolver);
2029 }
2030 }
2031
2032 let mut completed = HashSet::new();
2034 let mut finalized = HashMap::new();
2035 loop {
2036 let (validator, event) = done_receiver.next().await.unwrap();
2037 if let mocks::application::Progress::Finalized(proof, digest) = event {
2038 let (view, _, payload, _) =
2039 prover.deserialize_finalization(proof, 5, true).unwrap();
2040 if digest != payload {
2041 panic!(
2042 "finalization mismatch digest: {:?}, payload: {:?}",
2043 digest, payload
2044 );
2045 }
2046 if let Some(previous) = finalized.insert(view, digest.clone()) {
2047 if previous != digest {
2048 panic!(
2049 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
2050 view, previous, digest
2051 );
2052 }
2053 }
2054 if (finalized.len() as u64) < required_containers {
2055 continue;
2056 }
2057 completed.insert(validator);
2058 }
2059 if completed.len() == (n - 1) as usize {
2060 break;
2061 }
2062 }
2063
2064 let byz = &validators[0];
2066 let mut count_conflicting_notarize = 0;
2067 let mut count_conflicting_finalize = 0;
2068 for supervisor in supervisors.iter() {
2069 {
2071 let faults = supervisor.faults.lock().unwrap();
2072 assert_eq!(faults.len(), 1);
2073 let faulter = faults.get(byz).expect("byzantine party is not faulter");
2074 for (_, faults) in faulter.iter() {
2075 for fault in faults.iter() {
2076 match *fault {
2077 CONFLICTING_NOTARIZE => {
2078 count_conflicting_notarize += 1;
2079 }
2080 CONFLICTING_FINALIZE => {
2081 count_conflicting_finalize += 1;
2082 }
2083 _ => panic!("unexpected fault: {:?}", fault),
2084 }
2085 }
2086 }
2087 }
2088 }
2089 assert!(count_conflicting_notarize > 0);
2090 assert!(count_conflicting_finalize > 0);
2091 });
2092 }
2093
2094 #[test_traced]
2095 fn test_nuller() {
2096 let n = 4;
2098 let required_containers = 50;
2099 let activity_timeout = 10;
2100 let namespace = b"consensus".to_vec();
2101 let (executor, context, _) = Executor::timed(Duration::from_secs(30));
2102 executor.start(async move {
2103 let (network, mut oracle) = Network::new(
2105 context.with_label("network"),
2106 Config {
2107 max_size: 1024 * 1024,
2108 },
2109 );
2110
2111 network.start();
2113
2114 let mut schemes = Vec::new();
2116 let mut validators = Vec::new();
2117 for i in 0..n {
2118 let scheme = Ed25519::from_seed(i as u64);
2119 let pk = scheme.public_key();
2120 schemes.push(scheme);
2121 validators.push(pk);
2122 }
2123 validators.sort();
2124 schemes.sort_by_key(|s| s.public_key());
2125 let view_validators = BTreeMap::from_iter(vec![(0, validators.clone())]);
2126 let mut registrations = register_validators(&mut oracle, &validators).await;
2127
2128 let link = Link {
2130 latency: 10.0,
2131 jitter: 1.0,
2132 success_rate: 1.0,
2133 };
2134 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2135
2136 let prover = Prover::new(&namespace);
2138 let relay = Arc::new(mocks::relay::Relay::new());
2139 let mut supervisors = Vec::new();
2140 let (done_sender, mut done_receiver) = mpsc::unbounded();
2141 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2142 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2144
2145 let validator = scheme.public_key();
2147 let supervisor_config = mocks::supervisor::Config {
2148 prover: prover.clone(),
2149 participants: view_validators.clone(),
2150 };
2151 let supervisor =
2152 mocks::supervisor::Supervisor::<Ed25519, Sha256Digest>::new(supervisor_config);
2153 if idx_scheme == 0 {
2154 let cfg = mocks::nuller::Config {
2155 crypto: scheme,
2156 supervisor,
2157 namespace: namespace.clone(),
2158 };
2159 let (voter, _) = registrations
2160 .remove(&validator)
2161 .expect("validator should be registered");
2162 let engine: mocks::nuller::Nuller<_, _, Sha256, _> =
2163 mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
2164 engine.start(voter);
2165 } else {
2166 supervisors.push(supervisor.clone());
2167 let application_cfg = mocks::application::Config {
2168 hasher: Sha256::default(),
2169 relay: relay.clone(),
2170 participant: validator.clone(),
2171 tracker: done_sender.clone(),
2172 propose_latency: (10.0, 5.0),
2173 verify_latency: (10.0, 5.0),
2174 };
2175 let (actor, application) = mocks::application::Application::new(
2176 context.with_label("application"),
2177 application_cfg,
2178 );
2179 actor.start();
2180 let cfg = JConfig {
2181 partition: validator.to_string(),
2182 };
2183 let journal = Journal::init(context.with_label("journal"), cfg)
2184 .await
2185 .expect("unable to create journal");
2186 let cfg = config::Config {
2187 crypto: scheme,
2188 automaton: application.clone(),
2189 relay: application.clone(),
2190 committer: application,
2191 supervisor,
2192 mailbox_size: 1024,
2193 namespace: namespace.clone(),
2194 leader_timeout: Duration::from_secs(1),
2195 notarization_timeout: Duration::from_secs(2),
2196 nullify_retry: Duration::from_secs(10),
2197 fetch_timeout: Duration::from_secs(1),
2198 activity_timeout,
2199 max_fetch_count: 1,
2200 max_fetch_size: 1024 * 512,
2201 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
2202 fetch_concurrent: 1,
2203 replay_concurrency: 1,
2204 };
2205 let (voter, resolver) = registrations
2206 .remove(&validator)
2207 .expect("validator should be registered");
2208 let engine = Engine::new(context.with_label("engine"), journal, cfg);
2209 engine.start(voter, resolver);
2210 }
2211 }
2212
2213 let mut completed = HashSet::new();
2215 let mut finalized = HashMap::new();
2216 loop {
2217 let (validator, event) = done_receiver.next().await.unwrap();
2218 if let mocks::application::Progress::Finalized(proof, digest) = event {
2219 let (view, _, payload, _) =
2220 prover.deserialize_finalization(proof, 5, true).unwrap();
2221 if digest != payload {
2222 panic!(
2223 "finalization mismatch digest: {:?}, payload: {:?}",
2224 digest, payload
2225 );
2226 }
2227 if let Some(previous) = finalized.insert(view, digest.clone()) {
2228 if previous != digest {
2229 panic!(
2230 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
2231 view, previous, digest
2232 );
2233 }
2234 }
2235 if (finalized.len() as u64) < required_containers {
2236 continue;
2237 }
2238 completed.insert(validator);
2239 }
2240 if completed.len() == (n - 1) as usize {
2241 break;
2242 }
2243 }
2244
2245 let byz = &validators[0];
2247 let mut count_nullify_and_finalize = 0;
2248 for supervisor in supervisors.iter() {
2249 {
2251 let faults = supervisor.faults.lock().unwrap();
2252 assert_eq!(faults.len(), 1);
2253 let faulter = faults.get(byz).expect("byzantine party is not faulter");
2254 for (_, faults) in faulter.iter() {
2255 for fault in faults.iter() {
2256 match *fault {
2257 NULLIFY_AND_FINALIZE => {
2258 count_nullify_and_finalize += 1;
2259 }
2260 _ => panic!("unexpected fault: {:?}", fault),
2261 }
2262 }
2263 }
2264 }
2265 }
2266 assert!(count_nullify_and_finalize > 0);
2267 });
2268 }
2269}