1pub mod types;
45
46cfg_if::cfg_if! {
47 if #[cfg(not(target_arch = "wasm32"))] {
48 mod config;
49 pub use config::Config;
50 mod engine;
51 pub use engine::Engine;
52 mod metrics;
53 mod safe_tip;
54
55 #[cfg(test)]
56 pub mod mocks;
57 }
58}
59
60#[cfg(test)]
61mod tests {
62 use super::{mocks, types::Epoch, Config, Engine};
63 use commonware_codec::Encode;
64 use commonware_cryptography::{
65 bls12381::{
66 dkg::ops,
67 primitives::{
68 group::Share,
69 ops as bls_ops, poly,
70 variant::{MinPk, MinSig, Variant},
71 },
72 },
73 ed25519::{PrivateKey, PublicKey},
74 sha256::Digest as Sha256Digest,
75 PrivateKeyExt as _, Signer as _,
76 };
77 use commonware_macros::{select, test_traced};
78 use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
79 use commonware_runtime::{
80 buffer::PoolRef,
81 deterministic::{self, Context},
82 Clock, Metrics, Runner, Spawner,
83 };
84 use commonware_utils::{NZUsize, NonZeroDuration};
85 use futures::{channel::oneshot, future::join_all};
86 use rand::{rngs::StdRng, Rng, SeedableRng};
87 use std::{
88 collections::{BTreeMap, HashMap},
89 num::NonZeroUsize,
90 sync::{Arc, Mutex},
91 time::Duration,
92 };
93 use tracing::debug;
94
95 type Registrations<P> = BTreeMap<P, (Sender<P>, Receiver<P>)>;
96
97 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
98 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
99
100 const RELIABLE_LINK: Link = Link {
102 latency: 10.0,
103 jitter: 1.0,
104 success_rate: 1.0,
105 };
106
107 async fn register_participants(
109 oracle: &mut Oracle<PublicKey>,
110 participants: &[PublicKey],
111 ) -> Registrations<PublicKey> {
112 let mut registrations = BTreeMap::new();
113 for participant in participants.iter() {
114 let (sender, receiver) = oracle.register(participant.clone(), 0).await.unwrap();
115 registrations.insert(participant.clone(), (sender, receiver));
116 }
117 registrations
118 }
119
120 async fn link_participants(
122 oracle: &mut Oracle<PublicKey>,
123 participants: &[PublicKey],
124 link: Link,
125 ) {
126 for v1 in participants.iter() {
127 for v2 in participants.iter() {
128 if v2 == v1 {
129 continue;
130 }
131 oracle
132 .add_link(v1.clone(), v2.clone(), link.clone())
133 .await
134 .unwrap();
135 }
136 }
137 }
138
139 async fn initialize_simulation(
141 context: Context,
142 num_validators: u32,
143 shares_vec: &mut [Share],
144 link: Link,
145 ) -> (
146 Oracle<PublicKey>,
147 Vec<(PublicKey, PrivateKey, Share)>,
148 Vec<PublicKey>,
149 Registrations<PublicKey>,
150 ) {
151 let (network, mut oracle) = Network::new(
152 context.with_label("network"),
153 commonware_p2p::simulated::Config {
154 max_size: 1024 * 1024,
155 },
156 );
157 network.start();
158
159 let mut schemes = (0..num_validators)
160 .map(|i| PrivateKey::from_seed(i as u64))
161 .collect::<Vec<_>>();
162 schemes.sort_by_key(|s| s.public_key());
163 let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
164 .iter()
165 .enumerate()
166 .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
167 .collect();
168 let pks = validators
169 .iter()
170 .map(|(pk, _, _)| pk.clone())
171 .collect::<Vec<_>>();
172
173 let registrations = register_participants(&mut oracle, &pks).await;
174 link_participants(&mut oracle, &pks, link).await;
175 (oracle, validators, pks, registrations)
176 }
177
178 #[allow(clippy::too_many_arguments)]
180 fn spawn_validator_engines<V: Variant>(
181 context: Context,
182 polynomial: poly::Public<V>,
183 validator_pks: &[PublicKey],
184 validators: &[(PublicKey, PrivateKey, Share)],
185 registrations: &mut Registrations<PublicKey>,
186 automatons: &mut BTreeMap<PublicKey, mocks::Application>,
187 reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>,
188 oracle: &mut Oracle<PublicKey>,
189 rebroadcast_timeout: Duration,
190 invalid_when: fn(u64) -> bool,
191 ) -> HashMap<PublicKey, mocks::Monitor> {
192 let mut monitors = HashMap::new();
193 let namespace = b"my testing namespace";
194 for (validator, _scheme, share) in validators.iter() {
195 let context = context.with_label(&validator.to_string());
196 let monitor = mocks::Monitor::new(111);
197 monitors.insert(validator.clone(), monitor.clone());
198 let supervisor = {
199 let mut s = mocks::Supervisor::<PublicKey, V>::default();
200 s.add_epoch(
201 111,
202 share.clone(),
203 polynomial.clone(),
204 validator_pks.to_vec(),
205 );
206 s
207 };
208
209 let blocker = oracle.control(validator.clone());
210
211 let automaton = mocks::Application::new(invalid_when);
212 automatons.insert(validator.clone(), automaton.clone());
213
214 let (reporter, reporter_mailbox) =
215 mocks::Reporter::<V, Sha256Digest>::new(namespace, polynomial.clone());
216 context.with_label("reporter").spawn(|_| reporter.run());
217 reporters.insert(validator.clone(), reporter_mailbox);
218
219 let engine = Engine::new(
220 context.with_label("engine"),
221 Config {
222 monitor,
223 validators: supervisor,
224 automaton: automaton.clone(),
225 reporter: reporters.get(validator).unwrap().clone(),
226 blocker,
227 namespace: namespace.to_vec(),
228 priority_acks: false,
229 rebroadcast_timeout: NonZeroDuration::new_panic(rebroadcast_timeout),
230 epoch_bounds: (1, 1),
231 window: std::num::NonZeroU64::new(10).unwrap(),
232 journal_partition: format!("aggregation/{validator}"),
233 journal_write_buffer: NZUsize!(4096),
234 journal_replay_buffer: NZUsize!(4096),
235 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
236 journal_compression: Some(3),
237 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
238 },
239 );
240
241 let (sender, receiver) = registrations.remove(validator).unwrap();
242 engine.start((sender, receiver));
243 }
244 monitors
245 }
246
247 async fn await_reporters<V: Variant>(
249 context: Context,
250 reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>,
251 threshold_index: u64,
252 threshold_epoch: Epoch,
253 ) {
254 let mut receivers = Vec::new();
255 for (reporter, mailbox) in reporters.iter() {
256 let (tx, rx) = oneshot::channel();
258 receivers.push(rx);
259
260 context.with_label("reporter_watcher").spawn({
261 let reporter = reporter.clone();
262 let mut mailbox = mailbox.clone();
263 move |context| async move {
264 loop {
265 let (index, epoch) = mailbox.get_tip().await.unwrap_or((0, 0));
266 let contiguous_index = mailbox.get_contiguous_tip().await.unwrap_or(0);
267 debug!(
268 index,
269 epoch,
270 contiguous_index,
271 threshold_index,
272 threshold_epoch,
273 ?reporter,
274 "reporter status"
275 );
276 if contiguous_index >= threshold_index && epoch >= threshold_epoch {
277 debug!(
278 ?reporter,
279 "reporter reached threshold, signaling completion"
280 );
281 let _ = tx.send(reporter.clone());
282 break;
283 }
284 context.sleep(Duration::from_millis(100)).await;
285 }
286 }
287 });
288 }
289
290 let results = join_all(receivers).await;
292 assert_eq!(results.len(), reporters.len());
293
294 for result in results {
296 assert!(result.is_ok(), "reporter was cancelled");
297 }
298 }
299
300 fn all_online<V: Variant>() {
302 let num_validators: u32 = 4;
303 let quorum: u32 = 3;
304 let runner = deterministic::Runner::timed(Duration::from_secs(30));
305
306 runner.start(|mut context| async move {
307 let (polynomial, mut shares_vec) =
308 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
309 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
310
311 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
312 context.with_label("simulation"),
313 num_validators,
314 &mut shares_vec,
315 RELIABLE_LINK,
316 )
317 .await;
318 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
319 let mut reporters =
320 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
321 spawn_validator_engines::<V>(
322 context.with_label("validator"),
323 polynomial.clone(),
324 &pks,
325 &validators,
326 &mut registrations,
327 &mut automatons.lock().unwrap(),
328 &mut reporters,
329 &mut oracle,
330 Duration::from_secs(5),
331 |_| false,
332 );
333 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
334 });
335 }
336
337 #[test_traced]
338 fn test_all_online() {
339 all_online::<MinPk>();
340 all_online::<MinSig>();
341 }
342
343 fn unclean_shutdown<V: Variant>() {
344 let num_validators: u32 = 4;
346 let quorum: u32 = 3;
347 let target_index = 200; let max_shutdowns = 10; let min_shutdowns = 4; let shutdown_range_min = Duration::from_millis(100);
351 let shutdown_range_max = Duration::from_millis(1_000);
352
353 let rebroadcast_timeout = NonZeroDuration::new_panic(Duration::from_millis(20));
355
356 let mut prev_ctx = None;
357 let shutdown_counts = Arc::new(Mutex::new(HashMap::<PublicKey, u32>::new()));
358 let completed_validators = Arc::new(Mutex::new(std::collections::HashSet::new()));
359 let all_validators = Arc::new(Mutex::new(Vec::new()));
360
361 let mut rng = StdRng::seed_from_u64(0);
363 let (polynomial, mut shares_vec) =
364 ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum);
365 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
366
367 while completed_validators.lock().unwrap().len() < num_validators as usize
369 && shutdown_counts.lock().unwrap().values().max().unwrap_or(&0) < &max_shutdowns
370 {
371 let completed_clone = completed_validators.clone();
372 let shutdown_counts_clone = shutdown_counts.clone();
373 let all_validators_clone = all_validators.clone();
374 let shares_vec_clone = shares_vec.clone();
375 let polynomial_clone = polynomial.clone();
376
377 let f = move |mut context: Context| {
378 let completed = completed_clone;
379 let shutdown_counts = shutdown_counts_clone;
380 let all_validators = all_validators_clone;
381 let mut shares_vec = shares_vec_clone;
382 let polynomial = polynomial_clone;
383 async move {
384 let (oracle, validators, pks, mut registrations) = initialize_simulation(
385 context.with_label("simulation"),
386 num_validators,
387 &mut shares_vec,
388 RELIABLE_LINK,
389 )
390 .await;
391 if all_validators.lock().unwrap().is_empty() {
393 let mut pks_lock = all_validators.lock().unwrap();
394 *pks_lock = pks.clone();
395 }
396 let automatons =
397 Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
398 let mut reporters =
399 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
400
401 let mut engine_monitors = HashMap::new();
403 let namespace = b"my testing namespace";
404 for (validator, _scheme, share) in validators.iter() {
405 let validator_context = context.with_label(&validator.to_string());
406 let monitor = mocks::Monitor::new(111);
407 engine_monitors.insert(validator.clone(), monitor.clone());
408 let supervisor = {
409 let mut s = mocks::Supervisor::<PublicKey, V>::default();
410 s.add_epoch(111, share.clone(), polynomial.clone(), pks.to_vec());
411 s
412 };
413
414 let blocker = oracle.control(validator.clone());
415 let automaton = mocks::Application::new(|_| false);
416 automatons
417 .lock()
418 .unwrap()
419 .insert(validator.clone(), automaton.clone());
420
421 let (reporter, reporter_mailbox) =
422 mocks::Reporter::<V, Sha256Digest>::new(namespace, polynomial.clone());
423 validator_context
424 .with_label("reporter")
425 .spawn(|_| reporter.run());
426 reporters.insert(validator.clone(), reporter_mailbox);
427
428 let engine = Engine::new(
429 validator_context.with_label("engine"),
430 Config {
431 monitor,
432 validators: supervisor,
433 automaton,
434 reporter: reporters.get(validator).unwrap().clone(),
435 blocker,
436 namespace: namespace.to_vec(),
437 priority_acks: false,
438 rebroadcast_timeout,
439 epoch_bounds: (1, 1),
440 window: std::num::NonZeroU64::new(10).unwrap(),
441 journal_partition: format!("unclean_shutdown_test/{validator}"),
443 journal_write_buffer: NZUsize!(4096),
444 journal_replay_buffer: NZUsize!(4096),
445 journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
446 journal_compression: Some(3),
447 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
448 },
449 );
450
451 let (sender, receiver) = registrations.remove(validator).unwrap();
452 engine.start((sender, receiver));
453 }
454
455 let mut completion_tasks = Vec::new();
457 for (validator_pk, mut reporter_mailbox) in reporters.clone() {
458 let validator = validator_pk.clone();
459 let completed_ref = completed.clone();
460 let task = context.with_label("completion_watcher").spawn(
461 move |context| async move {
462 loop {
463 if let Some((tip_index, _epoch)) =
464 reporter_mailbox.get_tip().await
465 {
466 if tip_index >= target_index {
467 for check_index in 0..=tip_index {
469 if let Some((digest, epoch)) =
470 reporter_mailbox.get(check_index).await
471 {
472 assert_eq!(
473 epoch, 111,
474 "Epoch should be consistent"
475 );
476 debug!(
477 ?validator,
478 check_index,
479 ?digest,
480 "Verified validator signed message"
481 );
482 }
483 }
484 completed_ref.lock().unwrap().insert(validator.clone());
485 debug!(
486 ?validator,
487 tip_index, "Validator completed signing target"
488 );
489 break;
490 }
491 }
492 context.sleep(Duration::from_millis(50)).await;
493 }
494 },
495 );
496 completion_tasks.push(task);
497 }
498
499 let shutdown_wait = context.gen_range(shutdown_range_min..shutdown_range_max);
501
502 select! {
503 _ = context.sleep(shutdown_wait) => {
504 debug!(shutdown_wait = ?shutdown_wait, "Simulating unclean shutdown");
505 let mut counts = shutdown_counts.lock().unwrap();
507 for (pk, _) in reporters {
508 if !completed.lock().unwrap().contains(&pk) {
509 *counts.entry(pk).or_insert(0) += 1;
510 }
511 }
512 (false, context) },
514 _ = join_all(completion_tasks) => {
515 debug!("All validators completed normally");
516 (true, context) }
518 }
519 }
520 };
521
522 let (complete, context) = if let Some(prev_ctx) = prev_ctx {
523 let shutdown_count = shutdown_counts.lock().unwrap().values().sum::<u32>();
524 debug!(shutdown_count, "Restarting from previous context");
525 deterministic::Runner::from(prev_ctx)
526 } else {
527 debug!("Starting initial run");
528 deterministic::Runner::timed(Duration::from_secs(45))
529 }
530 .start(f);
531
532 prev_ctx = Some(context.recover());
533
534 if complete {
535 debug!("Test completed successfully");
536 break;
537 }
538
539 let shutdown_count = shutdown_counts.lock().unwrap().values().sum::<u32>();
540 debug!(
541 shutdown_count,
542 completed = completed_validators.lock().unwrap().len(),
543 "Shutdown occurred, restarting"
544 );
545 }
546
547 let final_completed = completed_validators.lock().unwrap().len();
549 let total_shutdowns = shutdown_counts.lock().unwrap().values().sum::<u32>();
550 assert_eq!(
551 final_completed, num_validators as usize,
552 "All validators should reach target index {target_index} despite unclean shutdowns. Only {final_completed} completed after {total_shutdowns} shutdowns"
553 );
554
555 let counts = shutdown_counts.lock().unwrap();
557 for pk in all_validators.lock().unwrap().iter() {
558 let count = counts.get(pk).copied().unwrap_or(0);
559 assert!(
560 count >= min_shutdowns,
561 "Validator {pk:?} should have at least {min_shutdowns} shutdowns, but had {count}"
562 );
563 }
564
565 debug!(
566 total_shutdowns,
567 target_index, "Unclean shutdown test completed successfully"
568 );
569 }
570
571 #[test_traced]
572 fn test_unclean_shutdown() {
573 unclean_shutdown::<MinPk>();
574 unclean_shutdown::<MinSig>();
575 }
576
577 fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
578 let num_validators: u32 = 4;
579 let quorum: u32 = 3;
580 let cfg = deterministic::Config::new()
581 .with_seed(seed)
582 .with_timeout(Some(Duration::from_secs(120)));
583 let runner = deterministic::Runner::new(cfg);
584
585 runner.start(|mut context| async move {
586 let (polynomial, mut shares_vec) =
587 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
588 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
589
590 let degraded_link = Link {
592 latency: 200.0,
593 jitter: 150.0,
594 success_rate: 0.5,
595 };
596
597 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
598 context.with_label("simulation"),
599 num_validators,
600 &mut shares_vec,
601 degraded_link,
602 )
603 .await;
604 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
605 let mut reporters =
606 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
607
608 spawn_validator_engines::<V>(
609 context.with_label("validator"),
610 polynomial.clone(),
611 &pks,
612 &validators,
613 &mut registrations,
614 &mut automatons.lock().unwrap(),
615 &mut reporters,
616 &mut oracle,
617 Duration::from_secs(2),
618 |_| false,
619 );
620
621 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
622
623 context.auditor().state()
624 })
625 }
626
627 #[test_traced]
628 fn test_slow_and_lossy_links() {
629 slow_and_lossy_links::<MinPk>(0);
630 slow_and_lossy_links::<MinSig>(0);
631 }
632
633 #[test_traced]
634 fn test_determinism() {
635 for seed in 1..6 {
638 let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
639 let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
640 assert_eq!(pk_state_1, pk_state_2);
641
642 let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
643 let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
644 assert_eq!(sig_state_1, sig_state_2);
645
646 assert_ne!(pk_state_1, sig_state_1);
648 }
649 }
650
651 fn one_offline<V: Variant>() {
652 let num_validators: u32 = 5;
653 let quorum: u32 = 3;
654 let runner = deterministic::Runner::timed(Duration::from_secs(30));
655
656 runner.start(|mut context| async move {
657 let (polynomial, mut shares_vec) =
658 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
659 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
660
661 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
662 context.with_label("simulation"),
663 num_validators,
664 &mut shares_vec,
665 RELIABLE_LINK,
666 )
667 .await;
668 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
669 let mut reporters =
670 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
671
672 let online_validators: Vec<_> = validators.iter().take(4).cloned().collect();
674 let online_pks: Vec<_> = pks.iter().take(4).cloned().collect();
675
676 spawn_validator_engines::<V>(
677 context.with_label("validator"),
678 polynomial.clone(),
679 &online_pks,
680 &online_validators,
681 &mut registrations,
682 &mut automatons.lock().unwrap(),
683 &mut reporters,
684 &mut oracle,
685 Duration::from_secs(5),
686 |_| false,
687 );
688 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
689 });
690 }
691
692 #[test_traced]
693 fn test_one_offline() {
694 one_offline::<MinPk>();
695 one_offline::<MinSig>();
696 }
697
698 fn consensus_from_index_zero<V: Variant>() {
700 let num_validators: u32 = 4;
701 let quorum: u32 = 3;
702 let runner = deterministic::Runner::timed(Duration::from_secs(30));
703
704 runner.start(|mut context| async move {
705 let (polynomial, mut shares_vec) =
706 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
707 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
708
709 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
710 context.with_label("simulation"),
711 num_validators,
712 &mut shares_vec,
713 RELIABLE_LINK,
714 )
715 .await;
716 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
717 let mut reporters =
718 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
719
720 spawn_validator_engines::<V>(
721 context.with_label("validator"),
722 polynomial.clone(),
723 &pks,
724 &validators,
725 &mut registrations,
726 &mut automatons.lock().unwrap(),
727 &mut reporters,
728 &mut oracle,
729 Duration::from_secs(5),
730 |_| false,
731 );
732
733 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
734 });
735 }
736
737 #[test_traced]
738 fn test_consensus_from_index_zero() {
739 consensus_from_index_zero::<MinPk>();
740 consensus_from_index_zero::<MinSig>();
741 }
742
743 fn network_partition<V: Variant>() {
745 let num_validators: u32 = 4;
746 let quorum: u32 = 3;
747 let runner = deterministic::Runner::timed(Duration::from_secs(60));
748
749 runner.start(|mut context| async move {
750 let (polynomial, mut shares_vec) =
751 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
752 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
753
754 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
755 context.with_label("simulation"),
756 num_validators,
757 &mut shares_vec,
758 RELIABLE_LINK,
759 )
760 .await;
761 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
762 let mut reporters =
763 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
764
765 spawn_validator_engines::<V>(
766 context.with_label("validator"),
767 polynomial.clone(),
768 &pks,
769 &validators,
770 &mut registrations,
771 &mut automatons.lock().unwrap(),
772 &mut reporters,
773 &mut oracle,
774 Duration::from_secs(5),
775 |_| false,
776 );
777
778 for v1 in pks.iter() {
779 for v2 in pks.iter() {
780 if v2 == v1 {
781 continue;
782 }
783 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
784 }
785 }
786 context.sleep(Duration::from_secs(20)).await;
787
788 let link = Link {
789 latency: 10.0,
790 jitter: 1.0,
791 success_rate: 1.0,
792 };
793 for v1 in pks.iter() {
794 for v2 in pks.iter() {
795 if v2 == v1 {
796 continue;
797 }
798 oracle
799 .add_link(v1.clone(), v2.clone(), link.clone())
800 .await
801 .unwrap();
802 }
803 }
804
805 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
806 });
807 }
808
809 #[test_traced]
810 fn test_network_partition() {
811 network_partition::<MinPk>();
812 network_partition::<MinSig>();
813 }
814
815 fn invalid_signature_injection<V: Variant>() {
817 let num_validators: u32 = 4;
818 let quorum: u32 = 3;
819 let runner = deterministic::Runner::timed(Duration::from_secs(30));
820
821 runner.start(|mut context| async move {
822 let (polynomial, mut shares_vec) =
823 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
824 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
825
826 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
827 context.with_label("simulation"),
828 num_validators,
829 &mut shares_vec,
830 RELIABLE_LINK,
831 )
832 .await;
833 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
834 let mut reporters =
835 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
836
837 let byzantine_fault_fn = |index: u64| -> bool {
840 use std::{
841 collections::hash_map::DefaultHasher,
842 hash::{Hash, Hasher},
843 };
844
845 let mut hasher = DefaultHasher::new();
846 index.hash(&mut hasher);
847 let hash_value = hasher.finish();
848
849 (hash_value % 100) < 5
852 };
853
854 spawn_validator_engines::<V>(
855 context.with_label("validator"),
856 polynomial.clone(),
857 &pks,
858 &validators,
859 &mut registrations,
860 &mut automatons.lock().unwrap(),
861 &mut reporters,
862 &mut oracle,
863 Duration::from_secs(5),
864 byzantine_fault_fn,
865 );
866
867 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
868 });
869 }
870
871 #[test_traced]
872 fn test_invalid_signature_injection() {
873 invalid_signature_injection::<MinPk>();
874 invalid_signature_injection::<MinSig>();
875 }
876
877 fn cryptographic_validation<V: Variant>() {
879 let num_validators: u32 = 4;
880 let quorum: u32 = 3;
881 let runner = deterministic::Runner::timed(Duration::from_secs(30));
882
883 runner.start(|mut context| async move {
884 let (polynomial, mut shares_vec) =
885 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
886 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
887
888 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
889 context.with_label("simulation"),
890 num_validators,
891 &mut shares_vec,
892 RELIABLE_LINK,
893 )
894 .await;
895 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
896 let mut reporters =
897 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
898
899 spawn_validator_engines::<V>(
900 context.with_label("validator"),
901 polynomial.clone(),
902 &pks,
903 &validators,
904 &mut registrations,
905 &mut automatons.lock().unwrap(),
906 &mut reporters,
907 &mut oracle,
908 Duration::from_secs(5),
909 |_| false,
910 );
911
912 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
913
914 for (validator_pk, mut reporter_mailbox) in reporters {
917 let tip_result = reporter_mailbox.get_tip().await;
918 assert!(
919 tip_result.is_some(),
920 "Reporter for validator {validator_pk:?} should have a tip"
921 );
922
923 let (tip_index, tip_epoch) = tip_result.unwrap();
924 assert!(
925 tip_index >= 1,
926 "Tip should have progressed beyond initial state for validator {validator_pk:?}"
927 );
928 assert_eq!(
929 tip_epoch, 111,
930 "Tip epoch should match expected epoch for validator {validator_pk:?}"
931 );
932
933 if tip_index > 0 {
935 let item_result = reporter_mailbox.get(tip_index - 1).await;
936 assert!(
937 item_result.is_some(),
938 "Should be able to retrieve consensus item for validator {validator_pk:?}"
939 );
940 }
941 }
942 });
943 }
944
945 #[test_traced]
946 fn test_cryptographic_validation() {
947 cryptographic_validation::<MinPk>();
948 cryptographic_validation::<MinSig>();
949 }
950
951 fn advanced_byzantine_faults<V: Variant>() {
953 let num_validators: u32 = 7; let quorum: u32 = 5; let runner = deterministic::Runner::timed(Duration::from_secs(45));
956
957 runner.start(|mut context| async move {
958 let (polynomial, mut shares_vec) =
959 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
960 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
961
962 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
963 context.with_label("simulation"),
964 num_validators,
965 &mut shares_vec,
966 RELIABLE_LINK,
967 )
968 .await;
969 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
970 let mut reporters =
971 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
972
973 let advanced_byzantine_fn = |index: u64| -> bool {
975 use std::{
976 collections::hash_map::DefaultHasher,
977 hash::{Hash, Hasher},
978 };
979
980 let mut hasher = DefaultHasher::new();
981 index.hash(&mut hasher);
982 let hash_value = hasher.finish();
983
984 match index % 11 {
985 0..=2 if (hash_value % 100) < 8 => true,
988 3..=5 if index > 10 && index < 15 => true,
990 7 if (hash_value % 13) == 0 => true,
992 _ => false,
993 }
994 };
995
996 spawn_validator_engines::<V>(
997 context.with_label("validator"),
998 polynomial.clone(),
999 &pks,
1000 &validators,
1001 &mut registrations,
1002 &mut automatons.lock().unwrap(),
1003 &mut reporters,
1004 &mut oracle,
1005 Duration::from_secs(8), advanced_byzantine_fn,
1007 );
1008
1009 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
1010 });
1011 }
1012
1013 #[test_traced]
1014 fn test_advanced_byzantine_faults() {
1015 advanced_byzantine_faults::<MinPk>();
1016 advanced_byzantine_faults::<MinSig>();
1017 }
1018
1019 fn insufficient_validators<V: Variant>() {
1021 let num_validators: u32 = 5;
1022 let quorum: u32 = 3;
1023 let runner = deterministic::Runner::timed(Duration::from_secs(15));
1024
1025 runner.start(|mut context| async move {
1026 let (polynomial, mut shares_vec) =
1027 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
1028 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
1029
1030 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
1031 context.with_label("simulation"),
1032 num_validators,
1033 &mut shares_vec,
1034 RELIABLE_LINK,
1035 )
1036 .await;
1037 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
1038 let mut reporters =
1039 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
1040
1041 let insufficient_validators: Vec<_> = validators.iter().take(2).cloned().collect();
1043 let insufficient_pks: Vec<_> = pks.iter().take(2).cloned().collect();
1044
1045 spawn_validator_engines::<V>(
1046 context.with_label("validator"),
1047 polynomial.clone(),
1048 &insufficient_pks,
1049 &insufficient_validators,
1050 &mut registrations,
1051 &mut automatons.lock().unwrap(),
1052 &mut reporters,
1053 &mut oracle,
1054 Duration::from_secs(3),
1055 |_| false,
1056 );
1057
1058 context.sleep(Duration::from_secs(12)).await;
1061
1062 let mut any_consensus = false;
1064 for (validator_pk, mut reporter_mailbox) in reporters {
1065 let contiguous_tip = reporter_mailbox.get_contiguous_tip().await.unwrap_or(0);
1070 if contiguous_tip > 0 {
1071 any_consensus = true;
1072 tracing::warn!(
1073 ?validator_pk,
1074 contiguous_tip,
1075 "Unexpected threshold signature consensus with insufficient validators"
1076 );
1077 }
1078 }
1079
1080 assert!(
1082 !any_consensus,
1083 "Consensus should not be achieved with insufficient validator participation (below quorum)"
1084 );
1085 });
1086 }
1087
1088 #[test_traced]
1089 fn test_insufficient_validators() {
1090 insufficient_validators::<MinPk>();
1091 insufficient_validators::<MinSig>();
1092 }
1093
1094 fn threshold_signature_correctness<V: Variant>() {
1096 let num_validators: u32 = 4;
1097 let quorum: u32 = 3;
1098 let runner = deterministic::Runner::timed(Duration::from_secs(30));
1099
1100 runner.start(|mut context| async move {
1101 let (polynomial, mut shares_vec) =
1102 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
1103 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
1104
1105 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
1106 context.with_label("simulation"),
1107 num_validators,
1108 &mut shares_vec,
1109 RELIABLE_LINK,
1110 )
1111 .await;
1112 let automatons = Arc::new(Mutex::new(BTreeMap::<PublicKey, mocks::Application>::new()));
1113 let mut reporters =
1114 BTreeMap::<PublicKey, mocks::ReporterMailbox<V, Sha256Digest>>::new();
1115
1116 spawn_validator_engines::<V>(
1117 context.with_label("validator"),
1118 polynomial.clone(),
1119 &pks,
1120 &validators,
1121 &mut registrations,
1122 &mut automatons.lock().unwrap(),
1123 &mut reporters,
1124 &mut oracle,
1125 Duration::from_secs(5),
1126 |_| false, );
1128
1129 await_reporters(context.with_label("reporter"), &reporters, 100, 111).await;
1130
1131 for (validator_pk, mut reporter_mailbox) in reporters {
1133 let tip_result = reporter_mailbox.get_tip().await;
1134 assert!(
1135 tip_result.is_some(),
1136 "Reporter should have achieved consensus"
1137 );
1138
1139 let (tip_index, _) = tip_result.unwrap();
1140
1141 for index in 1..=tip_index {
1143 let item_result = reporter_mailbox.get(index).await;
1144 assert!(
1145 item_result.is_some(),
1146 "Should have consensus item at index {index}"
1147 );
1148
1149 let (digest, epoch) = item_result.unwrap();
1150
1151 let _item = super::types::Item { index, digest };
1154 let mut ack_namespace = b"my testing namespace".to_vec();
1155 ack_namespace.extend_from_slice(b"_AGG_ACK");
1156
1157 tracing::debug!(
1163 ?validator_pk,
1164 index,
1165 epoch,
1166 "Verified valid threshold signature for consensus item"
1167 );
1168 }
1169 }
1170 });
1171 }
1172
1173 #[test_traced]
1174 fn test_threshold_signature_correctness() {
1175 threshold_signature_correctness::<MinPk>();
1176 threshold_signature_correctness::<MinSig>();
1177 }
1178
1179 fn manual_threshold_verification<V: Variant>() {
1181 let runner = deterministic::Runner::timed(Duration::from_secs(10));
1182
1183 runner.start(|mut context| async move {
1184 let num_validators = 4u32;
1185 let quorum = 3u32;
1186
1187 let (polynomial, shares_vec) =
1189 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
1190
1191 let test_item = super::types::Item {
1193 index: 42,
1194 digest: Sha256Digest::from([1u8; 32]),
1195 };
1196
1197 let namespace = b"test_namespace";
1198 let ack_namespace = [namespace.as_slice(), b"_AGG_ACK"].concat();
1199
1200 let mut partial_sigs = Vec::new();
1202 for share in shares_vec.iter().take(quorum as usize) {
1203 let partial_sig = bls_ops::partial_sign_message::<V>(
1204 share,
1205 Some(&ack_namespace),
1206 &test_item.encode(),
1207 );
1208 partial_sigs.push(partial_sig);
1209 }
1210
1211 let threshold_sig = poly::Signature::<V>::recover(quorum, &partial_sigs).expect(
1213 "Should be able to recover threshold signature from sufficient partial signatures",
1214 );
1215
1216 let threshold_public = poly::public::<V>(&polynomial);
1218 let verification_result = bls_ops::verify_message::<V>(
1219 threshold_public,
1220 Some(&ack_namespace),
1221 &test_item.encode(),
1222 &threshold_sig,
1223 );
1224
1225 assert!(
1226 verification_result.is_ok(),
1227 "Manually constructed threshold signature should be valid: {:?}",
1228 verification_result.err()
1229 );
1230
1231 let insufficient_partial_sigs: Vec<_> = partial_sigs
1233 .iter()
1234 .take(quorum as usize - 1)
1235 .cloned()
1236 .collect();
1237 let insufficient_result =
1238 poly::Signature::<V>::recover(quorum, &insufficient_partial_sigs);
1239
1240 assert!(
1241 insufficient_result.is_err(),
1242 "Should not be able to recover threshold signature with insufficient partial signatures"
1243 );
1244
1245 tracing::debug!("Manual threshold signature verification completed successfully");
1246 });
1247 }
1248
1249 #[test_traced]
1250 fn test_manual_threshold_verification() {
1251 manual_threshold_verification::<MinPk>();
1252 manual_threshold_verification::<MinSig>();
1253 }
1254}