1pub mod scheme;
67pub mod types;
68
69cfg_if::cfg_if! {
70 if #[cfg(not(target_arch = "wasm32"))] {
71 mod config;
72 pub use config::Config;
73 mod engine;
74 pub use engine::Engine;
75 mod metrics;
76 mod safe_tip;
77
78 #[cfg(test)]
79 pub mod mocks;
80 }
81}
82
83#[cfg(test)]
84mod tests {
85 use super::{mocks, Config, Engine};
86 use crate::{
87 aggregation::scheme::{bls12381_multisig, bls12381_threshold, ed25519, Scheme},
88 types::{Epoch, EpochDelta},
89 };
90 use commonware_cryptography::{
91 bls12381::primitives::variant::{MinPk, MinSig},
92 certificate::mocks::Fixture,
93 ed25519::PublicKey,
94 sha256::Digest as Sha256Digest,
95 };
96 use commonware_macros::{select, test_group, test_traced};
97 use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
98 use commonware_runtime::{
99 buffer::PoolRef,
100 deterministic::{self, Context},
101 Clock, Metrics, Quota, Runner, Spawner,
102 };
103 use commonware_utils::{NZUsize, NonZeroDuration};
104 use futures::{channel::oneshot, future::join_all};
105 use rand::{rngs::StdRng, Rng, SeedableRng};
106 use std::{
107 collections::BTreeMap,
108 num::{NonZeroU32, NonZeroUsize},
109 time::Duration,
110 };
111 use tracing::debug;
112
113 type Registrations<P> = BTreeMap<P, (Sender<P, deterministic::Context>, Receiver<P>)>;
114
115 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
116 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
117 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
118
119 const RELIABLE_LINK: Link = Link {
121 latency: Duration::from_millis(10),
122 jitter: Duration::from_millis(1),
123 success_rate: 1.0,
124 };
125
126 async fn register_participants(
128 oracle: &mut Oracle<PublicKey, deterministic::Context>,
129 participants: &[PublicKey],
130 ) -> Registrations<PublicKey> {
131 let mut registrations = BTreeMap::new();
132 for participant in participants.iter() {
133 let (sender, receiver) = oracle
134 .control(participant.clone())
135 .register(0, TEST_QUOTA)
136 .await
137 .unwrap();
138 registrations.insert(participant.clone(), (sender, receiver));
139 }
140 registrations
141 }
142
143 async fn link_participants(
145 oracle: &mut Oracle<PublicKey, deterministic::Context>,
146 participants: &[PublicKey],
147 link: Link,
148 ) {
149 for v1 in participants.iter() {
150 for v2 in participants.iter() {
151 if v2 == v1 {
152 continue;
153 }
154 oracle
155 .add_link(v1.clone(), v2.clone(), link.clone())
156 .await
157 .unwrap();
158 }
159 }
160 }
161
162 async fn initialize_simulation<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
164 context: Context,
165 fixture: &Fixture<S>,
166 link: Link,
167 ) -> (
168 Oracle<PublicKey, deterministic::Context>,
169 Registrations<PublicKey>,
170 ) {
171 let (network, mut oracle) = Network::new(
172 context.with_label("network"),
173 commonware_p2p::simulated::Config {
174 max_size: 1024 * 1024,
175 disconnect_on_block: true,
176 tracked_peer_sets: None,
177 },
178 );
179 network.start();
180
181 let registrations = register_participants(&mut oracle, &fixture.participants).await;
182 link_participants(&mut oracle, &fixture.participants, link).await;
183
184 (oracle, registrations)
185 }
186
187 #[allow(clippy::too_many_arguments)]
189 fn spawn_validator_engines<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
190 context: Context,
191 fixture: &Fixture<S>,
192 registrations: &mut Registrations<PublicKey>,
193 oracle: &mut Oracle<PublicKey, deterministic::Context>,
194 namespace: &[u8],
195 epoch: Epoch,
196 rebroadcast_timeout: Duration,
197 incorrect: Vec<usize>,
198 ) -> BTreeMap<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>> {
199 let mut reporters = BTreeMap::new();
200
201 for (idx, participant) in fixture.participants.iter().enumerate() {
202 let context = context.with_label(&format!("participant_{participant}"));
203
204 let provider = mocks::Provider::new();
206 assert!(provider.register(epoch, fixture.schemes[idx].clone()));
207
208 let monitor = mocks::Monitor::new(epoch);
210
211 let strategy = if incorrect.contains(&idx) {
213 mocks::Strategy::Incorrect
214 } else {
215 mocks::Strategy::Correct
216 };
217 let automaton = mocks::Application::new(strategy);
218
219 let (reporter, reporter_mailbox) =
221 mocks::Reporter::new(context.clone(), namespace, fixture.verifier.clone());
222 context.with_label("reporter").spawn(|_| reporter.run());
223 reporters.insert(participant.clone(), reporter_mailbox.clone());
224
225 let blocker = oracle.control(participant.clone());
227
228 let engine = Engine::new(
230 context.with_label("engine"),
231 Config {
232 monitor,
233 provider,
234 automaton,
235 reporter: reporter_mailbox,
236 blocker,
237 namespace: namespace.to_vec(),
238 priority_acks: false,
239 rebroadcast_timeout: NonZeroDuration::new_panic(rebroadcast_timeout),
240 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
241 window: std::num::NonZeroU64::new(10).unwrap(),
242 activity_timeout: 100,
243 journal_partition: format!("aggregation-{participant}"),
244 journal_write_buffer: NZUsize!(4096),
245 journal_replay_buffer: NZUsize!(4096),
246 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
247 journal_compression: Some(3),
248 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
249 },
250 );
251
252 let (sender, receiver) = registrations.remove(participant).unwrap();
253 engine.start((sender, receiver));
254 }
255
256 reporters
257 }
258
259 async fn await_reporters<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
261 context: Context,
262 reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>>,
263 threshold_index: u64,
264 threshold_epoch: Epoch,
265 ) {
266 let mut receivers = Vec::new();
267 for (reporter, mailbox) in reporters.iter() {
268 let (tx, rx) = oneshot::channel();
270 receivers.push(rx);
271
272 context.with_label("reporter_watcher").spawn({
273 let reporter = reporter.clone();
274 let mut mailbox = mailbox.clone();
275 move |context| async move {
276 loop {
277 let (index, epoch) = mailbox.get_tip().await.unwrap_or((0, Epoch::zero()));
278 debug!(
279 index,
280 epoch = %epoch,
281 threshold_index,
282 threshold_epoch = %threshold_epoch,
283 ?reporter,
284 "reporter status"
285 );
286 if index >= threshold_index && epoch >= threshold_epoch {
287 debug!(
288 ?reporter,
289 "reporter reached threshold, signaling completion"
290 );
291 let _ = tx.send(reporter.clone());
292 break;
293 }
294 context.sleep(Duration::from_millis(100)).await;
295 }
296 }
297 });
298 }
299
300 let results = join_all(receivers).await;
302 assert_eq!(results.len(), reporters.len());
303
304 for result in results {
306 assert!(result.is_ok(), "reporter was cancelled");
307 }
308 }
309
310 fn all_online<S, F>(fixture: F)
312 where
313 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
314 F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
315 {
316 let runner = deterministic::Runner::timed(Duration::from_secs(30));
317
318 runner.start(|mut context| async move {
319 let num_validators = 4;
320 let fixture = fixture(&mut context, num_validators);
321 let namespace = b"my testing namespace";
322 let epoch = Epoch::new(111);
323
324 let (mut oracle, mut registrations) =
325 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
326 .await;
327
328 let reporters = spawn_validator_engines(
329 context.with_label("validator"),
330 &fixture,
331 &mut registrations,
332 &mut oracle,
333 namespace,
334 epoch,
335 Duration::from_secs(5),
336 vec![],
337 );
338
339 await_reporters(context.with_label("reporter"), &reporters, 100, epoch).await;
340 });
341 }
342
343 #[test_traced("INFO")]
344 fn test_all_online() {
345 all_online(bls12381_threshold::fixture::<MinPk, _>);
346 all_online(bls12381_threshold::fixture::<MinSig, _>);
347 all_online(bls12381_multisig::fixture::<MinPk, _>);
348 all_online(bls12381_multisig::fixture::<MinSig, _>);
349 all_online(ed25519::fixture);
350 }
351
352 fn byzantine_proposer<S, F>(fixture: F)
354 where
355 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
356 F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
357 {
358 let runner = deterministic::Runner::timed(Duration::from_secs(30));
359
360 runner.start(|mut context| async move {
361 let num_validators = 4;
362 let fixture = fixture(&mut context, num_validators);
363 let namespace = b"my testing namespace";
364 let epoch = Epoch::new(111);
365
366 let (mut oracle, mut registrations) =
367 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
368 .await;
369
370 let reporters = spawn_validator_engines(
371 context.with_label("validator"),
372 &fixture,
373 &mut registrations,
374 &mut oracle,
375 namespace,
376 epoch,
377 Duration::from_secs(5),
378 vec![0],
379 );
380
381 await_reporters(context.with_label("reporter"), &reporters, 100, epoch).await;
382 });
383 }
384
385 #[test_traced("INFO")]
386 fn test_byzantine_proposer() {
387 byzantine_proposer(bls12381_threshold::fixture::<MinPk, _>);
388 byzantine_proposer(bls12381_threshold::fixture::<MinSig, _>);
389 byzantine_proposer(bls12381_multisig::fixture::<MinPk, _>);
390 byzantine_proposer(bls12381_multisig::fixture::<MinSig, _>);
391 byzantine_proposer(ed25519::fixture);
392 }
393
394 fn unclean_byzantine_shutdown<S, F>(fixture: F)
395 where
396 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
397 F: Fn(&mut StdRng, u32) -> Fixture<S>,
398 {
399 let num_validators = 4;
401 let target_index = 200; let min_shutdowns = 4; let max_shutdowns = 10; let shutdown_range_min = Duration::from_millis(100);
405 let shutdown_range_max = Duration::from_millis(1_000);
406 let rebroadcast_timeout = NonZeroDuration::new_panic(Duration::from_millis(20));
407
408 let mut prev_checkpoint = None;
409
410 let mut rng = StdRng::seed_from_u64(0);
412 let fixture = fixture(&mut rng, num_validators);
413
414 let mut shutdown_count = 0;
416 while shutdown_count < max_shutdowns {
417 let fixture = fixture.clone();
418 let f = move |mut context: Context| {
419 async move {
420 let namespace = b"my testing namespace";
421 let epoch = Epoch::new(111);
422
423 let (oracle, mut registrations) = initialize_simulation(
424 context.with_label("simulation"),
425 &fixture,
426 RELIABLE_LINK,
427 )
428 .await;
429
430 let (reporter, mut reporter_mailbox) =
434 mocks::Reporter::new(context.clone(), namespace, fixture.verifier.clone());
435 context.with_label("reporter").spawn(|_| reporter.run());
436
437 for (idx, participant) in fixture.participants.iter().enumerate() {
439 let validator_context =
440 context.with_label(&format!("participant_{participant}"));
441
442 let provider = mocks::Provider::new();
444 assert!(provider.register(epoch, fixture.schemes[idx].clone()));
445
446 let monitor = mocks::Monitor::new(epoch);
448
449 let strategy = if idx == 0 {
451 mocks::Strategy::Incorrect
452 } else {
453 mocks::Strategy::Correct
454 };
455 let automaton = mocks::Application::new(strategy);
456
457 let blocker = oracle.control(participant.clone());
459
460 let engine = Engine::new(
462 validator_context.with_label("engine"),
463 Config {
464 monitor,
465 provider,
466 automaton,
467 reporter: reporter_mailbox.clone(),
468 blocker,
469 namespace: namespace.to_vec(),
470 priority_acks: false,
471 rebroadcast_timeout,
472 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
473 window: std::num::NonZeroU64::new(10).unwrap(),
474 activity_timeout: 1_024, journal_partition: format!("unclean_shutdown_test_{participant}"),
476 journal_write_buffer: NZUsize!(4096),
477 journal_replay_buffer: NZUsize!(4096),
478 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
479 journal_compression: Some(3),
480 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
481 },
482 );
483
484 let (sender, receiver) = registrations.remove(participant).unwrap();
485 engine.start((sender, receiver));
486 }
487
488 let completion =
490 context
491 .with_label("completion_watcher")
492 .spawn(move |context| async move {
493 loop {
494 if let Some(tip_index) =
495 reporter_mailbox.get_contiguous_tip().await
496 {
497 if tip_index >= target_index {
498 break;
499 }
500 }
501 context.sleep(Duration::from_millis(50)).await;
502 }
503 });
504
505 let shutdown_wait = context.gen_range(shutdown_range_min..shutdown_range_max);
507 select! {
508 _ = context.sleep(shutdown_wait) => {
509 debug!(shutdown_wait = ?shutdown_wait, "Simulating unclean shutdown");
510 false },
512 _ = completion => {
513 debug!("Shared reporter completed normally");
514 true },
516 }
517 }
518 };
519
520 let (complete, checkpoint) = prev_checkpoint
521 .map_or_else(
522 || {
523 debug!("Starting initial run");
524 deterministic::Runner::timed(Duration::from_secs(45))
525 },
526 |prev_checkpoint| {
527 debug!(shutdown_count, "Restarting from previous context");
528 deterministic::Runner::from(prev_checkpoint)
529 },
530 )
531 .start_and_recover(f);
532
533 if complete && shutdown_count >= min_shutdowns {
534 debug!("Test completed successfully");
535 break;
536 }
537
538 prev_checkpoint = Some(checkpoint);
539 shutdown_count += 1;
540 }
541 }
542
543 #[test_traced("INFO")]
544 fn test_unclean_byzantine_shutdown() {
545 unclean_byzantine_shutdown(bls12381_threshold::fixture::<MinPk, _>);
546 unclean_byzantine_shutdown(bls12381_threshold::fixture::<MinSig, _>);
547 unclean_byzantine_shutdown(bls12381_multisig::fixture::<MinPk, _>);
548 unclean_byzantine_shutdown(bls12381_multisig::fixture::<MinSig, _>);
549 unclean_byzantine_shutdown(ed25519::fixture);
550 }
551
552 fn unclean_shutdown_with_unsigned_index<S, F>(fixture: F)
553 where
554 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
555 F: Fn(&mut StdRng, u32) -> Fixture<S>,
556 {
557 let num_validators = 4;
559 let skip_index = 50; let window = 10;
561 let target_index = 100;
562 let namespace = b"my testing namespace";
563
564 let mut rng = StdRng::seed_from_u64(0);
566 let fixture = fixture(&mut rng, num_validators);
567
568 let f = |context: Context| {
570 let fixture = fixture.clone();
571 async move {
572 let epoch = Epoch::new(111);
573
574 let (oracle, mut registrations) = initialize_simulation(
576 context.with_label("simulation"),
577 &fixture,
578 RELIABLE_LINK,
579 )
580 .await;
581
582 let (reporter, mut reporter_mailbox) =
584 mocks::Reporter::new(context.clone(), namespace, fixture.verifier.clone());
585 context.with_label("reporter").spawn(|_| reporter.run());
586
587 for (idx, participant) in fixture.participants.iter().enumerate() {
589 let validator_context =
590 context.with_label(&format!("participant_{participant}"));
591
592 let provider = mocks::Provider::new();
594 assert!(provider.register(epoch, fixture.schemes[idx].clone()));
595
596 let monitor = mocks::Monitor::new(epoch);
598
599 let automaton =
601 mocks::Application::new(mocks::Strategy::Skip { index: skip_index });
602
603 let blocker = oracle.control(participant.clone());
605
606 let engine = Engine::new(
608 validator_context.with_label("engine"),
609 Config {
610 monitor,
611 provider,
612 automaton,
613 reporter: reporter_mailbox.clone(),
614 blocker,
615 namespace: namespace.to_vec(),
616 priority_acks: false,
617 rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
618 100,
619 )),
620 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
621 window: std::num::NonZeroU64::new(window).unwrap(),
622 activity_timeout: 100,
623 journal_partition: format!("unsigned_index_test_{participant}"),
624 journal_write_buffer: NZUsize!(4096),
625 journal_replay_buffer: NZUsize!(4096),
626 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
627 journal_compression: Some(3),
628 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
629 },
630 );
631
632 let (sender, receiver) = registrations.remove(participant).unwrap();
633 engine.start((sender, receiver));
634 }
635
636 loop {
638 if let Some((tip_index, _)) = reporter_mailbox.get_tip().await {
639 debug!(tip_index, skip_index, target_index, "reporter status");
640 if tip_index >= skip_index + window - 1 {
641 return;
643 }
644 }
645 context.sleep(Duration::from_millis(50)).await;
646 }
647 }
648 };
649
650 let (_, checkpoint) =
651 deterministic::Runner::timed(Duration::from_secs(60)).start_and_recover(f);
652
653 let f2 = |context: Context| {
655 async move {
656 let epoch = Epoch::new(111);
657
658 let (oracle, mut registrations) = initialize_simulation(
660 context.with_label("simulation"),
661 &fixture,
662 RELIABLE_LINK,
663 )
664 .await;
665
666 let (reporter, mut reporter_mailbox) =
668 mocks::Reporter::new(context.clone(), namespace, fixture.verifier.clone());
669 context.with_label("reporter").spawn(|_| reporter.run());
670
671 for (idx, participant) in fixture.participants.iter().enumerate() {
673 let validator_context =
674 context.with_label(&format!("participant_{participant}"));
675
676 let provider = mocks::Provider::new();
678 assert!(provider.register(epoch, fixture.schemes[idx].clone()));
679
680 let monitor = mocks::Monitor::new(epoch);
682
683 let automaton = mocks::Application::new(mocks::Strategy::Correct);
685
686 let blocker = oracle.control(participant.clone());
688
689 let engine = Engine::new(
691 validator_context.with_label("engine"),
692 Config {
693 monitor,
694 provider,
695 automaton,
696 reporter: reporter_mailbox.clone(),
697 blocker,
698 namespace: namespace.to_vec(),
699 priority_acks: false,
700 rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
701 100,
702 )),
703 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
704 window: std::num::NonZeroU64::new(10).unwrap(),
705 activity_timeout: 100,
706 journal_partition: format!("unsigned_index_test_{participant}"),
707 journal_write_buffer: NZUsize!(4096),
708 journal_replay_buffer: NZUsize!(4096),
709 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
710 journal_compression: Some(3),
711 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
712 },
713 );
714
715 let (sender, receiver) = registrations.remove(participant).unwrap();
716 engine.start((sender, receiver));
717 }
718
719 loop {
721 if let Some(tip_index) = reporter_mailbox.get_contiguous_tip().await {
722 debug!(
723 tip_index,
724 skip_index, target_index, "reporter status on restart"
725 );
726 if tip_index >= target_index {
727 break;
728 }
729 }
730 context.sleep(Duration::from_millis(50)).await;
731 }
732 }
733 };
734
735 deterministic::Runner::from(checkpoint).start(f2);
736 }
737
738 #[test_traced("INFO")]
739 fn test_unclean_shutdown_with_unsigned_index() {
740 unclean_shutdown_with_unsigned_index(bls12381_threshold::fixture::<MinPk, _>);
741 unclean_shutdown_with_unsigned_index(bls12381_threshold::fixture::<MinSig, _>);
742 unclean_shutdown_with_unsigned_index(bls12381_multisig::fixture::<MinPk, _>);
743 unclean_shutdown_with_unsigned_index(bls12381_multisig::fixture::<MinSig, _>);
744 unclean_shutdown_with_unsigned_index(ed25519::fixture);
745 }
746
747 fn slow_and_lossy_links<S, F>(fixture: F, seed: u64) -> String
748 where
749 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
750 F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
751 {
752 let cfg = deterministic::Config::new()
753 .with_seed(seed)
754 .with_timeout(Some(Duration::from_secs(120)));
755 let runner = deterministic::Runner::new(cfg);
756
757 runner.start(|mut context| async move {
758 let num_validators = 4;
759 let fixture = fixture(&mut context, num_validators);
760 let namespace = b"my testing namespace";
761 let epoch = Epoch::new(111);
762
763 let degraded_link = Link {
765 latency: Duration::from_millis(200),
766 jitter: Duration::from_millis(150),
767 success_rate: 0.5,
768 };
769
770 let (mut oracle, mut registrations) =
771 initialize_simulation(context.with_label("simulation"), &fixture, degraded_link)
772 .await;
773
774 let reporters = spawn_validator_engines(
775 context.with_label("validator"),
776 &fixture,
777 &mut registrations,
778 &mut oracle,
779 namespace,
780 epoch,
781 Duration::from_secs(2),
782 vec![],
783 );
784
785 await_reporters(context.with_label("reporter"), &reporters, 100, epoch).await;
786
787 context.auditor().state()
788 })
789 }
790
791 #[test_traced("INFO")]
792 fn test_slow_and_lossy_links() {
793 slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, 0);
794 slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, 0);
795 slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, 0);
796 slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, 0);
797 slow_and_lossy_links(ed25519::fixture, 0);
798 }
799
800 #[test_group("slow")]
801 #[test_traced("INFO")]
802 fn test_determinism() {
803 for seed in 1..6 {
806 let ts_pk_state_1 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
808 let ts_pk_state_2 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
809 assert_eq!(ts_pk_state_1, ts_pk_state_2);
810
811 let ts_sig_state_1 =
813 slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
814 let ts_sig_state_2 =
815 slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
816 assert_eq!(ts_sig_state_1, ts_sig_state_2);
817
818 let ms_pk_state_1 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
820 let ms_pk_state_2 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
821 assert_eq!(ms_pk_state_1, ms_pk_state_2);
822
823 let ms_sig_state_1 =
825 slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
826 let ms_sig_state_2 =
827 slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
828 assert_eq!(ms_sig_state_1, ms_sig_state_2);
829
830 let ed_state_1 = slow_and_lossy_links(ed25519::fixture, seed);
832 let ed_state_2 = slow_and_lossy_links(ed25519::fixture, seed);
833 assert_eq!(ed_state_1, ed_state_2);
834
835 let states = [
836 ("threshold-minpk", ts_pk_state_1),
837 ("threshold-minsig", ts_sig_state_1),
838 ("multisig-minpk", ms_pk_state_1),
839 ("multisig-minsig", ms_sig_state_1),
840 ("ed25519", ed_state_1),
841 ];
842
843 for pair in states.windows(2) {
845 assert_ne!(
846 pair[0].1, pair[1].1,
847 "state {} equals state {}",
848 pair[0].0, pair[1].0
849 );
850 }
851 }
852 }
853
854 fn one_offline<S, F>(fixture: F)
855 where
856 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
857 F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
858 {
859 let runner = deterministic::Runner::timed(Duration::from_secs(30));
860
861 runner.start(|mut context| async move {
862 let num_validators = 5;
863 let mut fixture = fixture(&mut context, num_validators);
864 let namespace = b"my testing namespace";
865 let epoch = Epoch::new(111);
866
867 fixture.participants.truncate(4);
869 fixture.schemes.truncate(4);
870
871 let (mut oracle, mut registrations) =
872 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
873 .await;
874
875 let reporters = spawn_validator_engines(
876 context.with_label("validator"),
877 &fixture,
878 &mut registrations,
879 &mut oracle,
880 namespace,
881 epoch,
882 Duration::from_secs(5),
883 vec![],
884 );
885
886 await_reporters(context.with_label("reporter"), &reporters, 100, epoch).await;
887 });
888 }
889
890 #[test_traced("INFO")]
891 fn test_one_offline() {
892 one_offline(bls12381_threshold::fixture::<MinPk, _>);
893 one_offline(bls12381_threshold::fixture::<MinSig, _>);
894 one_offline(bls12381_multisig::fixture::<MinPk, _>);
895 one_offline(bls12381_multisig::fixture::<MinSig, _>);
896 one_offline(ed25519::fixture);
897 }
898
899 fn network_partition<S, F>(fixture: F)
901 where
902 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
903 F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
904 {
905 let runner = deterministic::Runner::timed(Duration::from_secs(60));
906
907 runner.start(|mut context| async move {
908 let num_validators = 4;
909 let fixture = fixture(&mut context, num_validators);
910 let namespace = b"my testing namespace";
911 let epoch = Epoch::new(111);
912
913 let (mut oracle, mut registrations) =
914 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
915 .await;
916
917 let reporters = spawn_validator_engines(
918 context.with_label("validator"),
919 &fixture,
920 &mut registrations,
921 &mut oracle,
922 namespace,
923 epoch,
924 Duration::from_secs(5),
925 vec![],
926 );
927
928 for v1 in fixture.participants.iter() {
930 for v2 in fixture.participants.iter() {
931 if v2 == v1 {
932 continue;
933 }
934 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
935 }
936 }
937 context.sleep(Duration::from_secs(20)).await;
938
939 for v1 in fixture.participants.iter() {
941 for v2 in fixture.participants.iter() {
942 if v2 == v1 {
943 continue;
944 }
945 oracle
946 .add_link(v1.clone(), v2.clone(), RELIABLE_LINK)
947 .await
948 .unwrap();
949 }
950 }
951
952 await_reporters(context.with_label("reporter"), &reporters, 100, epoch).await;
953 });
954 }
955
956 #[test_traced("INFO")]
957 fn test_network_partition() {
958 network_partition(bls12381_threshold::fixture::<MinPk, _>);
959 network_partition(bls12381_threshold::fixture::<MinSig, _>);
960 network_partition(bls12381_multisig::fixture::<MinPk, _>);
961 network_partition(bls12381_multisig::fixture::<MinSig, _>);
962 network_partition(ed25519::fixture);
963 }
964
965 fn insufficient_validators<S, F>(fixture: F)
967 where
968 S: Scheme<Sha256Digest, PublicKey = PublicKey>,
969 F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
970 {
971 let runner = deterministic::Runner::timed(Duration::from_secs(15));
972
973 runner.start(|mut context| async move {
974 let num_validators = 5;
975 let fixture = fixture(&mut context, num_validators);
976 let namespace = b"my testing namespace";
977 let epoch = Epoch::new(111);
978
979 let (oracle, mut registrations) =
981 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
982 .await;
983
984 let mut reporters =
986 BTreeMap::<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>>::new();
987
988 for (idx, participant) in fixture.participants.iter().take(2).enumerate() {
990 let context = context.with_label(&format!("participant_{participant}"));
991
992 let provider = mocks::Provider::new();
994 assert!(provider.register(epoch, fixture.schemes[idx].clone()));
995
996 let monitor = mocks::Monitor::new(epoch);
998
999 let automaton = mocks::Application::new(mocks::Strategy::Correct);
1001
1002 let (reporter, reporter_mailbox) =
1004 mocks::Reporter::new(context.clone(), namespace, fixture.verifier.clone());
1005 context.with_label("reporter").spawn(|_| reporter.run());
1006 reporters.insert(participant.clone(), reporter_mailbox.clone());
1007
1008 let blocker = oracle.control(participant.clone());
1010
1011 let engine = Engine::new(
1013 context.with_label("engine"),
1014 Config {
1015 monitor,
1016 provider,
1017 automaton,
1018 reporter: reporter_mailbox,
1019 blocker,
1020 namespace: namespace.to_vec(),
1021 priority_acks: false,
1022 rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_secs(3)),
1023 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
1024 window: std::num::NonZeroU64::new(10).unwrap(),
1025 activity_timeout: 100,
1026 journal_partition: format!("aggregation-{participant}"),
1027 journal_write_buffer: NZUsize!(4096),
1028 journal_replay_buffer: NZUsize!(4096),
1029 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
1030 journal_compression: Some(3),
1031 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1032 },
1033 );
1034
1035 let (sender, receiver) = registrations.remove(participant).unwrap();
1036 engine.start((sender, receiver));
1037 }
1038
1039 context.sleep(Duration::from_secs(12)).await;
1042
1043 let mut any_consensus = false;
1045 for (validator_pk, mut reporter_mailbox) in reporters {
1046 let (tip, _) = reporter_mailbox
1047 .get_tip()
1048 .await
1049 .unwrap_or((0, Epoch::zero()));
1050 if tip > 0 {
1051 any_consensus = true;
1052 tracing::warn!(
1053 ?validator_pk,
1054 tip,
1055 "Unexpected consensus with insufficient validators"
1056 );
1057 }
1058 }
1059
1060 assert!(
1062 !any_consensus,
1063 "Consensus should not be achieved with insufficient validator participation (below quorum)"
1064 );
1065 });
1066 }
1067
1068 #[test_traced("INFO")]
1069 fn test_insufficient_validators() {
1070 insufficient_validators(bls12381_threshold::fixture::<MinPk, _>);
1071 insufficient_validators(bls12381_threshold::fixture::<MinSig, _>);
1072 insufficient_validators(bls12381_multisig::fixture::<MinPk, _>);
1073 insufficient_validators(bls12381_multisig::fixture::<MinSig, _>);
1074 insufficient_validators(ed25519::fixture);
1075 }
1076}