1pub mod scheme;
70pub mod types;
71
72cfg_if::cfg_if! {
73 if #[cfg(not(target_arch = "wasm32"))] {
74 mod config;
75 pub use config::Config;
76 mod engine;
77 pub use engine::Engine;
78 mod metrics;
79 mod safe_tip;
80
81 #[cfg(test)]
82 pub mod mocks;
83 }
84}
85
86#[cfg(test)]
87mod tests {
88 use super::{mocks, Config, Engine};
89 use crate::{
90 aggregation::scheme::{bls12381_multisig, bls12381_threshold, ed25519, secp256r1, Scheme},
91 types::{Epoch, EpochDelta, Height, HeightDelta},
92 };
93 use commonware_cryptography::{
94 bls12381::primitives::variant::{MinPk, MinSig},
95 certificate::mocks::Fixture,
96 ed25519::PublicKey,
97 sha256::Digest as Sha256Digest,
98 };
99 use commonware_macros::{select, test_group, test_traced};
100 use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
101 use commonware_parallel::Sequential;
102 use commonware_runtime::{
103 buffer::paged::CacheRef,
104 deterministic::{self, Context},
105 Clock, Metrics, Quota, Runner, Spawner,
106 };
107 use commonware_utils::{
108 channel::{fallible::OneshotExt, oneshot},
109 test_rng, NZUsize, NonZeroDuration, NZU16,
110 };
111 use futures::future::join_all;
112 use rand::{rngs::StdRng, Rng};
113 use std::{
114 collections::BTreeMap,
115 num::{NonZeroU16, NonZeroU32, NonZeroUsize},
116 time::Duration,
117 };
118 use tracing::debug;
119
120 type Registrations<P> = BTreeMap<P, (Sender<P, deterministic::Context>, Receiver<P>)>;
121
122 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
123 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
124 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
125 const TEST_NAMESPACE: &[u8] = b"my testing namespace";
126
127 const RELIABLE_LINK: Link = Link {
129 latency: Duration::from_millis(10),
130 jitter: Duration::from_millis(1),
131 success_rate: 1.0,
132 };
133
134 async fn register_participants(
136 oracle: &mut Oracle<PublicKey, deterministic::Context>,
137 participants: &[PublicKey],
138 ) -> Registrations<PublicKey> {
139 let mut registrations = BTreeMap::new();
140 for participant in participants.iter() {
141 let (sender, receiver) = oracle
142 .control(participant.clone())
143 .register(0, TEST_QUOTA)
144 .await
145 .unwrap();
146 registrations.insert(participant.clone(), (sender, receiver));
147 }
148 registrations
149 }
150
151 async fn link_participants(
153 oracle: &mut Oracle<PublicKey, deterministic::Context>,
154 participants: &[PublicKey],
155 link: Link,
156 ) {
157 for v1 in participants.iter() {
158 for v2 in participants.iter() {
159 if v2 == v1 {
160 continue;
161 }
162 oracle
163 .add_link(v1.clone(), v2.clone(), link.clone())
164 .await
165 .unwrap();
166 }
167 }
168 }
169
170 async fn initialize_simulation<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
172 context: Context,
173 fixture: &Fixture<S>,
174 link: Link,
175 ) -> (
176 Oracle<PublicKey, deterministic::Context>,
177 Registrations<PublicKey>,
178 ) {
179 let (network, mut oracle) = Network::new(
180 context.with_label("network"),
181 commonware_p2p::simulated::Config {
182 max_size: 1024 * 1024,
183 disconnect_on_block: true,
184 tracked_peer_sets: None,
185 },
186 );
187 network.start();
188
189 let registrations = register_participants(&mut oracle, &fixture.participants).await;
190 link_participants(&mut oracle, &fixture.participants, link).await;
191
192 (oracle, registrations)
193 }
194
195 fn spawn_validator_engines<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
197 context: Context,
198 fixture: &Fixture<S>,
199 registrations: &mut Registrations<PublicKey>,
200 oracle: &mut Oracle<PublicKey, deterministic::Context>,
201 epoch: Epoch,
202 rebroadcast_timeout: Duration,
203 incorrect: Vec<usize>,
204 ) -> BTreeMap<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>> {
205 let mut reporters = BTreeMap::new();
206
207 for (idx, participant) in fixture.participants.iter().enumerate() {
208 let context = context.with_label(&format!("participant_{participant}"));
209
210 let provider = mocks::Provider::new();
212 assert!(provider.register(epoch, fixture.schemes[idx].clone()));
213
214 let monitor = mocks::Monitor::new(epoch);
216
217 let strategy = if incorrect.contains(&idx) {
219 mocks::Strategy::Incorrect
220 } else {
221 mocks::Strategy::Correct
222 };
223 let automaton = mocks::Application::new(strategy);
224
225 let (reporter, reporter_mailbox) =
227 mocks::Reporter::new(context.clone(), fixture.verifier.clone());
228 context.with_label("reporter").spawn(|_| reporter.run());
229 reporters.insert(participant.clone(), reporter_mailbox.clone());
230
231 let blocker = oracle.control(participant.clone());
233
234 let engine = Engine::new(
236 context.with_label("engine"),
237 Config {
238 monitor,
239 provider,
240 automaton,
241 reporter: reporter_mailbox,
242 blocker,
243 priority_acks: false,
244 rebroadcast_timeout: NonZeroDuration::new_panic(rebroadcast_timeout),
245 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
246 window: std::num::NonZeroU64::new(10).unwrap(),
247 activity_timeout: HeightDelta::new(100),
248 journal_partition: format!("aggregation-{participant}"),
249 journal_write_buffer: NZUsize!(4096),
250 journal_replay_buffer: NZUsize!(4096),
251 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
252 journal_compression: Some(3),
253 journal_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
254 strategy: Sequential,
255 },
256 );
257
258 let (sender, receiver) = registrations.remove(participant).unwrap();
259 engine.start((sender, receiver));
260 }
261
262 reporters
263 }
264
265 async fn await_reporters<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
267 context: Context,
268 reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>>,
269 threshold_height: Height,
270 threshold_epoch: Epoch,
271 ) {
272 let mut receivers = Vec::new();
273 for (reporter, mailbox) in reporters.iter() {
274 let (tx, rx) = oneshot::channel();
276 receivers.push(rx);
277
278 context.with_label("reporter_watcher").spawn({
279 let reporter = reporter.clone();
280 let mut mailbox = mailbox.clone();
281 move |context| async move {
282 loop {
283 let (height, epoch) = mailbox
284 .get_tip()
285 .await
286 .unwrap_or((Height::zero(), Epoch::zero()));
287 debug!(
288 %height,
289 epoch = %epoch,
290 %threshold_height,
291 threshold_epoch = %threshold_epoch,
292 ?reporter,
293 "reporter status"
294 );
295 if height >= threshold_height && epoch >= threshold_epoch {
296 debug!(
297 ?reporter,
298 "reporter reached threshold, signaling completion"
299 );
300 tx.send_lossy(reporter.clone());
301 break;
302 }
303 context.sleep(Duration::from_millis(100)).await;
304 }
305 }
306 });
307 }
308
309 let results = join_all(receivers).await;
311 assert_eq!(results.len(), reporters.len());
312
313 for result in results {
315 assert!(result.is_ok(), "reporter was cancelled");
316 }
317 }
318
319 fn all_online<S, F>(fixture: F)
321 where
322 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
323 F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
324 {
325 let runner = deterministic::Runner::timed(Duration::from_secs(30));
326
327 runner.start(|mut context| async move {
328 let num_validators = 4;
329 let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
330 let epoch = Epoch::new(111);
331
332 let (mut oracle, mut registrations) =
333 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
334 .await;
335
336 let reporters = spawn_validator_engines(
337 context.with_label("validator"),
338 &fixture,
339 &mut registrations,
340 &mut oracle,
341 epoch,
342 Duration::from_secs(5),
343 vec![],
344 );
345
346 await_reporters(
347 context.with_label("reporter"),
348 &reporters,
349 Height::new(100),
350 epoch,
351 )
352 .await;
353 });
354 }
355
356 #[test_traced("INFO")]
357 fn test_all_online() {
358 all_online(bls12381_threshold::fixture::<MinPk, _>);
359 all_online(bls12381_threshold::fixture::<MinSig, _>);
360 all_online(bls12381_multisig::fixture::<MinPk, _>);
361 all_online(bls12381_multisig::fixture::<MinSig, _>);
362 all_online(ed25519::fixture);
363 all_online(secp256r1::fixture);
364 }
365
366 fn byzantine_proposer<S, F>(fixture: F)
368 where
369 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
370 F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
371 {
372 let runner = deterministic::Runner::timed(Duration::from_secs(30));
373
374 runner.start(|mut context| async move {
375 let num_validators = 4;
376 let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
377 let epoch = Epoch::new(111);
378
379 let (mut oracle, mut registrations) =
380 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
381 .await;
382
383 let reporters = spawn_validator_engines(
384 context.with_label("validator"),
385 &fixture,
386 &mut registrations,
387 &mut oracle,
388 epoch,
389 Duration::from_secs(5),
390 vec![0],
391 );
392
393 await_reporters(
394 context.with_label("reporter"),
395 &reporters,
396 Height::new(100),
397 epoch,
398 )
399 .await;
400 });
401 }
402
403 #[test_traced("INFO")]
404 fn test_byzantine_proposer() {
405 byzantine_proposer(bls12381_threshold::fixture::<MinPk, _>);
406 byzantine_proposer(bls12381_threshold::fixture::<MinSig, _>);
407 byzantine_proposer(bls12381_multisig::fixture::<MinPk, _>);
408 byzantine_proposer(bls12381_multisig::fixture::<MinSig, _>);
409 byzantine_proposer(ed25519::fixture);
410 byzantine_proposer(secp256r1::fixture);
411 }
412
413 fn unclean_byzantine_shutdown<S, F>(fixture: F)
414 where
415 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
416 F: Fn(&mut StdRng, &[u8], u32) -> Fixture<S>,
417 {
418 let num_validators = 4;
420 let target_height = Height::new(200); let min_shutdowns = 4; let max_shutdowns = 10; let shutdown_range_min = Duration::from_millis(100);
424 let shutdown_range_max = Duration::from_millis(1_000);
425 let rebroadcast_timeout = NonZeroDuration::new_panic(Duration::from_millis(20));
426
427 let mut prev_checkpoint = None;
428
429 let mut rng = test_rng();
431 let fixture = fixture(&mut rng, TEST_NAMESPACE, num_validators);
432
433 let mut shutdown_count = 0;
435 while shutdown_count < max_shutdowns {
436 let fixture = fixture.clone();
437 let f = move |mut context: Context| {
438 async move {
439 let epoch = Epoch::new(111);
440
441 let (oracle, mut registrations) = initialize_simulation(
442 context.with_label("simulation"),
443 &fixture,
444 RELIABLE_LINK,
445 )
446 .await;
447
448 let (reporter, mut reporter_mailbox) =
452 mocks::Reporter::new(context.clone(), fixture.verifier.clone());
453 context.with_label("reporter").spawn(|_| reporter.run());
454
455 for (idx, participant) in fixture.participants.iter().enumerate() {
457 let validator_context =
458 context.with_label(&format!("participant_{participant}"));
459
460 let provider = mocks::Provider::new();
462 assert!(provider.register(epoch, fixture.schemes[idx].clone()));
463
464 let monitor = mocks::Monitor::new(epoch);
466
467 let strategy = if idx == 0 {
469 mocks::Strategy::Incorrect
470 } else {
471 mocks::Strategy::Correct
472 };
473 let automaton = mocks::Application::new(strategy);
474
475 let blocker = oracle.control(participant.clone());
477
478 let engine = Engine::new(
480 validator_context.with_label("engine"),
481 Config {
482 monitor,
483 provider,
484 automaton,
485 reporter: reporter_mailbox.clone(),
486 blocker,
487 priority_acks: false,
488 rebroadcast_timeout,
489 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
490 window: std::num::NonZeroU64::new(10).unwrap(),
491 activity_timeout: HeightDelta::new(1_024), journal_partition: format!("unclean_shutdown_test_{participant}"),
493 journal_write_buffer: NZUsize!(4096),
494 journal_replay_buffer: NZUsize!(4096),
495 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
496 journal_compression: Some(3),
497 journal_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
498 strategy: Sequential,
499 },
500 );
501
502 let (sender, receiver) = registrations.remove(participant).unwrap();
503 engine.start((sender, receiver));
504 }
505
506 let completion =
508 context
509 .with_label("completion_watcher")
510 .spawn(move |context| async move {
511 loop {
512 if let Some(tip_height) =
513 reporter_mailbox.get_contiguous_tip().await
514 {
515 if tip_height >= target_height {
516 break;
517 }
518 }
519 context.sleep(Duration::from_millis(50)).await;
520 }
521 });
522
523 let shutdown_wait = context.gen_range(shutdown_range_min..shutdown_range_max);
525 select! {
526 _ = context.sleep(shutdown_wait) => {
527 debug!(shutdown_wait = ?shutdown_wait, "Simulating unclean shutdown");
528 false },
530 _ = completion => {
531 debug!("Shared reporter completed normally");
532 true },
534 }
535 }
536 };
537
538 let (complete, checkpoint) = prev_checkpoint
539 .map_or_else(
540 || {
541 debug!("Starting initial run");
542 deterministic::Runner::timed(Duration::from_secs(45))
543 },
544 |prev_checkpoint| {
545 debug!(shutdown_count, "Restarting from previous context");
546 deterministic::Runner::from(prev_checkpoint)
547 },
548 )
549 .start_and_recover(f);
550
551 if complete && shutdown_count >= min_shutdowns {
552 debug!("Test completed successfully");
553 break;
554 }
555
556 prev_checkpoint = Some(checkpoint);
557 shutdown_count += 1;
558 }
559 }
560
561 #[test_traced("INFO")]
562 fn test_unclean_byzantine_shutdown() {
563 unclean_byzantine_shutdown(bls12381_threshold::fixture::<MinPk, _>);
564 unclean_byzantine_shutdown(bls12381_threshold::fixture::<MinSig, _>);
565 unclean_byzantine_shutdown(bls12381_multisig::fixture::<MinPk, _>);
566 unclean_byzantine_shutdown(bls12381_multisig::fixture::<MinSig, _>);
567 unclean_byzantine_shutdown(ed25519::fixture);
568 unclean_byzantine_shutdown(secp256r1::fixture);
569 }
570
571 fn unclean_shutdown_with_unsigned_height<S, F>(fixture: F)
572 where
573 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
574 F: Fn(&mut StdRng, &[u8], u32) -> Fixture<S>,
575 {
576 let num_validators = 4;
578 let skip_height = Height::new(50); let window = HeightDelta::new(10);
580 let target_height = Height::new(100);
581
582 let mut rng = test_rng();
584 let fixture = fixture(&mut rng, TEST_NAMESPACE, num_validators);
585
586 let f = |context: Context| {
588 let fixture = fixture.clone();
589 async move {
590 let epoch = Epoch::new(111);
591
592 let (oracle, mut registrations) = initialize_simulation(
594 context.with_label("simulation"),
595 &fixture,
596 RELIABLE_LINK,
597 )
598 .await;
599
600 let (reporter, mut reporter_mailbox) =
602 mocks::Reporter::new(context.clone(), fixture.verifier.clone());
603 context.with_label("reporter").spawn(|_| reporter.run());
604
605 for (idx, participant) in fixture.participants.iter().enumerate() {
607 let validator_context =
608 context.with_label(&format!("participant_{participant}"));
609
610 let provider = mocks::Provider::new();
612 assert!(provider.register(epoch, fixture.schemes[idx].clone()));
613
614 let monitor = mocks::Monitor::new(epoch);
616
617 let automaton = mocks::Application::new(mocks::Strategy::Skip {
619 height: skip_height,
620 });
621
622 let blocker = oracle.control(participant.clone());
624
625 let engine = Engine::new(
627 validator_context.with_label("engine"),
628 Config {
629 monitor,
630 provider,
631 automaton,
632 reporter: reporter_mailbox.clone(),
633 blocker,
634 priority_acks: false,
635 rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
636 100,
637 )),
638 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
639 window: std::num::NonZeroU64::new(window.get()).unwrap(),
640 activity_timeout: HeightDelta::new(100),
641 journal_partition: format!("unsigned_height_test_{participant}"),
642 journal_write_buffer: NZUsize!(4096),
643 journal_replay_buffer: NZUsize!(4096),
644 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
645 journal_compression: Some(3),
646 journal_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
647 strategy: Sequential,
648 },
649 );
650
651 let (sender, receiver) = registrations.remove(participant).unwrap();
652 engine.start((sender, receiver));
653 }
654
655 loop {
657 if let Some((tip_height, _)) = reporter_mailbox.get_tip().await {
658 debug!(%tip_height, %skip_height, %target_height, "reporter status");
659 if tip_height >= skip_height.saturating_add(window).previous().unwrap() {
660 return;
662 }
663 }
664 context.sleep(Duration::from_millis(50)).await;
665 }
666 }
667 };
668
669 let (_, checkpoint) =
670 deterministic::Runner::timed(Duration::from_secs(60)).start_and_recover(f);
671
672 let f2 = |context: Context| {
674 async move {
675 let epoch = Epoch::new(111);
676
677 let (oracle, mut registrations) = initialize_simulation(
679 context.with_label("simulation"),
680 &fixture,
681 RELIABLE_LINK,
682 )
683 .await;
684
685 let (reporter, mut reporter_mailbox) =
687 mocks::Reporter::new(context.clone(), fixture.verifier.clone());
688 context.with_label("reporter").spawn(|_| reporter.run());
689
690 for (idx, participant) in fixture.participants.iter().enumerate() {
692 let validator_context =
693 context.with_label(&format!("participant_{participant}"));
694
695 let provider = mocks::Provider::new();
697 assert!(provider.register(epoch, fixture.schemes[idx].clone()));
698
699 let monitor = mocks::Monitor::new(epoch);
701
702 let automaton = mocks::Application::new(mocks::Strategy::Correct);
704
705 let blocker = oracle.control(participant.clone());
707
708 let engine = Engine::new(
710 validator_context.with_label("engine"),
711 Config {
712 monitor,
713 provider,
714 automaton,
715 reporter: reporter_mailbox.clone(),
716 blocker,
717 priority_acks: false,
718 rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
719 100,
720 )),
721 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
722 window: std::num::NonZeroU64::new(10).unwrap(),
723 activity_timeout: HeightDelta::new(100),
724 journal_partition: format!("unsigned_height_test_{participant}"),
725 journal_write_buffer: NZUsize!(4096),
726 journal_replay_buffer: NZUsize!(4096),
727 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
728 journal_compression: Some(3),
729 journal_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
730 strategy: Sequential,
731 },
732 );
733
734 let (sender, receiver) = registrations.remove(participant).unwrap();
735 engine.start((sender, receiver));
736 }
737
738 loop {
740 if let Some(tip_height) = reporter_mailbox.get_contiguous_tip().await {
741 debug!(
742 %tip_height,
743 %skip_height, %target_height, "reporter status on restart"
744 );
745 if tip_height >= target_height {
746 break;
747 }
748 }
749 context.sleep(Duration::from_millis(50)).await;
750 }
751 }
752 };
753
754 deterministic::Runner::from(checkpoint).start(f2);
755 }
756
757 #[test_traced("INFO")]
758 fn test_unclean_shutdown_with_unsigned_height() {
759 unclean_shutdown_with_unsigned_height(bls12381_threshold::fixture::<MinPk, _>);
760 unclean_shutdown_with_unsigned_height(bls12381_threshold::fixture::<MinSig, _>);
761 unclean_shutdown_with_unsigned_height(bls12381_multisig::fixture::<MinPk, _>);
762 unclean_shutdown_with_unsigned_height(bls12381_multisig::fixture::<MinSig, _>);
763 unclean_shutdown_with_unsigned_height(ed25519::fixture);
764 unclean_shutdown_with_unsigned_height(secp256r1::fixture);
765 }
766
767 fn slow_and_lossy_links<S, F>(fixture: F, seed: u64) -> String
768 where
769 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
770 F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
771 {
772 let cfg = deterministic::Config::new()
773 .with_seed(seed)
774 .with_timeout(Some(Duration::from_secs(120)));
775 let runner = deterministic::Runner::new(cfg);
776
777 runner.start(|mut context| async move {
778 let num_validators = 4;
779 let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
780 let epoch = Epoch::new(111);
781
782 let degraded_link = Link {
784 latency: Duration::from_millis(200),
785 jitter: Duration::from_millis(150),
786 success_rate: 0.5,
787 };
788
789 let (mut oracle, mut registrations) =
790 initialize_simulation(context.with_label("simulation"), &fixture, degraded_link)
791 .await;
792
793 let reporters = spawn_validator_engines(
794 context.with_label("validator"),
795 &fixture,
796 &mut registrations,
797 &mut oracle,
798 epoch,
799 Duration::from_secs(2),
800 vec![],
801 );
802
803 await_reporters(
804 context.with_label("reporter"),
805 &reporters,
806 Height::new(100),
807 epoch,
808 )
809 .await;
810
811 context.auditor().state()
812 })
813 }
814
815 #[test_traced("INFO")]
816 fn test_slow_and_lossy_links() {
817 slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, 0);
818 slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, 0);
819 slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, 0);
820 slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, 0);
821 slow_and_lossy_links(ed25519::fixture, 0);
822 slow_and_lossy_links(secp256r1::fixture, 0);
823 }
824
825 #[test_group("slow")]
826 #[test_traced("INFO")]
827 fn test_determinism() {
828 for seed in 1..6 {
831 let ts_pk_state_1 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
833 let ts_pk_state_2 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
834 assert_eq!(ts_pk_state_1, ts_pk_state_2);
835
836 let ts_sig_state_1 =
838 slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
839 let ts_sig_state_2 =
840 slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
841 assert_eq!(ts_sig_state_1, ts_sig_state_2);
842
843 let ms_pk_state_1 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
845 let ms_pk_state_2 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
846 assert_eq!(ms_pk_state_1, ms_pk_state_2);
847
848 let ms_sig_state_1 =
850 slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
851 let ms_sig_state_2 =
852 slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
853 assert_eq!(ms_sig_state_1, ms_sig_state_2);
854
855 let ed_state_1 = slow_and_lossy_links(ed25519::fixture, seed);
857 let ed_state_2 = slow_and_lossy_links(ed25519::fixture, seed);
858 assert_eq!(ed_state_1, ed_state_2);
859
860 let secp_state_1 = slow_and_lossy_links(secp256r1::fixture, seed);
862 let secp_state_2 = slow_and_lossy_links(secp256r1::fixture, seed);
863 assert_eq!(secp_state_1, secp_state_2);
864
865 let states = [
866 ("threshold-minpk", ts_pk_state_1),
867 ("threshold-minsig", ts_sig_state_1),
868 ("multisig-minpk", ms_pk_state_1),
869 ("multisig-minsig", ms_sig_state_1),
870 ("ed25519", ed_state_1),
871 ("secp256r1", secp_state_1),
872 ];
873
874 for pair in states.windows(2) {
876 assert_ne!(
877 pair[0].1, pair[1].1,
878 "state {} equals state {}",
879 pair[0].0, pair[1].0
880 );
881 }
882 }
883 }
884
885 fn one_offline<S, F>(fixture: F)
886 where
887 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
888 F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
889 {
890 let runner = deterministic::Runner::timed(Duration::from_secs(30));
891
892 runner.start(|mut context| async move {
893 let num_validators = 5;
894 let mut fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
895 let epoch = Epoch::new(111);
896
897 fixture.participants.truncate(4);
899 fixture.schemes.truncate(4);
900
901 let (mut oracle, mut registrations) =
902 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
903 .await;
904
905 let reporters = spawn_validator_engines(
906 context.with_label("validator"),
907 &fixture,
908 &mut registrations,
909 &mut oracle,
910 epoch,
911 Duration::from_secs(5),
912 vec![],
913 );
914
915 await_reporters(
916 context.with_label("reporter"),
917 &reporters,
918 Height::new(100),
919 epoch,
920 )
921 .await;
922 });
923 }
924
925 #[test_traced("INFO")]
926 fn test_one_offline() {
927 one_offline(bls12381_threshold::fixture::<MinPk, _>);
928 one_offline(bls12381_threshold::fixture::<MinSig, _>);
929 one_offline(bls12381_multisig::fixture::<MinPk, _>);
930 one_offline(bls12381_multisig::fixture::<MinSig, _>);
931 one_offline(ed25519::fixture);
932 one_offline(secp256r1::fixture);
933 }
934
935 fn network_partition<S, F>(fixture: F)
937 where
938 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
939 F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
940 {
941 let runner = deterministic::Runner::timed(Duration::from_secs(60));
942
943 runner.start(|mut context| async move {
944 let num_validators = 4;
945 let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
946 let epoch = Epoch::new(111);
947
948 let (mut oracle, mut registrations) =
949 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
950 .await;
951
952 let reporters = spawn_validator_engines(
953 context.with_label("validator"),
954 &fixture,
955 &mut registrations,
956 &mut oracle,
957 epoch,
958 Duration::from_secs(5),
959 vec![],
960 );
961
962 for v1 in fixture.participants.iter() {
964 for v2 in fixture.participants.iter() {
965 if v2 == v1 {
966 continue;
967 }
968 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
969 }
970 }
971 context.sleep(Duration::from_secs(20)).await;
972
973 for v1 in fixture.participants.iter() {
975 for v2 in fixture.participants.iter() {
976 if v2 == v1 {
977 continue;
978 }
979 oracle
980 .add_link(v1.clone(), v2.clone(), RELIABLE_LINK)
981 .await
982 .unwrap();
983 }
984 }
985
986 await_reporters(
987 context.with_label("reporter"),
988 &reporters,
989 Height::new(100),
990 epoch,
991 )
992 .await;
993 });
994 }
995
996 #[test_traced("INFO")]
997 fn test_network_partition() {
998 network_partition(bls12381_threshold::fixture::<MinPk, _>);
999 network_partition(bls12381_threshold::fixture::<MinSig, _>);
1000 network_partition(bls12381_multisig::fixture::<MinPk, _>);
1001 network_partition(bls12381_multisig::fixture::<MinSig, _>);
1002 network_partition(ed25519::fixture);
1003 network_partition(secp256r1::fixture);
1004 }
1005
1006 fn insufficient_validators<S, F>(fixture: F)
1008 where
1009 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1010 F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1011 {
1012 let runner = deterministic::Runner::timed(Duration::from_secs(15));
1013
1014 runner.start(|mut context| async move {
1015 let num_validators = 5;
1016 let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
1017 let epoch = Epoch::new(111);
1018
1019 let (oracle, mut registrations) =
1021 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
1022 .await;
1023
1024 let mut reporters =
1026 BTreeMap::<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>>::new();
1027
1028 for (idx, participant) in fixture.participants.iter().take(2).enumerate() {
1030 let context = context.with_label(&format!("participant_{participant}"));
1031
1032 let provider = mocks::Provider::new();
1034 assert!(provider.register(epoch, fixture.schemes[idx].clone()));
1035
1036 let monitor = mocks::Monitor::new(epoch);
1038
1039 let automaton = mocks::Application::new(mocks::Strategy::Correct);
1041
1042 let (reporter, reporter_mailbox) =
1044 mocks::Reporter::new(context.clone(), fixture.verifier.clone());
1045 context.with_label("reporter").spawn(|_| reporter.run());
1046 reporters.insert(participant.clone(), reporter_mailbox.clone());
1047
1048 let blocker = oracle.control(participant.clone());
1050
1051 let engine = Engine::new(
1053 context.with_label("engine"),
1054 Config {
1055 monitor,
1056 provider,
1057 automaton,
1058 reporter: reporter_mailbox,
1059 blocker,
1060 priority_acks: false,
1061 rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_secs(3)),
1062 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
1063 window: std::num::NonZeroU64::new(10).unwrap(),
1064 activity_timeout: HeightDelta::new(100),
1065 journal_partition: format!("aggregation-{participant}"),
1066 journal_write_buffer: NZUsize!(4096),
1067 journal_replay_buffer: NZUsize!(4096),
1068 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
1069 journal_compression: Some(3),
1070 journal_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1071 strategy: Sequential,
1072 },
1073 );
1074
1075 let (sender, receiver) = registrations.remove(participant).unwrap();
1076 engine.start((sender, receiver));
1077 }
1078
1079 context.sleep(Duration::from_secs(12)).await;
1082
1083 let mut any_consensus = false;
1085 for (validator_pk, mut reporter_mailbox) in reporters {
1086 let (tip, _) = reporter_mailbox
1087 .get_tip()
1088 .await
1089 .unwrap_or((Height::zero(), Epoch::zero()));
1090 if !tip.is_zero() {
1091 any_consensus = true;
1092 tracing::warn!(
1093 ?validator_pk,
1094 %tip,
1095 "Unexpected consensus with insufficient validators"
1096 );
1097 }
1098 }
1099
1100 assert!(
1102 !any_consensus,
1103 "Consensus should not be achieved with insufficient validator participation (below quorum)"
1104 );
1105 });
1106 }
1107
1108 #[test_traced("INFO")]
1109 fn test_insufficient_validators() {
1110 insufficient_validators(bls12381_threshold::fixture::<MinPk, _>);
1111 insufficient_validators(bls12381_threshold::fixture::<MinSig, _>);
1112 insufficient_validators(bls12381_multisig::fixture::<MinPk, _>);
1113 insufficient_validators(bls12381_multisig::fixture::<MinSig, _>);
1114 insufficient_validators(ed25519::fixture);
1115 insufficient_validators(secp256r1::fixture);
1116 }
1117}