1pub mod types;
53
54cfg_if::cfg_if! {
55 if #[cfg(not(target_arch = "wasm32"))] {
56 mod config;
57 pub use config::Config;
58 mod engine;
59 pub use engine::Engine;
60 mod metrics;
61 mod safe_tip;
62
63 #[cfg(test)]
64 pub mod mocks;
65 }
66}
67
68#[cfg(test)]
69mod tests {
70 use super::{mocks, types::Epoch, Config, Engine};
71 use crate::aggregation::mocks::Strategy;
72 use commonware_cryptography::{
73 bls12381::{
74 dkg::ops,
75 primitives::{
76 group::Share,
77 poly,
78 variant::{MinPk, MinSig, Variant},
79 },
80 },
81 ed25519::{PrivateKey, PublicKey},
82 sha256::Digest as Sha256Digest,
83 PrivateKeyExt as _, Signer as _,
84 };
85 use commonware_macros::{select, test_traced};
86 use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
87 use commonware_runtime::{
88 buffer::PoolRef,
89 deterministic::{self, Context},
90 Clock, Metrics, Runner, Spawner,
91 };
92 use commonware_utils::{NZUsize, NonZeroDuration};
93 use futures::{channel::oneshot, future::join_all};
94 use rand::{rngs::StdRng, Rng, SeedableRng};
95 use std::{
96 collections::{BTreeMap, HashMap},
97 num::NonZeroUsize,
98 sync::{Arc, Mutex},
99 time::Duration,
100 };
101 use tracing::debug;
102
103 type Registrations<P> = BTreeMap<P, (Sender<P>, Receiver<P>)>;
104
105 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
106 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
107
108 const RELIABLE_LINK: Link = Link {
110 latency: Duration::from_millis(10),
111 jitter: Duration::from_millis(1),
112 success_rate: 1.0,
113 };
114
115 async fn register_participants(
117 oracle: &mut Oracle<PublicKey>,
118 participants: &[PublicKey],
119 ) -> Registrations<PublicKey> {
120 let mut registrations = BTreeMap::new();
121 for participant in participants.iter() {
122 let (sender, receiver) = oracle.register(participant.clone(), 0).await.unwrap();
123 registrations.insert(participant.clone(), (sender, receiver));
124 }
125 registrations
126 }
127
128 async fn link_participants(
130 oracle: &mut Oracle<PublicKey>,
131 participants: &[PublicKey],
132 link: Link,
133 ) {
134 for v1 in participants.iter() {
135 for v2 in participants.iter() {
136 if v2 == v1 {
137 continue;
138 }
139 oracle
140 .add_link(v1.clone(), v2.clone(), link.clone())
141 .await
142 .unwrap();
143 }
144 }
145 }
146
147 async fn initialize_simulation(
149 context: Context,
150 num_validators: u32,
151 shares_vec: &mut [Share],
152 link: Link,
153 ) -> (
154 Oracle<PublicKey>,
155 Vec<(PublicKey, PrivateKey, Share)>,
156 Vec<PublicKey>,
157 Registrations<PublicKey>,
158 ) {
159 let (network, mut oracle) = Network::new(
160 context.with_label("network"),
161 commonware_p2p::simulated::Config {
162 max_size: 1024 * 1024,
163 },
164 );
165 network.start();
166
167 let mut schemes = (0..num_validators)
168 .map(|i| PrivateKey::from_seed(i as u64))
169 .collect::<Vec<_>>();
170 schemes.sort_by_key(|s| s.public_key());
171 let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
172 .iter()
173 .enumerate()
174 .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
175 .collect();
176 let pks = validators
177 .iter()
178 .map(|(pk, _, _)| pk.clone())
179 .collect::<Vec<_>>();
180
181 let registrations = register_participants(&mut oracle, &pks).await;
182 link_participants(&mut oracle, &pks, link).await;
183 (oracle, validators, pks, registrations)
184 }
185
186 #[allow(clippy::too_many_arguments)]
188 fn spawn_validator_engines<V: Variant>(
189 context: Context,
190 polynomial: poly::Public<V>,
191 validator_pks: &[PublicKey],
192 validators: &[(PublicKey, PrivateKey, Share)],
193 registrations: &mut Registrations<PublicKey>,
194 automatons: &mut BTreeMap<PublicKey, mocks::Application>,
195 reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>,
196 oracle: &mut Oracle<PublicKey>,
197 rebroadcast_timeout: Duration,
198 incorrect: Vec<usize>,
199 ) -> HashMap<PublicKey, mocks::Monitor> {
200 let mut monitors = HashMap::new();
201 let namespace = b"my testing namespace";
202
203 for (i, (validator, _, share)) in validators.iter().enumerate() {
204 let context = context.with_label(&validator.to_string());
205 let monitor = mocks::Monitor::new(111);
206 monitors.insert(validator.clone(), monitor.clone());
207 let supervisor = {
208 let identity = *poly::public::<V>(&polynomial);
209 let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
210 s.add_epoch(
211 111,
212 share.clone(),
213 polynomial.clone(),
214 validator_pks.to_vec(),
215 );
216 s
217 };
218
219 let blocker = oracle.control(validator.clone());
220
221 let automaton = mocks::Application::new(if incorrect.contains(&i) {
222 Strategy::Incorrect
223 } else {
224 Strategy::Correct
225 });
226 automatons.insert(validator.clone(), automaton.clone());
227
228 let (reporter, reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
229 namespace,
230 validator_pks.len() as u32,
231 polynomial.clone(),
232 );
233 context.with_label("reporter").spawn(|_| reporter.run());
234 reporters.insert(validator.clone(), reporter_mailbox);
235
236 let engine = Engine::new(
237 context.with_label("engine"),
238 Config {
239 monitor,
240 validators: supervisor,
241 automaton: automaton.clone(),
242 reporter: reporters.get(validator).unwrap().clone(),
243 blocker,
244 namespace: namespace.to_vec(),
245 priority_acks: false,
246 rebroadcast_timeout: NonZeroDuration::new_panic(rebroadcast_timeout),
247 epoch_bounds: (1, 1),
248 window: std::num::NonZeroU64::new(10).unwrap(),
249 activity_timeout: 100,
250 journal_partition: format!("aggregation/{validator}"),
251 journal_write_buffer: NZUsize!(4096),
252 journal_replay_buffer: NZUsize!(4096),
253 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
254 journal_compression: Some(3),
255 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
256 },
257 );
258
259 let (sender, receiver) = registrations.remove(validator).unwrap();
260 engine.start((sender, receiver));
261 }
262 monitors
263 }
264
265 async fn await_reporters<V: Variant>(
267 context: Context,
268 reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>,
269 threshold_index: u64,
270 threshold_epoch: Epoch,
271 ) {
272 let mut receivers = Vec::new();
273 for (reporter, mailbox) in reporters.iter() {
274 let (tx, rx) = oneshot::channel();
276 receivers.push(rx);
277
278 context.with_label("reporter_watcher").spawn({
279 let reporter = reporter.clone();
280 let mut mailbox = mailbox.clone();
281 move |context| async move {
282 loop {
283 let (index, epoch) = mailbox.get_tip().await.unwrap_or((0, 0));
284 debug!(
285 index,
286 epoch,
287 threshold_index,
288 threshold_epoch,
289 ?reporter,
290 "reporter status"
291 );
292 if index >= threshold_index && epoch >= threshold_epoch {
293 debug!(
294 ?reporter,
295 "reporter reached threshold, signaling completion"
296 );
297 let _ = tx.send(reporter.clone());
298 break;
299 }
300 context.sleep(Duration::from_millis(100)).await;
301 }
302 }
303 });
304 }
305
306 let results = join_all(receivers).await;
308 assert_eq!(results.len(), reporters.len());
309
310 for result in results {
312 assert!(result.is_ok(), "reporter was cancelled");
313 }
314 }
315
316 fn all_online<V: Variant>() {
318 let num_validators: u32 = 4;
319 let quorum: u32 = 3;
320 let runner = deterministic::Runner::timed(Duration::from_secs(30));
321
322 runner.start(|mut context| async move {
323 let (polynomial, mut shares_vec) =
324 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
325 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
326
327 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
328 context.with_label("simulation"),
329 num_validators,
330 &mut shares_vec,
331 RELIABLE_LINK,
332 )
333 .await;
334 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
335 let mut reporters =
336 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
337 spawn_validator_engines::<V>(
338 context.with_label("validator"),
339 polynomial.clone(),
340 &pks,
341 &validators,
342 &mut registrations,
343 &mut automatons.lock().unwrap(),
344 &mut reporters,
345 &mut oracle,
346 Duration::from_secs(5),
347 vec![],
348 );
349 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
350 });
351 }
352
353 #[test_traced("INFO")]
354 fn test_all_online() {
355 all_online::<MinPk>();
356 all_online::<MinSig>();
357 }
358
359 fn byzantine_proposer<V: Variant>() {
361 let num_validators: u32 = 4;
362 let quorum: u32 = 3;
363 let runner = deterministic::Runner::timed(Duration::from_secs(30));
364
365 runner.start(|mut context| async move {
366 let (polynomial, mut shares_vec) =
367 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
368 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
369
370 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
371 context.with_label("simulation"),
372 num_validators,
373 &mut shares_vec,
374 RELIABLE_LINK,
375 )
376 .await;
377 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
378 let mut reporters =
379 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
380
381 spawn_validator_engines::<V>(
382 context.with_label("validator"),
383 polynomial.clone(),
384 &pks,
385 &validators,
386 &mut registrations,
387 &mut automatons.lock().unwrap(),
388 &mut reporters,
389 &mut oracle,
390 Duration::from_secs(5),
391 vec![0],
392 );
393
394 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
395 });
396 }
397
398 #[test_traced("INFO")]
399 fn test_byzantine_proposer() {
400 byzantine_proposer::<MinPk>();
401 byzantine_proposer::<MinSig>();
402 }
403
404 fn unclean_byzantine_shutdown<V: Variant>() {
405 let num_validators: u32 = 4;
407 let quorum: u32 = 3;
408 let target_index = 200; let min_shutdowns = 4; let max_shutdowns = 10; let shutdown_range_min = Duration::from_millis(100);
412 let shutdown_range_max = Duration::from_millis(1_000);
413
414 let rebroadcast_timeout = NonZeroDuration::new_panic(Duration::from_millis(20));
416
417 let mut prev_ctx = None;
418 let all_validators = Arc::new(Mutex::new(Vec::new()));
419
420 let mut rng = StdRng::seed_from_u64(0);
422 let (polynomial, mut shares_vec) =
423 ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum);
424 let identity = *poly::public::<V>(&polynomial);
425 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
426
427 let mut shutdown_count = 0;
429 while shutdown_count < max_shutdowns {
430 let all_validators = all_validators.clone();
431 let mut shares_vec = shares_vec.clone();
432 let polynomial = polynomial.clone();
433 let f = move |mut context: Context| {
434 async move {
435 let (oracle, validators, pks, mut registrations) = initialize_simulation(
436 context.with_label("simulation"),
437 num_validators,
438 &mut shares_vec,
439 RELIABLE_LINK,
440 )
441 .await;
442 if all_validators.lock().unwrap().is_empty() {
444 let mut pks_lock = all_validators.lock().unwrap();
445 *pks_lock = pks.clone();
446 }
447 let automatons =
448 Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
449
450 let mut engine_monitors = HashMap::new();
452 let namespace = b"my testing namespace";
453
454 let (reporter, mut reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
458 namespace,
459 num_validators,
460 polynomial.clone(),
461 );
462 context.with_label("reporter").spawn(|_| reporter.run());
463
464 for (i, (validator, _, share)) in validators.iter().enumerate() {
466 let validator_context = context.with_label(&validator.to_string());
467 let monitor = mocks::Monitor::new(111);
468 engine_monitors.insert(validator.clone(), monitor.clone());
469 let supervisor = {
470 let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
471 s.add_epoch(111, share.clone(), polynomial.clone(), pks.to_vec());
472 s
473 };
474
475 let blocker = oracle.control(validator.clone());
476 let automaton = mocks::Application::new(if i == 0 {
477 Strategy::Incorrect
478 } else {
479 Strategy::Correct
480 });
481 automatons
482 .lock()
483 .unwrap()
484 .insert(validator.clone(), automaton.clone());
485
486 let engine = Engine::new(
487 validator_context.with_label("engine"),
488 Config {
489 monitor,
490 validators: supervisor,
491 automaton,
492 reporter: reporter_mailbox.clone(),
493 blocker,
494 namespace: namespace.to_vec(),
495 priority_acks: false,
496 rebroadcast_timeout,
497 epoch_bounds: (1, 1),
498 window: std::num::NonZeroU64::new(10).unwrap(),
499 activity_timeout: 1_024, journal_partition: format!("unclean_shutdown_test/{validator}"),
501 journal_write_buffer: NZUsize!(4096),
502 journal_replay_buffer: NZUsize!(4096),
503 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
504 journal_compression: Some(3),
505 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
506 },
507 );
508
509 let (sender, receiver) = registrations.remove(validator).unwrap();
510 engine.start((sender, receiver));
511 }
512
513 let completion =
515 context
516 .with_label("completion_watcher")
517 .spawn(move |context| async move {
518 loop {
519 if let Some(tip_index) =
520 reporter_mailbox.get_contiguous_tip().await
521 {
522 if tip_index >= target_index {
523 break;
524 }
525 }
526 context.sleep(Duration::from_millis(50)).await;
527 }
528 });
529
530 let shutdown_wait = context.gen_range(shutdown_range_min..shutdown_range_max);
532 select! {
533 _ = context.sleep(shutdown_wait) => {
534 debug!(shutdown_wait = ?shutdown_wait, "Simulating unclean shutdown");
535 (false, context) },
537 _ = completion => {
538 debug!("Shared reporter completed normally");
539 (true, context) },
541 }
542 }
543 };
544
545 let (complete, context) = if let Some(prev_ctx) = prev_ctx {
546 debug!(shutdown_count, "Restarting from previous context");
547 deterministic::Runner::from(prev_ctx)
548 } else {
549 debug!("Starting initial run");
550 deterministic::Runner::timed(Duration::from_secs(45))
551 }
552 .start(f);
553 if complete && shutdown_count >= min_shutdowns {
554 debug!("Test completed successfully");
555 break;
556 }
557
558 prev_ctx = Some(context.recover());
559 shutdown_count += 1;
560 }
561 }
562
563 #[test_traced("INFO")]
564 fn test_unclean_byzantine_shutdown() {
565 unclean_byzantine_shutdown::<MinPk>();
566 unclean_byzantine_shutdown::<MinSig>();
567 }
568
569 fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
570 let num_validators: u32 = 4;
571 let quorum: u32 = 3;
572 let cfg = deterministic::Config::new()
573 .with_seed(seed)
574 .with_timeout(Some(Duration::from_secs(120)));
575 let runner = deterministic::Runner::new(cfg);
576
577 runner.start(|mut context| async move {
578 let (polynomial, mut shares_vec) =
579 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
580 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
581
582 let degraded_link = Link {
584 latency: Duration::from_millis(200),
585 jitter: Duration::from_millis(150),
586 success_rate: 0.5,
587 };
588
589 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
590 context.with_label("simulation"),
591 num_validators,
592 &mut shares_vec,
593 degraded_link,
594 )
595 .await;
596 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
597 let mut reporters =
598 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
599
600 spawn_validator_engines::<V>(
601 context.with_label("validator"),
602 polynomial.clone(),
603 &pks,
604 &validators,
605 &mut registrations,
606 &mut automatons.lock().unwrap(),
607 &mut reporters,
608 &mut oracle,
609 Duration::from_secs(2),
610 vec![],
611 );
612
613 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
614
615 context.auditor().state()
616 })
617 }
618
619 #[test_traced("INFO")]
620 fn test_slow_and_lossy_links() {
621 slow_and_lossy_links::<MinPk>(0);
622 slow_and_lossy_links::<MinSig>(0);
623 }
624
625 #[test_traced("INFO")]
626 fn test_determinism() {
627 for seed in 1..6 {
630 let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
631 let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
632 assert_eq!(pk_state_1, pk_state_2);
633
634 let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
635 let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
636 assert_eq!(sig_state_1, sig_state_2);
637
638 assert_ne!(pk_state_1, sig_state_1);
640 }
641 }
642
643 fn one_offline<V: Variant>() {
644 let num_validators: u32 = 5;
645 let quorum: u32 = 3;
646 let runner = deterministic::Runner::timed(Duration::from_secs(30));
647
648 runner.start(|mut context| async move {
649 let (polynomial, mut shares_vec) =
650 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
651 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
652
653 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
654 context.with_label("simulation"),
655 num_validators,
656 &mut shares_vec,
657 RELIABLE_LINK,
658 )
659 .await;
660 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
661 let mut reporters =
662 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
663
664 let online_validators: Vec<_> = validators.iter().take(4).cloned().collect();
666 let online_pks: Vec<_> = pks.iter().take(4).cloned().collect();
667
668 spawn_validator_engines::<V>(
669 context.with_label("validator"),
670 polynomial.clone(),
671 &online_pks,
672 &online_validators,
673 &mut registrations,
674 &mut automatons.lock().unwrap(),
675 &mut reporters,
676 &mut oracle,
677 Duration::from_secs(5),
678 vec![],
679 );
680 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
681 });
682 }
683
684 #[test_traced("INFO")]
685 fn test_one_offline() {
686 one_offline::<MinPk>();
687 one_offline::<MinSig>();
688 }
689
690 fn network_partition<V: Variant>() {
692 let num_validators: u32 = 4;
693 let quorum: u32 = 3;
694 let runner = deterministic::Runner::timed(Duration::from_secs(60));
695
696 runner.start(|mut context| async move {
697 let (polynomial, mut shares_vec) =
698 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
699 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
700
701 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
702 context.with_label("simulation"),
703 num_validators,
704 &mut shares_vec,
705 RELIABLE_LINK,
706 )
707 .await;
708 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
709 let mut reporters =
710 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
711
712 spawn_validator_engines::<V>(
713 context.with_label("validator"),
714 polynomial.clone(),
715 &pks,
716 &validators,
717 &mut registrations,
718 &mut automatons.lock().unwrap(),
719 &mut reporters,
720 &mut oracle,
721 Duration::from_secs(5),
722 vec![],
723 );
724
725 for v1 in pks.iter() {
726 for v2 in pks.iter() {
727 if v2 == v1 {
728 continue;
729 }
730 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
731 }
732 }
733 context.sleep(Duration::from_secs(20)).await;
734
735 let link = Link {
736 latency: Duration::from_millis(10),
737 jitter: Duration::from_millis(1),
738 success_rate: 1.0,
739 };
740 for v1 in pks.iter() {
741 for v2 in pks.iter() {
742 if v2 == v1 {
743 continue;
744 }
745 oracle
746 .add_link(v1.clone(), v2.clone(), link.clone())
747 .await
748 .unwrap();
749 }
750 }
751
752 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
753 });
754 }
755
756 #[test_traced("INFO")]
757 fn test_network_partition() {
758 network_partition::<MinPk>();
759 network_partition::<MinSig>();
760 }
761
762 fn insufficient_validators<V: Variant>() {
764 let num_validators: u32 = 5;
765 let quorum: u32 = 3;
766 let runner = deterministic::Runner::timed(Duration::from_secs(15));
767
768 runner.start(|mut context| async move {
769 let (polynomial, mut shares_vec) =
770 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
771 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
772 let identity = *poly::public::<V>(&polynomial);
773
774 let (oracle, validators, pks, mut registrations) = initialize_simulation(
775 context.with_label("simulation"),
776 num_validators,
777 &mut shares_vec,
778 RELIABLE_LINK,
779 )
780 .await;
781 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
782 let mut reporters =
783 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
784
785 let namespace = b"my testing namespace";
787 for (validator, _scheme, share) in validators.iter().take(2) {
788 let context = context.with_label(&validator.to_string());
789 let monitor = mocks::Monitor::new(111);
790 let supervisor = {
791 let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
792 s.add_epoch(
793 111,
794 share.clone(),
795 polynomial.clone(),
796 pks.to_vec(),
797 );
798 s
799 };
800
801 let blocker = oracle.control(validator.clone());
802
803 let automaton = mocks::Application::new(Strategy::Correct);
804 automatons.lock().unwrap().insert(validator.clone(), automaton.clone());
805
806 let (reporter, reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
807 namespace,
808 pks.len() as u32,
809 polynomial.clone(),
810 );
811 context.with_label("reporter").spawn(|_| reporter.run());
812 reporters.insert(validator.clone(), reporter_mailbox);
813
814 let engine = Engine::new(
815 context.with_label("engine"),
816 Config {
817 monitor,
818 validators: supervisor,
819 automaton: automaton.clone(),
820 reporter: reporters.get(validator).unwrap().clone(),
821 blocker,
822 namespace: namespace.to_vec(),
823 priority_acks: false,
824 rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_secs(3)),
825 epoch_bounds: (1, 1),
826 window: std::num::NonZeroU64::new(10).unwrap(),
827 activity_timeout: 100,
828 journal_partition: format!("aggregation/{validator}"),
829 journal_write_buffer: NZUsize!(4096),
830 journal_replay_buffer: NZUsize!(4096),
831 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
832 journal_compression: Some(3),
833 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
834 },
835 );
836
837 let (sender, receiver) = registrations.remove(validator).unwrap();
838 engine.start((sender, receiver));
839 }
840
841 context.sleep(Duration::from_secs(12)).await;
844
845 let mut any_consensus = false;
847 for (validator_pk, mut reporter_mailbox) in reporters {
848 let (tip, _) = reporter_mailbox.get_tip().await.unwrap_or((0, 0));
849 if tip > 0 {
850 any_consensus = true;
851 tracing::warn!(
852 ?validator_pk,
853 tip,
854 "Unexpected threshold signature consensus with insufficient validators"
855 );
856 }
857 }
858
859 assert!(
861 !any_consensus,
862 "Consensus should not be achieved with insufficient validator participation (below quorum)"
863 );
864 });
865 }
866
867 #[test_traced("INFO")]
868 fn test_insufficient_validators() {
869 insufficient_validators::<MinPk>();
870 insufficient_validators::<MinSig>();
871 }
872}