1pub mod types;
37
38cfg_if::cfg_if! {
39 if #[cfg(not(target_arch = "wasm32"))] {
40 mod ack_manager;
41 use ack_manager::AckManager;
42 mod config;
43 pub use config::Config;
44 mod engine;
45 pub use engine::Engine;
46 mod metrics;
47 mod tip_manager;
48 use tip_manager::TipManager;
49 }
50}
51
52#[cfg(test)]
53pub mod mocks;
54
55#[cfg(test)]
56mod tests {
57 use super::{mocks, types::Epoch, Config, Engine};
58 use commonware_cryptography::{
59 bls12381::{
60 dkg::ops,
61 primitives::{
62 group::Share,
63 poly,
64 variant::{MinPk, MinSig, Variant},
65 },
66 },
67 ed25519::{PrivateKey, PublicKey},
68 sha256::Digest as Sha256Digest,
69 PrivateKeyExt as _, Signer as _,
70 };
71 use commonware_macros::test_traced;
72 use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
73 use commonware_runtime::{
74 deterministic::{self, Context},
75 Clock, Metrics, Runner, Spawner,
76 };
77 use commonware_utils::quorum;
78 use futures::{channel::oneshot, future::join_all};
79 use rand::{rngs::StdRng, SeedableRng as _};
80 use std::{
81 collections::{BTreeMap, HashMap, HashSet},
82 sync::{Arc, Mutex},
83 time::Duration,
84 };
85 use tracing::debug;
86
87 type Registrations<P> = BTreeMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))>;
88
89 async fn register_participants(
90 oracle: &mut Oracle<PublicKey>,
91 participants: &[PublicKey],
92 ) -> Registrations<PublicKey> {
93 let mut registrations = BTreeMap::new();
94 for participant in participants.iter() {
95 let (a1, a2) = oracle.register(participant.clone(), 0).await.unwrap();
96 let (b1, b2) = oracle.register(participant.clone(), 1).await.unwrap();
97 registrations.insert(participant.clone(), ((a1, a2), (b1, b2)));
98 }
99 registrations
100 }
101
102 #[allow(dead_code)]
103 enum Action {
104 Link(Link),
105 Update(Link),
106 Unlink,
107 }
108
109 async fn link_participants(
110 oracle: &mut Oracle<PublicKey>,
111 participants: &[PublicKey],
112 action: Action,
113 restrict_to: Option<fn(usize, usize, usize) -> bool>,
114 ) {
115 for (i1, v1) in participants.iter().enumerate() {
116 for (i2, v2) in participants.iter().enumerate() {
117 if v2 == v1 {
118 continue;
119 }
120 if let Some(f) = restrict_to {
121 if !f(participants.len(), i1, i2) {
122 continue;
123 }
124 }
125 if matches!(action, Action::Update(_) | Action::Unlink) {
126 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
127 }
128 if let Action::Link(ref link) | Action::Update(ref link) = action {
129 oracle
130 .add_link(v1.clone(), v2.clone(), link.clone())
131 .await
132 .unwrap();
133 }
134 }
135 }
136 }
137
138 async fn initialize_simulation(
139 context: Context,
140 num_validators: u32,
141 shares_vec: &mut [Share],
142 ) -> (
143 Oracle<PublicKey>,
144 Vec<(PublicKey, PrivateKey, Share)>,
145 Vec<PublicKey>,
146 Registrations<PublicKey>,
147 ) {
148 let (network, mut oracle) = Network::new(
149 context.with_label("network"),
150 commonware_p2p::simulated::Config {
151 max_size: 1024 * 1024,
152 },
153 );
154 network.start();
155
156 let mut schemes = (0..num_validators)
157 .map(|i| PrivateKey::from_seed(i as u64))
158 .collect::<Vec<_>>();
159 schemes.sort_by_key(|s| s.public_key());
160 let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
161 .iter()
162 .enumerate()
163 .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
164 .collect();
165 let pks = validators
166 .iter()
167 .map(|(pk, _, _)| pk.clone())
168 .collect::<Vec<_>>();
169
170 let registrations = register_participants(&mut oracle, &pks).await;
171 let link = Link {
172 latency: 10.0,
173 jitter: 1.0,
174 success_rate: 1.0,
175 };
176 link_participants(&mut oracle, &pks, Action::Link(link), None).await;
177 (oracle, validators, pks, registrations)
178 }
179
180 #[allow(clippy::too_many_arguments)]
181 fn spawn_validator_engines<V: Variant>(
182 context: Context,
183 polynomial: poly::Public<V>,
184 sequencer_pks: &[PublicKey],
185 validator_pks: &[PublicKey],
186 validators: &[(PublicKey, PrivateKey, Share)],
187 registrations: &mut Registrations<PublicKey>,
188 automatons: &mut BTreeMap<PublicKey, mocks::Automaton<PublicKey>>,
189 reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>,
190 rebroadcast_timeout: Duration,
191 invalid_when: fn(u64) -> bool,
192 misses_allowed: Option<usize>,
193 ) -> HashMap<PublicKey, mocks::Monitor> {
194 let mut monitors = HashMap::new();
195 let namespace = b"my testing namespace";
196 for (validator, scheme, share) in validators.iter() {
197 let context = context.with_label(&validator.to_string());
198 let monitor = mocks::Monitor::new(111);
199 monitors.insert(validator.clone(), monitor.clone());
200 let sequencers = mocks::Sequencers::<PublicKey>::new(sequencer_pks.to_vec());
201 let validators = mocks::Validators::<PublicKey, V>::new(
202 polynomial.clone(),
203 validator_pks.to_vec(),
204 Some(share.clone()),
205 );
206
207 let automaton = mocks::Automaton::<PublicKey>::new(invalid_when);
208 automatons.insert(validator.clone(), automaton.clone());
209
210 let (reporter, reporter_mailbox) = mocks::Reporter::<PublicKey, V, Sha256Digest>::new(
211 namespace,
212 *poly::public::<V>(&polynomial),
213 misses_allowed,
214 );
215 context.with_label("reporter").spawn(|_| reporter.run());
216 reporters.insert(validator.clone(), reporter_mailbox);
217
218 let engine = Engine::new(
219 context.with_label("engine"),
220 Config {
221 crypto: scheme.clone(),
222 relay: automaton.clone(),
223 automaton: automaton.clone(),
224 reporter: reporters.get(validator).unwrap().clone(),
225 monitor,
226 sequencers,
227 validators,
228 namespace: namespace.to_vec(),
229 epoch_bounds: (1, 1),
230 height_bound: 2,
231 rebroadcast_timeout,
232 priority_acks: false,
233 priority_proposals: false,
234 journal_heights_per_section: 10,
235 journal_replay_buffer: 4096,
236 journal_write_buffer: 4096,
237 journal_name_prefix: format!("ordered-broadcast-seq/{validator}/"),
238 journal_compression: Some(3),
239 },
240 );
241
242 let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
243 engine.start((a1, a2), (b1, b2));
244 }
245 monitors
246 }
247
248 async fn await_reporters<V: Variant>(
249 context: Context,
250 sequencers: Vec<PublicKey>,
251 reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>,
252 threshold: (u64, Epoch, bool),
253 ) {
254 let mut receivers = Vec::new();
255 for (reporter, mailbox) in reporters.iter() {
256 for sequencer in sequencers.iter() {
258 let (tx, rx) = oneshot::channel();
260 receivers.push(rx);
261
262 context.with_label("reporter_watcher").spawn({
263 let reporter = reporter.clone();
264 let sequencer = sequencer.clone();
265 let mut mailbox = mailbox.clone();
266 move |context| async move {
267 loop {
268 let (height, epoch) =
269 mailbox.get_tip(sequencer.clone()).await.unwrap_or((0, 0));
270 debug!(height, epoch, ?sequencer, ?reporter, "reporter");
271 let contiguous_height = mailbox
272 .get_contiguous_tip(sequencer.clone())
273 .await
274 .unwrap_or(0);
275 if height >= threshold.0
276 && epoch >= threshold.1
277 && (!threshold.2 || contiguous_height >= threshold.0)
278 {
279 let _ = tx.send(sequencer.clone());
280 break;
281 }
282 context.sleep(Duration::from_millis(100)).await;
283 }
284 }
285 });
286 }
287 }
288
289 let results = join_all(receivers).await;
291 assert_eq!(results.len(), sequencers.len() * reporters.len());
292
293 for result in results {
295 assert!(result.is_ok(), "reporter was cancelled");
296 }
297 }
298
299 async fn get_max_height<V: Variant>(
300 reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>,
301 ) -> u64 {
302 let mut max_height = 0;
303 for (sequencer, mailbox) in reporters.iter_mut() {
304 let (height, _) = mailbox.get_tip(sequencer.clone()).await.unwrap_or((0, 0));
305 if height > max_height {
306 max_height = height;
307 }
308 }
309 max_height
310 }
311
312 fn all_online<V: Variant>() {
313 let num_validators: u32 = 4;
314 let quorum: u32 = 3;
315 let runner = deterministic::Runner::timed(Duration::from_secs(30));
316
317 runner.start(|mut context| async move {
318 let (polynomial, mut shares_vec) =
319 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
320 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
321
322 let (_oracle, validators, pks, mut registrations) = initialize_simulation(
323 context.with_label("simulation"),
324 num_validators,
325 &mut shares_vec,
326 )
327 .await;
328 let automatons = Arc::new(Mutex::new(
329 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
330 ));
331 let mut reporters =
332 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
333 spawn_validator_engines::<V>(
334 context.with_label("validator"),
335 polynomial.clone(),
336 &pks,
337 &pks,
338 &validators,
339 &mut registrations,
340 &mut automatons.lock().unwrap(),
341 &mut reporters,
342 Duration::from_secs(5),
343 |_| false,
344 Some(5),
345 );
346 await_reporters(
347 context.with_label("reporter"),
348 reporters.keys().cloned().collect::<Vec<_>>(),
349 &reporters,
350 (100, 111, true),
351 )
352 .await;
353 });
354 }
355
356 #[test_traced]
357 fn test_all_online() {
358 all_online::<MinPk>();
359 all_online::<MinSig>();
360 }
361
362 fn unclean_shutdown<V: Variant>() {
363 let num_validators: u32 = 4;
364 let quorum: u32 = 3;
365 let mut rng = StdRng::seed_from_u64(0);
366 let (polynomial, mut shares_vec) =
367 ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum);
368 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
369 let completed = Arc::new(Mutex::new(HashSet::new()));
370 let shutdowns = Arc::new(Mutex::new(0u64));
371 let mut prev_ctx = None;
372
373 while completed.lock().unwrap().len() != num_validators as usize {
374 let completed = completed.clone();
375 let shares_vec = shares_vec.clone();
376 let shutdowns = shutdowns.clone();
377 let polynomial = polynomial.clone();
378
379 let f = |context: deterministic::Context| async move {
380 let (network, mut oracle) = Network::new(
381 context.with_label("network"),
382 commonware_p2p::simulated::Config {
383 max_size: 1024 * 1024,
384 },
385 );
386 network.start();
387
388 let mut schemes = (0..num_validators)
389 .map(|i| PrivateKey::from_seed(i as u64))
390 .collect::<Vec<_>>();
391 schemes.sort_by_key(|s| s.public_key());
392 let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
393 .iter()
394 .enumerate()
395 .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
396 .collect();
397 let pks = validators
398 .iter()
399 .map(|(pk, _, _)| pk.clone())
400 .collect::<Vec<_>>();
401
402 let mut registrations = register_participants(&mut oracle, &pks).await;
403 let link = commonware_p2p::simulated::Link {
404 latency: 10.0,
405 jitter: 1.0,
406 success_rate: 1.0,
407 };
408 link_participants(&mut oracle, &pks, Action::Link(link), None).await;
409
410 let automatons = Arc::new(Mutex::new(BTreeMap::<
411 PublicKey,
412 mocks::Automaton<PublicKey>,
413 >::new()));
414 let mut reporters =
415 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new(
416 );
417 spawn_validator_engines(
418 context.with_label("validator"),
419 polynomial.clone(),
420 &pks,
421 &pks,
422 &validators,
423 &mut registrations,
424 &mut automatons.lock().unwrap(),
425 &mut reporters,
426 Duration::from_secs(5),
427 |_| false,
428 None,
429 );
430
431 let reporter_pairs: Vec<(
432 PublicKey,
433 mocks::ReporterMailbox<PublicKey, V, Sha256Digest>,
434 )> = reporters
435 .iter()
436 .map(|(v, m)| (v.clone(), m.clone()))
437 .collect();
438 for (validator, mut mailbox) in reporter_pairs {
439 let completed_clone = completed.clone();
440 context
441 .with_label("reporter_unclean")
442 .spawn(|context| async move {
443 loop {
444 let (height, _) =
445 mailbox.get_tip(validator.clone()).await.unwrap_or((0, 0));
446 if height >= 100 {
447 completed_clone.lock().unwrap().insert(validator.clone());
448 break;
449 }
450 context.sleep(Duration::from_millis(100)).await;
451 }
452 });
453 }
454 context.sleep(Duration::from_millis(1000)).await;
455 *shutdowns.lock().unwrap() += 1;
456
457 context
458 };
459
460 let context = if let Some(prev_ctx) = prev_ctx {
461 deterministic::Runner::from(prev_ctx)
462 } else {
463 deterministic::Runner::timed(Duration::from_secs(45))
464 }
465 .start(f);
466
467 prev_ctx = Some(context.recover());
468 }
469 }
470
471 #[test_traced]
472 fn test_unclean_shutdown() {
473 unclean_shutdown::<MinPk>();
474 unclean_shutdown::<MinSig>();
475 }
476
477 fn network_partition<V: Variant>() {
478 let num_validators: u32 = 4;
479 let quorum: u32 = 3;
480 let runner = deterministic::Runner::timed(Duration::from_secs(60));
481
482 runner.start(|mut context| async move {
483 let (polynomial, mut shares_vec) =
484 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
485 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
486
487 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
489 context.with_label("simulation"),
490 num_validators,
491 &mut shares_vec,
492 )
493 .await;
494 let automatons = Arc::new(Mutex::new(
495 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
496 ));
497 let mut reporters =
498 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
499 spawn_validator_engines(
500 context.with_label("validator"),
501 polynomial.clone(),
502 &pks,
503 &pks,
504 &validators,
505 &mut registrations,
506 &mut automatons.lock().unwrap(),
507 &mut reporters,
508 Duration::from_secs(1),
509 |_| false,
510 None,
511 );
512
513 link_participants(&mut oracle, &pks, Action::Unlink, None).await;
515 context.sleep(Duration::from_secs(30)).await;
516
517 let max_height = get_max_height(&mut reporters).await;
519
520 let link = Link {
522 latency: 10.0,
523 jitter: 1.0,
524 success_rate: 1.0,
525 };
526 link_participants(&mut oracle, &pks, Action::Link(link), None).await;
527 await_reporters(
528 context.with_label("reporter"),
529 reporters.keys().cloned().collect::<Vec<_>>(),
530 &reporters,
531 (max_height + 100, 111, false),
532 )
533 .await;
534 });
535 }
536
537 #[test_traced]
538 #[ignore]
539 fn test_network_partition() {
540 network_partition::<MinPk>();
541 network_partition::<MinSig>();
542 }
543
544 fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
545 let num_validators: u32 = 4;
546 let quorum: u32 = 3;
547 let cfg = deterministic::Config::new()
548 .with_seed(seed)
549 .with_timeout(Some(Duration::from_secs(40)));
550 let runner = deterministic::Runner::new(cfg);
551
552 runner.start(|mut context| async move {
553 let (polynomial, mut shares_vec) =
554 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
555 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
556
557 let (oracle, validators, pks, mut registrations) = initialize_simulation(
558 context.with_label("simulation"),
559 num_validators,
560 &mut shares_vec,
561 )
562 .await;
563 let delayed_link = Link {
564 latency: 50.0,
565 jitter: 40.0,
566 success_rate: 0.5,
567 };
568 let mut oracle_clone = oracle.clone();
569 link_participants(&mut oracle_clone, &pks, Action::Update(delayed_link), None).await;
570
571 let automatons = Arc::new(Mutex::new(
572 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
573 ));
574 let mut reporters =
575 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
576 spawn_validator_engines(
577 context.with_label("validator"),
578 polynomial.clone(),
579 &pks,
580 &pks,
581 &validators,
582 &mut registrations,
583 &mut automatons.lock().unwrap(),
584 &mut reporters,
585 Duration::from_millis(150),
586 |_| false,
587 None,
588 );
589
590 await_reporters(
591 context.with_label("reporter"),
592 reporters.keys().cloned().collect::<Vec<_>>(),
593 &reporters,
594 (40, 111, false),
595 )
596 .await;
597
598 context.auditor().state()
599 })
600 }
601
602 #[test_traced]
603 fn test_slow_and_lossy_links() {
604 slow_and_lossy_links::<MinPk>(0);
605 slow_and_lossy_links::<MinSig>(0);
606 }
607
608 #[test_traced]
609 #[ignore]
610 fn test_determinism() {
611 for seed in 1..6 {
614 let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
615 let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
616 assert_eq!(pk_state_1, pk_state_2);
617
618 let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
619 let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
620 assert_eq!(sig_state_1, sig_state_2);
621
622 assert_ne!(pk_state_1, sig_state_1);
624 }
625 }
626
627 fn invalid_signature_injection<V: Variant>() {
628 let num_validators: u32 = 4;
629 let quorum: u32 = 3;
630 let runner = deterministic::Runner::timed(Duration::from_secs(30));
631
632 runner.start(|mut context| async move {
633 let (polynomial, mut shares_vec) =
634 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
635 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
636
637 let (_oracle, validators, pks, mut registrations) = initialize_simulation(
638 context.with_label("simulation"),
639 num_validators,
640 &mut shares_vec,
641 )
642 .await;
643 let automatons = Arc::new(Mutex::new(
644 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
645 ));
646 let mut reporters =
647 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
648 spawn_validator_engines::<V>(
649 context.with_label("validator"),
650 polynomial.clone(),
651 &pks,
652 &pks,
653 &validators,
654 &mut registrations,
655 &mut automatons.lock().unwrap(),
656 &mut reporters,
657 Duration::from_secs(5),
658 |i| i % 10 == 0,
659 None,
660 );
661
662 await_reporters(
663 context.with_label("reporter"),
664 reporters.keys().cloned().collect::<Vec<_>>(),
665 &reporters,
666 (100, 111, true),
667 )
668 .await;
669 });
670 }
671
672 #[test_traced]
673 fn test_invalid_signature_injection() {
674 invalid_signature_injection::<MinPk>();
675 invalid_signature_injection::<MinSig>();
676 }
677
678 fn updated_epoch<V: Variant>() {
679 let num_validators: u32 = 4;
680 let quorum: u32 = 3;
681 let runner = deterministic::Runner::timed(Duration::from_secs(60));
682
683 runner.start(|mut context| async move {
684 let (polynomial, mut shares_vec) =
685 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
686 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
687
688 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
690 context.with_label("simulation"),
691 num_validators,
692 &mut shares_vec,
693 )
694 .await;
695 let automatons = Arc::new(Mutex::new(
696 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
697 ));
698 let mut reporters =
699 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
700 let monitors = spawn_validator_engines::<V>(
701 context.with_label("validator"),
702 polynomial.clone(),
703 &pks,
704 &pks,
705 &validators,
706 &mut registrations,
707 &mut automatons.lock().unwrap(),
708 &mut reporters,
709 Duration::from_secs(1),
710 |_| false,
711 Some(5),
712 );
713
714 await_reporters(
716 context.with_label("reporter"),
717 reporters.keys().cloned().collect::<Vec<_>>(),
718 &reporters,
719 (100, 111, true),
720 )
721 .await;
722
723 link_participants(&mut oracle, &pks, Action::Unlink, None).await;
725 context.sleep(Duration::from_secs(30)).await;
726
727 let max_height = get_max_height(&mut reporters).await;
729
730 for monitor in monitors.values() {
732 monitor.update(112);
733 }
734
735 let link = Link {
737 latency: 10.0,
738 jitter: 1.0,
739 success_rate: 1.0,
740 };
741 link_participants(&mut oracle, &pks, Action::Link(link), None).await;
742 await_reporters(
743 context.with_label("reporter"),
744 reporters.keys().cloned().collect::<Vec<_>>(),
745 &reporters,
746 (max_height + 100, 112, true),
747 )
748 .await;
749 });
750 }
751
752 #[test_traced]
753 fn test_updated_epoch() {
754 updated_epoch::<MinPk>();
755 updated_epoch::<MinSig>();
756 }
757
758 fn external_sequencer<V: Variant>() {
759 let num_validators: u32 = 4;
760 let quorum: u32 = quorum(3);
761 let runner = deterministic::Runner::timed(Duration::from_secs(60));
762 runner.start(|mut context| async move {
763 let (polynomial, shares) =
765 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
766
767 let mut schemes = (0..num_validators)
769 .map(|i| PrivateKey::from_seed(i as u64))
770 .collect::<Vec<_>>();
771 schemes.sort_by_key(|s| s.public_key());
772
773 let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
775 .iter()
776 .enumerate()
777 .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares[i].clone()))
778 .collect();
779 let validator_pks = validators
780 .iter()
781 .map(|(pk, _, _)| pk.clone())
782 .collect::<Vec<_>>();
783
784 let sequencer = PrivateKey::from_seed(u64::MAX);
786
787 let mut participants = validators
789 .iter()
790 .map(|(pk, _, _)| pk.clone())
791 .collect::<Vec<_>>();
792 participants.push(sequencer.public_key()); let (network, mut oracle) = Network::new(
796 context.with_label("network"),
797 commonware_p2p::simulated::Config {
798 max_size: 1024 * 1024,
799 },
800 );
801 network.start();
802
803 let mut registrations = register_participants(&mut oracle, &participants).await;
805 let link = commonware_p2p::simulated::Link {
806 latency: 10.0,
807 jitter: 1.0,
808 success_rate: 1.0,
809 };
810 link_participants(&mut oracle, &participants, Action::Link(link), None).await;
811
812 let automatons = Arc::new(Mutex::new(
814 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
815 ));
816 let mut reporters =
817 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
818 let mut monitors = HashMap::new();
819 let namespace = b"my testing namespace";
820
821 for (validator, scheme, share) in validators.iter() {
823 let context = context.with_label(&validator.to_string());
824 let monitor = mocks::Monitor::new(111);
825 monitors.insert(validator.clone(), monitor.clone());
826 let sequencers = mocks::Sequencers::<PublicKey>::new(vec![sequencer.public_key()]);
827 let validators = mocks::Validators::<PublicKey, V>::new(
828 polynomial.clone(),
829 validator_pks.clone(),
830 Some(share.clone()),
831 );
832
833 let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
834 automatons
835 .lock()
836 .unwrap()
837 .insert(validator.clone(), automaton.clone());
838
839 let (reporter, reporter_mailbox) =
840 mocks::Reporter::<PublicKey, V, Sha256Digest>::new(
841 namespace,
842 *poly::public::<V>(&polynomial),
843 Some(5),
844 );
845 context.with_label("reporter").spawn(|_| reporter.run());
846 reporters.insert(validator.clone(), reporter_mailbox);
847
848 let engine = Engine::new(
849 context.with_label("engine"),
850 Config {
851 crypto: scheme.clone(),
852 relay: automaton.clone(),
853 automaton: automaton.clone(),
854 reporter: reporters.get(validator).unwrap().clone(),
855 monitor,
856 sequencers,
857 validators,
858 namespace: namespace.to_vec(),
859 epoch_bounds: (1, 1),
860 height_bound: 2,
861 rebroadcast_timeout: Duration::from_secs(5),
862 priority_acks: false,
863 priority_proposals: false,
864 journal_heights_per_section: 10,
865 journal_replay_buffer: 4096,
866 journal_write_buffer: 4096,
867 journal_name_prefix: format!("ordered-broadcast-seq/{validator}/"),
868 journal_compression: Some(3),
869 },
870 );
871
872 let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
873 engine.start((a1, a2), (b1, b2));
874 }
875
876 {
878 let context = context.with_label("sequencer");
879 let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
880 automatons
881 .lock()
882 .unwrap()
883 .insert(sequencer.public_key(), automaton.clone());
884 let (reporter, reporter_mailbox) =
885 mocks::Reporter::<PublicKey, V, Sha256Digest>::new(
886 namespace,
887 *poly::public::<V>(&polynomial),
888 Some(5),
889 );
890 context.with_label("reporter").spawn(|_| reporter.run());
891 reporters.insert(sequencer.public_key(), reporter_mailbox);
892 let engine = Engine::new(
893 context.with_label("engine"),
894 Config {
895 crypto: sequencer.clone(),
896 relay: automaton.clone(),
897 automaton: automaton.clone(),
898 reporter: reporters.get(&sequencer.public_key()).unwrap().clone(),
899 monitor: mocks::Monitor::new(111),
900 sequencers: mocks::Sequencers::<PublicKey>::new(vec![
901 sequencer.public_key()
902 ]),
903 validators: mocks::Validators::<PublicKey, V>::new(
904 polynomial.clone(),
905 validator_pks,
906 None,
907 ),
908 namespace: namespace.to_vec(),
909 epoch_bounds: (1, 1),
910 height_bound: 2,
911 rebroadcast_timeout: Duration::from_secs(5),
912 priority_acks: false,
913 priority_proposals: false,
914 journal_heights_per_section: 10,
915 journal_replay_buffer: 4096,
916 journal_write_buffer: 4096,
917 journal_name_prefix: format!(
918 "ordered-broadcast-seq/{}/",
919 sequencer.public_key()
920 ),
921 journal_compression: Some(3),
922 },
923 );
924
925 let ((a1, a2), (b1, b2)) = registrations.remove(&sequencer.public_key()).unwrap();
926 engine.start((a1, a2), (b1, b2));
927 }
928
929 await_reporters(
931 context.with_label("reporter"),
932 vec![sequencer.public_key()],
933 &reporters,
934 (100, 111, true),
935 )
936 .await;
937 });
938 }
939
940 #[test_traced]
941 fn test_external_sequencer() {
942 external_sequencer::<MinPk>();
943 external_sequencer::<MinSig>();
944 }
945
946 fn run_1k<V: Variant>() {
947 let num_validators: u32 = 10;
948 let quorum: u32 = 3;
949 let cfg = deterministic::Config::new();
950 let runner = deterministic::Runner::new(cfg);
951
952 runner.start(|mut context| async move {
953 let (polynomial, mut shares_vec) =
954 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
955 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
956
957 let (oracle, validators, pks, mut registrations) = initialize_simulation(
958 context.with_label("simulation"),
959 num_validators,
960 &mut shares_vec,
961 )
962 .await;
963 let delayed_link = Link {
964 latency: 80.0,
965 jitter: 10.0,
966 success_rate: 0.98,
967 };
968 let mut oracle_clone = oracle.clone();
969 link_participants(&mut oracle_clone, &pks, Action::Update(delayed_link), None).await;
970
971 let automatons = Arc::new(Mutex::new(
972 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
973 ));
974 let mut reporters =
975 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
976 let sequencers = &pks[0..pks.len() / 2];
977 spawn_validator_engines::<V>(
978 context.with_label("validator"),
979 polynomial.clone(),
980 sequencers,
981 &pks,
982 &validators,
983 &mut registrations,
984 &mut automatons.lock().unwrap(),
985 &mut reporters,
986 Duration::from_millis(150),
987 |_| false,
988 None,
989 );
990
991 await_reporters(
992 context.with_label("reporter"),
993 sequencers.to_vec(),
994 &reporters,
995 (1_000, 111, false),
996 )
997 .await;
998 })
999 }
1000
1001 #[test_traced]
1002 #[ignore]
1003 fn test_1k() {
1004 run_1k::<MinPk>();
1005 run_1k::<MinSig>();
1006 }
1007}