1pub mod types;
53
54cfg_if::cfg_if! {
55 if #[cfg(not(target_arch = "wasm32"))] {
56 mod config;
57 pub use config::Config;
58 mod engine;
59 pub use engine::Engine;
60 mod metrics;
61 mod safe_tip;
62
63 #[cfg(test)]
64 pub mod mocks;
65 }
66}
67
68#[cfg(test)]
69mod tests {
70 use super::{mocks, Config, Engine};
71 use crate::{aggregation::mocks::Strategy, types::Epoch};
72 use commonware_cryptography::{
73 bls12381::{
74 dkg::ops,
75 primitives::{
76 group::Share,
77 poly,
78 variant::{MinPk, MinSig, Variant},
79 },
80 },
81 ed25519::{PrivateKey, PublicKey},
82 sha256::Digest as Sha256Digest,
83 PrivateKeyExt as _, Signer as _,
84 };
85 use commonware_macros::{select, test_traced};
86 use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
87 use commonware_runtime::{
88 buffer::PoolRef,
89 deterministic::{self, Context},
90 Clock, Metrics, Runner, Spawner,
91 };
92 use commonware_utils::{NZUsize, NonZeroDuration};
93 use futures::{channel::oneshot, future::join_all};
94 use rand::{rngs::StdRng, Rng, SeedableRng};
95 use std::{
96 collections::{BTreeMap, HashMap},
97 num::NonZeroUsize,
98 sync::{Arc, Mutex},
99 time::Duration,
100 };
101 use tracing::debug;
102
103 type Registrations<P> = BTreeMap<P, (Sender<P>, Receiver<P>)>;
104
105 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
106 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
107
108 const RELIABLE_LINK: Link = Link {
110 latency: Duration::from_millis(10),
111 jitter: Duration::from_millis(1),
112 success_rate: 1.0,
113 };
114
115 async fn register_participants(
117 oracle: &mut Oracle<PublicKey>,
118 participants: &[PublicKey],
119 ) -> Registrations<PublicKey> {
120 let mut registrations = BTreeMap::new();
121 for participant in participants.iter() {
122 let (sender, receiver) = oracle
123 .control(participant.clone())
124 .register(0)
125 .await
126 .unwrap();
127 registrations.insert(participant.clone(), (sender, receiver));
128 }
129 registrations
130 }
131
132 async fn link_participants(
134 oracle: &mut Oracle<PublicKey>,
135 participants: &[PublicKey],
136 link: Link,
137 ) {
138 for v1 in participants.iter() {
139 for v2 in participants.iter() {
140 if v2 == v1 {
141 continue;
142 }
143 oracle
144 .add_link(v1.clone(), v2.clone(), link.clone())
145 .await
146 .unwrap();
147 }
148 }
149 }
150
151 async fn initialize_simulation(
153 context: Context,
154 num_validators: u32,
155 shares_vec: &mut [Share],
156 link: Link,
157 ) -> (
158 Oracle<PublicKey>,
159 Vec<(PublicKey, PrivateKey, Share)>,
160 Vec<PublicKey>,
161 Registrations<PublicKey>,
162 ) {
163 let (network, mut oracle) = Network::new(
164 context.with_label("network"),
165 commonware_p2p::simulated::Config {
166 max_size: 1024 * 1024,
167 disconnect_on_block: true,
168 tracked_peer_sets: None,
169 },
170 );
171 network.start();
172
173 let mut schemes = (0..num_validators)
174 .map(|i| PrivateKey::from_seed(i as u64))
175 .collect::<Vec<_>>();
176 schemes.sort_by_key(|s| s.public_key());
177 let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
178 .iter()
179 .enumerate()
180 .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
181 .collect();
182 let pks = validators
183 .iter()
184 .map(|(pk, _, _)| pk.clone())
185 .collect::<Vec<_>>();
186
187 let registrations = register_participants(&mut oracle, &pks).await;
188 link_participants(&mut oracle, &pks, link).await;
189 (oracle, validators, pks, registrations)
190 }
191
192 #[allow(clippy::too_many_arguments)]
194 fn spawn_validator_engines<V: Variant>(
195 context: Context,
196 polynomial: poly::Public<V>,
197 validator_pks: &[PublicKey],
198 validators: &[(PublicKey, PrivateKey, Share)],
199 registrations: &mut Registrations<PublicKey>,
200 automatons: &mut BTreeMap<PublicKey, mocks::Application>,
201 reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>,
202 oracle: &mut Oracle<PublicKey>,
203 rebroadcast_timeout: Duration,
204 incorrect: Vec<usize>,
205 ) -> HashMap<PublicKey, mocks::Monitor> {
206 let mut monitors = HashMap::new();
207 let namespace = b"my testing namespace";
208
209 for (i, (validator, _, share)) in validators.iter().enumerate() {
210 let context = context.with_label(&validator.to_string());
211 let monitor = mocks::Monitor::new(111);
212 monitors.insert(validator.clone(), monitor.clone());
213 let supervisor = {
214 let identity = *poly::public::<V>(&polynomial);
215 let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
216 s.add_epoch(
217 111,
218 share.clone(),
219 polynomial.clone(),
220 validator_pks.to_vec(),
221 );
222 s
223 };
224
225 let blocker = oracle.control(validator.clone());
226
227 let automaton = mocks::Application::new(if incorrect.contains(&i) {
228 Strategy::Incorrect
229 } else {
230 Strategy::Correct
231 });
232 automatons.insert(validator.clone(), automaton.clone());
233
234 let (reporter, reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
235 namespace,
236 validator_pks.len() as u32,
237 polynomial.clone(),
238 );
239 context.with_label("reporter").spawn(|_| reporter.run());
240 reporters.insert(validator.clone(), reporter_mailbox);
241
242 let engine = Engine::new(
243 context.with_label("engine"),
244 Config {
245 monitor,
246 validators: supervisor,
247 automaton: automaton.clone(),
248 reporter: reporters.get(validator).unwrap().clone(),
249 blocker,
250 namespace: namespace.to_vec(),
251 priority_acks: false,
252 rebroadcast_timeout: NonZeroDuration::new_panic(rebroadcast_timeout),
253 epoch_bounds: (1, 1),
254 window: std::num::NonZeroU64::new(10).unwrap(),
255 activity_timeout: 100,
256 journal_partition: format!("aggregation/{validator}"),
257 journal_write_buffer: NZUsize!(4096),
258 journal_replay_buffer: NZUsize!(4096),
259 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
260 journal_compression: Some(3),
261 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
262 },
263 );
264
265 let (sender, receiver) = registrations.remove(validator).unwrap();
266 engine.start((sender, receiver));
267 }
268 monitors
269 }
270
271 async fn await_reporters<V: Variant>(
273 context: Context,
274 reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>,
275 threshold_index: u64,
276 threshold_epoch: Epoch,
277 ) {
278 let mut receivers = Vec::new();
279 for (reporter, mailbox) in reporters.iter() {
280 let (tx, rx) = oneshot::channel();
282 receivers.push(rx);
283
284 context.with_label("reporter_watcher").spawn({
285 let reporter = reporter.clone();
286 let mut mailbox = mailbox.clone();
287 move |context| async move {
288 loop {
289 let (index, epoch) = mailbox.get_tip().await.unwrap_or((0, 0));
290 debug!(
291 index,
292 epoch,
293 threshold_index,
294 threshold_epoch,
295 ?reporter,
296 "reporter status"
297 );
298 if index >= threshold_index && epoch >= threshold_epoch {
299 debug!(
300 ?reporter,
301 "reporter reached threshold, signaling completion"
302 );
303 let _ = tx.send(reporter.clone());
304 break;
305 }
306 context.sleep(Duration::from_millis(100)).await;
307 }
308 }
309 });
310 }
311
312 let results = join_all(receivers).await;
314 assert_eq!(results.len(), reporters.len());
315
316 for result in results {
318 assert!(result.is_ok(), "reporter was cancelled");
319 }
320 }
321
322 fn all_online<V: Variant>() {
324 let num_validators: u32 = 4;
325 let quorum: u32 = 3;
326 let runner = deterministic::Runner::timed(Duration::from_secs(30));
327
328 runner.start(|mut context| async move {
329 let (polynomial, mut shares_vec) =
330 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
331 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
332
333 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
334 context.with_label("simulation"),
335 num_validators,
336 &mut shares_vec,
337 RELIABLE_LINK,
338 )
339 .await;
340 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
341 let mut reporters =
342 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
343 spawn_validator_engines::<V>(
344 context.with_label("validator"),
345 polynomial.clone(),
346 &pks,
347 &validators,
348 &mut registrations,
349 &mut automatons.lock().unwrap(),
350 &mut reporters,
351 &mut oracle,
352 Duration::from_secs(5),
353 vec![],
354 );
355 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
356 });
357 }
358
359 #[test_traced("INFO")]
360 fn test_all_online() {
361 all_online::<MinPk>();
362 all_online::<MinSig>();
363 }
364
365 fn byzantine_proposer<V: Variant>() {
367 let num_validators: u32 = 4;
368 let quorum: u32 = 3;
369 let runner = deterministic::Runner::timed(Duration::from_secs(30));
370
371 runner.start(|mut context| async move {
372 let (polynomial, mut shares_vec) =
373 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
374 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
375
376 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
377 context.with_label("simulation"),
378 num_validators,
379 &mut shares_vec,
380 RELIABLE_LINK,
381 )
382 .await;
383 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
384 let mut reporters =
385 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
386
387 spawn_validator_engines::<V>(
388 context.with_label("validator"),
389 polynomial.clone(),
390 &pks,
391 &validators,
392 &mut registrations,
393 &mut automatons.lock().unwrap(),
394 &mut reporters,
395 &mut oracle,
396 Duration::from_secs(5),
397 vec![0],
398 );
399
400 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
401 });
402 }
403
404 #[test_traced("INFO")]
405 fn test_byzantine_proposer() {
406 byzantine_proposer::<MinPk>();
407 byzantine_proposer::<MinSig>();
408 }
409
410 fn unclean_byzantine_shutdown<V: Variant>() {
411 let num_validators: u32 = 4;
413 let quorum: u32 = 3;
414 let target_index = 200; let min_shutdowns = 4; let max_shutdowns = 10; let shutdown_range_min = Duration::from_millis(100);
418 let shutdown_range_max = Duration::from_millis(1_000);
419
420 let rebroadcast_timeout = NonZeroDuration::new_panic(Duration::from_millis(20));
422
423 let mut prev_checkpoint = None;
424 let all_validators = Arc::new(Mutex::new(Vec::new()));
425
426 let mut rng = StdRng::seed_from_u64(0);
428 let (polynomial, mut shares_vec) =
429 ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum);
430 let identity = *poly::public::<V>(&polynomial);
431 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
432
433 let mut shutdown_count = 0;
435 while shutdown_count < max_shutdowns {
436 let all_validators = all_validators.clone();
437 let mut shares_vec = shares_vec.clone();
438 let polynomial = polynomial.clone();
439 let f = move |mut context: Context| {
440 async move {
441 let (oracle, validators, pks, mut registrations) = initialize_simulation(
442 context.with_label("simulation"),
443 num_validators,
444 &mut shares_vec,
445 RELIABLE_LINK,
446 )
447 .await;
448 if all_validators.lock().unwrap().is_empty() {
450 let mut pks_lock = all_validators.lock().unwrap();
451 *pks_lock = pks.clone();
452 }
453 let automatons =
454 Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
455
456 let mut engine_monitors = HashMap::new();
458 let namespace = b"my testing namespace";
459
460 let (reporter, mut reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
464 namespace,
465 num_validators,
466 polynomial.clone(),
467 );
468 context.with_label("reporter").spawn(|_| reporter.run());
469
470 for (i, (validator, _, share)) in validators.iter().enumerate() {
472 let validator_context = context.with_label(&validator.to_string());
473 let monitor = mocks::Monitor::new(111);
474 engine_monitors.insert(validator.clone(), monitor.clone());
475 let supervisor = {
476 let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
477 s.add_epoch(111, share.clone(), polynomial.clone(), pks.to_vec());
478 s
479 };
480
481 let blocker = oracle.control(validator.clone());
482 let automaton = mocks::Application::new(if i == 0 {
483 Strategy::Incorrect
484 } else {
485 Strategy::Correct
486 });
487 automatons
488 .lock()
489 .unwrap()
490 .insert(validator.clone(), automaton.clone());
491
492 let engine = Engine::new(
493 validator_context.with_label("engine"),
494 Config {
495 monitor,
496 validators: supervisor,
497 automaton,
498 reporter: reporter_mailbox.clone(),
499 blocker,
500 namespace: namespace.to_vec(),
501 priority_acks: false,
502 rebroadcast_timeout,
503 epoch_bounds: (1, 1),
504 window: std::num::NonZeroU64::new(10).unwrap(),
505 activity_timeout: 1_024, journal_partition: format!("unclean_shutdown_test/{validator}"),
507 journal_write_buffer: NZUsize!(4096),
508 journal_replay_buffer: NZUsize!(4096),
509 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
510 journal_compression: Some(3),
511 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
512 },
513 );
514
515 let (sender, receiver) = registrations.remove(validator).unwrap();
516 engine.start((sender, receiver));
517 }
518
519 let completion =
521 context
522 .with_label("completion_watcher")
523 .spawn(move |context| async move {
524 loop {
525 if let Some(tip_index) =
526 reporter_mailbox.get_contiguous_tip().await
527 {
528 if tip_index >= target_index {
529 break;
530 }
531 }
532 context.sleep(Duration::from_millis(50)).await;
533 }
534 });
535
536 let shutdown_wait = context.gen_range(shutdown_range_min..shutdown_range_max);
538 select! {
539 _ = context.sleep(shutdown_wait) => {
540 debug!(shutdown_wait = ?shutdown_wait, "Simulating unclean shutdown");
541 false },
543 _ = completion => {
544 debug!("Shared reporter completed normally");
545 true },
547 }
548 }
549 };
550
551 let (complete, checkpoint) = if let Some(prev_checkpoint) = prev_checkpoint {
552 debug!(shutdown_count, "Restarting from previous context");
553 deterministic::Runner::from(prev_checkpoint)
554 } else {
555 debug!("Starting initial run");
556 deterministic::Runner::timed(Duration::from_secs(45))
557 }
558 .start_and_recover(f);
559 if complete && shutdown_count >= min_shutdowns {
560 debug!("Test completed successfully");
561 break;
562 }
563
564 prev_checkpoint = Some(checkpoint);
565 shutdown_count += 1;
566 }
567 }
568
569 #[test_traced("INFO")]
570 fn test_unclean_byzantine_shutdown() {
571 unclean_byzantine_shutdown::<MinPk>();
572 unclean_byzantine_shutdown::<MinSig>();
573 }
574
575 fn unclean_shutdown_with_unsigned_index<V: Variant>() {
576 let num_validators: u32 = 4;
578 let quorum: u32 = 3;
579 let skip_index = 50u64; let window = 10u64;
581 let target_index = 100u64;
582 let namespace = b"my testing namespace";
583
584 let all_validators = Arc::new(Mutex::new(Vec::new()));
586 let mut rng = StdRng::seed_from_u64(0);
587 let (polynomial, mut shares_vec) =
588 ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum);
589 let identity = *poly::public::<V>(&polynomial);
590 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
591
592 let f = |context: Context| {
594 let all_validators_clone = all_validators.clone();
595 let mut shares_vec_clone = shares_vec.clone();
596 let polynomial_clone = polynomial.clone();
597 async move {
598 let (oracle, validators, pks, mut registrations) = initialize_simulation(
599 context.with_label("simulation"),
600 num_validators,
601 &mut shares_vec_clone,
602 RELIABLE_LINK,
603 )
604 .await;
605
606 {
608 let mut pks_lock = all_validators_clone.lock().unwrap();
609 *pks_lock = pks.clone();
610 }
611
612 let (reporter, mut reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
614 namespace,
615 num_validators,
616 polynomial_clone.clone(),
617 );
618 context.with_label("reporter").spawn(|_| reporter.run());
619
620 let mut engine_monitors = HashMap::new();
622 let automatons =
623 Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
624 for (validator, _, share) in validators.iter() {
625 let validator_context = context.with_label(&validator.to_string());
626 let monitor = mocks::Monitor::new(111);
627 engine_monitors.insert(validator.clone(), monitor.clone());
628 let supervisor = {
629 let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
630 s.add_epoch(111, share.clone(), polynomial_clone.clone(), pks.to_vec());
631 s
632 };
633 let blocker = oracle.control(validator.clone());
634
635 let automaton = mocks::Application::new(Strategy::Skip { index: skip_index });
637 automatons
638 .lock()
639 .unwrap()
640 .insert(validator.clone(), automaton.clone());
641
642 let engine = Engine::new(
643 validator_context.with_label("engine"),
644 Config {
645 monitor,
646 validators: supervisor,
647 automaton,
648 reporter: reporter_mailbox.clone(),
649 blocker,
650 namespace: namespace.to_vec(),
651 priority_acks: false,
652 rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
653 100,
654 )),
655 epoch_bounds: (1, 1),
656 window: std::num::NonZeroU64::new(window).unwrap(),
657 activity_timeout: 100,
658 journal_partition: format!("unsigned_index_test/{validator}"),
659 journal_write_buffer: NZUsize!(4096),
660 journal_replay_buffer: NZUsize!(4096),
661 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
662 journal_compression: Some(3),
663 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
664 },
665 );
666
667 let (sender, receiver) = registrations.remove(validator).unwrap();
668 engine.start((sender, receiver));
669 }
670
671 loop {
673 if let Some((tip_index, _)) = reporter_mailbox.get_tip().await {
674 debug!(tip_index, skip_index, target_index, "reporter status");
675 if tip_index >= skip_index + window - 1 {
676 return;
678 }
679 }
680 context.sleep(Duration::from_millis(50)).await;
681 }
682 }
683 };
684 let (_, checkpoint) =
685 deterministic::Runner::timed(Duration::from_secs(60)).start_and_recover(f);
686
687 let f2 = move |context: Context| {
689 async move {
690 let (oracle, validators, pks, mut registrations) = initialize_simulation(
691 context.with_label("simulation"),
692 num_validators,
693 &mut shares_vec,
694 RELIABLE_LINK,
695 )
696 .await;
697
698 let (reporter, mut reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
700 namespace,
701 num_validators,
702 polynomial.clone(),
703 );
704 context.with_label("reporter").spawn(|_| reporter.run());
705
706 let automatons =
708 Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
709 for (validator, _, share) in validators.iter() {
710 let validator_context = context.with_label(&validator.to_string());
711 let monitor = mocks::Monitor::new(111);
712 let supervisor = {
713 let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
714 s.add_epoch(111, share.clone(), polynomial.clone(), pks.to_vec());
715 s
716 };
717
718 let blocker = oracle.control(validator.clone());
719
720 let automaton = mocks::Application::new(Strategy::Correct);
722 automatons
723 .lock()
724 .unwrap()
725 .insert(validator.clone(), automaton.clone());
726
727 let engine = Engine::new(
728 validator_context.with_label("engine"),
729 Config {
730 monitor,
731 validators: supervisor,
732 automaton,
733 reporter: reporter_mailbox.clone(),
734 blocker,
735 namespace: namespace.to_vec(),
736 priority_acks: false,
737 rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
738 100,
739 )),
740 epoch_bounds: (1, 1),
741 window: std::num::NonZeroU64::new(10).unwrap(),
742 activity_timeout: 100,
743 journal_partition: format!("unsigned_index_test/{validator}"),
744 journal_write_buffer: NZUsize!(4096),
745 journal_replay_buffer: NZUsize!(4096),
746 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
747 journal_compression: Some(3),
748 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
749 },
750 );
751
752 let (sender, receiver) = registrations.remove(validator).unwrap();
753 engine.start((sender, receiver));
754 }
755
756 loop {
758 if let Some(tip_index) = reporter_mailbox.get_contiguous_tip().await {
759 debug!(
760 tip_index,
761 skip_index, target_index, "reporter status on restart",
762 );
763 if tip_index >= target_index {
764 break;
765 }
766 }
767 context.sleep(Duration::from_millis(50)).await;
768 }
769 }
770 };
771 deterministic::Runner::from(checkpoint).start(f2);
772 }
773
774 #[test_traced("INFO")]
775 fn test_unclean_shutdown_with_unsigned_index() {
776 unclean_shutdown_with_unsigned_index::<MinPk>();
777 unclean_shutdown_with_unsigned_index::<MinSig>();
778 }
779
780 fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
781 let num_validators: u32 = 4;
782 let quorum: u32 = 3;
783 let cfg = deterministic::Config::new()
784 .with_seed(seed)
785 .with_timeout(Some(Duration::from_secs(120)));
786 let runner = deterministic::Runner::new(cfg);
787
788 runner.start(|mut context| async move {
789 let (polynomial, mut shares_vec) =
790 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
791 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
792
793 let degraded_link = Link {
795 latency: Duration::from_millis(200),
796 jitter: Duration::from_millis(150),
797 success_rate: 0.5,
798 };
799
800 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
801 context.with_label("simulation"),
802 num_validators,
803 &mut shares_vec,
804 degraded_link,
805 )
806 .await;
807 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
808 let mut reporters =
809 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
810
811 spawn_validator_engines::<V>(
812 context.with_label("validator"),
813 polynomial.clone(),
814 &pks,
815 &validators,
816 &mut registrations,
817 &mut automatons.lock().unwrap(),
818 &mut reporters,
819 &mut oracle,
820 Duration::from_secs(2),
821 vec![],
822 );
823
824 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
825
826 context.auditor().state()
827 })
828 }
829
830 #[test_traced("INFO")]
831 fn test_slow_and_lossy_links() {
832 slow_and_lossy_links::<MinPk>(0);
833 slow_and_lossy_links::<MinSig>(0);
834 }
835
836 #[test_traced("INFO")]
837 fn test_determinism() {
838 for seed in 1..6 {
841 let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
842 let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
843 assert_eq!(pk_state_1, pk_state_2);
844
845 let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
846 let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
847 assert_eq!(sig_state_1, sig_state_2);
848
849 assert_ne!(pk_state_1, sig_state_1);
851 }
852 }
853
854 fn one_offline<V: Variant>() {
855 let num_validators: u32 = 5;
856 let quorum: u32 = 3;
857 let runner = deterministic::Runner::timed(Duration::from_secs(30));
858
859 runner.start(|mut context| async move {
860 let (polynomial, mut shares_vec) =
861 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
862 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
863
864 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
865 context.with_label("simulation"),
866 num_validators,
867 &mut shares_vec,
868 RELIABLE_LINK,
869 )
870 .await;
871 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
872 let mut reporters =
873 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
874
875 let online_validators: Vec<_> = validators.iter().take(4).cloned().collect();
877 let online_pks: Vec<_> = pks.iter().take(4).cloned().collect();
878
879 spawn_validator_engines::<V>(
880 context.with_label("validator"),
881 polynomial.clone(),
882 &online_pks,
883 &online_validators,
884 &mut registrations,
885 &mut automatons.lock().unwrap(),
886 &mut reporters,
887 &mut oracle,
888 Duration::from_secs(5),
889 vec![],
890 );
891 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
892 });
893 }
894
895 #[test_traced("INFO")]
896 fn test_one_offline() {
897 one_offline::<MinPk>();
898 one_offline::<MinSig>();
899 }
900
901 fn network_partition<V: Variant>() {
903 let num_validators: u32 = 4;
904 let quorum: u32 = 3;
905 let runner = deterministic::Runner::timed(Duration::from_secs(60));
906
907 runner.start(|mut context| async move {
908 let (polynomial, mut shares_vec) =
909 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
910 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
911
912 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
913 context.with_label("simulation"),
914 num_validators,
915 &mut shares_vec,
916 RELIABLE_LINK,
917 )
918 .await;
919 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
920 let mut reporters =
921 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
922
923 spawn_validator_engines::<V>(
924 context.with_label("validator"),
925 polynomial.clone(),
926 &pks,
927 &validators,
928 &mut registrations,
929 &mut automatons.lock().unwrap(),
930 &mut reporters,
931 &mut oracle,
932 Duration::from_secs(5),
933 vec![],
934 );
935
936 for v1 in pks.iter() {
937 for v2 in pks.iter() {
938 if v2 == v1 {
939 continue;
940 }
941 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
942 }
943 }
944 context.sleep(Duration::from_secs(20)).await;
945
946 let link = Link {
947 latency: Duration::from_millis(10),
948 jitter: Duration::from_millis(1),
949 success_rate: 1.0,
950 };
951 for v1 in pks.iter() {
952 for v2 in pks.iter() {
953 if v2 == v1 {
954 continue;
955 }
956 oracle
957 .add_link(v1.clone(), v2.clone(), link.clone())
958 .await
959 .unwrap();
960 }
961 }
962
963 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
964 });
965 }
966
967 #[test_traced("INFO")]
968 fn test_network_partition() {
969 network_partition::<MinPk>();
970 network_partition::<MinSig>();
971 }
972
973 fn insufficient_validators<V: Variant>() {
975 let num_validators: u32 = 5;
976 let quorum: u32 = 3;
977 let runner = deterministic::Runner::timed(Duration::from_secs(15));
978
979 runner.start(|mut context| async move {
980 let (polynomial, mut shares_vec) =
981 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
982 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
983 let identity = *poly::public::<V>(&polynomial);
984
985 let (oracle, validators, pks, mut registrations) = initialize_simulation(
986 context.with_label("simulation"),
987 num_validators,
988 &mut shares_vec,
989 RELIABLE_LINK,
990 )
991 .await;
992 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
993 let mut reporters =
994 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
995
996 let namespace = b"my testing namespace";
998 for (validator, _scheme, share) in validators.iter().take(2) {
999 let context = context.with_label(&validator.to_string());
1000 let monitor = mocks::Monitor::new(111);
1001 let supervisor = {
1002 let mut s = mocks::Supervisor::<PublicKey, V>::new(identity);
1003 s.add_epoch(
1004 111,
1005 share.clone(),
1006 polynomial.clone(),
1007 pks.to_vec(),
1008 );
1009 s
1010 };
1011
1012 let blocker = oracle.control(validator.clone());
1013
1014 let automaton = mocks::Application::new(Strategy::Correct);
1015 automatons.lock().unwrap().insert(validator.clone(), automaton.clone());
1016
1017 let (reporter, reporter_mailbox) = mocks::Reporter::<V, Sha256Digest>::new(
1018 namespace,
1019 pks.len() as u32,
1020 polynomial.clone(),
1021 );
1022 context.with_label("reporter").spawn(|_| reporter.run());
1023 reporters.insert(validator.clone(), reporter_mailbox);
1024
1025 let engine = Engine::new(
1026 context.with_label("engine"),
1027 Config {
1028 monitor,
1029 validators: supervisor,
1030 automaton: automaton.clone(),
1031 reporter: reporters.get(validator).unwrap().clone(),
1032 blocker,
1033 namespace: namespace.to_vec(),
1034 priority_acks: false,
1035 rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_secs(3)),
1036 epoch_bounds: (1, 1),
1037 window: std::num::NonZeroU64::new(10).unwrap(),
1038 activity_timeout: 100,
1039 journal_partition: format!("aggregation/{validator}"),
1040 journal_write_buffer: NZUsize!(4096),
1041 journal_replay_buffer: NZUsize!(4096),
1042 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
1043 journal_compression: Some(3),
1044 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1045 },
1046 );
1047
1048 let (sender, receiver) = registrations.remove(validator).unwrap();
1049 engine.start((sender, receiver));
1050 }
1051
1052 context.sleep(Duration::from_secs(12)).await;
1055
1056 let mut any_consensus = false;
1058 for (validator_pk, mut reporter_mailbox) in reporters {
1059 let (tip, _) = reporter_mailbox.get_tip().await.unwrap_or((0, 0));
1060 if tip > 0 {
1061 any_consensus = true;
1062 tracing::warn!(
1063 ?validator_pk,
1064 tip,
1065 "Unexpected threshold signature consensus with insufficient validators"
1066 );
1067 }
1068 }
1069
1070 assert!(
1072 !any_consensus,
1073 "Consensus should not be achieved with insufficient validator participation (below quorum)"
1074 );
1075 });
1076 }
1077
1078 #[test_traced("INFO")]
1079 fn test_insufficient_validators() {
1080 insufficient_validators::<MinPk>();
1081 insufficient_validators::<MinSig>();
1082 }
1083}