1pub mod types;
45
46cfg_if::cfg_if! {
47 if #[cfg(not(target_arch = "wasm32"))] {
48 mod config;
49 pub use config::Config;
50 mod engine;
51 pub use engine::Engine;
52 mod metrics;
53 mod safe_tip;
54
55 #[cfg(test)]
56 pub mod mocks;
57 }
58}
59
60#[cfg(test)]
61mod tests {
62 use super::{mocks, types::Epoch, Config, Engine};
63 use commonware_codec::Encode;
64 use commonware_cryptography::{
65 bls12381::{
66 dkg::ops,
67 primitives::{
68 group::Share,
69 ops as bls_ops, poly,
70 variant::{MinPk, MinSig, Variant},
71 },
72 },
73 ed25519::{PrivateKey, PublicKey},
74 sha256::Digest as Sha256Digest,
75 PrivateKeyExt as _, Signer as _,
76 };
77 use commonware_macros::{select, test_traced};
78 use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
79 use commonware_runtime::{
80 deterministic::{self, Context},
81 Clock, Metrics, Runner, Spawner,
82 };
83 use commonware_utils::NonZeroDuration;
84 use futures::{channel::oneshot, future::join_all};
85 use rand::{rngs::StdRng, Rng, SeedableRng};
86 use std::{
87 collections::{BTreeMap, HashMap},
88 sync::{Arc, Mutex},
89 time::Duration,
90 };
91 use tracing::debug;
92
93 type Registrations<P> = BTreeMap<P, (Sender<P>, Receiver<P>)>;
94
95 const RELIABLE_LINK: Link = Link {
97 latency: 10.0,
98 jitter: 1.0,
99 success_rate: 1.0,
100 };
101
102 async fn register_participants(
104 oracle: &mut Oracle<PublicKey>,
105 participants: &[PublicKey],
106 ) -> Registrations<PublicKey> {
107 let mut registrations = BTreeMap::new();
108 for participant in participants.iter() {
109 let (sender, receiver) = oracle.register(participant.clone(), 0).await.unwrap();
110 registrations.insert(participant.clone(), (sender, receiver));
111 }
112 registrations
113 }
114
115 async fn link_participants(
117 oracle: &mut Oracle<PublicKey>,
118 participants: &[PublicKey],
119 link: Link,
120 ) {
121 for v1 in participants.iter() {
122 for v2 in participants.iter() {
123 if v2 == v1 {
124 continue;
125 }
126 oracle
127 .add_link(v1.clone(), v2.clone(), link.clone())
128 .await
129 .unwrap();
130 }
131 }
132 }
133
134 async fn initialize_simulation(
136 context: Context,
137 num_validators: u32,
138 shares_vec: &mut [Share],
139 link: Link,
140 ) -> (
141 Oracle<PublicKey>,
142 Vec<(PublicKey, PrivateKey, Share)>,
143 Vec<PublicKey>,
144 Registrations<PublicKey>,
145 ) {
146 let (network, mut oracle) = Network::new(
147 context.with_label("network"),
148 commonware_p2p::simulated::Config {
149 max_size: 1024 * 1024,
150 },
151 );
152 network.start();
153
154 let mut schemes = (0..num_validators)
155 .map(|i| PrivateKey::from_seed(i as u64))
156 .collect::<Vec<_>>();
157 schemes.sort_by_key(|s| s.public_key());
158 let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
159 .iter()
160 .enumerate()
161 .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
162 .collect();
163 let pks = validators
164 .iter()
165 .map(|(pk, _, _)| pk.clone())
166 .collect::<Vec<_>>();
167
168 let registrations = register_participants(&mut oracle, &pks).await;
169 link_participants(&mut oracle, &pks, link).await;
170 (oracle, validators, pks, registrations)
171 }
172
173 #[allow(clippy::too_many_arguments)]
175 fn spawn_validator_engines<V: Variant>(
176 context: Context,
177 polynomial: poly::Public<V>,
178 validator_pks: &[PublicKey],
179 validators: &[(PublicKey, PrivateKey, Share)],
180 registrations: &mut Registrations<PublicKey>,
181 automatons: &mut BTreeMap<PublicKey, mocks::Application>,
182 reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>,
183 oracle: &mut Oracle<PublicKey>,
184 rebroadcast_timeout: Duration,
185 invalid_when: fn(u64) -> bool,
186 ) -> HashMap<PublicKey, mocks::Monitor> {
187 let mut monitors = HashMap::new();
188 let namespace = b"my testing namespace";
189 for (validator, _scheme, share) in validators.iter() {
190 let context = context.with_label(&validator.to_string());
191 let monitor = mocks::Monitor::new(111);
192 monitors.insert(validator.clone(), monitor.clone());
193 let supervisor = {
194 let mut s = mocks::Supervisor::<PublicKey, V>::default();
195 s.add_epoch(
196 111,
197 share.clone(),
198 polynomial.clone(),
199 validator_pks.to_vec(),
200 );
201 s
202 };
203
204 let blocker = oracle.control(validator.clone());
205
206 let automaton = mocks::Application::new(invalid_when);
207 automatons.insert(validator.clone(), automaton.clone());
208
209 let (reporter, reporter_mailbox) =
210 mocks::Reporter::<V, Sha256Digest>::new(namespace, polynomial.clone());
211 context.with_label("reporter").spawn(|_| reporter.run());
212 reporters.insert(validator.clone(), reporter_mailbox);
213
214 let engine = Engine::new(
215 context.with_label("engine"),
216 Config {
217 monitor,
218 validators: supervisor,
219 automaton: automaton.clone(),
220 reporter: reporters.get(validator).unwrap().clone(),
221 blocker,
222 namespace: namespace.to_vec(),
223 priority_acks: false,
224 rebroadcast_timeout: NonZeroDuration::new_panic(rebroadcast_timeout),
225 epoch_bounds: (1, 1),
226 window: std::num::NonZeroU64::new(10).unwrap(),
227 journal_partition: format!("aggregation/{validator}"),
228 journal_write_buffer: std::num::NonZeroUsize::new(4096).unwrap(),
229 journal_replay_buffer: std::num::NonZeroUsize::new(4096).unwrap(),
230 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
231 journal_compression: Some(3),
232 },
233 );
234
235 let (sender, receiver) = registrations.remove(validator).unwrap();
236 engine.start((sender, receiver));
237 }
238 monitors
239 }
240
241 async fn await_reporters<V: Variant>(
243 context: Context,
244 reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>,
245 threshold_index: u64,
246 threshold_epoch: Epoch,
247 ) {
248 let mut receivers = Vec::new();
249 for (reporter, mailbox) in reporters.iter() {
250 let (tx, rx) = oneshot::channel();
252 receivers.push(rx);
253
254 context.with_label("reporter_watcher").spawn({
255 let reporter = reporter.clone();
256 let mut mailbox = mailbox.clone();
257 move |context| async move {
258 loop {
259 let (index, epoch) = mailbox.get_tip().await.unwrap_or((0, 0));
260 let contiguous_index = mailbox.get_contiguous_tip().await.unwrap_or(0);
261 debug!(
262 index,
263 epoch,
264 contiguous_index,
265 threshold_index,
266 threshold_epoch,
267 ?reporter,
268 "reporter status"
269 );
270 if contiguous_index >= threshold_index && epoch >= threshold_epoch {
271 debug!(
272 ?reporter,
273 "reporter reached threshold, signaling completion"
274 );
275 let _ = tx.send(reporter.clone());
276 break;
277 }
278 context.sleep(Duration::from_millis(100)).await;
279 }
280 }
281 });
282 }
283
284 let results = join_all(receivers).await;
286 assert_eq!(results.len(), reporters.len());
287
288 for result in results {
290 assert!(result.is_ok(), "reporter was cancelled");
291 }
292 }
293
294 fn all_online<V: Variant>() {
296 let num_validators: u32 = 4;
297 let quorum: u32 = 3;
298 let runner = deterministic::Runner::timed(Duration::from_secs(30));
299
300 runner.start(|mut context| async move {
301 let (polynomial, mut shares_vec) =
302 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
303 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
304
305 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
306 context.with_label("simulation"),
307 num_validators,
308 &mut shares_vec,
309 RELIABLE_LINK,
310 )
311 .await;
312 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
313 let mut reporters =
314 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
315 spawn_validator_engines::<V>(
316 context.with_label("validator"),
317 polynomial.clone(),
318 &pks,
319 &validators,
320 &mut registrations,
321 &mut automatons.lock().unwrap(),
322 &mut reporters,
323 &mut oracle,
324 Duration::from_secs(5),
325 |_| false,
326 );
327 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
328 });
329 }
330
331 #[test_traced]
332 fn test_all_online() {
333 all_online::<MinPk>();
334 all_online::<MinSig>();
335 }
336
337 fn unclean_shutdown<V: Variant>() {
338 let num_validators: u32 = 4;
340 let quorum: u32 = 3;
341 let target_index = 200; let max_shutdowns = 10; let min_shutdowns = 4; let shutdown_range_min = Duration::from_millis(100);
345 let shutdown_range_max = Duration::from_millis(1_000);
346
347 let rebroadcast_timeout = NonZeroDuration::new_panic(Duration::from_millis(20));
349
350 let mut prev_ctx = None;
351 let shutdown_counts = Arc::new(Mutex::new(HashMap::<PublicKey, u32>::new()));
352 let completed_validators = Arc::new(Mutex::new(std::collections::HashSet::new()));
353 let all_validators = Arc::new(Mutex::new(Vec::new()));
354
355 let mut rng = StdRng::seed_from_u64(0);
357 let (polynomial, mut shares_vec) =
358 ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum);
359 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
360
361 while completed_validators.lock().unwrap().len() < num_validators as usize
363 && shutdown_counts.lock().unwrap().values().max().unwrap_or(&0) < &max_shutdowns
364 {
365 let completed_clone = completed_validators.clone();
366 let shutdown_counts_clone = shutdown_counts.clone();
367 let all_validators_clone = all_validators.clone();
368 let shares_vec_clone = shares_vec.clone();
369 let polynomial_clone = polynomial.clone();
370
371 let f = move |mut context: Context| {
372 let completed = completed_clone;
373 let shutdown_counts = shutdown_counts_clone;
374 let all_validators = all_validators_clone;
375 let mut shares_vec = shares_vec_clone;
376 let polynomial = polynomial_clone;
377 async move {
378 let (oracle, validators, pks, mut registrations) = initialize_simulation(
379 context.with_label("simulation"),
380 num_validators,
381 &mut shares_vec,
382 RELIABLE_LINK,
383 )
384 .await;
385 if all_validators.lock().unwrap().is_empty() {
387 let mut pks_lock = all_validators.lock().unwrap();
388 *pks_lock = pks.clone();
389 }
390 let automatons =
391 Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
392 let mut reporters =
393 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
394
395 let mut engine_monitors = HashMap::new();
397 let namespace = b"my testing namespace";
398 for (validator, _scheme, share) in validators.iter() {
399 let validator_context = context.with_label(&validator.to_string());
400 let monitor = mocks::Monitor::new(111);
401 engine_monitors.insert(validator.clone(), monitor.clone());
402 let supervisor = {
403 let mut s = mocks::Supervisor::<PublicKey, V>::default();
404 s.add_epoch(111, share.clone(), polynomial.clone(), pks.to_vec());
405 s
406 };
407
408 let blocker = oracle.control(validator.clone());
409 let automaton = mocks::Application::new(|_| false);
410 automatons
411 .lock()
412 .unwrap()
413 .insert(validator.clone(), automaton.clone());
414
415 let (reporter, reporter_mailbox) =
416 mocks::Reporter::<V, Sha256Digest>::new(namespace, polynomial.clone());
417 validator_context
418 .with_label("reporter")
419 .spawn(|_| reporter.run());
420 reporters.insert(validator.clone(), reporter_mailbox);
421
422 let engine = Engine::new(
423 validator_context.with_label("engine"),
424 Config {
425 monitor,
426 validators: supervisor,
427 automaton,
428 reporter: reporters.get(validator).unwrap().clone(),
429 blocker,
430 namespace: namespace.to_vec(),
431 priority_acks: false,
432 rebroadcast_timeout,
433 epoch_bounds: (1, 1),
434 window: std::num::NonZeroU64::new(10).unwrap(),
435 journal_partition: format!("unclean_shutdown_test/{validator}"),
437 journal_write_buffer: std::num::NonZeroUsize::new(4096).unwrap(),
438 journal_replay_buffer: std::num::NonZeroUsize::new(4096).unwrap(),
439 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
440 journal_compression: Some(3),
441 },
442 );
443
444 let (sender, receiver) = registrations.remove(validator).unwrap();
445 engine.start((sender, receiver));
446 }
447
448 let mut completion_tasks = Vec::new();
450 for (validator_pk, mut reporter_mailbox) in reporters.clone() {
451 let validator = validator_pk.clone();
452 let completed_ref = completed.clone();
453 let task = context.with_label("completion_watcher").spawn(
454 move |context| async move {
455 loop {
456 if let Some((tip_index, _epoch)) =
457 reporter_mailbox.get_tip().await
458 {
459 if tip_index >= target_index {
460 for check_index in 0..=tip_index {
462 if let Some((digest, epoch)) =
463 reporter_mailbox.get(check_index).await
464 {
465 assert_eq!(
466 epoch, 111,
467 "Epoch should be consistent"
468 );
469 debug!(
470 ?validator,
471 check_index,
472 ?digest,
473 "Verified validator signed message"
474 );
475 }
476 }
477 completed_ref.lock().unwrap().insert(validator.clone());
478 debug!(
479 ?validator,
480 tip_index, "Validator completed signing target"
481 );
482 break;
483 }
484 }
485 context.sleep(Duration::from_millis(50)).await;
486 }
487 },
488 );
489 completion_tasks.push(task);
490 }
491
492 let shutdown_wait = context.gen_range(shutdown_range_min..shutdown_range_max);
494
495 select! {
496 _ = context.sleep(shutdown_wait) => {
497 debug!(shutdown_wait = ?shutdown_wait, "Simulating unclean shutdown");
498 let mut counts = shutdown_counts.lock().unwrap();
500 for (pk, _) in reporters {
501 if !completed.lock().unwrap().contains(&pk) {
502 *counts.entry(pk).or_insert(0) += 1;
503 }
504 }
505 (false, context) },
507 _ = join_all(completion_tasks) => {
508 debug!("All validators completed normally");
509 (true, context) }
511 }
512 }
513 };
514
515 let (complete, context) = if let Some(prev_ctx) = prev_ctx {
516 let shutdown_count = shutdown_counts.lock().unwrap().values().sum::<u32>();
517 debug!(shutdown_count, "Restarting from previous context");
518 deterministic::Runner::from(prev_ctx)
519 } else {
520 debug!("Starting initial run");
521 deterministic::Runner::timed(Duration::from_secs(45))
522 }
523 .start(f);
524
525 prev_ctx = Some(context.recover());
526
527 if complete {
528 debug!("Test completed successfully");
529 break;
530 }
531
532 let shutdown_count = shutdown_counts.lock().unwrap().values().sum::<u32>();
533 debug!(
534 shutdown_count,
535 completed = completed_validators.lock().unwrap().len(),
536 "Shutdown occurred, restarting"
537 );
538 }
539
540 let final_completed = completed_validators.lock().unwrap().len();
542 let total_shutdowns = shutdown_counts.lock().unwrap().values().sum::<u32>();
543 assert_eq!(
544 final_completed, num_validators as usize,
545 "All validators should reach target index {target_index} despite unclean shutdowns. Only {final_completed} completed after {total_shutdowns} shutdowns"
546 );
547
548 let counts = shutdown_counts.lock().unwrap();
550 for pk in all_validators.lock().unwrap().iter() {
551 let count = counts.get(pk).copied().unwrap_or(0);
552 assert!(
553 count >= min_shutdowns,
554 "Validator {pk:?} should have at least {min_shutdowns} shutdowns, but had {count}"
555 );
556 }
557
558 debug!(
559 total_shutdowns,
560 target_index, "Unclean shutdown test completed successfully"
561 );
562 }
563
564 #[test_traced]
565 fn test_unclean_shutdown() {
566 unclean_shutdown::<MinPk>();
567 unclean_shutdown::<MinSig>();
568 }
569
570 fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
571 let num_validators: u32 = 4;
572 let quorum: u32 = 3;
573 let cfg = deterministic::Config::new()
574 .with_seed(seed)
575 .with_timeout(Some(Duration::from_secs(120)));
576 let runner = deterministic::Runner::new(cfg);
577
578 runner.start(|mut context| async move {
579 let (polynomial, mut shares_vec) =
580 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
581 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
582
583 let degraded_link = Link {
585 latency: 200.0,
586 jitter: 150.0,
587 success_rate: 0.5,
588 };
589
590 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
591 context.with_label("simulation"),
592 num_validators,
593 &mut shares_vec,
594 degraded_link,
595 )
596 .await;
597 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
598 let mut reporters =
599 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
600
601 spawn_validator_engines::<V>(
602 context.with_label("validator"),
603 polynomial.clone(),
604 &pks,
605 &validators,
606 &mut registrations,
607 &mut automatons.lock().unwrap(),
608 &mut reporters,
609 &mut oracle,
610 Duration::from_secs(2),
611 |_| false,
612 );
613
614 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
615
616 context.auditor().state()
617 })
618 }
619
620 #[test_traced]
621 fn test_slow_and_lossy_links() {
622 slow_and_lossy_links::<MinPk>(0);
623 slow_and_lossy_links::<MinSig>(0);
624 }
625
626 #[test_traced]
627 fn test_determinism() {
628 for seed in 1..6 {
631 let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
632 let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
633 assert_eq!(pk_state_1, pk_state_2);
634
635 let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
636 let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
637 assert_eq!(sig_state_1, sig_state_2);
638
639 assert_ne!(pk_state_1, sig_state_1);
641 }
642 }
643
644 fn one_offline<V: Variant>() {
645 let num_validators: u32 = 5;
646 let quorum: u32 = 3;
647 let runner = deterministic::Runner::timed(Duration::from_secs(30));
648
649 runner.start(|mut context| async move {
650 let (polynomial, mut shares_vec) =
651 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
652 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
653
654 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
655 context.with_label("simulation"),
656 num_validators,
657 &mut shares_vec,
658 RELIABLE_LINK,
659 )
660 .await;
661 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
662 let mut reporters =
663 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
664
665 let online_validators: Vec<_> = validators.iter().take(4).cloned().collect();
667 let online_pks: Vec<_> = pks.iter().take(4).cloned().collect();
668
669 spawn_validator_engines::<V>(
670 context.with_label("validator"),
671 polynomial.clone(),
672 &online_pks,
673 &online_validators,
674 &mut registrations,
675 &mut automatons.lock().unwrap(),
676 &mut reporters,
677 &mut oracle,
678 Duration::from_secs(5),
679 |_| false,
680 );
681 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
682 });
683 }
684
685 #[test_traced]
686 fn test_one_offline() {
687 one_offline::<MinPk>();
688 one_offline::<MinSig>();
689 }
690
691 fn consensus_from_index_zero<V: Variant>() {
693 let num_validators: u32 = 4;
694 let quorum: u32 = 3;
695 let runner = deterministic::Runner::timed(Duration::from_secs(30));
696
697 runner.start(|mut context| async move {
698 let (polynomial, mut shares_vec) =
699 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
700 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
701
702 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
703 context.with_label("simulation"),
704 num_validators,
705 &mut shares_vec,
706 RELIABLE_LINK,
707 )
708 .await;
709 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
710 let mut reporters =
711 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
712
713 spawn_validator_engines::<V>(
714 context.with_label("validator"),
715 polynomial.clone(),
716 &pks,
717 &validators,
718 &mut registrations,
719 &mut automatons.lock().unwrap(),
720 &mut reporters,
721 &mut oracle,
722 Duration::from_secs(5),
723 |_| false,
724 );
725
726 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
727 });
728 }
729
730 #[test_traced]
731 fn test_consensus_from_index_zero() {
732 consensus_from_index_zero::<MinPk>();
733 consensus_from_index_zero::<MinSig>();
734 }
735
736 fn network_partition<V: Variant>() {
738 let num_validators: u32 = 4;
739 let quorum: u32 = 3;
740 let runner = deterministic::Runner::timed(Duration::from_secs(60));
741
742 runner.start(|mut context| async move {
743 let (polynomial, mut shares_vec) =
744 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
745 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
746
747 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
748 context.with_label("simulation"),
749 num_validators,
750 &mut shares_vec,
751 RELIABLE_LINK,
752 )
753 .await;
754 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
755 let mut reporters =
756 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
757
758 spawn_validator_engines::<V>(
759 context.with_label("validator"),
760 polynomial.clone(),
761 &pks,
762 &validators,
763 &mut registrations,
764 &mut automatons.lock().unwrap(),
765 &mut reporters,
766 &mut oracle,
767 Duration::from_secs(5),
768 |_| false,
769 );
770
771 for v1 in pks.iter() {
772 for v2 in pks.iter() {
773 if v2 == v1 {
774 continue;
775 }
776 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
777 }
778 }
779 context.sleep(Duration::from_secs(20)).await;
780
781 let link = Link {
782 latency: 10.0,
783 jitter: 1.0,
784 success_rate: 1.0,
785 };
786 for v1 in pks.iter() {
787 for v2 in pks.iter() {
788 if v2 == v1 {
789 continue;
790 }
791 oracle
792 .add_link(v1.clone(), v2.clone(), link.clone())
793 .await
794 .unwrap();
795 }
796 }
797
798 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
799 });
800 }
801
802 #[test_traced]
803 fn test_network_partition() {
804 network_partition::<MinPk>();
805 network_partition::<MinSig>();
806 }
807
808 fn invalid_signature_injection<V: Variant>() {
810 let num_validators: u32 = 4;
811 let quorum: u32 = 3;
812 let runner = deterministic::Runner::timed(Duration::from_secs(30));
813
814 runner.start(|mut context| async move {
815 let (polynomial, mut shares_vec) =
816 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
817 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
818
819 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
820 context.with_label("simulation"),
821 num_validators,
822 &mut shares_vec,
823 RELIABLE_LINK,
824 )
825 .await;
826 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
827 let mut reporters =
828 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
829
830 let byzantine_fault_fn = |index: u64| -> bool {
833 use std::{
834 collections::hash_map::DefaultHasher,
835 hash::{Hash, Hasher},
836 };
837
838 let mut hasher = DefaultHasher::new();
839 index.hash(&mut hasher);
840 let hash_value = hasher.finish();
841
842 (hash_value % 100) < 5
845 };
846
847 spawn_validator_engines::<V>(
848 context.with_label("validator"),
849 polynomial.clone(),
850 &pks,
851 &validators,
852 &mut registrations,
853 &mut automatons.lock().unwrap(),
854 &mut reporters,
855 &mut oracle,
856 Duration::from_secs(5),
857 byzantine_fault_fn,
858 );
859
860 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
861 });
862 }
863
864 #[test_traced]
865 fn test_invalid_signature_injection() {
866 invalid_signature_injection::<MinPk>();
867 invalid_signature_injection::<MinSig>();
868 }
869
870 fn cryptographic_validation<V: Variant>() {
872 let num_validators: u32 = 4;
873 let quorum: u32 = 3;
874 let runner = deterministic::Runner::timed(Duration::from_secs(30));
875
876 runner.start(|mut context| async move {
877 let (polynomial, mut shares_vec) =
878 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
879 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
880
881 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
882 context.with_label("simulation"),
883 num_validators,
884 &mut shares_vec,
885 RELIABLE_LINK,
886 )
887 .await;
888 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
889 let mut reporters =
890 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
891
892 spawn_validator_engines::<V>(
893 context.with_label("validator"),
894 polynomial.clone(),
895 &pks,
896 &validators,
897 &mut registrations,
898 &mut automatons.lock().unwrap(),
899 &mut reporters,
900 &mut oracle,
901 Duration::from_secs(5),
902 |_| false,
903 );
904
905 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
906
907 for (validator_pk, mut reporter_mailbox) in reporters {
910 let tip_result = reporter_mailbox.get_tip().await;
911 assert!(
912 tip_result.is_some(),
913 "Reporter for validator {validator_pk:?} should have a tip"
914 );
915
916 let (tip_index, tip_epoch) = tip_result.unwrap();
917 assert!(
918 tip_index >= 1,
919 "Tip should have progressed beyond initial state for validator {validator_pk:?}"
920 );
921 assert_eq!(
922 tip_epoch, 111,
923 "Tip epoch should match expected epoch for validator {validator_pk:?}"
924 );
925
926 if tip_index > 0 {
928 let item_result = reporter_mailbox.get(tip_index - 1).await;
929 assert!(
930 item_result.is_some(),
931 "Should be able to retrieve consensus item for validator {validator_pk:?}"
932 );
933 }
934 }
935 });
936 }
937
938 #[test_traced]
939 fn test_cryptographic_validation() {
940 cryptographic_validation::<MinPk>();
941 cryptographic_validation::<MinSig>();
942 }
943
944 fn advanced_byzantine_faults<V: Variant>() {
946 let num_validators: u32 = 7; let quorum: u32 = 5; let runner = deterministic::Runner::timed(Duration::from_secs(45));
949
950 runner.start(|mut context| async move {
951 let (polynomial, mut shares_vec) =
952 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
953 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
954
955 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
956 context.with_label("simulation"),
957 num_validators,
958 &mut shares_vec,
959 RELIABLE_LINK,
960 )
961 .await;
962 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
963 let mut reporters =
964 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
965
966 let advanced_byzantine_fn = |index: u64| -> bool {
968 use std::{
969 collections::hash_map::DefaultHasher,
970 hash::{Hash, Hasher},
971 };
972
973 let mut hasher = DefaultHasher::new();
974 index.hash(&mut hasher);
975 let hash_value = hasher.finish();
976
977 match index % 11 {
978 0..=2 if (hash_value % 100) < 8 => true,
981 3..=5 if index > 10 && index < 15 => true,
983 7 if (hash_value % 13) == 0 => true,
985 _ => false,
986 }
987 };
988
989 spawn_validator_engines::<V>(
990 context.with_label("validator"),
991 polynomial.clone(),
992 &pks,
993 &validators,
994 &mut registrations,
995 &mut automatons.lock().unwrap(),
996 &mut reporters,
997 &mut oracle,
998 Duration::from_secs(8), advanced_byzantine_fn,
1000 );
1001
1002 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
1003 });
1004 }
1005
1006 #[test_traced]
1007 fn test_advanced_byzantine_faults() {
1008 advanced_byzantine_faults::<MinPk>();
1009 advanced_byzantine_faults::<MinSig>();
1010 }
1011
1012 fn insufficient_validators<V: Variant>() {
1014 let num_validators: u32 = 5;
1015 let quorum: u32 = 3;
1016 let runner = deterministic::Runner::timed(Duration::from_secs(15));
1017
1018 runner.start(|mut context| async move {
1019 let (polynomial, mut shares_vec) =
1020 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
1021 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
1022
1023 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
1024 context.with_label("simulation"),
1025 num_validators,
1026 &mut shares_vec,
1027 RELIABLE_LINK,
1028 )
1029 .await;
1030 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
1031 let mut reporters =
1032 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
1033
1034 let insufficient_validators: Vec<_> = validators.iter().take(2).cloned().collect();
1036 let insufficient_pks: Vec<_> = pks.iter().take(2).cloned().collect();
1037
1038 spawn_validator_engines::<V>(
1039 context.with_label("validator"),
1040 polynomial.clone(),
1041 &insufficient_pks,
1042 &insufficient_validators,
1043 &mut registrations,
1044 &mut automatons.lock().unwrap(),
1045 &mut reporters,
1046 &mut oracle,
1047 Duration::from_secs(3),
1048 |_| false,
1049 );
1050
1051 context.sleep(Duration::from_secs(12)).await;
1054
1055 let mut any_consensus = false;
1057 for (validator_pk, mut reporter_mailbox) in reporters {
1058 let contiguous_tip = reporter_mailbox.get_contiguous_tip().await.unwrap_or(0);
1063 if contiguous_tip > 0 {
1064 any_consensus = true;
1065 tracing::warn!(
1066 ?validator_pk,
1067 contiguous_tip,
1068 "Unexpected threshold signature consensus with insufficient validators"
1069 );
1070 }
1071 }
1072
1073 assert!(
1075 !any_consensus,
1076 "Consensus should not be achieved with insufficient validator participation (below quorum)"
1077 );
1078 });
1079 }
1080
1081 #[test_traced]
1082 fn test_insufficient_validators() {
1083 insufficient_validators::<MinPk>();
1084 insufficient_validators::<MinSig>();
1085 }
1086
1087 fn threshold_signature_correctness<V: Variant>() {
1089 let num_validators: u32 = 4;
1090 let quorum: u32 = 3;
1091 let runner = deterministic::Runner::timed(Duration::from_secs(30));
1092
1093 runner.start(|mut context| async move {
1094 let (polynomial, mut shares_vec) =
1095 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
1096 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
1097
1098 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
1099 context.with_label("simulation"),
1100 num_validators,
1101 &mut shares_vec,
1102 RELIABLE_LINK,
1103 )
1104 .await;
1105 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
1106 let mut reporters =
1107 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
1108
1109 spawn_validator_engines::<V>(
1110 context.with_label("validator"),
1111 polynomial.clone(),
1112 &pks,
1113 &validators,
1114 &mut registrations,
1115 &mut automatons.lock().unwrap(),
1116 &mut reporters,
1117 &mut oracle,
1118 Duration::from_secs(5),
1119 |_| false, );
1121
1122 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
1123
1124 for (validator_pk, mut reporter_mailbox) in reporters {
1126 let tip_result = reporter_mailbox.get_tip().await;
1127 assert!(
1128 tip_result.is_some(),
1129 "Reporter should have achieved consensus"
1130 );
1131
1132 let (tip_index, _) = tip_result.unwrap();
1133
1134 for index in 1..=tip_index {
1136 let item_result = reporter_mailbox.get(index).await;
1137 assert!(
1138 item_result.is_some(),
1139 "Should have consensus item at index {index}"
1140 );
1141
1142 let (digest, epoch) = item_result.unwrap();
1143
1144 let _item = super::types::Item { index, digest };
1147 let mut ack_namespace = b"my testing namespace".to_vec();
1148 ack_namespace.extend_from_slice(b"_AGG_ACK");
1149
1150 tracing::debug!(
1156 ?validator_pk,
1157 index,
1158 epoch,
1159 "Verified valid threshold signature for consensus item"
1160 );
1161 }
1162 }
1163 });
1164 }
1165
1166 #[test_traced]
1167 fn test_threshold_signature_correctness() {
1168 threshold_signature_correctness::<MinPk>();
1169 threshold_signature_correctness::<MinSig>();
1170 }
1171
1172 fn manual_threshold_verification<V: Variant>() {
1174 let runner = deterministic::Runner::timed(Duration::from_secs(10));
1175
1176 runner.start(|mut context| async move {
1177 let num_validators = 4u32;
1178 let quorum = 3u32;
1179
1180 let (polynomial, shares_vec) =
1182 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
1183
1184 let test_item = super::types::Item {
1186 index: 42,
1187 digest: Sha256Digest::from([1u8; 32]),
1188 };
1189
1190 let namespace = b"test_namespace";
1191 let ack_namespace = [namespace.as_slice(), b"_AGG_ACK"].concat();
1192
1193 let mut partial_sigs = Vec::new();
1195 for share in shares_vec.iter().take(quorum as usize) {
1196 let partial_sig = bls_ops::partial_sign_message::<V>(
1197 share,
1198 Some(&ack_namespace),
1199 &test_item.encode(),
1200 );
1201 partial_sigs.push(partial_sig);
1202 }
1203
1204 let threshold_sig = poly::Signature::<V>::recover(quorum, &partial_sigs).expect(
1206 "Should be able to recover threshold signature from sufficient partial signatures",
1207 );
1208
1209 let threshold_public = poly::public::<V>(&polynomial);
1211 let verification_result = bls_ops::verify_message::<V>(
1212 threshold_public,
1213 Some(&ack_namespace),
1214 &test_item.encode(),
1215 &threshold_sig,
1216 );
1217
1218 assert!(
1219 verification_result.is_ok(),
1220 "Manually constructed threshold signature should be valid: {:?}",
1221 verification_result.err()
1222 );
1223
1224 let insufficient_partial_sigs: Vec<_> = partial_sigs
1226 .iter()
1227 .take(quorum as usize - 1)
1228 .cloned()
1229 .collect();
1230 let insufficient_result =
1231 poly::Signature::<V>::recover(quorum, &insufficient_partial_sigs);
1232
1233 assert!(
1234 insufficient_result.is_err(),
1235 "Should not be able to recover threshold signature with insufficient partial signatures"
1236 );
1237
1238 tracing::debug!("Manual threshold signature verification completed successfully");
1239 });
1240 }
1241
1242 #[test_traced]
1243 fn test_manual_threshold_verification() {
1244 manual_threshold_verification::<MinPk>();
1245 manual_threshold_verification::<MinSig>();
1246 }
1247}