1pub mod scheme;
70pub mod types;
71
72cfg_if::cfg_if! {
73 if #[cfg(not(target_arch = "wasm32"))] {
74 mod config;
75 pub use config::Config;
76 mod engine;
77 pub use engine::Engine;
78 mod metrics;
79 mod safe_tip;
80
81 #[cfg(test)]
82 pub mod mocks;
83 }
84}
85
86#[cfg(test)]
87mod tests {
88 use super::{mocks, Config, Engine};
89 use crate::{
90 aggregation::scheme::{bls12381_multisig, bls12381_threshold, ed25519, secp256r1, Scheme},
91 types::{Epoch, EpochDelta, Height, HeightDelta},
92 };
93 use commonware_cryptography::{
94 bls12381::primitives::variant::{MinPk, MinSig},
95 certificate::mocks::Fixture,
96 ed25519::PublicKey,
97 sha256::Digest as Sha256Digest,
98 };
99 use commonware_macros::{select, test_group, test_traced};
100 use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
101 use commonware_parallel::Sequential;
102 use commonware_runtime::{
103 buffer::paged::CacheRef,
104 deterministic::{self, Context},
105 Clock, Quota, Runner, Spawner, Supervisor as _,
106 };
107 use commonware_utils::{
108 channel::{fallible::OneshotExt, oneshot},
109 test_rng, NZUsize, NonZeroDuration, NZU16,
110 };
111 use futures::future::join_all;
112 use rand::{rngs::StdRng, Rng};
113 use std::{
114 collections::BTreeMap,
115 num::{NonZeroU16, NonZeroU32, NonZeroUsize},
116 time::Duration,
117 };
118 use tracing::debug;
119
120 type Registrations<P> = BTreeMap<P, (Sender<P, deterministic::Context>, Receiver<P>)>;
121
122 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
123 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
124 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
125 const TEST_NAMESPACE: &[u8] = b"my testing namespace";
126
127 const RELIABLE_LINK: Link = Link {
129 latency: Duration::from_millis(10),
130 jitter: Duration::from_millis(1),
131 success_rate: 1.0,
132 };
133
134 async fn register_participants(
136 oracle: &mut Oracle<PublicKey, deterministic::Context>,
137 participants: &[PublicKey],
138 ) -> Registrations<PublicKey> {
139 let mut registrations = BTreeMap::new();
140 for participant in participants.iter() {
141 let (sender, receiver) = oracle
142 .control(participant.clone())
143 .register(0, TEST_QUOTA)
144 .await
145 .unwrap();
146 registrations.insert(participant.clone(), (sender, receiver));
147 }
148 registrations
149 }
150
151 async fn link_participants(
153 oracle: &mut Oracle<PublicKey, deterministic::Context>,
154 participants: &[PublicKey],
155 link: Link,
156 ) {
157 for v1 in participants.iter() {
158 for v2 in participants.iter() {
159 if v2 == v1 {
160 continue;
161 }
162 oracle
163 .add_link(v1.clone(), v2.clone(), link.clone())
164 .await
165 .unwrap();
166 }
167 }
168 }
169
170 async fn initialize_simulation<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
172 context: Context,
173 fixture: &Fixture<S>,
174 link: Link,
175 ) -> (
176 Oracle<PublicKey, deterministic::Context>,
177 Registrations<PublicKey>,
178 ) {
179 let (network, mut oracle) = Network::new_with_peers(
180 context.child("network"),
181 commonware_p2p::simulated::Config {
182 max_size: 1024 * 1024,
183 disconnect_on_block: true,
184 tracked_peer_sets: NZUsize!(1),
185 },
186 fixture.participants.clone(),
187 )
188 .await;
189 network.start();
190
191 let registrations = register_participants(&mut oracle, &fixture.participants).await;
192 link_participants(&mut oracle, &fixture.participants, link).await;
193
194 (oracle, registrations)
195 }
196
197 fn spawn_validator_engines<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
199 context: Context,
200 fixture: &Fixture<S>,
201 registrations: &mut Registrations<PublicKey>,
202 oracle: &mut Oracle<PublicKey, deterministic::Context>,
203 epoch: Epoch,
204 rebroadcast_timeout: Duration,
205 incorrect: Vec<usize>,
206 ) -> BTreeMap<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>> {
207 let mut reporters = BTreeMap::new();
208
209 for (idx, participant) in fixture.participants.iter().enumerate() {
210 let context = context
211 .child("participant")
212 .with_attribute("public_key", participant);
213
214 let provider = mocks::Provider::new();
216 assert!(provider.register(epoch, fixture.schemes[idx].clone()));
217
218 let monitor = mocks::Monitor::new(epoch);
220
221 let strategy = if incorrect.contains(&idx) {
223 mocks::Strategy::Incorrect
224 } else {
225 mocks::Strategy::Correct
226 };
227 let automaton = mocks::Application::new(strategy);
228
229 let (reporter, reporter_mailbox) =
231 mocks::Reporter::new(context.child("reporter"), fixture.verifier.clone());
232 reporter.start();
233 reporters.insert(participant.clone(), reporter_mailbox.clone());
234
235 let blocker = oracle.control(participant.clone());
237
238 let engine = Engine::new(
240 context.child("engine"),
241 Config {
242 monitor,
243 provider,
244 automaton,
245 reporter: reporter_mailbox,
246 blocker,
247 priority_acks: false,
248 rebroadcast_timeout: NonZeroDuration::new_panic(rebroadcast_timeout),
249 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
250 window: std::num::NonZeroU64::new(10).unwrap(),
251 activity_timeout: HeightDelta::new(100),
252 journal_partition: format!("aggregation-{participant}"),
253 journal_write_buffer: NZUsize!(4096),
254 journal_replay_buffer: NZUsize!(4096),
255 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
256 journal_compression: Some(3),
257 journal_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
258 strategy: Sequential,
259 },
260 );
261
262 let (sender, receiver) = registrations.remove(participant).unwrap();
263 engine.start((sender, receiver));
264 }
265
266 reporters
267 }
268
269 async fn await_reporters<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
271 context: Context,
272 reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>>,
273 threshold_height: Height,
274 threshold_epoch: Epoch,
275 ) {
276 let mut receivers = Vec::new();
277 for (reporter, mailbox) in reporters.iter() {
278 let (tx, rx) = oneshot::channel();
280 receivers.push(rx);
281
282 context
283 .child("reporter_watcher")
284 .with_attribute("reporter", reporter)
285 .spawn({
286 let reporter = reporter.clone();
287 let mut mailbox = mailbox.clone();
288 move |context| async move {
289 loop {
290 let (height, epoch) = mailbox
291 .get_tip()
292 .await
293 .unwrap_or((Height::zero(), Epoch::zero()));
294 debug!(
295 %height,
296 epoch = %epoch,
297 %threshold_height,
298 threshold_epoch = %threshold_epoch,
299 ?reporter,
300 "reporter status"
301 );
302 if height >= threshold_height && epoch >= threshold_epoch {
303 debug!(
304 ?reporter,
305 "reporter reached threshold, signaling completion"
306 );
307 tx.send_lossy(reporter.clone());
308 break;
309 }
310 context.sleep(Duration::from_millis(100)).await;
311 }
312 }
313 });
314 }
315
316 let results = join_all(receivers).await;
318 assert_eq!(results.len(), reporters.len());
319
320 for result in results {
322 assert!(result.is_ok(), "reporter was cancelled");
323 }
324 }
325
326 fn all_online<S, F>(fixture: F)
328 where
329 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
330 F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
331 {
332 let runner = deterministic::Runner::timed(Duration::from_secs(30));
333
334 runner.start(|mut context| async move {
335 let num_validators = 4;
336 let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
337 let epoch = Epoch::new(111);
338
339 let (mut oracle, mut registrations) =
340 initialize_simulation(context.child("simulation"), &fixture, RELIABLE_LINK).await;
341
342 let reporters = spawn_validator_engines(
343 context.child("validator"),
344 &fixture,
345 &mut registrations,
346 &mut oracle,
347 epoch,
348 Duration::from_secs(5),
349 vec![],
350 );
351
352 await_reporters(
353 context.child("reporter"),
354 &reporters,
355 Height::new(100),
356 epoch,
357 )
358 .await;
359 });
360 }
361
362 #[test_group("slow")]
363 #[test_traced("INFO")]
364 fn test_all_online() {
365 all_online(bls12381_threshold::fixture::<MinPk, _>);
366 all_online(bls12381_threshold::fixture::<MinSig, _>);
367 all_online(bls12381_multisig::fixture::<MinPk, _>);
368 all_online(bls12381_multisig::fixture::<MinSig, _>);
369 all_online(ed25519::fixture);
370 all_online(secp256r1::fixture);
371 }
372
373 fn byzantine_proposer<S, F>(fixture: F)
375 where
376 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
377 F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
378 {
379 let runner = deterministic::Runner::timed(Duration::from_secs(30));
380
381 runner.start(|mut context| async move {
382 let num_validators = 4;
383 let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
384 let epoch = Epoch::new(111);
385
386 let (mut oracle, mut registrations) =
387 initialize_simulation(context.child("simulation"), &fixture, RELIABLE_LINK).await;
388
389 let reporters = spawn_validator_engines(
390 context.child("validator"),
391 &fixture,
392 &mut registrations,
393 &mut oracle,
394 epoch,
395 Duration::from_secs(5),
396 vec![0],
397 );
398
399 await_reporters(
400 context.child("reporter"),
401 &reporters,
402 Height::new(100),
403 epoch,
404 )
405 .await;
406 });
407 }
408
409 #[test_traced("INFO")]
410 fn test_byzantine_proposer() {
411 byzantine_proposer(bls12381_threshold::fixture::<MinPk, _>);
412 byzantine_proposer(bls12381_threshold::fixture::<MinSig, _>);
413 byzantine_proposer(bls12381_multisig::fixture::<MinPk, _>);
414 byzantine_proposer(bls12381_multisig::fixture::<MinSig, _>);
415 byzantine_proposer(ed25519::fixture);
416 byzantine_proposer(secp256r1::fixture);
417 }
418
419 fn unclean_byzantine_shutdown<S, F>(fixture: F)
420 where
421 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
422 F: Fn(&mut StdRng, &[u8], u32) -> Fixture<S>,
423 {
424 let num_validators = 4;
426 let target_height = Height::new(200); let min_shutdowns = 4; let max_shutdowns = 10; let shutdown_range_min = Duration::from_millis(100);
430 let shutdown_range_max = Duration::from_millis(1_000);
431 let rebroadcast_timeout = NonZeroDuration::new_panic(Duration::from_millis(20));
432
433 let mut prev_checkpoint = None;
434
435 let mut rng = test_rng();
437 let fixture = fixture(&mut rng, TEST_NAMESPACE, num_validators);
438
439 let mut shutdown_count = 0;
441 while shutdown_count < max_shutdowns {
442 let fixture = fixture.clone();
443 let f = move |mut context: Context| {
444 async move {
445 let epoch = Epoch::new(111);
446
447 let (oracle, mut registrations) =
448 initialize_simulation(context.child("simulation"), &fixture, RELIABLE_LINK)
449 .await;
450
451 let (reporter, mut reporter_mailbox) =
455 mocks::Reporter::new(context.child("reporter"), fixture.verifier.clone());
456 reporter.start();
457
458 for (idx, participant) in fixture.participants.iter().enumerate() {
460 let validator_context = context
461 .child("participant")
462 .with_attribute("public_key", participant);
463
464 let provider = mocks::Provider::new();
466 assert!(provider.register(epoch, fixture.schemes[idx].clone()));
467
468 let monitor = mocks::Monitor::new(epoch);
470
471 let strategy = if idx == 0 {
473 mocks::Strategy::Incorrect
474 } else {
475 mocks::Strategy::Correct
476 };
477 let automaton = mocks::Application::new(strategy);
478
479 let blocker = oracle.control(participant.clone());
481
482 let engine = Engine::new(
484 validator_context.child("engine"),
485 Config {
486 monitor,
487 provider,
488 automaton,
489 reporter: reporter_mailbox.clone(),
490 blocker,
491 priority_acks: false,
492 rebroadcast_timeout,
493 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
494 window: std::num::NonZeroU64::new(10).unwrap(),
495 activity_timeout: HeightDelta::new(1_024), journal_partition: format!("unclean_shutdown_test_{participant}"),
497 journal_write_buffer: NZUsize!(4096),
498 journal_replay_buffer: NZUsize!(4096),
499 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
500 journal_compression: Some(3),
501 journal_page_cache: CacheRef::from_pooler(
502 &context,
503 PAGE_SIZE,
504 PAGE_CACHE_SIZE,
505 ),
506 strategy: Sequential,
507 },
508 );
509
510 let (sender, receiver) = registrations.remove(participant).unwrap();
511 engine.start((sender, receiver));
512 }
513
514 let completion =
516 context
517 .child("completion_watcher")
518 .spawn(move |context| async move {
519 loop {
520 if let Some(tip_height) =
521 reporter_mailbox.get_contiguous_tip().await
522 {
523 if tip_height >= target_height {
524 break;
525 }
526 }
527 context.sleep(Duration::from_millis(50)).await;
528 }
529 });
530
531 let shutdown_wait = context.gen_range(shutdown_range_min..shutdown_range_max);
533 select! {
534 _ = context.sleep(shutdown_wait) => {
535 debug!(shutdown_wait = ?shutdown_wait, "Simulating unclean shutdown");
536 false },
538 _ = completion => {
539 debug!("Shared reporter completed normally");
540 true },
542 }
543 }
544 };
545
546 let (complete, checkpoint) = prev_checkpoint
547 .map_or_else(
548 || {
549 debug!("Starting initial run");
550 deterministic::Runner::timed(Duration::from_secs(45))
551 },
552 |prev_checkpoint| {
553 debug!(shutdown_count, "Restarting from previous context");
554 deterministic::Runner::from(prev_checkpoint)
555 },
556 )
557 .start_and_recover(f);
558
559 if complete && shutdown_count >= min_shutdowns {
560 debug!("Test completed successfully");
561 break;
562 }
563
564 prev_checkpoint = Some(checkpoint);
565 shutdown_count += 1;
566 }
567 }
568
569 #[test_group("slow")]
570 #[test_traced("INFO")]
571 fn test_unclean_byzantine_shutdown() {
572 unclean_byzantine_shutdown(bls12381_threshold::fixture::<MinPk, _>);
573 unclean_byzantine_shutdown(bls12381_threshold::fixture::<MinSig, _>);
574 unclean_byzantine_shutdown(bls12381_multisig::fixture::<MinPk, _>);
575 unclean_byzantine_shutdown(bls12381_multisig::fixture::<MinSig, _>);
576 unclean_byzantine_shutdown(ed25519::fixture);
577 unclean_byzantine_shutdown(secp256r1::fixture);
578 }
579
580 fn unclean_shutdown_with_unsigned_height<S, F>(fixture: F)
581 where
582 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
583 F: Fn(&mut StdRng, &[u8], u32) -> Fixture<S>,
584 {
585 let num_validators = 4;
587 let skip_height = Height::new(50); let window = HeightDelta::new(10);
589 let target_height = Height::new(100);
590
591 let mut rng = test_rng();
593 let fixture = fixture(&mut rng, TEST_NAMESPACE, num_validators);
594
595 let f = |context: Context| {
597 let fixture = fixture.clone();
598 async move {
599 let epoch = Epoch::new(111);
600
601 let (oracle, mut registrations) =
603 initialize_simulation(context.child("simulation"), &fixture, RELIABLE_LINK)
604 .await;
605
606 let (reporter, mut reporter_mailbox) =
608 mocks::Reporter::new(context.child("reporter"), fixture.verifier.clone());
609 reporter.start();
610
611 for (idx, participant) in fixture.participants.iter().enumerate() {
613 let validator_context = context
614 .child("participant")
615 .with_attribute("public_key", participant);
616
617 let provider = mocks::Provider::new();
619 assert!(provider.register(epoch, fixture.schemes[idx].clone()));
620
621 let monitor = mocks::Monitor::new(epoch);
623
624 let automaton = mocks::Application::new(mocks::Strategy::Skip {
626 height: skip_height,
627 });
628
629 let blocker = oracle.control(participant.clone());
631
632 let engine = Engine::new(
634 validator_context.child("engine"),
635 Config {
636 monitor,
637 provider,
638 automaton,
639 reporter: reporter_mailbox.clone(),
640 blocker,
641 priority_acks: false,
642 rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
643 100,
644 )),
645 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
646 window: std::num::NonZeroU64::new(window.get()).unwrap(),
647 activity_timeout: HeightDelta::new(100),
648 journal_partition: format!("unsigned_height_test_{participant}"),
649 journal_write_buffer: NZUsize!(4096),
650 journal_replay_buffer: NZUsize!(4096),
651 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
652 journal_compression: Some(3),
653 journal_page_cache: CacheRef::from_pooler(
654 &context,
655 PAGE_SIZE,
656 PAGE_CACHE_SIZE,
657 ),
658 strategy: Sequential,
659 },
660 );
661
662 let (sender, receiver) = registrations.remove(participant).unwrap();
663 engine.start((sender, receiver));
664 }
665
666 loop {
668 if let Some((tip_height, _)) = reporter_mailbox.get_tip().await {
669 debug!(%tip_height, %skip_height, %target_height, "reporter status");
670 if tip_height >= skip_height.saturating_add(window).previous().unwrap() {
671 return;
673 }
674 }
675 context.sleep(Duration::from_millis(50)).await;
676 }
677 }
678 };
679
680 let (_, checkpoint) =
681 deterministic::Runner::timed(Duration::from_secs(60)).start_and_recover(f);
682
683 let f2 = |context: Context| {
685 async move {
686 let epoch = Epoch::new(111);
687
688 let (oracle, mut registrations) =
690 initialize_simulation(context.child("simulation"), &fixture, RELIABLE_LINK)
691 .await;
692
693 let (reporter, mut reporter_mailbox) =
695 mocks::Reporter::new(context.child("reporter"), fixture.verifier.clone());
696 reporter.start();
697
698 for (idx, participant) in fixture.participants.iter().enumerate() {
700 let validator_context = context
701 .child("participant")
702 .with_attribute("public_key", participant);
703
704 let provider = mocks::Provider::new();
706 assert!(provider.register(epoch, fixture.schemes[idx].clone()));
707
708 let monitor = mocks::Monitor::new(epoch);
710
711 let automaton = mocks::Application::new(mocks::Strategy::Correct);
713
714 let blocker = oracle.control(participant.clone());
716
717 let engine = Engine::new(
719 validator_context.child("engine"),
720 Config {
721 monitor,
722 provider,
723 automaton,
724 reporter: reporter_mailbox.clone(),
725 blocker,
726 priority_acks: false,
727 rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
728 100,
729 )),
730 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
731 window: std::num::NonZeroU64::new(10).unwrap(),
732 activity_timeout: HeightDelta::new(100),
733 journal_partition: format!("unsigned_height_test_{participant}"),
734 journal_write_buffer: NZUsize!(4096),
735 journal_replay_buffer: NZUsize!(4096),
736 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
737 journal_compression: Some(3),
738 journal_page_cache: CacheRef::from_pooler(
739 &context,
740 PAGE_SIZE,
741 PAGE_CACHE_SIZE,
742 ),
743 strategy: Sequential,
744 },
745 );
746
747 let (sender, receiver) = registrations.remove(participant).unwrap();
748 engine.start((sender, receiver));
749 }
750
751 loop {
753 if let Some(tip_height) = reporter_mailbox.get_contiguous_tip().await {
754 debug!(
755 %tip_height,
756 %skip_height, %target_height, "reporter status on restart"
757 );
758 if tip_height >= target_height {
759 break;
760 }
761 }
762 context.sleep(Duration::from_millis(50)).await;
763 }
764 }
765 };
766
767 deterministic::Runner::from(checkpoint).start(f2);
768 }
769
770 #[test_group("slow")]
771 #[test_traced("INFO")]
772 fn test_unclean_shutdown_with_unsigned_height() {
773 unclean_shutdown_with_unsigned_height(bls12381_threshold::fixture::<MinPk, _>);
774 unclean_shutdown_with_unsigned_height(bls12381_threshold::fixture::<MinSig, _>);
775 unclean_shutdown_with_unsigned_height(bls12381_multisig::fixture::<MinPk, _>);
776 unclean_shutdown_with_unsigned_height(bls12381_multisig::fixture::<MinSig, _>);
777 unclean_shutdown_with_unsigned_height(ed25519::fixture);
778 unclean_shutdown_with_unsigned_height(secp256r1::fixture);
779 }
780
781 fn slow_and_lossy_links<S, F>(fixture: F, seed: u64) -> String
782 where
783 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
784 F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
785 {
786 let cfg = deterministic::Config::new()
787 .with_seed(seed)
788 .with_timeout(Some(Duration::from_secs(120)));
789 let runner = deterministic::Runner::new(cfg);
790
791 runner.start(|mut context| async move {
792 let num_validators = 4;
793 let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
794 let epoch = Epoch::new(111);
795
796 let degraded_link = Link {
798 latency: Duration::from_millis(200),
799 jitter: Duration::from_millis(150),
800 success_rate: 0.5,
801 };
802
803 let (mut oracle, mut registrations) =
804 initialize_simulation(context.child("simulation"), &fixture, degraded_link).await;
805
806 let reporters = spawn_validator_engines(
807 context.child("validator"),
808 &fixture,
809 &mut registrations,
810 &mut oracle,
811 epoch,
812 Duration::from_secs(2),
813 vec![],
814 );
815
816 await_reporters(
817 context.child("reporter"),
818 &reporters,
819 Height::new(100),
820 epoch,
821 )
822 .await;
823
824 context.auditor().state()
825 })
826 }
827
828 #[test_group("slow")]
829 #[test_traced("INFO")]
830 fn test_slow_and_lossy_links() {
831 slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, 0);
832 slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, 0);
833 slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, 0);
834 slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, 0);
835 slow_and_lossy_links(ed25519::fixture, 0);
836 slow_and_lossy_links(secp256r1::fixture, 0);
837 }
838
839 #[test_group("slow")]
840 #[test_traced("INFO")]
841 fn test_determinism() {
842 for seed in 1..6 {
845 let ts_pk_state_1 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
847 let ts_pk_state_2 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
848 assert_eq!(ts_pk_state_1, ts_pk_state_2);
849
850 let ts_sig_state_1 =
852 slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
853 let ts_sig_state_2 =
854 slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
855 assert_eq!(ts_sig_state_1, ts_sig_state_2);
856
857 let ms_pk_state_1 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
859 let ms_pk_state_2 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
860 assert_eq!(ms_pk_state_1, ms_pk_state_2);
861
862 let ms_sig_state_1 =
864 slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
865 let ms_sig_state_2 =
866 slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
867 assert_eq!(ms_sig_state_1, ms_sig_state_2);
868
869 let ed_state_1 = slow_and_lossy_links(ed25519::fixture, seed);
871 let ed_state_2 = slow_and_lossy_links(ed25519::fixture, seed);
872 assert_eq!(ed_state_1, ed_state_2);
873
874 let secp_state_1 = slow_and_lossy_links(secp256r1::fixture, seed);
876 let secp_state_2 = slow_and_lossy_links(secp256r1::fixture, seed);
877 assert_eq!(secp_state_1, secp_state_2);
878
879 let states = [
880 ("threshold-minpk", ts_pk_state_1),
881 ("threshold-minsig", ts_sig_state_1),
882 ("multisig-minpk", ms_pk_state_1),
883 ("multisig-minsig", ms_sig_state_1),
884 ("ed25519", ed_state_1),
885 ("secp256r1", secp_state_1),
886 ];
887
888 for pair in states.windows(2) {
890 assert_ne!(
891 pair[0].1, pair[1].1,
892 "state {} equals state {}",
893 pair[0].0, pair[1].0
894 );
895 }
896 }
897 }
898
899 fn one_offline<S, F>(fixture: F)
900 where
901 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
902 F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
903 {
904 let runner = deterministic::Runner::timed(Duration::from_secs(30));
905
906 runner.start(|mut context| async move {
907 let num_validators = 5;
908 let mut fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
909 let epoch = Epoch::new(111);
910
911 fixture.participants.truncate(4);
913 fixture.schemes.truncate(4);
914
915 let (mut oracle, mut registrations) =
916 initialize_simulation(context.child("simulation"), &fixture, RELIABLE_LINK).await;
917
918 let reporters = spawn_validator_engines(
919 context.child("validator"),
920 &fixture,
921 &mut registrations,
922 &mut oracle,
923 epoch,
924 Duration::from_secs(5),
925 vec![],
926 );
927
928 await_reporters(
929 context.child("reporter"),
930 &reporters,
931 Height::new(100),
932 epoch,
933 )
934 .await;
935 });
936 }
937
938 #[test_group("slow")]
939 #[test_traced("INFO")]
940 fn test_one_offline() {
941 one_offline(bls12381_threshold::fixture::<MinPk, _>);
942 one_offline(bls12381_threshold::fixture::<MinSig, _>);
943 one_offline(bls12381_multisig::fixture::<MinPk, _>);
944 one_offline(bls12381_multisig::fixture::<MinSig, _>);
945 one_offline(ed25519::fixture);
946 one_offline(secp256r1::fixture);
947 }
948
949 fn network_partition<S, F>(fixture: F)
951 where
952 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
953 F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
954 {
955 let runner = deterministic::Runner::timed(Duration::from_secs(60));
956
957 runner.start(|mut context| async move {
958 let num_validators = 4;
959 let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
960 let epoch = Epoch::new(111);
961
962 let (mut oracle, mut registrations) =
963 initialize_simulation(context.child("simulation"), &fixture, RELIABLE_LINK).await;
964
965 let reporters = spawn_validator_engines(
966 context.child("validator"),
967 &fixture,
968 &mut registrations,
969 &mut oracle,
970 epoch,
971 Duration::from_secs(5),
972 vec![],
973 );
974
975 for v1 in fixture.participants.iter() {
977 for v2 in fixture.participants.iter() {
978 if v2 == v1 {
979 continue;
980 }
981 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
982 }
983 }
984 context.sleep(Duration::from_secs(20)).await;
985
986 for v1 in fixture.participants.iter() {
988 for v2 in fixture.participants.iter() {
989 if v2 == v1 {
990 continue;
991 }
992 oracle
993 .add_link(v1.clone(), v2.clone(), RELIABLE_LINK)
994 .await
995 .unwrap();
996 }
997 }
998
999 await_reporters(
1000 context.child("reporter"),
1001 &reporters,
1002 Height::new(100),
1003 epoch,
1004 )
1005 .await;
1006 });
1007 }
1008
1009 #[test_traced("INFO")]
1010 fn test_network_partition() {
1011 network_partition(bls12381_threshold::fixture::<MinPk, _>);
1012 network_partition(bls12381_threshold::fixture::<MinSig, _>);
1013 network_partition(bls12381_multisig::fixture::<MinPk, _>);
1014 network_partition(bls12381_multisig::fixture::<MinSig, _>);
1015 network_partition(ed25519::fixture);
1016 network_partition(secp256r1::fixture);
1017 }
1018
1019 fn insufficient_validators<S, F>(fixture: F)
1021 where
1022 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1023 F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1024 {
1025 let runner = deterministic::Runner::timed(Duration::from_secs(15));
1026
1027 runner.start(|mut context| async move {
1028 let num_validators = 5;
1029 let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
1030 let epoch = Epoch::new(111);
1031
1032 let (oracle, mut registrations) =
1034 initialize_simulation(context.child("simulation"), &fixture, RELIABLE_LINK)
1035 .await;
1036
1037 let mut reporters =
1039 BTreeMap::<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>>::new();
1040
1041 for (idx, participant) in fixture.participants.iter().take(2).enumerate() {
1043 let context = context.child("participant").with_attribute("public_key", participant);
1044
1045 let provider = mocks::Provider::new();
1047 assert!(provider.register(epoch, fixture.schemes[idx].clone()));
1048
1049 let monitor = mocks::Monitor::new(epoch);
1051
1052 let automaton = mocks::Application::new(mocks::Strategy::Correct);
1054
1055 let (reporter, reporter_mailbox) =
1057 mocks::Reporter::new(context.child("reporter"), fixture.verifier.clone());
1058 reporter.start();
1059 reporters.insert(participant.clone(), reporter_mailbox.clone());
1060
1061 let blocker = oracle.control(participant.clone());
1063
1064 let engine = Engine::new(
1066 context.child("engine"),
1067 Config {
1068 monitor,
1069 provider,
1070 automaton,
1071 reporter: reporter_mailbox,
1072 blocker,
1073 priority_acks: false,
1074 rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_secs(3)),
1075 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
1076 window: std::num::NonZeroU64::new(10).unwrap(),
1077 activity_timeout: HeightDelta::new(100),
1078 journal_partition: format!("aggregation-{participant}"),
1079 journal_write_buffer: NZUsize!(4096),
1080 journal_replay_buffer: NZUsize!(4096),
1081 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
1082 journal_compression: Some(3),
1083 journal_page_cache: CacheRef::from_pooler(
1084 &context,
1085 PAGE_SIZE,
1086 PAGE_CACHE_SIZE,
1087 ),
1088 strategy: Sequential,
1089 },
1090 );
1091
1092 let (sender, receiver) = registrations.remove(participant).unwrap();
1093 engine.start((sender, receiver));
1094 }
1095
1096 context.sleep(Duration::from_secs(12)).await;
1099
1100 let mut any_consensus = false;
1102 for (validator_pk, mut reporter_mailbox) in reporters {
1103 let (tip, _) = reporter_mailbox
1104 .get_tip()
1105 .await
1106 .unwrap_or((Height::zero(), Epoch::zero()));
1107 if !tip.is_zero() {
1108 any_consensus = true;
1109 tracing::warn!(
1110 ?validator_pk,
1111 %tip,
1112 "Unexpected consensus with insufficient validators"
1113 );
1114 }
1115 }
1116
1117 assert!(
1119 !any_consensus,
1120 "Consensus should not be achieved with insufficient validator participation (below quorum)"
1121 );
1122 });
1123 }
1124
1125 #[test_traced("INFO")]
1126 fn test_insufficient_validators() {
1127 insufficient_validators(bls12381_threshold::fixture::<MinPk, _>);
1128 insufficient_validators(bls12381_threshold::fixture::<MinSig, _>);
1129 insufficient_validators(bls12381_multisig::fixture::<MinPk, _>);
1130 insufficient_validators(bls12381_multisig::fixture::<MinSig, _>);
1131 insufficient_validators(ed25519::fixture);
1132 insufficient_validators(secp256r1::fixture);
1133 }
1134}