1pub mod scheme;
70pub mod types;
71
72cfg_if::cfg_if! {
73 if #[cfg(not(target_arch = "wasm32"))] {
74 mod config;
75 pub use config::Config;
76 mod engine;
77 pub use engine::Engine;
78 mod metrics;
79 mod safe_tip;
80
81 #[cfg(test)]
82 pub mod mocks;
83 }
84}
85
86#[cfg(test)]
87mod tests {
88 use super::{mocks, Config, Engine};
89 use crate::{
90 aggregation::scheme::{bls12381_multisig, bls12381_threshold, ed25519, secp256r1, Scheme},
91 types::{Epoch, EpochDelta, Height, HeightDelta},
92 };
93 use commonware_cryptography::{
94 bls12381::primitives::variant::{MinPk, MinSig},
95 certificate::mocks::Fixture,
96 ed25519::PublicKey,
97 sha256::Digest as Sha256Digest,
98 };
99 use commonware_macros::{select, test_group, test_traced};
100 use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
101 use commonware_parallel::Sequential;
102 use commonware_runtime::{
103 buffer::paged::CacheRef,
104 deterministic::{self, Context},
105 Clock, Metrics, Quota, Runner, Spawner,
106 };
107 use commonware_utils::{
108 channel::{fallible::OneshotExt, oneshot},
109 test_rng, NZUsize, NonZeroDuration, NZU16,
110 };
111 use futures::future::join_all;
112 use rand::{rngs::StdRng, Rng};
113 use std::{
114 collections::BTreeMap,
115 num::{NonZeroU16, NonZeroU32, NonZeroUsize},
116 time::Duration,
117 };
118 use tracing::debug;
119
120 type Registrations<P> = BTreeMap<P, (Sender<P, deterministic::Context>, Receiver<P>)>;
121
122 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
123 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
124 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
125 const TEST_NAMESPACE: &[u8] = b"my testing namespace";
126
127 const RELIABLE_LINK: Link = Link {
129 latency: Duration::from_millis(10),
130 jitter: Duration::from_millis(1),
131 success_rate: 1.0,
132 };
133
134 async fn register_participants(
136 oracle: &mut Oracle<PublicKey, deterministic::Context>,
137 participants: &[PublicKey],
138 ) -> Registrations<PublicKey> {
139 let mut registrations = BTreeMap::new();
140 for participant in participants.iter() {
141 let (sender, receiver) = oracle
142 .control(participant.clone())
143 .register(0, TEST_QUOTA)
144 .await
145 .unwrap();
146 registrations.insert(participant.clone(), (sender, receiver));
147 }
148 registrations
149 }
150
151 async fn link_participants(
153 oracle: &mut Oracle<PublicKey, deterministic::Context>,
154 participants: &[PublicKey],
155 link: Link,
156 ) {
157 for v1 in participants.iter() {
158 for v2 in participants.iter() {
159 if v2 == v1 {
160 continue;
161 }
162 oracle
163 .add_link(v1.clone(), v2.clone(), link.clone())
164 .await
165 .unwrap();
166 }
167 }
168 }
169
170 async fn initialize_simulation<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
172 context: Context,
173 fixture: &Fixture<S>,
174 link: Link,
175 ) -> (
176 Oracle<PublicKey, deterministic::Context>,
177 Registrations<PublicKey>,
178 ) {
179 let (network, mut oracle) = Network::new_with_peers(
180 context.with_label("network"),
181 commonware_p2p::simulated::Config {
182 max_size: 1024 * 1024,
183 disconnect_on_block: true,
184 tracked_peer_sets: NZUsize!(1),
185 },
186 fixture.participants.clone(),
187 )
188 .await;
189 network.start();
190
191 let registrations = register_participants(&mut oracle, &fixture.participants).await;
192 link_participants(&mut oracle, &fixture.participants, link).await;
193
194 (oracle, registrations)
195 }
196
197 fn spawn_validator_engines<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
199 context: Context,
200 fixture: &Fixture<S>,
201 registrations: &mut Registrations<PublicKey>,
202 oracle: &mut Oracle<PublicKey, deterministic::Context>,
203 epoch: Epoch,
204 rebroadcast_timeout: Duration,
205 incorrect: Vec<usize>,
206 ) -> BTreeMap<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>> {
207 let mut reporters = BTreeMap::new();
208
209 for (idx, participant) in fixture.participants.iter().enumerate() {
210 let context = context.with_label(&format!("participant_{participant}"));
211
212 let provider = mocks::Provider::new();
214 assert!(provider.register(epoch, fixture.schemes[idx].clone()));
215
216 let monitor = mocks::Monitor::new(epoch);
218
219 let strategy = if incorrect.contains(&idx) {
221 mocks::Strategy::Incorrect
222 } else {
223 mocks::Strategy::Correct
224 };
225 let automaton = mocks::Application::new(strategy);
226
227 let (reporter, reporter_mailbox) =
229 mocks::Reporter::new(context.clone(), fixture.verifier.clone());
230 context.with_label("reporter").spawn(|_| reporter.run());
231 reporters.insert(participant.clone(), reporter_mailbox.clone());
232
233 let blocker = oracle.control(participant.clone());
235
236 let engine = Engine::new(
238 context.with_label("engine"),
239 Config {
240 monitor,
241 provider,
242 automaton,
243 reporter: reporter_mailbox,
244 blocker,
245 priority_acks: false,
246 rebroadcast_timeout: NonZeroDuration::new_panic(rebroadcast_timeout),
247 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
248 window: std::num::NonZeroU64::new(10).unwrap(),
249 activity_timeout: HeightDelta::new(100),
250 journal_partition: format!("aggregation-{participant}"),
251 journal_write_buffer: NZUsize!(4096),
252 journal_replay_buffer: NZUsize!(4096),
253 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
254 journal_compression: Some(3),
255 journal_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
256 strategy: Sequential,
257 },
258 );
259
260 let (sender, receiver) = registrations.remove(participant).unwrap();
261 engine.start((sender, receiver));
262 }
263
264 reporters
265 }
266
267 async fn await_reporters<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
269 context: Context,
270 reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>>,
271 threshold_height: Height,
272 threshold_epoch: Epoch,
273 ) {
274 let mut receivers = Vec::new();
275 for (reporter, mailbox) in reporters.iter() {
276 let (tx, rx) = oneshot::channel();
278 receivers.push(rx);
279
280 context.with_label("reporter_watcher").spawn({
281 let reporter = reporter.clone();
282 let mut mailbox = mailbox.clone();
283 move |context| async move {
284 loop {
285 let (height, epoch) = mailbox
286 .get_tip()
287 .await
288 .unwrap_or((Height::zero(), Epoch::zero()));
289 debug!(
290 %height,
291 epoch = %epoch,
292 %threshold_height,
293 threshold_epoch = %threshold_epoch,
294 ?reporter,
295 "reporter status"
296 );
297 if height >= threshold_height && epoch >= threshold_epoch {
298 debug!(
299 ?reporter,
300 "reporter reached threshold, signaling completion"
301 );
302 tx.send_lossy(reporter.clone());
303 break;
304 }
305 context.sleep(Duration::from_millis(100)).await;
306 }
307 }
308 });
309 }
310
311 let results = join_all(receivers).await;
313 assert_eq!(results.len(), reporters.len());
314
315 for result in results {
317 assert!(result.is_ok(), "reporter was cancelled");
318 }
319 }
320
321 fn all_online<S, F>(fixture: F)
323 where
324 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
325 F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
326 {
327 let runner = deterministic::Runner::timed(Duration::from_secs(30));
328
329 runner.start(|mut context| async move {
330 let num_validators = 4;
331 let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
332 let epoch = Epoch::new(111);
333
334 let (mut oracle, mut registrations) =
335 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
336 .await;
337
338 let reporters = spawn_validator_engines(
339 context.with_label("validator"),
340 &fixture,
341 &mut registrations,
342 &mut oracle,
343 epoch,
344 Duration::from_secs(5),
345 vec![],
346 );
347
348 await_reporters(
349 context.with_label("reporter"),
350 &reporters,
351 Height::new(100),
352 epoch,
353 )
354 .await;
355 });
356 }
357
358 #[test_group("slow")]
359 #[test_traced("INFO")]
360 fn test_all_online() {
361 all_online(bls12381_threshold::fixture::<MinPk, _>);
362 all_online(bls12381_threshold::fixture::<MinSig, _>);
363 all_online(bls12381_multisig::fixture::<MinPk, _>);
364 all_online(bls12381_multisig::fixture::<MinSig, _>);
365 all_online(ed25519::fixture);
366 all_online(secp256r1::fixture);
367 }
368
369 fn byzantine_proposer<S, F>(fixture: F)
371 where
372 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
373 F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
374 {
375 let runner = deterministic::Runner::timed(Duration::from_secs(30));
376
377 runner.start(|mut context| async move {
378 let num_validators = 4;
379 let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
380 let epoch = Epoch::new(111);
381
382 let (mut oracle, mut registrations) =
383 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
384 .await;
385
386 let reporters = spawn_validator_engines(
387 context.with_label("validator"),
388 &fixture,
389 &mut registrations,
390 &mut oracle,
391 epoch,
392 Duration::from_secs(5),
393 vec![0],
394 );
395
396 await_reporters(
397 context.with_label("reporter"),
398 &reporters,
399 Height::new(100),
400 epoch,
401 )
402 .await;
403 });
404 }
405
406 #[test_traced("INFO")]
407 fn test_byzantine_proposer() {
408 byzantine_proposer(bls12381_threshold::fixture::<MinPk, _>);
409 byzantine_proposer(bls12381_threshold::fixture::<MinSig, _>);
410 byzantine_proposer(bls12381_multisig::fixture::<MinPk, _>);
411 byzantine_proposer(bls12381_multisig::fixture::<MinSig, _>);
412 byzantine_proposer(ed25519::fixture);
413 byzantine_proposer(secp256r1::fixture);
414 }
415
416 fn unclean_byzantine_shutdown<S, F>(fixture: F)
417 where
418 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
419 F: Fn(&mut StdRng, &[u8], u32) -> Fixture<S>,
420 {
421 let num_validators = 4;
423 let target_height = Height::new(200); let min_shutdowns = 4; let max_shutdowns = 10; let shutdown_range_min = Duration::from_millis(100);
427 let shutdown_range_max = Duration::from_millis(1_000);
428 let rebroadcast_timeout = NonZeroDuration::new_panic(Duration::from_millis(20));
429
430 let mut prev_checkpoint = None;
431
432 let mut rng = test_rng();
434 let fixture = fixture(&mut rng, TEST_NAMESPACE, num_validators);
435
436 let mut shutdown_count = 0;
438 while shutdown_count < max_shutdowns {
439 let fixture = fixture.clone();
440 let f = move |mut context: Context| {
441 async move {
442 let epoch = Epoch::new(111);
443
444 let (oracle, mut registrations) = initialize_simulation(
445 context.with_label("simulation"),
446 &fixture,
447 RELIABLE_LINK,
448 )
449 .await;
450
451 let (reporter, mut reporter_mailbox) =
455 mocks::Reporter::new(context.clone(), fixture.verifier.clone());
456 context.with_label("reporter").spawn(|_| reporter.run());
457
458 for (idx, participant) in fixture.participants.iter().enumerate() {
460 let validator_context =
461 context.with_label(&format!("participant_{participant}"));
462
463 let provider = mocks::Provider::new();
465 assert!(provider.register(epoch, fixture.schemes[idx].clone()));
466
467 let monitor = mocks::Monitor::new(epoch);
469
470 let strategy = if idx == 0 {
472 mocks::Strategy::Incorrect
473 } else {
474 mocks::Strategy::Correct
475 };
476 let automaton = mocks::Application::new(strategy);
477
478 let blocker = oracle.control(participant.clone());
480
481 let engine = Engine::new(
483 validator_context.with_label("engine"),
484 Config {
485 monitor,
486 provider,
487 automaton,
488 reporter: reporter_mailbox.clone(),
489 blocker,
490 priority_acks: false,
491 rebroadcast_timeout,
492 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
493 window: std::num::NonZeroU64::new(10).unwrap(),
494 activity_timeout: HeightDelta::new(1_024), journal_partition: format!("unclean_shutdown_test_{participant}"),
496 journal_write_buffer: NZUsize!(4096),
497 journal_replay_buffer: NZUsize!(4096),
498 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
499 journal_compression: Some(3),
500 journal_page_cache: CacheRef::from_pooler(
501 &context,
502 PAGE_SIZE,
503 PAGE_CACHE_SIZE,
504 ),
505 strategy: Sequential,
506 },
507 );
508
509 let (sender, receiver) = registrations.remove(participant).unwrap();
510 engine.start((sender, receiver));
511 }
512
513 let completion =
515 context
516 .with_label("completion_watcher")
517 .spawn(move |context| async move {
518 loop {
519 if let Some(tip_height) =
520 reporter_mailbox.get_contiguous_tip().await
521 {
522 if tip_height >= target_height {
523 break;
524 }
525 }
526 context.sleep(Duration::from_millis(50)).await;
527 }
528 });
529
530 let shutdown_wait = context.gen_range(shutdown_range_min..shutdown_range_max);
532 select! {
533 _ = context.sleep(shutdown_wait) => {
534 debug!(shutdown_wait = ?shutdown_wait, "Simulating unclean shutdown");
535 false },
537 _ = completion => {
538 debug!("Shared reporter completed normally");
539 true },
541 }
542 }
543 };
544
545 let (complete, checkpoint) = prev_checkpoint
546 .map_or_else(
547 || {
548 debug!("Starting initial run");
549 deterministic::Runner::timed(Duration::from_secs(45))
550 },
551 |prev_checkpoint| {
552 debug!(shutdown_count, "Restarting from previous context");
553 deterministic::Runner::from(prev_checkpoint)
554 },
555 )
556 .start_and_recover(f);
557
558 if complete && shutdown_count >= min_shutdowns {
559 debug!("Test completed successfully");
560 break;
561 }
562
563 prev_checkpoint = Some(checkpoint);
564 shutdown_count += 1;
565 }
566 }
567
568 #[test_group("slow")]
569 #[test_traced("INFO")]
570 fn test_unclean_byzantine_shutdown() {
571 unclean_byzantine_shutdown(bls12381_threshold::fixture::<MinPk, _>);
572 unclean_byzantine_shutdown(bls12381_threshold::fixture::<MinSig, _>);
573 unclean_byzantine_shutdown(bls12381_multisig::fixture::<MinPk, _>);
574 unclean_byzantine_shutdown(bls12381_multisig::fixture::<MinSig, _>);
575 unclean_byzantine_shutdown(ed25519::fixture);
576 unclean_byzantine_shutdown(secp256r1::fixture);
577 }
578
579 fn unclean_shutdown_with_unsigned_height<S, F>(fixture: F)
580 where
581 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
582 F: Fn(&mut StdRng, &[u8], u32) -> Fixture<S>,
583 {
584 let num_validators = 4;
586 let skip_height = Height::new(50); let window = HeightDelta::new(10);
588 let target_height = Height::new(100);
589
590 let mut rng = test_rng();
592 let fixture = fixture(&mut rng, TEST_NAMESPACE, num_validators);
593
594 let f = |context: Context| {
596 let fixture = fixture.clone();
597 async move {
598 let epoch = Epoch::new(111);
599
600 let (oracle, mut registrations) = initialize_simulation(
602 context.with_label("simulation"),
603 &fixture,
604 RELIABLE_LINK,
605 )
606 .await;
607
608 let (reporter, mut reporter_mailbox) =
610 mocks::Reporter::new(context.clone(), fixture.verifier.clone());
611 context.with_label("reporter").spawn(|_| reporter.run());
612
613 for (idx, participant) in fixture.participants.iter().enumerate() {
615 let validator_context =
616 context.with_label(&format!("participant_{participant}"));
617
618 let provider = mocks::Provider::new();
620 assert!(provider.register(epoch, fixture.schemes[idx].clone()));
621
622 let monitor = mocks::Monitor::new(epoch);
624
625 let automaton = mocks::Application::new(mocks::Strategy::Skip {
627 height: skip_height,
628 });
629
630 let blocker = oracle.control(participant.clone());
632
633 let engine = Engine::new(
635 validator_context.with_label("engine"),
636 Config {
637 monitor,
638 provider,
639 automaton,
640 reporter: reporter_mailbox.clone(),
641 blocker,
642 priority_acks: false,
643 rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
644 100,
645 )),
646 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
647 window: std::num::NonZeroU64::new(window.get()).unwrap(),
648 activity_timeout: HeightDelta::new(100),
649 journal_partition: format!("unsigned_height_test_{participant}"),
650 journal_write_buffer: NZUsize!(4096),
651 journal_replay_buffer: NZUsize!(4096),
652 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
653 journal_compression: Some(3),
654 journal_page_cache: CacheRef::from_pooler(
655 &context,
656 PAGE_SIZE,
657 PAGE_CACHE_SIZE,
658 ),
659 strategy: Sequential,
660 },
661 );
662
663 let (sender, receiver) = registrations.remove(participant).unwrap();
664 engine.start((sender, receiver));
665 }
666
667 loop {
669 if let Some((tip_height, _)) = reporter_mailbox.get_tip().await {
670 debug!(%tip_height, %skip_height, %target_height, "reporter status");
671 if tip_height >= skip_height.saturating_add(window).previous().unwrap() {
672 return;
674 }
675 }
676 context.sleep(Duration::from_millis(50)).await;
677 }
678 }
679 };
680
681 let (_, checkpoint) =
682 deterministic::Runner::timed(Duration::from_secs(60)).start_and_recover(f);
683
684 let f2 = |context: Context| {
686 async move {
687 let epoch = Epoch::new(111);
688
689 let (oracle, mut registrations) = initialize_simulation(
691 context.with_label("simulation"),
692 &fixture,
693 RELIABLE_LINK,
694 )
695 .await;
696
697 let (reporter, mut reporter_mailbox) =
699 mocks::Reporter::new(context.clone(), fixture.verifier.clone());
700 context.with_label("reporter").spawn(|_| reporter.run());
701
702 for (idx, participant) in fixture.participants.iter().enumerate() {
704 let validator_context =
705 context.with_label(&format!("participant_{participant}"));
706
707 let provider = mocks::Provider::new();
709 assert!(provider.register(epoch, fixture.schemes[idx].clone()));
710
711 let monitor = mocks::Monitor::new(epoch);
713
714 let automaton = mocks::Application::new(mocks::Strategy::Correct);
716
717 let blocker = oracle.control(participant.clone());
719
720 let engine = Engine::new(
722 validator_context.with_label("engine"),
723 Config {
724 monitor,
725 provider,
726 automaton,
727 reporter: reporter_mailbox.clone(),
728 blocker,
729 priority_acks: false,
730 rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
731 100,
732 )),
733 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
734 window: std::num::NonZeroU64::new(10).unwrap(),
735 activity_timeout: HeightDelta::new(100),
736 journal_partition: format!("unsigned_height_test_{participant}"),
737 journal_write_buffer: NZUsize!(4096),
738 journal_replay_buffer: NZUsize!(4096),
739 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
740 journal_compression: Some(3),
741 journal_page_cache: CacheRef::from_pooler(
742 &context,
743 PAGE_SIZE,
744 PAGE_CACHE_SIZE,
745 ),
746 strategy: Sequential,
747 },
748 );
749
750 let (sender, receiver) = registrations.remove(participant).unwrap();
751 engine.start((sender, receiver));
752 }
753
754 loop {
756 if let Some(tip_height) = reporter_mailbox.get_contiguous_tip().await {
757 debug!(
758 %tip_height,
759 %skip_height, %target_height, "reporter status on restart"
760 );
761 if tip_height >= target_height {
762 break;
763 }
764 }
765 context.sleep(Duration::from_millis(50)).await;
766 }
767 }
768 };
769
770 deterministic::Runner::from(checkpoint).start(f2);
771 }
772
773 #[test_group("slow")]
774 #[test_traced("INFO")]
775 fn test_unclean_shutdown_with_unsigned_height() {
776 unclean_shutdown_with_unsigned_height(bls12381_threshold::fixture::<MinPk, _>);
777 unclean_shutdown_with_unsigned_height(bls12381_threshold::fixture::<MinSig, _>);
778 unclean_shutdown_with_unsigned_height(bls12381_multisig::fixture::<MinPk, _>);
779 unclean_shutdown_with_unsigned_height(bls12381_multisig::fixture::<MinSig, _>);
780 unclean_shutdown_with_unsigned_height(ed25519::fixture);
781 unclean_shutdown_with_unsigned_height(secp256r1::fixture);
782 }
783
784 fn slow_and_lossy_links<S, F>(fixture: F, seed: u64) -> String
785 where
786 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
787 F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
788 {
789 let cfg = deterministic::Config::new()
790 .with_seed(seed)
791 .with_timeout(Some(Duration::from_secs(120)));
792 let runner = deterministic::Runner::new(cfg);
793
794 runner.start(|mut context| async move {
795 let num_validators = 4;
796 let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
797 let epoch = Epoch::new(111);
798
799 let degraded_link = Link {
801 latency: Duration::from_millis(200),
802 jitter: Duration::from_millis(150),
803 success_rate: 0.5,
804 };
805
806 let (mut oracle, mut registrations) =
807 initialize_simulation(context.with_label("simulation"), &fixture, degraded_link)
808 .await;
809
810 let reporters = spawn_validator_engines(
811 context.with_label("validator"),
812 &fixture,
813 &mut registrations,
814 &mut oracle,
815 epoch,
816 Duration::from_secs(2),
817 vec![],
818 );
819
820 await_reporters(
821 context.with_label("reporter"),
822 &reporters,
823 Height::new(100),
824 epoch,
825 )
826 .await;
827
828 context.auditor().state()
829 })
830 }
831
832 #[test_group("slow")]
833 #[test_traced("INFO")]
834 fn test_slow_and_lossy_links() {
835 slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, 0);
836 slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, 0);
837 slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, 0);
838 slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, 0);
839 slow_and_lossy_links(ed25519::fixture, 0);
840 slow_and_lossy_links(secp256r1::fixture, 0);
841 }
842
843 #[test_group("slow")]
844 #[test_traced("INFO")]
845 fn test_determinism() {
846 for seed in 1..6 {
849 let ts_pk_state_1 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
851 let ts_pk_state_2 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
852 assert_eq!(ts_pk_state_1, ts_pk_state_2);
853
854 let ts_sig_state_1 =
856 slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
857 let ts_sig_state_2 =
858 slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
859 assert_eq!(ts_sig_state_1, ts_sig_state_2);
860
861 let ms_pk_state_1 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
863 let ms_pk_state_2 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
864 assert_eq!(ms_pk_state_1, ms_pk_state_2);
865
866 let ms_sig_state_1 =
868 slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
869 let ms_sig_state_2 =
870 slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
871 assert_eq!(ms_sig_state_1, ms_sig_state_2);
872
873 let ed_state_1 = slow_and_lossy_links(ed25519::fixture, seed);
875 let ed_state_2 = slow_and_lossy_links(ed25519::fixture, seed);
876 assert_eq!(ed_state_1, ed_state_2);
877
878 let secp_state_1 = slow_and_lossy_links(secp256r1::fixture, seed);
880 let secp_state_2 = slow_and_lossy_links(secp256r1::fixture, seed);
881 assert_eq!(secp_state_1, secp_state_2);
882
883 let states = [
884 ("threshold-minpk", ts_pk_state_1),
885 ("threshold-minsig", ts_sig_state_1),
886 ("multisig-minpk", ms_pk_state_1),
887 ("multisig-minsig", ms_sig_state_1),
888 ("ed25519", ed_state_1),
889 ("secp256r1", secp_state_1),
890 ];
891
892 for pair in states.windows(2) {
894 assert_ne!(
895 pair[0].1, pair[1].1,
896 "state {} equals state {}",
897 pair[0].0, pair[1].0
898 );
899 }
900 }
901 }
902
903 fn one_offline<S, F>(fixture: F)
904 where
905 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
906 F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
907 {
908 let runner = deterministic::Runner::timed(Duration::from_secs(30));
909
910 runner.start(|mut context| async move {
911 let num_validators = 5;
912 let mut fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
913 let epoch = Epoch::new(111);
914
915 fixture.participants.truncate(4);
917 fixture.schemes.truncate(4);
918
919 let (mut oracle, mut registrations) =
920 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
921 .await;
922
923 let reporters = spawn_validator_engines(
924 context.with_label("validator"),
925 &fixture,
926 &mut registrations,
927 &mut oracle,
928 epoch,
929 Duration::from_secs(5),
930 vec![],
931 );
932
933 await_reporters(
934 context.with_label("reporter"),
935 &reporters,
936 Height::new(100),
937 epoch,
938 )
939 .await;
940 });
941 }
942
943 #[test_group("slow")]
944 #[test_traced("INFO")]
945 fn test_one_offline() {
946 one_offline(bls12381_threshold::fixture::<MinPk, _>);
947 one_offline(bls12381_threshold::fixture::<MinSig, _>);
948 one_offline(bls12381_multisig::fixture::<MinPk, _>);
949 one_offline(bls12381_multisig::fixture::<MinSig, _>);
950 one_offline(ed25519::fixture);
951 one_offline(secp256r1::fixture);
952 }
953
954 fn network_partition<S, F>(fixture: F)
956 where
957 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
958 F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
959 {
960 let runner = deterministic::Runner::timed(Duration::from_secs(60));
961
962 runner.start(|mut context| async move {
963 let num_validators = 4;
964 let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
965 let epoch = Epoch::new(111);
966
967 let (mut oracle, mut registrations) =
968 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
969 .await;
970
971 let reporters = spawn_validator_engines(
972 context.with_label("validator"),
973 &fixture,
974 &mut registrations,
975 &mut oracle,
976 epoch,
977 Duration::from_secs(5),
978 vec![],
979 );
980
981 for v1 in fixture.participants.iter() {
983 for v2 in fixture.participants.iter() {
984 if v2 == v1 {
985 continue;
986 }
987 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
988 }
989 }
990 context.sleep(Duration::from_secs(20)).await;
991
992 for v1 in fixture.participants.iter() {
994 for v2 in fixture.participants.iter() {
995 if v2 == v1 {
996 continue;
997 }
998 oracle
999 .add_link(v1.clone(), v2.clone(), RELIABLE_LINK)
1000 .await
1001 .unwrap();
1002 }
1003 }
1004
1005 await_reporters(
1006 context.with_label("reporter"),
1007 &reporters,
1008 Height::new(100),
1009 epoch,
1010 )
1011 .await;
1012 });
1013 }
1014
1015 #[test_traced("INFO")]
1016 fn test_network_partition() {
1017 network_partition(bls12381_threshold::fixture::<MinPk, _>);
1018 network_partition(bls12381_threshold::fixture::<MinSig, _>);
1019 network_partition(bls12381_multisig::fixture::<MinPk, _>);
1020 network_partition(bls12381_multisig::fixture::<MinSig, _>);
1021 network_partition(ed25519::fixture);
1022 network_partition(secp256r1::fixture);
1023 }
1024
1025 fn insufficient_validators<S, F>(fixture: F)
1027 where
1028 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1029 F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1030 {
1031 let runner = deterministic::Runner::timed(Duration::from_secs(15));
1032
1033 runner.start(|mut context| async move {
1034 let num_validators = 5;
1035 let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
1036 let epoch = Epoch::new(111);
1037
1038 let (oracle, mut registrations) =
1040 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
1041 .await;
1042
1043 let mut reporters =
1045 BTreeMap::<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>>::new();
1046
1047 for (idx, participant) in fixture.participants.iter().take(2).enumerate() {
1049 let context = context.with_label(&format!("participant_{participant}"));
1050
1051 let provider = mocks::Provider::new();
1053 assert!(provider.register(epoch, fixture.schemes[idx].clone()));
1054
1055 let monitor = mocks::Monitor::new(epoch);
1057
1058 let automaton = mocks::Application::new(mocks::Strategy::Correct);
1060
1061 let (reporter, reporter_mailbox) =
1063 mocks::Reporter::new(context.clone(), fixture.verifier.clone());
1064 context.with_label("reporter").spawn(|_| reporter.run());
1065 reporters.insert(participant.clone(), reporter_mailbox.clone());
1066
1067 let blocker = oracle.control(participant.clone());
1069
1070 let engine = Engine::new(
1072 context.with_label("engine"),
1073 Config {
1074 monitor,
1075 provider,
1076 automaton,
1077 reporter: reporter_mailbox,
1078 blocker,
1079 priority_acks: false,
1080 rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_secs(3)),
1081 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
1082 window: std::num::NonZeroU64::new(10).unwrap(),
1083 activity_timeout: HeightDelta::new(100),
1084 journal_partition: format!("aggregation-{participant}"),
1085 journal_write_buffer: NZUsize!(4096),
1086 journal_replay_buffer: NZUsize!(4096),
1087 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
1088 journal_compression: Some(3),
1089 journal_page_cache: CacheRef::from_pooler(
1090 &context,
1091 PAGE_SIZE,
1092 PAGE_CACHE_SIZE,
1093 ),
1094 strategy: Sequential,
1095 },
1096 );
1097
1098 let (sender, receiver) = registrations.remove(participant).unwrap();
1099 engine.start((sender, receiver));
1100 }
1101
1102 context.sleep(Duration::from_secs(12)).await;
1105
1106 let mut any_consensus = false;
1108 for (validator_pk, mut reporter_mailbox) in reporters {
1109 let (tip, _) = reporter_mailbox
1110 .get_tip()
1111 .await
1112 .unwrap_or((Height::zero(), Epoch::zero()));
1113 if !tip.is_zero() {
1114 any_consensus = true;
1115 tracing::warn!(
1116 ?validator_pk,
1117 %tip,
1118 "Unexpected consensus with insufficient validators"
1119 );
1120 }
1121 }
1122
1123 assert!(
1125 !any_consensus,
1126 "Consensus should not be achieved with insufficient validator participation (below quorum)"
1127 );
1128 });
1129 }
1130
1131 #[test_traced("INFO")]
1132 fn test_insufficient_validators() {
1133 insufficient_validators(bls12381_threshold::fixture::<MinPk, _>);
1134 insufficient_validators(bls12381_threshold::fixture::<MinSig, _>);
1135 insufficient_validators(bls12381_multisig::fixture::<MinPk, _>);
1136 insufficient_validators(bls12381_multisig::fixture::<MinSig, _>);
1137 insufficient_validators(ed25519::fixture);
1138 insufficient_validators(secp256r1::fixture);
1139 }
1140}