1use commonware_utils::Array;
150
151mod encoder;
152mod prover;
153pub use prover::Prover;
154mod wire {
155 include!(concat!(env!("OUT_DIR"), "/threshold_simplex.wire.rs"));
156}
157
158cfg_if::cfg_if! {
159 if #[cfg(not(target_arch = "wasm32"))] {
160 mod actors;
161 mod config;
162 pub use config::Config;
163 mod engine;
164 pub use engine::Engine;
165 mod metrics;
166 mod verifier;
167 }
168}
169
170#[cfg(test)]
171pub mod mocks;
172
173pub type View = u64;
175
176use crate::Activity;
177
178#[derive(Clone)]
180pub struct Context<D: Array> {
181 pub view: View,
183
184 pub parent: (View, D),
191}
192
193pub const NOTARIZE: Activity = 0;
201pub const FINALIZE: Activity = 1;
203pub const CONFLICTING_NOTARIZE: Activity = 2;
205pub const CONFLICTING_FINALIZE: Activity = 3;
207pub const NULLIFY_AND_FINALIZE: Activity = 4;
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213 use commonware_cryptography::{
214 bls12381::{dkg::ops, primitives::poly},
215 Ed25519, Scheme, Sha256,
216 };
217 use commonware_macros::{select, test_traced};
218 use commonware_p2p::simulated::{Config, Link, Network, Oracle, Receiver, Sender};
219 use commonware_runtime::{
220 deterministic::{self, Executor},
221 Clock, Metrics, Runner, Spawner,
222 };
223 use commonware_storage::journal::variable::{Config as JConfig, Journal};
224 use commonware_utils::quorum;
225 use engine::Engine;
226 use futures::{channel::mpsc, StreamExt};
227 use governor::Quota;
228 use rand::{rngs::StdRng, Rng, SeedableRng};
229 use std::{
230 collections::{BTreeMap, HashMap, HashSet},
231 num::NonZeroU32,
232 sync::{Arc, Mutex},
233 time::Duration,
234 };
235 use tracing::debug;
236
237 async fn register_validators<P: Array>(
239 oracle: &mut Oracle<P>,
240 validators: &[P],
241 ) -> HashMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))> {
242 let mut registrations = HashMap::new();
243 for validator in validators.iter() {
244 let (voter_sender, voter_receiver) =
245 oracle.register(validator.clone(), 0).await.unwrap();
246 let (resolver_sender, resolver_receiver) =
247 oracle.register(validator.clone(), 1).await.unwrap();
248 registrations.insert(
249 validator.clone(),
250 (
251 (voter_sender, voter_receiver),
252 (resolver_sender, resolver_receiver),
253 ),
254 );
255 }
256 registrations
257 }
258
259 enum Action {
261 Link(Link),
262 Update(Link), Unlink,
264 }
265
266 async fn link_validators<P: Array>(
272 oracle: &mut Oracle<P>,
273 validators: &[P],
274 action: Action,
275 restrict_to: Option<fn(usize, usize, usize) -> bool>,
276 ) {
277 for (i1, v1) in validators.iter().enumerate() {
278 for (i2, v2) in validators.iter().enumerate() {
279 if v2 == v1 {
281 continue;
282 }
283
284 if let Some(f) = restrict_to {
286 if !f(validators.len(), i1, i2) {
287 continue;
288 }
289 }
290
291 match action {
293 Action::Update(_) | Action::Unlink => {
294 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
295 }
296 _ => {}
297 }
298
299 match action {
301 Action::Link(ref link) | Action::Update(ref link) => {
302 oracle
303 .add_link(v1.clone(), v2.clone(), link.clone())
304 .await
305 .unwrap();
306 }
307 _ => {}
308 }
309 }
310 }
311 }
312
313 #[test_traced]
314 fn test_all_online() {
315 let n = 5;
317 let threshold = quorum(n).expect("unable to calculate threshold");
318 let max_exceptions = 4;
319 let required_containers = 100;
320 let activity_timeout = 10;
321 let namespace = b"consensus".to_vec();
322 let (executor, mut context, _) = Executor::timed(Duration::from_secs(30));
323 executor.start(async move {
324 let (network, mut oracle) = Network::new(
326 context.with_label("network"),
327 Config {
328 max_size: 1024 * 1024,
329 },
330 );
331
332 network.start();
334
335 let mut schemes = Vec::new();
337 let mut validators = Vec::new();
338 for i in 0..n {
339 let scheme = Ed25519::from_seed(i as u64);
340 let pk = scheme.public_key();
341 schemes.push(scheme);
342 validators.push(pk);
343 }
344 validators.sort();
345 schemes.sort_by_key(|s| s.public_key());
346 let mut registrations = register_validators(&mut oracle, &validators).await;
347
348 let link = Link {
350 latency: 10.0,
351 jitter: 1.0,
352 success_rate: 1.0,
353 };
354 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
355
356 let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
358 let pk = poly::public(&public);
359 let prover = Prover::new(pk, &namespace);
360
361 let relay = Arc::new(mocks::relay::Relay::new());
363 let mut supervisors = Vec::new();
364 let (done_sender, mut done_receiver) = mpsc::unbounded();
365 let mut engine_handlers = Vec::new();
366 for (idx, scheme) in schemes.into_iter().enumerate() {
367 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
369
370 let validator = scheme.public_key();
372 let mut participants = BTreeMap::new();
373 participants.insert(0, (public.clone(), validators.clone(), shares[idx]));
374 let supervisor_config = mocks::supervisor::Config {
375 prover: prover.clone(),
376 participants,
377 };
378 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
379 supervisors.push(supervisor.clone());
380 let application_cfg = mocks::application::Config {
381 hasher: Sha256::default(),
382 relay: relay.clone(),
383 participant: validator.clone(),
384 tracker: done_sender.clone(),
385 propose_latency: (10.0, 5.0),
386 verify_latency: (10.0, 5.0),
387 };
388 let (actor, application) = mocks::application::Application::new(
389 context.with_label("application"),
390 application_cfg,
391 );
392 actor.start();
393 let cfg = JConfig {
394 partition: validator.to_string(),
395 };
396 let journal = Journal::init(context.with_label("journal"), cfg)
397 .await
398 .expect("unable to create journal");
399 let cfg = config::Config {
400 crypto: scheme,
401 automaton: application.clone(),
402 relay: application.clone(),
403 committer: application,
404 supervisor,
405 mailbox_size: 1024,
406 namespace: namespace.clone(),
407 leader_timeout: Duration::from_secs(1),
408 notarization_timeout: Duration::from_secs(2),
409 nullify_retry: Duration::from_secs(10),
410 fetch_timeout: Duration::from_secs(1),
411 activity_timeout,
412 max_fetch_count: 1,
413 max_fetch_size: 1024 * 512,
414 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
415 fetch_concurrent: 1,
416 replay_concurrency: 1,
417 };
418 let engine = Engine::new(context.with_label("engine"), journal, cfg);
419
420 let (voter, resolver) = registrations
422 .remove(&validator)
423 .expect("validator should be registered");
424 engine_handlers.push(engine.start(voter, resolver));
425 }
426
427 let mut completed = HashSet::new();
429 let mut finalized = HashMap::new();
430 loop {
431 let (validator, event) = done_receiver.next().await.unwrap();
432 if let mocks::application::Progress::Finalized(proof, digest) = event {
433 let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
434 if digest != payload {
435 panic!(
436 "finalization mismatch digest: {:?}, payload: {:?}",
437 digest, payload
438 );
439 }
440 if let Some(previous) = finalized.insert(view, digest.clone()) {
441 if previous != digest {
442 panic!(
443 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
444 view, previous, digest
445 );
446 }
447 }
448 if (finalized.len() as u64) < required_containers {
449 continue;
450 }
451 completed.insert(validator);
452 }
453 if completed.len() == n as usize {
454 break;
455 }
456 }
457
458 let latest_complete = required_containers - activity_timeout;
460 for supervisor in supervisors.iter() {
461 {
463 let faults = supervisor.faults.lock().unwrap();
464 assert!(faults.is_empty());
465 }
466
467 let mut exceptions = 0;
469 {
470 let notarizes = supervisor.notarizes.lock().unwrap();
471 for (view, payloads) in notarizes.iter() {
472 if payloads.len() > 1 {
474 panic!("view: {}", view);
475 }
476
477 if *view > latest_complete {
479 continue;
480 }
481
482 let digest = finalized.get(view).expect("view should be finalized");
484 let voters = payloads.get(digest).expect("digest should exist");
485 if voters.len() < threshold as usize {
486 panic!("view: {}", view);
489 }
490 if voters.len() != n as usize {
491 exceptions += 1;
492 }
493 }
494 }
495 {
496 let finalizes = supervisor.finalizes.lock().unwrap();
497 for (view, payloads) in finalizes.iter() {
498 if payloads.len() > 1 {
500 panic!("view: {}", view);
501 }
502
503 if *view > latest_complete {
505 continue;
506 }
507
508 let digest = finalized.get(view).expect("view should be finalized");
510 let finalizers = payloads.get(digest).expect("digest should exist");
511 if finalizers.len() < threshold as usize {
512 panic!("view: {}", view);
515 }
516 if finalizers.len() != n as usize {
517 exceptions += 1;
518 }
519 }
520 }
521
522 assert!(exceptions <= max_exceptions);
524 }
525 });
526 }
527
528 #[test_traced]
529 fn test_unclean_shutdown() {
530 let n = 5;
532 let threshold = quorum(n).expect("unable to calculate threshold");
533 let required_containers = 100;
534 let activity_timeout = 10;
535 let namespace = b"consensus".to_vec();
536
537 let mut rng = StdRng::seed_from_u64(0);
539 let (public, shares) = ops::generate_shares(&mut rng, None, n, threshold);
540 let pk = poly::public(&public);
541
542 let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
544 let notarized = Arc::new(Mutex::new(HashMap::new()));
545 let finalized = Arc::new(Mutex::new(HashMap::new()));
546 let completed = Arc::new(Mutex::new(HashSet::new()));
547 let supervised = Arc::new(Mutex::new(Vec::new()));
548 let (mut executor, mut context, _) = Executor::timed(Duration::from_secs(300));
549 while completed.lock().unwrap().len() != n as usize {
550 let namespace = namespace.clone();
551 let shutdowns = shutdowns.clone();
552 let notarized = notarized.clone();
553 let finalized = finalized.clone();
554 let completed = completed.clone();
555 let supervised = supervised.clone();
556 executor.start({
557 let mut context = context.clone();
558 let public = public.clone();
559 let shares = shares.clone();
560 async move {
561 let (network, mut oracle) = Network::new(
563 context.with_label("network"),
564 Config {
565 max_size: 1024 * 1024,
566 },
567 );
568
569 network.start();
571
572 let mut schemes = Vec::new();
574 let mut validators = Vec::new();
575 for i in 0..n {
576 let scheme = Ed25519::from_seed(i as u64);
577 let pk = scheme.public_key();
578 schemes.push(scheme);
579 validators.push(pk);
580 }
581 validators.sort();
582 schemes.sort_by_key(|s| s.public_key());
583 let mut registrations = register_validators(&mut oracle, &validators).await;
584
585 let link = Link {
587 latency: 50.0,
588 jitter: 50.0,
589 success_rate: 1.0,
590 };
591 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
592
593 let prover = Prover::new(pk, &namespace);
595 let relay = Arc::new(mocks::relay::Relay::new());
596 let mut supervisors = HashMap::new();
597 let (done_sender, mut done_receiver) = mpsc::unbounded();
598 let mut engine_handlers = Vec::new();
599 for (idx, scheme) in schemes.into_iter().enumerate() {
600 let context = context
602 .clone()
603 .with_label(&format!("validator-{}", scheme.public_key()));
604
605 let validator = scheme.public_key();
607 let mut participants = BTreeMap::new();
608 participants.insert(0, (public.clone(), validators.clone(), shares[idx]));
609 let supervisor_config = mocks::supervisor::Config {
610 prover: prover.clone(),
611 participants,
612 };
613 let supervisor =
614 mocks::supervisor::Supervisor::new(supervisor_config);
615 supervisors.insert(validator.clone(), supervisor.clone());
616 let application_cfg = mocks::application::Config {
617 hasher: Sha256::default(),
618 relay: relay.clone(),
619 participant: validator.clone(),
620 tracker: done_sender.clone(),
621 propose_latency: (10.0, 5.0),
622 verify_latency: (10.0, 5.0),
623 };
624 let (actor, application) =
625 mocks::application::Application::new(context.with_label("application"), application_cfg);
626 actor.start();
627 let cfg = JConfig {
628 partition: validator.to_string(),
629 };
630 let journal = Journal::init(context.with_label("journal"), cfg)
631 .await
632 .expect("unable to create journal");
633 let cfg = config::Config {
634 crypto: scheme,
635 automaton: application.clone(),
636 relay: application.clone(),
637 committer: application,
638 supervisor,
639 mailbox_size: 1024,
640 namespace: namespace.clone(),
641 leader_timeout: Duration::from_secs(1),
642 notarization_timeout: Duration::from_secs(2),
643 nullify_retry: Duration::from_secs(10),
644 fetch_timeout: Duration::from_secs(1),
645 activity_timeout,
646 max_fetch_count: 1,
647 max_fetch_size: 1024 * 512,
648 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
649 fetch_concurrent: 1,
650 replay_concurrency: 1,
651 };
652 let engine = Engine::new(context.with_label("engine"), journal, cfg);
653
654 let (voter, resolver) = registrations
656 .remove(&validator)
657 .expect("validator should be registered");
658 engine_handlers.push(engine.start(voter, resolver));
659 }
660
661 context.with_label("confirmed").spawn(move |_| async move {
663 loop {
664 let (validator, event) = done_receiver.next().await.unwrap();
666 match event {
667 mocks::application::Progress::Notarized(proof, digest) => {
668 let (view, _, payload, _, _) =
670 prover.deserialize_notarization(proof).unwrap();
671 if digest != payload {
672 panic!(
673 "notarization mismatch digest: {:?}, payload: {:?}",
674 digest, payload
675 );
676 }
677
678 {
680 let mut notarized = notarized.lock().unwrap();
681 if let Some(previous) = notarized.insert(view, digest.clone())
682 {
683 if previous != digest {
684 panic!(
685 "notarization mismatch at {:?} previous: {:?}, current: {:?}",
686 view, previous, digest
687 );
688 }
689 }
690 if (notarized.len() as u64) < required_containers {
691 continue;
692 }
693 }
694 }
695 mocks::application::Progress::Finalized(proof, digest) => {
696 let (view, _, payload, _, _) =
698 prover.deserialize_finalization(proof).unwrap();
699 if digest != payload {
700 panic!(
701 "finalization mismatch digest: {:?}, payload: {:?}",
702 digest, payload
703 );
704 }
705
706 {
708 let mut finalized = finalized.lock().unwrap();
709 if let Some(previous) = finalized.insert(view, digest.clone()) {
710 if previous != digest {
711 panic!(
712 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
713 view, previous, digest
714 );
715 }
716 }
717 if (finalized.len() as u64) < required_containers {
718 continue;
719 }
720 }
721 completed.lock().unwrap().insert(validator);
722 }
723 }
724 }
725 });
726
727 let wait =
729 context.gen_range(Duration::from_millis(10)..Duration::from_millis(2_000));
730 context.sleep(wait).await;
731 {
732 let mut shutdowns = shutdowns.lock().unwrap();
733 debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
734 *shutdowns += 1;
735 }
736
737 supervised.lock().unwrap().push(supervisors);
739 }});
740
741 (executor, context, _) = context.recover();
743 }
744
745 let supervised = supervised.lock().unwrap();
747 for supervisors in supervised.iter() {
748 for (_, supervisor) in supervisors.iter() {
749 let faults = supervisor.faults.lock().unwrap();
750 assert!(faults.is_empty());
751 }
752 }
753 }
754
755 #[test_traced]
756 fn test_backfill() {
757 let n = 4;
759 let threshold = quorum(n).expect("unable to calculate threshold");
760 let required_containers = 100;
761 let activity_timeout = 10;
762 let namespace = b"consensus".to_vec();
763 let (executor, mut context, _) = Executor::timed(Duration::from_secs(360));
764 executor.start(async move {
765 let (network, mut oracle) = Network::new(
767 context.with_label("network"),
768 Config {
769 max_size: 1024 * 1024,
770 },
771 );
772
773 network.start();
775
776 let mut schemes = Vec::new();
778 let mut validators = Vec::new();
779 for i in 0..n {
780 let scheme = Ed25519::from_seed(i as u64);
781 let pk = scheme.public_key();
782 schemes.push(scheme);
783 validators.push(pk);
784 }
785 validators.sort();
786 schemes.sort_by_key(|s| s.public_key());
787 let mut registrations = register_validators(&mut oracle, &validators).await;
788
789 let link = Link {
791 latency: 10.0,
792 jitter: 1.0,
793 success_rate: 1.0,
794 };
795 link_validators(
796 &mut oracle,
797 &validators,
798 Action::Link(link),
799 Some(|_, i, j| ![i, j].contains(&0usize)),
800 )
801 .await;
802
803 let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
805 let pk = poly::public(&public);
806 let prover = Prover::new(pk, &namespace);
807
808 let relay = Arc::new(mocks::relay::Relay::new());
810 let mut supervisors = Vec::new();
811 let (done_sender, mut done_receiver) = mpsc::unbounded();
812 let mut engine_handlers = Vec::new();
813 for (idx_scheme, scheme) in schemes.iter().enumerate() {
814 if idx_scheme == 0 {
816 continue;
817 }
818
819 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
821
822 let validator = scheme.public_key();
824 let mut participants = BTreeMap::new();
825 participants.insert(0, (public.clone(), validators.clone(), shares[idx_scheme]));
826 let supervisor_config = mocks::supervisor::Config {
827 prover: prover.clone(),
828 participants,
829 };
830 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
831 supervisors.push(supervisor.clone());
832 let application_cfg = mocks::application::Config {
833 hasher: Sha256::default(),
834 relay: relay.clone(),
835 participant: validator.clone(),
836 tracker: done_sender.clone(),
837 propose_latency: (10.0, 5.0),
838 verify_latency: (10.0, 5.0),
839 };
840 let (actor, application) = mocks::application::Application::new(
841 context.with_label("application"),
842 application_cfg,
843 );
844 actor.start();
845 let cfg = JConfig {
846 partition: validator.to_string(),
847 };
848 let journal = Journal::init(context.with_label("journal"), cfg)
849 .await
850 .expect("unable to create journal");
851 let cfg = config::Config {
852 crypto: scheme.clone(),
853 automaton: application.clone(),
854 relay: application.clone(),
855 committer: application,
856 supervisor,
857 mailbox_size: 1024,
858 namespace: namespace.clone(),
859 leader_timeout: Duration::from_secs(1),
860 notarization_timeout: Duration::from_secs(2),
861 nullify_retry: Duration::from_secs(10),
862 fetch_timeout: Duration::from_secs(1),
863 activity_timeout,
864 max_fetch_count: 1, max_fetch_size: 1024 * 1024,
866 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
867 fetch_concurrent: 1,
868 replay_concurrency: 1,
869 };
870 let engine = Engine::new(context.with_label("engine"), journal, cfg);
871
872 let (voter, resolver) = registrations
874 .remove(&validator)
875 .expect("validator should be registered");
876 engine_handlers.push(engine.start(voter, resolver));
877 }
878
879 let mut completed = HashSet::new();
881 let mut finalized = HashMap::new();
882 loop {
883 let (validator, event) = done_receiver.next().await.unwrap();
884 if let mocks::application::Progress::Finalized(proof, digest) = event {
885 let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
886 if digest != payload {
887 panic!(
888 "finalization mismatch digest: {:?}, payload: {:?}",
889 digest, payload
890 );
891 }
892 finalized.insert(view, digest);
893 if (finalized.len() as u64) < required_containers {
894 continue;
895 }
896 completed.insert(validator);
897 }
898 if completed.len() == (n - 1) as usize {
899 break;
900 }
901 }
902
903 let link = Link {
905 latency: 3_000.0,
906 jitter: 0.0,
907 success_rate: 1.0,
908 };
909 link_validators(
910 &mut oracle,
911 &validators,
912 Action::Update(link.clone()),
913 Some(|_, i, j| ![i, j].contains(&0usize)),
914 )
915 .await;
916
917 context.sleep(Duration::from_secs(120)).await;
919
920 link_validators(
922 &mut oracle,
923 &validators,
924 Action::Unlink,
925 Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
926 )
927 .await;
928
929 let scheme = schemes[0].clone();
931 let validator = scheme.public_key();
932 {
933 let context = context.with_label(&format!("validator-{}", validator));
935
936 link_validators(
938 &mut oracle,
939 &validators,
940 Action::Link(link),
941 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
942 )
943 .await;
944
945 let link = Link {
947 latency: 10.0,
948 jitter: 2.5,
949 success_rate: 1.0,
950 };
951 link_validators(
952 &mut oracle,
953 &validators,
954 Action::Update(link),
955 Some(|_, i, j| ![i, j].contains(&1usize)),
956 )
957 .await;
958
959 let mut participants = BTreeMap::new();
961 participants.insert(0, (public.clone(), validators.clone(), shares[0]));
962 let supervisor_config = mocks::supervisor::Config {
963 prover: prover.clone(),
964 participants,
965 };
966 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
967 supervisors.push(supervisor.clone());
968 let application_cfg = mocks::application::Config {
969 hasher: Sha256::default(),
970 relay: relay.clone(),
971 participant: validator.clone(),
972 tracker: done_sender.clone(),
973 propose_latency: (10.0, 5.0),
974 verify_latency: (10.0, 5.0),
975 };
976 let (actor, application) = mocks::application::Application::new(
977 context.with_label("application"),
978 application_cfg,
979 );
980 actor.start();
981 let cfg = JConfig {
982 partition: validator.to_string(),
983 };
984 let journal = Journal::init(context.with_label("journal"), cfg)
985 .await
986 .expect("unable to create journal");
987 let cfg = config::Config {
988 crypto: scheme,
989 automaton: application.clone(),
990 relay: application.clone(),
991 committer: application,
992 supervisor,
993 mailbox_size: 1024,
994 namespace: namespace.clone(),
995 leader_timeout: Duration::from_secs(1),
996 notarization_timeout: Duration::from_secs(2),
997 nullify_retry: Duration::from_secs(10),
998 fetch_timeout: Duration::from_secs(1),
999 activity_timeout,
1000 max_fetch_count: 1,
1001 max_fetch_size: 1024 * 512,
1002 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1003 fetch_concurrent: 1,
1004 replay_concurrency: 1,
1005 };
1006 let engine = Engine::new(context.with_label("engine"), journal, cfg);
1007
1008 let (voter, resolver) = registrations
1010 .remove(&validator)
1011 .expect("validator should be registered");
1012 engine_handlers.push(engine.start(voter, resolver));
1013 }
1014
1015 let mut finalized = HashMap::new();
1017 let mut validator_finalized = HashSet::new();
1018 loop {
1019 let (candidate, event) = done_receiver.next().await.unwrap();
1020 if let mocks::application::Progress::Finalized(proof, digest) = event {
1021 let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
1022 if digest != payload {
1023 panic!(
1024 "finalization mismatch digest: {:?}, payload: {:?}",
1025 digest, payload
1026 );
1027 }
1028 if let Some(previous) = finalized.insert(view, digest.clone()) {
1029 if previous != digest {
1030 panic!(
1031 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1032 view, previous, digest
1033 );
1034 }
1035 }
1036 if validator == candidate {
1037 validator_finalized.insert(view);
1038 }
1039 }
1040 if validator_finalized.len() == required_containers as usize {
1041 break;
1042 }
1043 }
1044 });
1045 }
1046
1047 #[test_traced]
1048 fn test_one_offline() {
1049 let n = 5;
1051 let threshold = quorum(n).expect("unable to calculate threshold");
1052 let required_containers = 100;
1053 let activity_timeout = 10;
1054 let namespace = b"consensus".to_vec();
1055 let (executor, mut context, _) = Executor::timed(Duration::from_secs(30));
1056 executor.start(async move {
1057 let (network, mut oracle) = Network::new(
1059 context.with_label("network"),
1060 Config {
1061 max_size: 1024 * 1024,
1062 },
1063 );
1064
1065 network.start();
1067
1068 let mut schemes = Vec::new();
1070 let mut validators = Vec::new();
1071 for i in 0..n {
1072 let scheme = Ed25519::from_seed(i as u64);
1073 let pk = scheme.public_key();
1074 schemes.push(scheme);
1075 validators.push(pk);
1076 }
1077 validators.sort();
1078 schemes.sort_by_key(|s| s.public_key());
1079 let mut registrations = register_validators(&mut oracle, &validators).await;
1080
1081 let link = Link {
1083 latency: 10.0,
1084 jitter: 1.0,
1085 success_rate: 1.0,
1086 };
1087 link_validators(
1088 &mut oracle,
1089 &validators,
1090 Action::Link(link),
1091 Some(|_, i, j| ![i, j].contains(&0usize)),
1092 )
1093 .await;
1094
1095 let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
1097 let pk = poly::public(&public);
1098 let prover = Prover::new(pk, &namespace);
1099
1100 let relay = Arc::new(mocks::relay::Relay::new());
1102 let mut supervisors = Vec::new();
1103 let (done_sender, mut done_receiver) = mpsc::unbounded();
1104 let mut engine_handlers = Vec::new();
1105 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1106 if idx_scheme == 0 {
1108 continue;
1109 }
1110
1111 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1113
1114 let validator = scheme.public_key();
1116 let mut participants = BTreeMap::new();
1117 participants.insert(0, (public.clone(), validators.clone(), shares[idx_scheme]));
1118 let supervisor_config = mocks::supervisor::Config {
1119 prover: prover.clone(),
1120 participants,
1121 };
1122 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1123 supervisors.push(supervisor.clone());
1124 let application_cfg = mocks::application::Config {
1125 hasher: Sha256::default(),
1126 relay: relay.clone(),
1127 participant: validator.clone(),
1128 tracker: done_sender.clone(),
1129 propose_latency: (10.0, 5.0),
1130 verify_latency: (10.0, 5.0),
1131 };
1132 let (actor, application) = mocks::application::Application::new(
1133 context.with_label("application"),
1134 application_cfg,
1135 );
1136 actor.start();
1137 let cfg = JConfig {
1138 partition: validator.to_string(),
1139 };
1140 let journal = Journal::init(context.with_label("journal"), cfg)
1141 .await
1142 .expect("unable to create journal");
1143 let cfg = config::Config {
1144 crypto: scheme,
1145 automaton: application.clone(),
1146 relay: application.clone(),
1147 committer: application,
1148 supervisor,
1149 mailbox_size: 1024,
1150 namespace: namespace.clone(),
1151 leader_timeout: Duration::from_secs(1),
1152 notarization_timeout: Duration::from_secs(2),
1153 nullify_retry: Duration::from_secs(10),
1154 fetch_timeout: Duration::from_secs(1),
1155 activity_timeout,
1156 max_fetch_count: 1,
1157 max_fetch_size: 1024 * 512,
1158 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1159 fetch_concurrent: 1,
1160 replay_concurrency: 1,
1161 };
1162 let engine = Engine::new(context.with_label("engine"), journal, cfg);
1163
1164 let (voter, resolver) = registrations
1166 .remove(&validator)
1167 .expect("validator should be registered");
1168 engine_handlers.push(engine.start(voter, resolver));
1169 }
1170
1171 let mut completed = HashSet::new();
1173 let mut finalized = HashMap::new();
1174 loop {
1175 let (validator, event) = done_receiver.next().await.unwrap();
1176 if let mocks::application::Progress::Finalized(proof, digest) = event {
1177 let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
1178 if digest != payload {
1179 panic!(
1180 "finalization mismatch digest: {:?}, payload: {:?}",
1181 digest, payload
1182 );
1183 }
1184 if let Some(previous) = finalized.insert(view, digest.clone()) {
1185 if previous != digest {
1186 panic!(
1187 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1188 view, previous, digest
1189 );
1190 }
1191 }
1192 if (finalized.len() as u64) < required_containers {
1193 continue;
1194 }
1195 completed.insert(validator);
1196 }
1197 if completed.len() == (n - 1) as usize {
1198 break;
1199 }
1200 }
1201
1202 let offline = &validators[0];
1204 for supervisor in supervisors.iter() {
1205 {
1207 let faults = supervisor.faults.lock().unwrap();
1208 assert!(faults.is_empty());
1209 }
1210
1211 {
1213 let notarizes = supervisor.notarizes.lock().unwrap();
1214 for (view, payloads) in notarizes.iter() {
1215 for (_, participants) in payloads.iter() {
1216 if participants.contains(offline) {
1217 panic!("view: {}", view);
1218 }
1219 }
1220 }
1221 }
1222 {
1223 let finalizes = supervisor.finalizes.lock().unwrap();
1224 for (view, payloads) in finalizes.iter() {
1225 for (_, finalizers) in payloads.iter() {
1226 if finalizers.contains(offline) {
1227 panic!("view: {}", view);
1228 }
1229 }
1230 }
1231 }
1232 }
1233 });
1234 }
1235
1236 #[test_traced]
1237 fn test_slow_validator() {
1238 let n = 5;
1240 let threshold = quorum(n).expect("unable to calculate threshold");
1241 let required_containers = 50;
1242 let activity_timeout = 10;
1243 let namespace = b"consensus".to_vec();
1244 let (executor, mut context, _) = Executor::timed(Duration::from_secs(30));
1245 executor.start(async move {
1246 let (network, mut oracle) = Network::new(
1248 context.with_label("network"),
1249 Config {
1250 max_size: 1024 * 1024,
1251 },
1252 );
1253
1254 network.start();
1256
1257 let mut schemes = Vec::new();
1259 let mut validators = Vec::new();
1260 for i in 0..n {
1261 let scheme = Ed25519::from_seed(i as u64);
1262 let pk = scheme.public_key();
1263 schemes.push(scheme);
1264 validators.push(pk);
1265 }
1266 validators.sort();
1267 schemes.sort_by_key(|s| s.public_key());
1268 let mut registrations = register_validators(&mut oracle, &validators).await;
1269
1270 let link = Link {
1272 latency: 10.0,
1273 jitter: 1.0,
1274 success_rate: 1.0,
1275 };
1276 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1277
1278 let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
1280 let pk = poly::public(&public);
1281 let prover = Prover::new(pk, &namespace);
1282
1283 let relay = Arc::new(mocks::relay::Relay::new());
1285 let mut supervisors = Vec::new();
1286 let (done_sender, mut done_receiver) = mpsc::unbounded();
1287 let mut engine_handlers = Vec::new();
1288 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
1289 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1291
1292 let validator = scheme.public_key();
1294 let mut participants = BTreeMap::new();
1295 participants.insert(0, (public.clone(), validators.clone(), shares[idx_scheme]));
1296 let supervisor_config = mocks::supervisor::Config {
1297 prover: prover.clone(),
1298 participants,
1299 };
1300 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1301 supervisors.push(supervisor.clone());
1302 let application_cfg = if idx_scheme == 0 {
1303 mocks::application::Config {
1304 hasher: Sha256::default(),
1305 relay: relay.clone(),
1306 participant: validator.clone(),
1307 tracker: done_sender.clone(),
1308 propose_latency: (3_000.0, 0.0),
1309 verify_latency: (3_000.0, 5.0),
1310 }
1311 } else {
1312 mocks::application::Config {
1313 hasher: Sha256::default(),
1314 relay: relay.clone(),
1315 participant: validator.clone(),
1316 tracker: done_sender.clone(),
1317 propose_latency: (10.0, 5.0),
1318 verify_latency: (10.0, 5.0),
1319 }
1320 };
1321 let (actor, application) = mocks::application::Application::new(
1322 context.with_label("application"),
1323 application_cfg,
1324 );
1325 actor.start();
1326 let cfg = JConfig {
1327 partition: validator.to_string(),
1328 };
1329 let journal = Journal::init(context.with_label("journal"), cfg)
1330 .await
1331 .expect("unable to create journal");
1332 let cfg = config::Config {
1333 crypto: scheme,
1334 automaton: application.clone(),
1335 relay: application.clone(),
1336 committer: application,
1337 supervisor,
1338 mailbox_size: 1024,
1339 namespace: namespace.clone(),
1340 leader_timeout: Duration::from_secs(1),
1341 notarization_timeout: Duration::from_secs(2),
1342 nullify_retry: Duration::from_secs(10),
1343 fetch_timeout: Duration::from_secs(1),
1344 activity_timeout,
1345 max_fetch_count: 1,
1346 max_fetch_size: 1024 * 512,
1347 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1348 fetch_concurrent: 1,
1349 replay_concurrency: 1,
1350 };
1351 let engine = Engine::new(context.with_label("engine"), journal, cfg);
1352
1353 let (voter, resolver) = registrations
1355 .remove(&validator)
1356 .expect("validator should be registered");
1357 engine_handlers.push(engine.start(voter, resolver));
1358 }
1359
1360 let mut completed = HashSet::new();
1362 let mut finalized = HashMap::new();
1363 loop {
1364 let (validator, event) = done_receiver.next().await.unwrap();
1365 if let mocks::application::Progress::Finalized(proof, digest) = event {
1366 let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
1367 if digest != payload {
1368 panic!(
1369 "finalization mismatch digest: {:?}, payload: {:?}",
1370 digest, payload
1371 );
1372 }
1373 if let Some(previous) = finalized.insert(view, digest.clone()) {
1374 if previous != digest {
1375 panic!(
1376 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1377 view, previous, digest
1378 );
1379 }
1380 }
1381 if (finalized.len() as u64) < required_containers {
1382 continue;
1383 }
1384 completed.insert(validator);
1385 }
1386 if completed.len() == n as usize {
1387 break;
1388 }
1389 }
1390
1391 let slow = &validators[0];
1393 for supervisor in supervisors.iter() {
1394 {
1396 let faults = supervisor.faults.lock().unwrap();
1397 assert!(faults.is_empty());
1398 }
1399
1400 {
1402 let notarizes = supervisor.notarizes.lock().unwrap();
1403 for (view, payloads) in notarizes.iter() {
1404 for (_, participants) in payloads.iter() {
1405 if participants.contains(slow) {
1406 panic!("view: {}", view);
1407 }
1408 }
1409 }
1410 }
1411 {
1412 let finalizes = supervisor.finalizes.lock().unwrap();
1413 for (view, payloads) in finalizes.iter() {
1414 for (_, finalizers) in payloads.iter() {
1415 if finalizers.contains(slow) {
1416 panic!("view: {}", view);
1417 }
1418 }
1419 }
1420 }
1421 }
1422 });
1423 }
1424
1425 #[test_traced]
1426 fn test_all_recovery() {
1427 let n = 5;
1429 let threshold = quorum(n).expect("unable to calculate threshold");
1430 let required_containers = 100;
1431 let activity_timeout = 10;
1432 let namespace = b"consensus".to_vec();
1433 let (executor, mut context, _) = Executor::timed(Duration::from_secs(120));
1434 executor.start(async move {
1435 let (network, mut oracle) = Network::new(
1437 context.with_label("network"),
1438 Config {
1439 max_size: 1024 * 1024,
1440 },
1441 );
1442
1443 network.start();
1445
1446 let mut schemes = Vec::new();
1448 let mut validators = Vec::new();
1449 for i in 0..n {
1450 let scheme = Ed25519::from_seed(i as u64);
1451 let pk = scheme.public_key();
1452 schemes.push(scheme);
1453 validators.push(pk);
1454 }
1455 validators.sort();
1456 schemes.sort_by_key(|s| s.public_key());
1457 let mut registrations = register_validators(&mut oracle, &validators).await;
1458
1459 let link = Link {
1461 latency: 3_000.0,
1462 jitter: 0.0,
1463 success_rate: 1.0,
1464 };
1465 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
1466
1467 let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
1469 let pk = poly::public(&public);
1470 let prover = Prover::new(pk, &namespace);
1471
1472 let relay = Arc::new(mocks::relay::Relay::new());
1474 let mut supervisors = Vec::new();
1475 let (done_sender, mut done_receiver) = mpsc::unbounded();
1476 let mut engine_handlers = Vec::new();
1477 for (idx, scheme) in schemes.iter().enumerate() {
1478 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1480
1481 let validator = scheme.public_key();
1483 let mut participants = BTreeMap::new();
1484 participants.insert(0, (public.clone(), validators.clone(), shares[idx]));
1485 let supervisor_config = mocks::supervisor::Config {
1486 prover: prover.clone(),
1487 participants,
1488 };
1489 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1490 supervisors.push(supervisor.clone());
1491 let application_cfg = mocks::application::Config {
1492 hasher: Sha256::default(),
1493 relay: relay.clone(),
1494 participant: validator.clone(),
1495 tracker: done_sender.clone(),
1496 propose_latency: (10.0, 5.0),
1497 verify_latency: (10.0, 5.0),
1498 };
1499 let (actor, application) = mocks::application::Application::new(
1500 context.with_label("application"),
1501 application_cfg,
1502 );
1503 actor.start();
1504 let cfg = JConfig {
1505 partition: validator.to_string(),
1506 };
1507 let journal = Journal::init(context.with_label("journal"), cfg)
1508 .await
1509 .expect("unable to create journal");
1510 let cfg = config::Config {
1511 crypto: scheme.clone(),
1512 automaton: application.clone(),
1513 relay: application.clone(),
1514 committer: application,
1515 supervisor,
1516 mailbox_size: 1024,
1517 namespace: namespace.clone(),
1518 leader_timeout: Duration::from_secs(1),
1519 notarization_timeout: Duration::from_secs(2),
1520 nullify_retry: Duration::from_secs(10),
1521 fetch_timeout: Duration::from_secs(1),
1522 activity_timeout,
1523 max_fetch_count: 1,
1524 max_fetch_size: 1024 * 512,
1525 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1526 fetch_concurrent: 1,
1527 replay_concurrency: 1,
1528 };
1529 let engine = Engine::new(context.with_label("engine"), journal, cfg);
1530
1531 let (voter, resolver) = registrations
1533 .remove(&validator)
1534 .expect("validator should be registered");
1535 engine_handlers.push(engine.start(voter, resolver));
1536 }
1537
1538 select! {
1540 _timeout = context.sleep(Duration::from_secs(60)) => {},
1541 _done = done_receiver.next() => {
1542 panic!("engine should not notarize or finalize anything");
1543 }
1544 }
1545
1546 let link = Link {
1548 latency: 10.0,
1549 jitter: 1.0,
1550 success_rate: 1.0,
1551 };
1552 link_validators(&mut oracle, &validators, Action::Update(link), None).await;
1553
1554 let mut completed = HashSet::new();
1556 let mut finalized = HashMap::new();
1557 loop {
1558 let (validator, event) = done_receiver.next().await.unwrap();
1559 if let mocks::application::Progress::Finalized(proof, digest) = event {
1560 let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
1561 if digest != payload {
1562 panic!(
1563 "finalization mismatch digest: {:?}, payload: {:?}",
1564 digest, payload
1565 );
1566 }
1567 if let Some(previous) = finalized.insert(view, digest.clone()) {
1568 if previous != digest {
1569 panic!(
1570 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1571 view, previous, digest
1572 );
1573 }
1574 }
1575 if (finalized.len() as u64) < required_containers {
1576 continue;
1577 }
1578 completed.insert(validator);
1579 }
1580 if completed.len() == n as usize {
1581 break;
1582 }
1583 }
1584
1585 for supervisor in supervisors.iter() {
1587 {
1589 let faults = supervisor.faults.lock().unwrap();
1590 assert!(faults.is_empty());
1591 }
1592 }
1593 });
1594 }
1595
1596 #[test_traced]
1597 fn test_partition() {
1598 let n = 10;
1600 let threshold = quorum(n).expect("unable to calculate threshold");
1601 let required_containers = 50;
1602 let activity_timeout = 10;
1603 let namespace = b"consensus".to_vec();
1604 let (executor, mut context, _) = Executor::timed(Duration::from_secs(900));
1605 executor.start(async move {
1606 let (network, mut oracle) = Network::new(
1608 context.with_label("network"),
1609 Config {
1610 max_size: 1024 * 1024,
1611 },
1612 );
1613
1614 network.start();
1616
1617 let mut schemes = Vec::new();
1619 let mut validators = Vec::new();
1620 for i in 0..n {
1621 let scheme = Ed25519::from_seed(i as u64);
1622 let pk = scheme.public_key();
1623 schemes.push(scheme);
1624 validators.push(pk);
1625 }
1626 validators.sort();
1627 schemes.sort_by_key(|s| s.public_key());
1628 let mut registrations = register_validators(&mut oracle, &validators).await;
1629
1630 let link = Link {
1632 latency: 10.0,
1633 jitter: 1.0,
1634 success_rate: 1.0,
1635 };
1636 link_validators(&mut oracle, &validators, Action::Link(link.clone()), None).await;
1637
1638 let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
1640 let pk = poly::public(&public);
1641 let prover = Prover::new(pk, &namespace);
1642
1643 let relay = Arc::new(mocks::relay::Relay::new());
1645 let mut supervisors = Vec::new();
1646 let (done_sender, mut done_receiver) = mpsc::unbounded();
1647 let mut engine_handlers = Vec::new();
1648 for (idx, scheme) in schemes.iter().enumerate() {
1649 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1651
1652 let validator = scheme.public_key();
1654 let mut participants = BTreeMap::new();
1655 participants.insert(0, (public.clone(), validators.clone(), shares[idx]));
1656 let supervisor_config = mocks::supervisor::Config {
1657 prover: prover.clone(),
1658 participants,
1659 };
1660 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1661 supervisors.push(supervisor.clone());
1662 let application_cfg = mocks::application::Config {
1663 hasher: Sha256::default(),
1664 relay: relay.clone(),
1665 participant: validator.clone(),
1666 tracker: done_sender.clone(),
1667 propose_latency: (10.0, 5.0),
1668 verify_latency: (10.0, 5.0),
1669 };
1670 let (actor, application) = mocks::application::Application::new(
1671 context.with_label("application"),
1672 application_cfg,
1673 );
1674 actor.start();
1675 let cfg = JConfig {
1676 partition: validator.to_string(),
1677 };
1678 let journal = Journal::init(context.with_label("journal"), cfg)
1679 .await
1680 .expect("unable to create journal");
1681 let cfg = config::Config {
1682 crypto: scheme.clone(),
1683 automaton: application.clone(),
1684 relay: application.clone(),
1685 committer: application,
1686 supervisor,
1687 mailbox_size: 1024,
1688 namespace: namespace.clone(),
1689 leader_timeout: Duration::from_secs(1),
1690 notarization_timeout: Duration::from_secs(2),
1691 nullify_retry: Duration::from_secs(10),
1692 fetch_timeout: Duration::from_secs(1),
1693 activity_timeout,
1694 max_fetch_count: 1,
1695 max_fetch_size: 1024 * 512,
1696 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1697 fetch_concurrent: 1,
1698 replay_concurrency: 1,
1699 };
1700 let engine = Engine::new(context.with_label("engine"), journal, cfg);
1701
1702 let (voter, resolver) = registrations
1704 .remove(&validator)
1705 .expect("validator should be registered");
1706 engine_handlers.push(engine.start(voter, resolver));
1707 }
1708
1709 let mut completed = HashSet::new();
1711 let mut finalized = HashMap::new();
1712 let mut highest_finalized = 0;
1713 loop {
1714 let (validator, event) = done_receiver.next().await.unwrap();
1715 if let mocks::application::Progress::Finalized(proof, digest) = event {
1716 let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
1717 if digest != payload {
1718 panic!(
1719 "finalization mismatch digest: {:?}, payload: {:?}",
1720 digest, payload
1721 );
1722 }
1723 if let Some(previous) = finalized.insert(view, digest.clone()) {
1724 if previous != digest {
1725 panic!(
1726 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1727 view, previous, digest
1728 );
1729 }
1730 }
1731 if view > highest_finalized {
1732 highest_finalized = view;
1733 }
1734 if (finalized.len() as u64) < required_containers {
1735 continue;
1736 }
1737 completed.insert(validator);
1738 }
1739 if completed.len() == n as usize {
1740 break;
1741 }
1742 }
1743
1744 fn separated(n: usize, a: usize, b: usize) -> bool {
1746 let m = n / 2;
1747 (a < m && b >= m) || (a >= m && b < m)
1748 }
1749 link_validators(&mut oracle, &validators, Action::Unlink, Some(separated)).await;
1750
1751 context.sleep(Duration::from_secs(10)).await;
1753
1754 loop {
1756 if done_receiver.try_next().is_err() {
1757 break;
1758 }
1759 }
1760
1761 select! {
1763 _timeout = context.sleep(Duration::from_secs(600)) => {},
1764 _done = done_receiver.next() => {
1765 panic!("engine should not notarize or finalize anything");
1766 }
1767 }
1768
1769 link_validators(
1771 &mut oracle,
1772 &validators,
1773 Action::Link(link),
1774 Some(separated),
1775 )
1776 .await;
1777
1778 let mut completed = HashSet::new();
1780 loop {
1781 let (validator, event) = done_receiver.next().await.unwrap();
1782 if let mocks::application::Progress::Finalized(proof, digest) = event {
1783 let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
1784 if digest != payload {
1785 panic!(
1786 "finalization mismatch digest: {:?}, payload: {:?}",
1787 digest, payload
1788 );
1789 }
1790 if let Some(previous) = finalized.insert(view, digest.clone()) {
1791 if previous != digest {
1792 panic!(
1793 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1794 view, previous, digest
1795 );
1796 }
1797 }
1798 if (finalized.len() as u64) < required_containers + highest_finalized {
1799 continue;
1800 }
1801 completed.insert(validator);
1802 }
1803 if completed.len() == n as usize {
1804 break;
1805 }
1806 }
1807
1808 for supervisor in supervisors.iter() {
1810 {
1812 let faults = supervisor.faults.lock().unwrap();
1813 assert!(faults.is_empty());
1814 }
1815 }
1816 });
1817 }
1818
1819 fn slow_and_lossy_links(seed: u64) -> String {
1820 let n = 5;
1822 let threshold = quorum(n).expect("unable to calculate threshold");
1823 let required_containers = 50;
1824 let activity_timeout = 10;
1825 let namespace = b"consensus".to_vec();
1826 let cfg = deterministic::Config {
1827 seed,
1828 timeout: Some(Duration::from_secs(3_000)),
1829 ..deterministic::Config::default()
1830 };
1831 let (executor, mut context, auditor) = Executor::init(cfg);
1832 executor.start(async move {
1833 let (network, mut oracle) = Network::new(
1835 context.with_label("network"),
1836 Config {
1837 max_size: 1024 * 1024,
1838 },
1839 );
1840
1841 network.start();
1843
1844 let mut schemes = Vec::new();
1846 let mut validators = Vec::new();
1847 for i in 0..n {
1848 let scheme = Ed25519::from_seed(i as u64);
1849 let pk = scheme.public_key();
1850 schemes.push(scheme);
1851 validators.push(pk);
1852 }
1853 validators.sort();
1854 schemes.sort_by_key(|s| s.public_key());
1855 let mut registrations = register_validators(&mut oracle, &validators).await;
1856
1857 let degraded_link = Link {
1859 latency: 200.0,
1860 jitter: 150.0,
1861 success_rate: 0.5,
1862 };
1863 link_validators(&mut oracle, &validators, Action::Link(degraded_link), None).await;
1864
1865 let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
1867 let pk = poly::public(&public);
1868 let prover = Prover::new(pk, &namespace);
1869
1870 let relay = Arc::new(mocks::relay::Relay::new());
1872 let mut supervisors = Vec::new();
1873 let (done_sender, mut done_receiver) = mpsc::unbounded();
1874 let mut engine_handlers = Vec::new();
1875 for (idx, scheme) in schemes.into_iter().enumerate() {
1876 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
1878
1879 let validator = scheme.public_key();
1881 let mut participants = BTreeMap::new();
1882 participants.insert(0, (public.clone(), validators.clone(), shares[idx]));
1883 let supervisor_config = mocks::supervisor::Config {
1884 prover: prover.clone(),
1885 participants,
1886 };
1887 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
1888 supervisors.push(supervisor.clone());
1889 let application_cfg = mocks::application::Config {
1890 hasher: Sha256::default(),
1891 relay: relay.clone(),
1892 participant: validator.clone(),
1893 tracker: done_sender.clone(),
1894 propose_latency: (10.0, 5.0),
1895 verify_latency: (10.0, 5.0),
1896 };
1897 let (actor, application) = mocks::application::Application::new(
1898 context.with_label("application"),
1899 application_cfg,
1900 );
1901 actor.start();
1902 let cfg = JConfig {
1903 partition: validator.to_string(),
1904 };
1905 let journal = Journal::init(context.with_label("journal"), cfg)
1906 .await
1907 .expect("unable to create journal");
1908 let cfg = config::Config {
1909 crypto: scheme,
1910 automaton: application.clone(),
1911 relay: application.clone(),
1912 committer: application,
1913 supervisor,
1914 mailbox_size: 1024,
1915 namespace: namespace.clone(),
1916 leader_timeout: Duration::from_secs(1),
1917 notarization_timeout: Duration::from_secs(2),
1918 nullify_retry: Duration::from_secs(10),
1919 fetch_timeout: Duration::from_secs(1),
1920 activity_timeout,
1921 max_fetch_count: 1,
1922 max_fetch_size: 1024 * 512,
1923 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
1924 fetch_concurrent: 1,
1925 replay_concurrency: 1,
1926 };
1927 let engine = Engine::new(context.with_label("engine"), journal, cfg);
1928
1929 let (voter, resolver) = registrations
1931 .remove(&validator)
1932 .expect("validator should be registered");
1933 engine_handlers.push(engine.start(voter, resolver));
1934 }
1935
1936 let mut completed = HashSet::new();
1938 let mut finalized = HashMap::new();
1939 loop {
1940 let (validator, event) = done_receiver.next().await.unwrap();
1941 if let mocks::application::Progress::Finalized(proof, digest) = event {
1942 let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
1943 if digest != payload {
1944 panic!(
1945 "finalization mismatch digest: {:?}, payload: {:?}",
1946 digest, payload
1947 );
1948 }
1949 if let Some(previous) = finalized.insert(view, digest.clone()) {
1950 if previous != digest {
1951 panic!(
1952 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
1953 view, previous, digest
1954 );
1955 }
1956 }
1957 if (finalized.len() as u64) < required_containers {
1958 continue;
1959 }
1960 completed.insert(validator);
1961 }
1962 if completed.len() == n as usize {
1963 break;
1964 }
1965 }
1966
1967 for supervisor in supervisors.iter() {
1969 {
1971 let faults = supervisor.faults.lock().unwrap();
1972 assert!(faults.is_empty());
1973 }
1974 }
1975 });
1976 auditor.state()
1977 }
1978
1979 #[test_traced]
1980 fn test_slow_and_lossy_links() {
1981 slow_and_lossy_links(0);
1982 }
1983
1984 #[test_traced]
1985 fn test_determinism() {
1986 for seed in 1..6 {
1989 let state_1 = slow_and_lossy_links(seed);
1991
1992 let state_2 = slow_and_lossy_links(seed);
1994
1995 assert_eq!(state_1, state_2);
1997 }
1998 }
1999
2000 #[test_traced]
2001 fn test_conflicter() {
2002 let n = 4;
2004 let threshold = quorum(n).expect("unable to calculate threshold");
2005 let required_containers = 50;
2006 let activity_timeout = 10;
2007 let namespace = b"consensus".to_vec();
2008 let (executor, mut context, _) = Executor::timed(Duration::from_secs(30));
2009 executor.start(async move {
2010 let (network, mut oracle) = Network::new(
2012 context.with_label("network"),
2013 Config {
2014 max_size: 1024 * 1024,
2015 },
2016 );
2017
2018 network.start();
2020
2021 let mut schemes = Vec::new();
2023 let mut validators = Vec::new();
2024 for i in 0..n {
2025 let scheme = Ed25519::from_seed(i as u64);
2026 let pk = scheme.public_key();
2027 schemes.push(scheme);
2028 validators.push(pk);
2029 }
2030 validators.sort();
2031 schemes.sort_by_key(|s| s.public_key());
2032 let mut registrations = register_validators(&mut oracle, &validators).await;
2033
2034 let link = Link {
2036 latency: 10.0,
2037 jitter: 1.0,
2038 success_rate: 1.0,
2039 };
2040 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2041
2042 let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
2044 let pk = poly::public(&public);
2045 let prover = Prover::new(pk, &namespace);
2046
2047 let relay = Arc::new(mocks::relay::Relay::new());
2049 let mut supervisors = Vec::new();
2050 let (done_sender, mut done_receiver) = mpsc::unbounded();
2051 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2052 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2054
2055 let validator = scheme.public_key();
2057 let mut participants = BTreeMap::new();
2058 participants.insert(0, (public.clone(), validators.clone(), shares[idx_scheme]));
2059 let supervisor_config = mocks::supervisor::Config {
2060 prover: prover.clone(),
2061 participants,
2062 };
2063 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2064 let (voter, resolver) = registrations
2065 .remove(&validator)
2066 .expect("validator should be registered");
2067 if idx_scheme == 0 {
2068 let cfg = mocks::conflicter::Config {
2069 supervisor,
2070 namespace: namespace.clone(),
2071 };
2072 let engine: mocks::conflicter::Conflicter<_, Sha256, _> =
2073 mocks::conflicter::Conflicter::new(
2074 context.with_label("byzantine_engine"),
2075 cfg,
2076 );
2077 engine.start(voter);
2078 } else {
2079 supervisors.push(supervisor.clone());
2080 let application_cfg = mocks::application::Config {
2081 hasher: Sha256::default(),
2082 relay: relay.clone(),
2083 participant: validator.clone(),
2084 tracker: done_sender.clone(),
2085 propose_latency: (10.0, 5.0),
2086 verify_latency: (10.0, 5.0),
2087 };
2088 let (actor, application) = mocks::application::Application::new(
2089 context.with_label("application"),
2090 application_cfg,
2091 );
2092 actor.start();
2093 let cfg = JConfig {
2094 partition: validator.to_string(),
2095 };
2096 let journal = Journal::init(context.with_label("journal"), cfg)
2097 .await
2098 .expect("unable to create journal");
2099 let cfg = config::Config {
2100 crypto: scheme,
2101 automaton: application.clone(),
2102 relay: application.clone(),
2103 committer: application,
2104 supervisor,
2105 mailbox_size: 1024,
2106 namespace: namespace.clone(),
2107 leader_timeout: Duration::from_secs(1),
2108 notarization_timeout: Duration::from_secs(2),
2109 nullify_retry: Duration::from_secs(10),
2110 fetch_timeout: Duration::from_secs(1),
2111 activity_timeout,
2112 max_fetch_count: 1,
2113 max_fetch_size: 1024 * 512,
2114 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
2115 fetch_concurrent: 1,
2116 replay_concurrency: 1,
2117 };
2118 let engine = Engine::new(context.with_label("engine"), journal, cfg);
2119 engine.start(voter, resolver);
2120 }
2121 }
2122
2123 let mut completed = HashSet::new();
2125 let mut finalized = HashMap::new();
2126 loop {
2127 let (validator, event) = done_receiver.next().await.unwrap();
2128 if let mocks::application::Progress::Finalized(proof, digest) = event {
2129 let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
2130 if digest != payload {
2131 panic!(
2132 "finalization mismatch digest: {:?}, payload: {:?}",
2133 digest, payload
2134 );
2135 }
2136 if let Some(previous) = finalized.insert(view, digest.clone()) {
2137 if previous != digest {
2138 panic!(
2139 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
2140 view, previous, digest
2141 );
2142 }
2143 }
2144 if (finalized.len() as u64) < required_containers {
2145 continue;
2146 }
2147 completed.insert(validator);
2148 }
2149 if completed.len() == (n - 1) as usize {
2150 break;
2151 }
2152 }
2153
2154 let byz = &validators[0];
2156 let mut count_conflicting_notarize = 0;
2157 let mut count_conflicting_finalize = 0;
2158 for supervisor in supervisors.iter() {
2159 {
2161 let faults = supervisor.faults.lock().unwrap();
2162 assert_eq!(faults.len(), 1);
2163 let faulter = faults.get(byz).expect("byzantine party is not faulter");
2164 for (_, faults) in faulter.iter() {
2165 for fault in faults.iter() {
2166 match *fault {
2167 CONFLICTING_NOTARIZE => {
2168 count_conflicting_notarize += 1;
2169 }
2170 CONFLICTING_FINALIZE => {
2171 count_conflicting_finalize += 1;
2172 }
2173 _ => panic!("unexpected fault: {:?}", fault),
2174 }
2175 }
2176 }
2177 }
2178 }
2179 assert!(count_conflicting_notarize > 0);
2180 assert!(count_conflicting_finalize > 0);
2181 });
2182 }
2183
2184 #[test_traced]
2185 fn test_nuller() {
2186 let n = 4;
2188 let threshold = quorum(n).expect("unable to calculate threshold");
2189 let required_containers = 50;
2190 let activity_timeout = 10;
2191 let namespace = b"consensus".to_vec();
2192 let (executor, mut context, _) = Executor::timed(Duration::from_secs(30));
2193 executor.start(async move {
2194 let (network, mut oracle) = Network::new(
2196 context.with_label("network"),
2197 Config {
2198 max_size: 1024 * 1024,
2199 },
2200 );
2201
2202 network.start();
2204
2205 let mut schemes = Vec::new();
2207 let mut validators = Vec::new();
2208 for i in 0..n {
2209 let scheme = Ed25519::from_seed(i as u64);
2210 let pk = scheme.public_key();
2211 schemes.push(scheme);
2212 validators.push(pk);
2213 }
2214 validators.sort();
2215 schemes.sort_by_key(|s| s.public_key());
2216 let mut registrations = register_validators(&mut oracle, &validators).await;
2217
2218 let link = Link {
2220 latency: 10.0,
2221 jitter: 1.0,
2222 success_rate: 1.0,
2223 };
2224 link_validators(&mut oracle, &validators, Action::Link(link), None).await;
2225
2226 let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
2228 let pk = poly::public(&public);
2229 let prover = Prover::new(pk, &namespace);
2230
2231 let relay = Arc::new(mocks::relay::Relay::new());
2233 let mut supervisors = Vec::new();
2234 let (done_sender, mut done_receiver) = mpsc::unbounded();
2235 for (idx_scheme, scheme) in schemes.into_iter().enumerate() {
2236 let context = context.with_label(&format!("validator-{}", scheme.public_key()));
2238
2239 let validator = scheme.public_key();
2241 let mut participants = BTreeMap::new();
2242 participants.insert(0, (public.clone(), validators.clone(), shares[idx_scheme]));
2243 let supervisor_config = mocks::supervisor::Config {
2244 prover: prover.clone(),
2245 participants,
2246 };
2247 let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
2248 let (voter, resolver) = registrations
2249 .remove(&validator)
2250 .expect("validator should be registered");
2251 if idx_scheme == 0 {
2252 let cfg = mocks::nuller::Config {
2253 supervisor,
2254 namespace: namespace.clone(),
2255 };
2256 let engine: mocks::nuller::Nuller<_, Sha256, _> =
2257 mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
2258 engine.start(voter);
2259 } else {
2260 supervisors.push(supervisor.clone());
2261 let application_cfg = mocks::application::Config {
2262 hasher: Sha256::default(),
2263 relay: relay.clone(),
2264 participant: validator.clone(),
2265 tracker: done_sender.clone(),
2266 propose_latency: (10.0, 5.0),
2267 verify_latency: (10.0, 5.0),
2268 };
2269 let (actor, application) = mocks::application::Application::new(
2270 context.with_label("application"),
2271 application_cfg,
2272 );
2273 actor.start();
2274 let cfg = JConfig {
2275 partition: validator.to_string(),
2276 };
2277 let journal = Journal::init(context.with_label("journal"), cfg)
2278 .await
2279 .expect("unable to create journal");
2280 let cfg = config::Config {
2281 crypto: scheme,
2282 automaton: application.clone(),
2283 relay: application.clone(),
2284 committer: application,
2285 supervisor,
2286 mailbox_size: 1024,
2287 namespace: namespace.clone(),
2288 leader_timeout: Duration::from_secs(1),
2289 notarization_timeout: Duration::from_secs(2),
2290 nullify_retry: Duration::from_secs(10),
2291 fetch_timeout: Duration::from_secs(1),
2292 activity_timeout,
2293 max_fetch_count: 1,
2294 max_fetch_size: 1024 * 512,
2295 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()),
2296 fetch_concurrent: 1,
2297 replay_concurrency: 1,
2298 };
2299 let engine = Engine::new(context.with_label("engine"), journal, cfg);
2300 engine.start(voter, resolver);
2301 }
2302 }
2303
2304 let mut completed = HashSet::new();
2306 let mut finalized = HashMap::new();
2307 loop {
2308 let (validator, event) = done_receiver.next().await.unwrap();
2309 if let mocks::application::Progress::Finalized(proof, digest) = event {
2310 let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap();
2311 if digest != payload {
2312 panic!(
2313 "finalization mismatch digest: {:?}, payload: {:?}",
2314 digest, payload
2315 );
2316 }
2317 if let Some(previous) = finalized.insert(view, digest.clone()) {
2318 if previous != digest {
2319 panic!(
2320 "finalization mismatch at {:?} previous: {:?}, current: {:?}",
2321 view, previous, digest
2322 );
2323 }
2324 }
2325 if (finalized.len() as u64) < required_containers {
2326 continue;
2327 }
2328 completed.insert(validator);
2329 }
2330 if completed.len() == (n - 1) as usize {
2331 break;
2332 }
2333 }
2334
2335 let byz = &validators[0];
2337 let mut count_nullify_and_finalize = 0;
2338 for supervisor in supervisors.iter() {
2339 {
2341 let faults = supervisor.faults.lock().unwrap();
2342 assert_eq!(faults.len(), 1);
2343 let faulter = faults.get(byz).expect("byzantine party is not faulter");
2344 for (_, faults) in faulter.iter() {
2345 for fault in faults.iter() {
2346 match *fault {
2347 NULLIFY_AND_FINALIZE => {
2348 count_nullify_and_finalize += 1;
2349 }
2350 _ => panic!("unexpected fault: {:?}", fault),
2351 }
2352 }
2353 }
2354 }
2355 }
2356 assert!(count_nullify_and_finalize > 0);
2357 });
2358 }
2359}