1pub mod types;
53
54cfg_if::cfg_if! {
55 if #[cfg(not(target_arch = "wasm32"))] {
56 mod config;
57 pub use config::Config;
58 mod engine;
59 pub use engine::Engine;
60 mod metrics;
61 mod safe_tip;
62
63 #[cfg(test)]
64 pub mod mocks;
65 }
66}
67
68#[cfg(test)]
69mod tests {
70 use super::{mocks, types::Epoch, Config, Engine};
71 use crate::aggregation::mocks::Strategy;
72 use commonware_cryptography::{
73 bls12381::{
74 dkg::ops,
75 primitives::{
76 group::Share,
77 poly,
78 variant::{MinPk, MinSig, Variant},
79 },
80 },
81 ed25519::{PrivateKey, PublicKey},
82 sha256::Digest as Sha256Digest,
83 PrivateKeyExt as _, Signer as _,
84 };
85 use commonware_macros::{select, test_traced};
86 use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
87 use commonware_runtime::{
88 buffer::PoolRef,
89 deterministic::{self, Context},
90 Clock, Metrics, Runner, Spawner,
91 };
92 use commonware_utils::{NZUsize, NonZeroDuration};
93 use futures::{channel::oneshot, future::join_all};
94 use rand::{rngs::StdRng, Rng, SeedableRng};
95 use std::{
96 collections::{BTreeMap, HashMap},
97 num::NonZeroUsize,
98 sync::{Arc, Mutex},
99 time::Duration,
100 };
101 use tracing::debug;
102
103 type Registrations<P> = BTreeMap<P, (Sender<P>, Receiver<P>)>;
104
105 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
106 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
107
108 const RELIABLE_LINK: Link = Link {
110 latency: Duration::from_millis(10),
111 jitter: Duration::from_millis(1),
112 success_rate: 1.0,
113 };
114
115 async fn register_participants(
117 oracle: &mut Oracle<PublicKey>,
118 participants: &[PublicKey],
119 ) -> Registrations<PublicKey> {
120 let mut registrations = BTreeMap::new();
121 for participant in participants.iter() {
122 let (sender, receiver) = oracle.register(participant.clone(), 0).await.unwrap();
123 registrations.insert(participant.clone(), (sender, receiver));
124 }
125 registrations
126 }
127
128 async fn link_participants(
130 oracle: &mut Oracle<PublicKey>,
131 participants: &[PublicKey],
132 link: Link,
133 ) {
134 for v1 in participants.iter() {
135 for v2 in participants.iter() {
136 if v2 == v1 {
137 continue;
138 }
139 oracle
140 .add_link(v1.clone(), v2.clone(), link.clone())
141 .await
142 .unwrap();
143 }
144 }
145 }
146
147 async fn initialize_simulation(
149 context: Context,
150 num_validators: u32,
151 shares_vec: &mut [Share],
152 link: Link,
153 ) -> (
154 Oracle<PublicKey>,
155 Vec<(PublicKey, PrivateKey, Share)>,
156 Vec<PublicKey>,
157 Registrations<PublicKey>,
158 ) {
159 let (network, mut oracle) = Network::new(
160 context.with_label("network"),
161 commonware_p2p::simulated::Config {
162 max_size: 1024 * 1024,
163 },
164 );
165 network.start();
166
167 let mut schemes = (0..num_validators)
168 .map(|i| PrivateKey::from_seed(i as u64))
169 .collect::<Vec<_>>();
170 schemes.sort_by_key(|s| s.public_key());
171 let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
172 .iter()
173 .enumerate()
174 .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
175 .collect();
176 let pks = validators
177 .iter()
178 .map(|(pk, _, _)| pk.clone())
179 .collect::<Vec<_>>();
180
181 let registrations = register_participants(&mut oracle, &pks).await;
182 link_participants(&mut oracle, &pks, link).await;
183 (oracle, validators, pks, registrations)
184 }
185
186 #[allow(clippy::too_many_arguments)]
188 fn spawn_validator_engines<V: Variant>(
189 context: Context,
190 polynomial: poly::Public<V>,
191 validator_pks: &[PublicKey],
192 validators: &[(PublicKey, PrivateKey, Share)],
193 registrations: &mut Registrations<PublicKey>,
194 automatons: &mut BTreeMap<PublicKey, mocks::Application>,
195 reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>,
196 oracle: &mut Oracle<PublicKey>,
197 rebroadcast_timeout: Duration,
198 incorrect: Vec<usize>,
199 ) -> HashMap<PublicKey, mocks::Monitor> {
200 let mut monitors = HashMap::new();
201 let namespace = b"my testing namespace";
202
203 for (i, (validator, _, share)) in validators.iter().enumerate() {
204 let context = context.with_label(&validator.to_string());
205 let monitor = mocks::Monitor::new(111);
206 monitors.insert(validator.clone(), monitor.clone());
207 let supervisor = {
208 let identity = *poly::public::<V>(&polynomial);
209 let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
210 s.add_epoch(
211 111,
212 share.clone(),
213 polynomial.clone(),
214 validator_pks.to_vec(),
215 );
216 s
217 };
218
219 let blocker = oracle.control(validator.clone());
220
221 let automaton = mocks::Application::new(if incorrect.contains(&i) {
222 Strategy::Incorrect
223 } else {
224 Strategy::Correct
225 });
226 automatons.insert(validator.clone(), automaton.clone());
227
228 let (reporter, reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
229 namespace,
230 validator_pks.len() as u32,
231 polynomial.clone(),
232 );
233 context.with_label("reporter").spawn(|_| reporter.run());
234 reporters.insert(validator.clone(), reporter_mailbox);
235
236 let engine = Engine::new(
237 context.with_label("engine"),
238 Config {
239 monitor,
240 validators: supervisor,
241 automaton: automaton.clone(),
242 reporter: reporters.get(validator).unwrap().clone(),
243 blocker,
244 namespace: namespace.to_vec(),
245 priority_acks: false,
246 rebroadcast_timeout: NonZeroDuration::new_panic(rebroadcast_timeout),
247 epoch_bounds: (1, 1),
248 window: std::num::NonZeroU64::new(10).unwrap(),
249 activity_timeout: 100,
250 journal_partition: format!("aggregation/{validator}"),
251 journal_write_buffer: NZUsize!(4096),
252 journal_replay_buffer: NZUsize!(4096),
253 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
254 journal_compression: Some(3),
255 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
256 },
257 );
258
259 let (sender, receiver) = registrations.remove(validator).unwrap();
260 engine.start((sender, receiver));
261 }
262 monitors
263 }
264
265 async fn await_reporters<V: Variant>(
267 context: Context,
268 reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>,
269 threshold_index: u64,
270 threshold_epoch: Epoch,
271 ) {
272 let mut receivers = Vec::new();
273 for (reporter, mailbox) in reporters.iter() {
274 let (tx, rx) = oneshot::channel();
276 receivers.push(rx);
277
278 context.with_label("reporter_watcher").spawn({
279 let reporter = reporter.clone();
280 let mut mailbox = mailbox.clone();
281 move |context| async move {
282 loop {
283 let (index, epoch) = mailbox.get_tip().await.unwrap_or((0, 0));
284 debug!(
285 index,
286 epoch,
287 threshold_index,
288 threshold_epoch,
289 ?reporter,
290 "reporter status"
291 );
292 if index >= threshold_index && epoch >= threshold_epoch {
293 debug!(
294 ?reporter,
295 "reporter reached threshold, signaling completion"
296 );
297 let _ = tx.send(reporter.clone());
298 break;
299 }
300 context.sleep(Duration::from_millis(100)).await;
301 }
302 }
303 });
304 }
305
306 let results = join_all(receivers).await;
308 assert_eq!(results.len(), reporters.len());
309
310 for result in results {
312 assert!(result.is_ok(), "reporter was cancelled");
313 }
314 }
315
316 fn all_online<V: Variant>() {
318 let num_validators: u32 = 4;
319 let quorum: u32 = 3;
320 let runner = deterministic::Runner::timed(Duration::from_secs(30));
321
322 runner.start(|mut context| async move {
323 let (polynomial, mut shares_vec) =
324 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
325 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
326
327 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
328 context.with_label("simulation"),
329 num_validators,
330 &mut shares_vec,
331 RELIABLE_LINK,
332 )
333 .await;
334 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
335 let mut reporters =
336 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
337 spawn_validator_engines::<V>(
338 context.with_label("validator"),
339 polynomial.clone(),
340 &pks,
341 &validators,
342 &mut registrations,
343 &mut automatons.lock().unwrap(),
344 &mut reporters,
345 &mut oracle,
346 Duration::from_secs(5),
347 vec![],
348 );
349 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
350 });
351 }
352
353 #[test_traced("INFO")]
354 fn test_all_online() {
355 all_online::<MinPk>();
356 all_online::<MinSig>();
357 }
358
359 fn byzantine_proposer<V: Variant>() {
361 let num_validators: u32 = 4;
362 let quorum: u32 = 3;
363 let runner = deterministic::Runner::timed(Duration::from_secs(30));
364
365 runner.start(|mut context| async move {
366 let (polynomial, mut shares_vec) =
367 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
368 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
369
370 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
371 context.with_label("simulation"),
372 num_validators,
373 &mut shares_vec,
374 RELIABLE_LINK,
375 )
376 .await;
377 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
378 let mut reporters =
379 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
380
381 spawn_validator_engines::<V>(
382 context.with_label("validator"),
383 polynomial.clone(),
384 &pks,
385 &validators,
386 &mut registrations,
387 &mut automatons.lock().unwrap(),
388 &mut reporters,
389 &mut oracle,
390 Duration::from_secs(5),
391 vec![0],
392 );
393
394 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
395 });
396 }
397
398 #[test_traced("INFO")]
399 fn test_byzantine_proposer() {
400 byzantine_proposer::<MinPk>();
401 byzantine_proposer::<MinSig>();
402 }
403
404 fn unclean_byzantine_shutdown<V: Variant>() {
405 let num_validators: u32 = 4;
407 let quorum: u32 = 3;
408 let target_index = 200; let min_shutdowns = 4; let max_shutdowns = 10; let shutdown_range_min = Duration::from_millis(100);
412 let shutdown_range_max = Duration::from_millis(1_000);
413
414 let rebroadcast_timeout = NonZeroDuration::new_panic(Duration::from_millis(20));
416
417 let mut prev_ctx = None;
418 let all_validators = Arc::new(Mutex::new(Vec::new()));
419
420 let mut rng = StdRng::seed_from_u64(0);
422 let (polynomial, mut shares_vec) =
423 ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum);
424 let identity = *poly::public::<V>(&polynomial);
425 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
426
427 let mut shutdown_count = 0;
429 while shutdown_count < max_shutdowns {
430 let all_validators = all_validators.clone();
431 let mut shares_vec = shares_vec.clone();
432 let polynomial = polynomial.clone();
433 let f = move |mut context: Context| {
434 async move {
435 let (oracle, validators, pks, mut registrations) = initialize_simulation(
436 context.with_label("simulation"),
437 num_validators,
438 &mut shares_vec,
439 RELIABLE_LINK,
440 )
441 .await;
442 if all_validators.lock().unwrap().is_empty() {
444 let mut pks_lock = all_validators.lock().unwrap();
445 *pks_lock = pks.clone();
446 }
447 let automatons =
448 Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
449
450 let mut engine_monitors = HashMap::new();
452 let namespace = b"my testing namespace";
453
454 let (reporter, mut reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
458 namespace,
459 num_validators,
460 polynomial.clone(),
461 );
462 context.with_label("reporter").spawn(|_| reporter.run());
463
464 for (i, (validator, _, share)) in validators.iter().enumerate() {
466 let validator_context = context.with_label(&validator.to_string());
467 let monitor = mocks::Monitor::new(111);
468 engine_monitors.insert(validator.clone(), monitor.clone());
469 let supervisor = {
470 let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
471 s.add_epoch(111, share.clone(), polynomial.clone(), pks.to_vec());
472 s
473 };
474
475 let blocker = oracle.control(validator.clone());
476 let automaton = mocks::Application::new(if i == 0 {
477 Strategy::Incorrect
478 } else {
479 Strategy::Correct
480 });
481 automatons
482 .lock()
483 .unwrap()
484 .insert(validator.clone(), automaton.clone());
485
486 let engine = Engine::new(
487 validator_context.with_label("engine"),
488 Config {
489 monitor,
490 validators: supervisor,
491 automaton,
492 reporter: reporter_mailbox.clone(),
493 blocker,
494 namespace: namespace.to_vec(),
495 priority_acks: false,
496 rebroadcast_timeout,
497 epoch_bounds: (1, 1),
498 window: std::num::NonZeroU64::new(10).unwrap(),
499 activity_timeout: 1_024, journal_partition: format!("unclean_shutdown_test/{validator}"),
501 journal_write_buffer: NZUsize!(4096),
502 journal_replay_buffer: NZUsize!(4096),
503 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
504 journal_compression: Some(3),
505 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
506 },
507 );
508
509 let (sender, receiver) = registrations.remove(validator).unwrap();
510 engine.start((sender, receiver));
511 }
512
513 let completion =
515 context
516 .with_label("completion_watcher")
517 .spawn(move |context| async move {
518 loop {
519 if let Some(tip_index) =
520 reporter_mailbox.get_contiguous_tip().await
521 {
522 if tip_index >= target_index {
523 break;
524 }
525 }
526 context.sleep(Duration::from_millis(50)).await;
527 }
528 });
529
530 let shutdown_wait = context.gen_range(shutdown_range_min..shutdown_range_max);
532 select! {
533 _ = context.sleep(shutdown_wait) => {
534 debug!(shutdown_wait = ?shutdown_wait, "Simulating unclean shutdown");
535 (false, context) },
537 _ = completion => {
538 debug!("Shared reporter completed normally");
539 (true, context) },
541 }
542 }
543 };
544
545 let (complete, context) = if let Some(prev_ctx) = prev_ctx {
546 debug!(shutdown_count, "Restarting from previous context");
547 deterministic::Runner::from(prev_ctx)
548 } else {
549 debug!("Starting initial run");
550 deterministic::Runner::timed(Duration::from_secs(45))
551 }
552 .start(f);
553 if complete && shutdown_count >= min_shutdowns {
554 debug!("Test completed successfully");
555 break;
556 }
557
558 prev_ctx = Some(context.recover());
559 shutdown_count += 1;
560 }
561 }
562
563 #[test_traced("INFO")]
564 fn test_unclean_byzantine_shutdown() {
565 unclean_byzantine_shutdown::<MinPk>();
566 unclean_byzantine_shutdown::<MinSig>();
567 }
568
569 fn unclean_shutdown_with_unsigned_index<V: Variant>() {
570 let num_validators: u32 = 4;
572 let quorum: u32 = 3;
573 let skip_index = 50u64; let window = 10u64;
575 let target_index = 100u64;
576 let namespace = b"my testing namespace";
577
578 let all_validators = Arc::new(Mutex::new(Vec::new()));
580 let mut rng = StdRng::seed_from_u64(0);
581 let (polynomial, mut shares_vec) =
582 ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum);
583 let identity = *poly::public::<V>(&polynomial);
584 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
585
586 let f = |context: Context| {
588 let all_validators_clone = all_validators.clone();
589 let mut shares_vec_clone = shares_vec.clone();
590 let polynomial_clone = polynomial.clone();
591 async move {
592 let (oracle, validators, pks, mut registrations) = initialize_simulation(
593 context.with_label("simulation"),
594 num_validators,
595 &mut shares_vec_clone,
596 RELIABLE_LINK,
597 )
598 .await;
599
600 {
602 let mut pks_lock = all_validators_clone.lock().unwrap();
603 *pks_lock = pks.clone();
604 }
605
606 let (reporter, mut reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
608 namespace,
609 num_validators,
610 polynomial_clone.clone(),
611 );
612 context.with_label("reporter").spawn(|_| reporter.run());
613
614 let mut engine_monitors = HashMap::new();
616 let automatons =
617 Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
618 for (validator, _, share) in validators.iter() {
619 let validator_context = context.with_label(&validator.to_string());
620 let monitor = mocks::Monitor::new(111);
621 engine_monitors.insert(validator.clone(), monitor.clone());
622 let supervisor = {
623 let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
624 s.add_epoch(111, share.clone(), polynomial_clone.clone(), pks.to_vec());
625 s
626 };
627 let blocker = oracle.control(validator.clone());
628
629 let automaton = mocks::Application::new(Strategy::Skip { index: skip_index });
631 automatons
632 .lock()
633 .unwrap()
634 .insert(validator.clone(), automaton.clone());
635
636 let engine = Engine::new(
637 validator_context.with_label("engine"),
638 Config {
639 monitor,
640 validators: supervisor,
641 automaton,
642 reporter: reporter_mailbox.clone(),
643 blocker,
644 namespace: namespace.to_vec(),
645 priority_acks: false,
646 rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
647 100,
648 )),
649 epoch_bounds: (1, 1),
650 window: std::num::NonZeroU64::new(window).unwrap(),
651 activity_timeout: 100,
652 journal_partition: format!("unsigned_index_test/{validator}"),
653 journal_write_buffer: NZUsize!(4096),
654 journal_replay_buffer: NZUsize!(4096),
655 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
656 journal_compression: Some(3),
657 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
658 },
659 );
660
661 let (sender, receiver) = registrations.remove(validator).unwrap();
662 engine.start((sender, receiver));
663 }
664
665 loop {
667 if let Some((tip_index, _)) = reporter_mailbox.get_tip().await {
668 debug!(tip_index, skip_index, target_index, "reporter status");
669 if tip_index >= skip_index + window - 1 {
670 return context;
672 }
673 }
674 context.sleep(Duration::from_millis(50)).await;
675 }
676 }
677 };
678 let context = deterministic::Runner::timed(Duration::from_secs(60)).start(f);
679 let prev_ctx = context.recover();
680
681 let f2 = move |context: Context| {
683 async move {
684 let (oracle, validators, pks, mut registrations) = initialize_simulation(
685 context.with_label("simulation"),
686 num_validators,
687 &mut shares_vec,
688 RELIABLE_LINK,
689 )
690 .await;
691
692 let (reporter, mut reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
694 namespace,
695 num_validators,
696 polynomial.clone(),
697 );
698 context.with_label("reporter").spawn(|_| reporter.run());
699
700 let automatons =
702 Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
703 for (validator, _, share) in validators.iter() {
704 let validator_context = context.with_label(&validator.to_string());
705 let monitor = mocks::Monitor::new(111);
706 let supervisor = {
707 let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
708 s.add_epoch(111, share.clone(), polynomial.clone(), pks.to_vec());
709 s
710 };
711
712 let blocker = oracle.control(validator.clone());
713
714 let automaton = mocks::Application::new(Strategy::Correct);
716 automatons
717 .lock()
718 .unwrap()
719 .insert(validator.clone(), automaton.clone());
720
721 let engine = Engine::new(
722 validator_context.with_label("engine"),
723 Config {
724 monitor,
725 validators: supervisor,
726 automaton,
727 reporter: reporter_mailbox.clone(),
728 blocker,
729 namespace: namespace.to_vec(),
730 priority_acks: false,
731 rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
732 100,
733 )),
734 epoch_bounds: (1, 1),
735 window: std::num::NonZeroU64::new(10).unwrap(),
736 activity_timeout: 100,
737 journal_partition: format!("unsigned_index_test/{validator}"),
738 journal_write_buffer: NZUsize!(4096),
739 journal_replay_buffer: NZUsize!(4096),
740 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
741 journal_compression: Some(3),
742 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
743 },
744 );
745
746 let (sender, receiver) = registrations.remove(validator).unwrap();
747 engine.start((sender, receiver));
748 }
749
750 loop {
752 if let Some(tip_index) = reporter_mailbox.get_contiguous_tip().await {
753 debug!(
754 tip_index,
755 skip_index, target_index, "reporter status on restart",
756 );
757 if tip_index >= target_index {
758 break;
759 }
760 }
761 context.sleep(Duration::from_millis(50)).await;
762 }
763 }
764 };
765 deterministic::Runner::from(prev_ctx).start(f2);
766 }
767
768 #[test_traced("INFO")]
769 fn test_unclean_shutdown_with_unsigned_index() {
770 unclean_shutdown_with_unsigned_index::<MinPk>();
771 unclean_shutdown_with_unsigned_index::<MinSig>();
772 }
773
774 fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
775 let num_validators: u32 = 4;
776 let quorum: u32 = 3;
777 let cfg = deterministic::Config::new()
778 .with_seed(seed)
779 .with_timeout(Some(Duration::from_secs(120)));
780 let runner = deterministic::Runner::new(cfg);
781
782 runner.start(|mut context| async move {
783 let (polynomial, mut shares_vec) =
784 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
785 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
786
787 let degraded_link = Link {
789 latency: Duration::from_millis(200),
790 jitter: Duration::from_millis(150),
791 success_rate: 0.5,
792 };
793
794 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
795 context.with_label("simulation"),
796 num_validators,
797 &mut shares_vec,
798 degraded_link,
799 )
800 .await;
801 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
802 let mut reporters =
803 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
804
805 spawn_validator_engines::<V>(
806 context.with_label("validator"),
807 polynomial.clone(),
808 &pks,
809 &validators,
810 &mut registrations,
811 &mut automatons.lock().unwrap(),
812 &mut reporters,
813 &mut oracle,
814 Duration::from_secs(2),
815 vec![],
816 );
817
818 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
819
820 context.auditor().state()
821 })
822 }
823
824 #[test_traced("INFO")]
825 fn test_slow_and_lossy_links() {
826 slow_and_lossy_links::<MinPk>(0);
827 slow_and_lossy_links::<MinSig>(0);
828 }
829
830 #[test_traced("INFO")]
831 fn test_determinism() {
832 for seed in 1..6 {
835 let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
836 let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
837 assert_eq!(pk_state_1, pk_state_2);
838
839 let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
840 let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
841 assert_eq!(sig_state_1, sig_state_2);
842
843 assert_ne!(pk_state_1, sig_state_1);
845 }
846 }
847
848 fn one_offline<V: Variant>() {
849 let num_validators: u32 = 5;
850 let quorum: u32 = 3;
851 let runner = deterministic::Runner::timed(Duration::from_secs(30));
852
853 runner.start(|mut context| async move {
854 let (polynomial, mut shares_vec) =
855 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
856 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
857
858 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
859 context.with_label("simulation"),
860 num_validators,
861 &mut shares_vec,
862 RELIABLE_LINK,
863 )
864 .await;
865 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
866 let mut reporters =
867 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
868
869 let online_validators: Vec<_> = validators.iter().take(4).cloned().collect();
871 let online_pks: Vec<_> = pks.iter().take(4).cloned().collect();
872
873 spawn_validator_engines::<V>(
874 context.with_label("validator"),
875 polynomial.clone(),
876 &online_pks,
877 &online_validators,
878 &mut registrations,
879 &mut automatons.lock().unwrap(),
880 &mut reporters,
881 &mut oracle,
882 Duration::from_secs(5),
883 vec![],
884 );
885 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
886 });
887 }
888
889 #[test_traced("INFO")]
890 fn test_one_offline() {
891 one_offline::<MinPk>();
892 one_offline::<MinSig>();
893 }
894
895 fn network_partition<V: Variant>() {
897 let num_validators: u32 = 4;
898 let quorum: u32 = 3;
899 let runner = deterministic::Runner::timed(Duration::from_secs(60));
900
901 runner.start(|mut context| async move {
902 let (polynomial, mut shares_vec) =
903 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
904 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
905
906 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
907 context.with_label("simulation"),
908 num_validators,
909 &mut shares_vec,
910 RELIABLE_LINK,
911 )
912 .await;
913 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
914 let mut reporters =
915 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
916
917 spawn_validator_engines::<V>(
918 context.with_label("validator"),
919 polynomial.clone(),
920 &pks,
921 &validators,
922 &mut registrations,
923 &mut automatons.lock().unwrap(),
924 &mut reporters,
925 &mut oracle,
926 Duration::from_secs(5),
927 vec![],
928 );
929
930 for v1 in pks.iter() {
931 for v2 in pks.iter() {
932 if v2 == v1 {
933 continue;
934 }
935 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
936 }
937 }
938 context.sleep(Duration::from_secs(20)).await;
939
940 let link = Link {
941 latency: Duration::from_millis(10),
942 jitter: Duration::from_millis(1),
943 success_rate: 1.0,
944 };
945 for v1 in pks.iter() {
946 for v2 in pks.iter() {
947 if v2 == v1 {
948 continue;
949 }
950 oracle
951 .add_link(v1.clone(), v2.clone(), link.clone())
952 .await
953 .unwrap();
954 }
955 }
956
957 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
958 });
959 }
960
961 #[test_traced("INFO")]
962 fn test_network_partition() {
963 network_partition::<MinPk>();
964 network_partition::<MinSig>();
965 }
966
967 fn insufficient_validators<V: Variant>() {
969 let num_validators: u32 = 5;
970 let quorum: u32 = 3;
971 let runner = deterministic::Runner::timed(Duration::from_secs(15));
972
973 runner.start(|mut context| async move {
974 let (polynomial, mut shares_vec) =
975 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
976 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
977 let identity = *poly::public::<V>(&polynomial);
978
979 let (oracle, validators, pks, mut registrations) = initialize_simulation(
980 context.with_label("simulation"),
981 num_validators,
982 &mut shares_vec,
983 RELIABLE_LINK,
984 )
985 .await;
986 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
987 let mut reporters =
988 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
989
990 let namespace = b"my testing namespace";
992 for (validator, _scheme, share) in validators.iter().take(2) {
993 let context = context.with_label(&validator.to_string());
994 let monitor = mocks::Monitor::new(111);
995 let supervisor = {
996 let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
997 s.add_epoch(
998 111,
999 share.clone(),
1000 polynomial.clone(),
1001 pks.to_vec(),
1002 );
1003 s
1004 };
1005
1006 let blocker = oracle.control(validator.clone());
1007
1008 let automaton = mocks::Application::new(Strategy::Correct);
1009 automatons.lock().unwrap().insert(validator.clone(), automaton.clone());
1010
1011 let (reporter, reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
1012 namespace,
1013 pks.len() as u32,
1014 polynomial.clone(),
1015 );
1016 context.with_label("reporter").spawn(|_| reporter.run());
1017 reporters.insert(validator.clone(), reporter_mailbox);
1018
1019 let engine = Engine::new(
1020 context.with_label("engine"),
1021 Config {
1022 monitor,
1023 validators: supervisor,
1024 automaton: automaton.clone(),
1025 reporter: reporters.get(validator).unwrap().clone(),
1026 blocker,
1027 namespace: namespace.to_vec(),
1028 priority_acks: false,
1029 rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_secs(3)),
1030 epoch_bounds: (1, 1),
1031 window: std::num::NonZeroU64::new(10).unwrap(),
1032 activity_timeout: 100,
1033 journal_partition: format!("aggregation/{validator}"),
1034 journal_write_buffer: NZUsize!(4096),
1035 journal_replay_buffer: NZUsize!(4096),
1036 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
1037 journal_compression: Some(3),
1038 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1039 },
1040 );
1041
1042 let (sender, receiver) = registrations.remove(validator).unwrap();
1043 engine.start((sender, receiver));
1044 }
1045
1046 context.sleep(Duration::from_secs(12)).await;
1049
1050 let mut any_consensus = false;
1052 for (validator_pk, mut reporter_mailbox) in reporters {
1053 let (tip, _) = reporter_mailbox.get_tip().await.unwrap_or((0, 0));
1054 if tip > 0 {
1055 any_consensus = true;
1056 tracing::warn!(
1057 ?validator_pk,
1058 tip,
1059 "Unexpected threshold signature consensus with insufficient validators"
1060 );
1061 }
1062 }
1063
1064 assert!(
1066 !any_consensus,
1067 "Consensus should not be achieved with insufficient validator participation (below quorum)"
1068 );
1069 });
1070 }
1071
1072 #[test_traced("INFO")]
1073 fn test_insufficient_validators() {
1074 insufficient_validators::<MinPk>();
1075 insufficient_validators::<MinSig>();
1076 }
1077}