1pub mod types;
37
38cfg_if::cfg_if! {
39 if #[cfg(not(target_arch = "wasm32"))] {
40 mod ack_manager;
41 use ack_manager::AckManager;
42 mod config;
43 pub use config::Config;
44 mod engine;
45 pub use engine::Engine;
46 mod metrics;
47 mod tip_manager;
48 use tip_manager::TipManager;
49 }
50}
51
52#[cfg(test)]
53pub mod mocks;
54
55#[cfg(test)]
56mod tests {
57 use super::{mocks, types::Epoch, Config, Engine};
58 use commonware_cryptography::{
59 bls12381::{
60 dkg::ops,
61 primitives::{
62 group::Share,
63 poly,
64 variant::{MinPk, MinSig, Variant},
65 },
66 },
67 ed25519::{PrivateKey, PublicKey},
68 sha256::Digest as Sha256Digest,
69 PrivateKeyExt as _, Signer as _,
70 };
71 use commonware_macros::test_traced;
72 use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
73 use commonware_runtime::{
74 buffer::PoolRef,
75 deterministic::{self, Context},
76 Clock, Metrics, Runner, Spawner,
77 };
78 use commonware_utils::{quorum, NZUsize};
79 use futures::{channel::oneshot, future::join_all};
80 use rand::{rngs::StdRng, SeedableRng as _};
81 use std::{
82 collections::{BTreeMap, HashMap, HashSet},
83 num::NonZeroUsize,
84 sync::{Arc, Mutex},
85 time::Duration,
86 };
87 use tracing::debug;
88
89 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
90 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
91
92 type Registrations<P> = BTreeMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))>;
93
94 async fn register_participants(
95 oracle: &mut Oracle<PublicKey>,
96 participants: &[PublicKey],
97 ) -> Registrations<PublicKey> {
98 let mut registrations = BTreeMap::new();
99 for participant in participants.iter() {
100 let (a1, a2) = oracle.register(participant.clone(), 0).await.unwrap();
101 let (b1, b2) = oracle.register(participant.clone(), 1).await.unwrap();
102 registrations.insert(participant.clone(), ((a1, a2), (b1, b2)));
103 }
104 registrations
105 }
106
107 #[allow(dead_code)]
108 enum Action {
109 Link(Link),
110 Update(Link),
111 Unlink,
112 }
113
114 async fn link_participants(
115 oracle: &mut Oracle<PublicKey>,
116 participants: &[PublicKey],
117 action: Action,
118 restrict_to: Option<fn(usize, usize, usize) -> bool>,
119 ) {
120 for (i1, v1) in participants.iter().enumerate() {
121 for (i2, v2) in participants.iter().enumerate() {
122 if v2 == v1 {
123 continue;
124 }
125 if let Some(f) = restrict_to {
126 if !f(participants.len(), i1, i2) {
127 continue;
128 }
129 }
130 if matches!(action, Action::Update(_) | Action::Unlink) {
131 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
132 }
133 if let Action::Link(ref link) | Action::Update(ref link) = action {
134 oracle
135 .add_link(v1.clone(), v2.clone(), link.clone())
136 .await
137 .unwrap();
138 }
139 }
140 }
141 }
142
143 async fn initialize_simulation(
144 context: Context,
145 num_validators: u32,
146 shares_vec: &mut [Share],
147 ) -> (
148 Oracle<PublicKey>,
149 Vec<(PublicKey, PrivateKey, Share)>,
150 Vec<PublicKey>,
151 Registrations<PublicKey>,
152 ) {
153 let (network, mut oracle) = Network::new(
154 context.with_label("network"),
155 commonware_p2p::simulated::Config {
156 max_size: 1024 * 1024,
157 },
158 );
159 network.start();
160
161 let mut schemes = (0..num_validators)
162 .map(|i| PrivateKey::from_seed(i as u64))
163 .collect::<Vec<_>>();
164 schemes.sort_by_key(|s| s.public_key());
165 let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
166 .iter()
167 .enumerate()
168 .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
169 .collect();
170 let pks = validators
171 .iter()
172 .map(|(pk, _, _)| pk.clone())
173 .collect::<Vec<_>>();
174
175 let registrations = register_participants(&mut oracle, &pks).await;
176 let link = Link {
177 latency: Duration::from_millis(10),
178 jitter: Duration::from_millis(1),
179 success_rate: 1.0,
180 };
181 link_participants(&mut oracle, &pks, Action::Link(link), None).await;
182 (oracle, validators, pks, registrations)
183 }
184
185 #[allow(clippy::too_many_arguments)]
186 fn spawn_validator_engines<V: Variant>(
187 context: Context,
188 polynomial: poly::Public<V>,
189 sequencer_pks: &[PublicKey],
190 validator_pks: &[PublicKey],
191 validators: &[(PublicKey, PrivateKey, Share)],
192 registrations: &mut Registrations<PublicKey>,
193 automatons: &mut BTreeMap<PublicKey, mocks::Automaton<PublicKey>>,
194 reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>,
195 rebroadcast_timeout: Duration,
196 invalid_when: fn(u64) -> bool,
197 misses_allowed: Option<usize>,
198 ) -> HashMap<PublicKey, mocks::Monitor> {
199 let mut monitors = HashMap::new();
200 let namespace = b"my testing namespace";
201 for (validator, scheme, share) in validators.iter() {
202 let context = context.with_label(&validator.to_string());
203 let monitor = mocks::Monitor::new(111);
204 monitors.insert(validator.clone(), monitor.clone());
205 let sequencers = mocks::Sequencers::<PublicKey>::new(sequencer_pks.to_vec());
206 let validators = mocks::Validators::<PublicKey, V>::new(
207 polynomial.clone(),
208 validator_pks.to_vec(),
209 Some(share.clone()),
210 );
211
212 let automaton = mocks::Automaton::<PublicKey>::new(invalid_when);
213 automatons.insert(validator.clone(), automaton.clone());
214
215 let (reporter, reporter_mailbox) = mocks::Reporter::<PublicKey, V, Sha256Digest>::new(
216 namespace,
217 *poly::public::<V>(&polynomial),
218 misses_allowed,
219 );
220 context.with_label("reporter").spawn(|_| reporter.run());
221 reporters.insert(validator.clone(), reporter_mailbox);
222
223 let engine = Engine::new(
224 context.with_label("engine"),
225 Config {
226 crypto: scheme.clone(),
227 relay: automaton.clone(),
228 automaton: automaton.clone(),
229 reporter: reporters.get(validator).unwrap().clone(),
230 monitor,
231 sequencers,
232 validators,
233 namespace: namespace.to_vec(),
234 epoch_bounds: (1, 1),
235 height_bound: 2,
236 rebroadcast_timeout,
237 priority_acks: false,
238 priority_proposals: false,
239 journal_heights_per_section: 10,
240 journal_replay_buffer: NZUsize!(4096),
241 journal_write_buffer: NZUsize!(4096),
242 journal_name_prefix: format!("ordered-broadcast-seq/{validator}/"),
243 journal_compression: Some(3),
244 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
245 },
246 );
247
248 let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
249 engine.start((a1, a2), (b1, b2));
250 }
251 monitors
252 }
253
254 async fn await_reporters<V: Variant>(
255 context: Context,
256 sequencers: Vec<PublicKey>,
257 reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>,
258 threshold: (u64, Epoch, bool),
259 ) {
260 let mut receivers = Vec::new();
261 for (reporter, mailbox) in reporters.iter() {
262 for sequencer in sequencers.iter() {
264 let (tx, rx) = oneshot::channel();
266 receivers.push(rx);
267
268 context.with_label("reporter_watcher").spawn({
269 let reporter = reporter.clone();
270 let sequencer = sequencer.clone();
271 let mut mailbox = mailbox.clone();
272 move |context| async move {
273 loop {
274 let (height, epoch) =
275 mailbox.get_tip(sequencer.clone()).await.unwrap_or((0, 0));
276 debug!(height, epoch, ?sequencer, ?reporter, "reporter");
277 let contiguous_height = mailbox
278 .get_contiguous_tip(sequencer.clone())
279 .await
280 .unwrap_or(0);
281 if height >= threshold.0
282 && epoch >= threshold.1
283 && (!threshold.2 || contiguous_height >= threshold.0)
284 {
285 let _ = tx.send(sequencer.clone());
286 break;
287 }
288 context.sleep(Duration::from_millis(100)).await;
289 }
290 }
291 });
292 }
293 }
294
295 let results = join_all(receivers).await;
297 assert_eq!(results.len(), sequencers.len() * reporters.len());
298
299 for result in results {
301 assert!(result.is_ok(), "reporter was cancelled");
302 }
303 }
304
305 async fn get_max_height<V: Variant>(
306 reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>,
307 ) -> u64 {
308 let mut max_height = 0;
309 for (sequencer, mailbox) in reporters.iter_mut() {
310 let (height, _) = mailbox.get_tip(sequencer.clone()).await.unwrap_or((0, 0));
311 if height > max_height {
312 max_height = height;
313 }
314 }
315 max_height
316 }
317
318 fn all_online<V: Variant>() {
319 let num_validators: u32 = 4;
320 let quorum: u32 = 3;
321 let runner = deterministic::Runner::timed(Duration::from_secs(30));
322
323 runner.start(|mut context| async move {
324 let (polynomial, mut shares_vec) =
325 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
326 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
327
328 let (_oracle, validators, pks, mut registrations) = initialize_simulation(
329 context.with_label("simulation"),
330 num_validators,
331 &mut shares_vec,
332 )
333 .await;
334 let automatons = Arc::new(Mutex::new(
335 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
336 ));
337 let mut reporters =
338 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
339 spawn_validator_engines::<V>(
340 context.with_label("validator"),
341 polynomial.clone(),
342 &pks,
343 &pks,
344 &validators,
345 &mut registrations,
346 &mut automatons.lock().unwrap(),
347 &mut reporters,
348 Duration::from_secs(5),
349 |_| false,
350 Some(5),
351 );
352 await_reporters(
353 context.with_label("reporter"),
354 reporters.keys().cloned().collect::<Vec<_>>(),
355 &reporters,
356 (100, 111, true),
357 )
358 .await;
359 });
360 }
361
362 #[test_traced]
363 fn test_all_online() {
364 all_online::<MinPk>();
365 all_online::<MinSig>();
366 }
367
368 fn unclean_shutdown<V: Variant>() {
369 let num_validators: u32 = 4;
370 let quorum: u32 = 3;
371 let mut rng = StdRng::seed_from_u64(0);
372 let (polynomial, mut shares_vec) =
373 ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum);
374 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
375 let completed = Arc::new(Mutex::new(HashSet::new()));
376 let shutdowns = Arc::new(Mutex::new(0u64));
377 let mut prev_ctx = None;
378
379 while completed.lock().unwrap().len() != num_validators as usize {
380 let completed = completed.clone();
381 let shares_vec = shares_vec.clone();
382 let shutdowns = shutdowns.clone();
383 let polynomial = polynomial.clone();
384
385 let f = |context: deterministic::Context| async move {
386 let (network, mut oracle) = Network::new(
387 context.with_label("network"),
388 commonware_p2p::simulated::Config {
389 max_size: 1024 * 1024,
390 },
391 );
392 network.start();
393
394 let mut schemes = (0..num_validators)
395 .map(|i| PrivateKey::from_seed(i as u64))
396 .collect::<Vec<_>>();
397 schemes.sort_by_key(|s| s.public_key());
398 let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
399 .iter()
400 .enumerate()
401 .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
402 .collect();
403 let pks = validators
404 .iter()
405 .map(|(pk, _, _)| pk.clone())
406 .collect::<Vec<_>>();
407
408 let mut registrations = register_participants(&mut oracle, &pks).await;
409 let link = commonware_p2p::simulated::Link {
410 latency: Duration::from_millis(10),
411 jitter: Duration::from_millis(1),
412 success_rate: 1.0,
413 };
414 link_participants(&mut oracle, &pks, Action::Link(link), None).await;
415
416 let automatons = Arc::new(Mutex::new(BTreeMap::<
417 PublicKey,
418 mocks::Automaton<PublicKey>,
419 >::new()));
420 let mut reporters =
421 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new(
422 );
423 spawn_validator_engines(
424 context.with_label("validator"),
425 polynomial.clone(),
426 &pks,
427 &pks,
428 &validators,
429 &mut registrations,
430 &mut automatons.lock().unwrap(),
431 &mut reporters,
432 Duration::from_secs(5),
433 |_| false,
434 None,
435 );
436
437 let reporter_pairs: Vec<(
438 PublicKey,
439 mocks::ReporterMailbox<PublicKey, V, Sha256Digest>,
440 )> = reporters
441 .iter()
442 .map(|(v, m)| (v.clone(), m.clone()))
443 .collect();
444 for (validator, mut mailbox) in reporter_pairs {
445 let completed_clone = completed.clone();
446 context
447 .with_label("reporter_unclean")
448 .spawn(|context| async move {
449 loop {
450 let (height, _) =
451 mailbox.get_tip(validator.clone()).await.unwrap_or((0, 0));
452 if height >= 100 {
453 completed_clone.lock().unwrap().insert(validator.clone());
454 break;
455 }
456 context.sleep(Duration::from_millis(100)).await;
457 }
458 });
459 }
460 context.sleep(Duration::from_millis(1000)).await;
461 *shutdowns.lock().unwrap() += 1;
462
463 context
464 };
465
466 let context = if let Some(prev_ctx) = prev_ctx {
467 deterministic::Runner::from(prev_ctx)
468 } else {
469 deterministic::Runner::timed(Duration::from_secs(45))
470 }
471 .start(f);
472
473 prev_ctx = Some(context.recover());
474 }
475 }
476
477 #[test_traced]
478 fn test_unclean_shutdown() {
479 unclean_shutdown::<MinPk>();
480 unclean_shutdown::<MinSig>();
481 }
482
483 fn network_partition<V: Variant>() {
484 let num_validators: u32 = 4;
485 let quorum: u32 = 3;
486 let runner = deterministic::Runner::timed(Duration::from_secs(60));
487
488 runner.start(|mut context| async move {
489 let (polynomial, mut shares_vec) =
490 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
491 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
492
493 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
495 context.with_label("simulation"),
496 num_validators,
497 &mut shares_vec,
498 )
499 .await;
500 let automatons = Arc::new(Mutex::new(
501 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
502 ));
503 let mut reporters =
504 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
505 spawn_validator_engines(
506 context.with_label("validator"),
507 polynomial.clone(),
508 &pks,
509 &pks,
510 &validators,
511 &mut registrations,
512 &mut automatons.lock().unwrap(),
513 &mut reporters,
514 Duration::from_secs(1),
515 |_| false,
516 None,
517 );
518
519 link_participants(&mut oracle, &pks, Action::Unlink, None).await;
521 context.sleep(Duration::from_secs(30)).await;
522
523 let max_height = get_max_height(&mut reporters).await;
525
526 let link = Link {
528 latency: Duration::from_millis(10),
529 jitter: Duration::from_millis(1),
530 success_rate: 1.0,
531 };
532 link_participants(&mut oracle, &pks, Action::Link(link), None).await;
533 await_reporters(
534 context.with_label("reporter"),
535 reporters.keys().cloned().collect::<Vec<_>>(),
536 &reporters,
537 (max_height + 100, 111, false),
538 )
539 .await;
540 });
541 }
542
543 #[test_traced]
544 #[ignore]
545 fn test_network_partition() {
546 network_partition::<MinPk>();
547 network_partition::<MinSig>();
548 }
549
550 fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
551 let num_validators: u32 = 4;
552 let quorum: u32 = 3;
553 let cfg = deterministic::Config::new()
554 .with_seed(seed)
555 .with_timeout(Some(Duration::from_secs(40)));
556 let runner = deterministic::Runner::new(cfg);
557
558 runner.start(|mut context| async move {
559 let (polynomial, mut shares_vec) =
560 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
561 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
562
563 let (oracle, validators, pks, mut registrations) = initialize_simulation(
564 context.with_label("simulation"),
565 num_validators,
566 &mut shares_vec,
567 )
568 .await;
569 let delayed_link = Link {
570 latency: Duration::from_millis(50),
571 jitter: Duration::from_millis(40),
572 success_rate: 0.5,
573 };
574 let mut oracle_clone = oracle.clone();
575 link_participants(&mut oracle_clone, &pks, Action::Update(delayed_link), None).await;
576
577 let automatons = Arc::new(Mutex::new(
578 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
579 ));
580 let mut reporters =
581 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
582 spawn_validator_engines(
583 context.with_label("validator"),
584 polynomial.clone(),
585 &pks,
586 &pks,
587 &validators,
588 &mut registrations,
589 &mut automatons.lock().unwrap(),
590 &mut reporters,
591 Duration::from_millis(150),
592 |_| false,
593 None,
594 );
595
596 await_reporters(
597 context.with_label("reporter"),
598 reporters.keys().cloned().collect::<Vec<_>>(),
599 &reporters,
600 (40, 111, false),
601 )
602 .await;
603
604 context.auditor().state()
605 })
606 }
607
608 #[test_traced]
609 fn test_slow_and_lossy_links() {
610 slow_and_lossy_links::<MinPk>(0);
611 slow_and_lossy_links::<MinSig>(0);
612 }
613
614 #[test_traced]
615 #[ignore]
616 fn test_determinism() {
617 for seed in 1..6 {
620 let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
621 let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
622 assert_eq!(pk_state_1, pk_state_2);
623
624 let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
625 let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
626 assert_eq!(sig_state_1, sig_state_2);
627
628 assert_ne!(pk_state_1, sig_state_1);
630 }
631 }
632
633 fn invalid_signature_injection<V: Variant>() {
634 let num_validators: u32 = 4;
635 let quorum: u32 = 3;
636 let runner = deterministic::Runner::timed(Duration::from_secs(30));
637
638 runner.start(|mut context| async move {
639 let (polynomial, mut shares_vec) =
640 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
641 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
642
643 let (_oracle, validators, pks, mut registrations) = initialize_simulation(
644 context.with_label("simulation"),
645 num_validators,
646 &mut shares_vec,
647 )
648 .await;
649 let automatons = Arc::new(Mutex::new(
650 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
651 ));
652 let mut reporters =
653 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
654 spawn_validator_engines::<V>(
655 context.with_label("validator"),
656 polynomial.clone(),
657 &pks,
658 &pks,
659 &validators,
660 &mut registrations,
661 &mut automatons.lock().unwrap(),
662 &mut reporters,
663 Duration::from_secs(5),
664 |i| i % 10 == 0,
665 None,
666 );
667
668 await_reporters(
669 context.with_label("reporter"),
670 reporters.keys().cloned().collect::<Vec<_>>(),
671 &reporters,
672 (100, 111, true),
673 )
674 .await;
675 });
676 }
677
678 #[test_traced]
679 fn test_invalid_signature_injection() {
680 invalid_signature_injection::<MinPk>();
681 invalid_signature_injection::<MinSig>();
682 }
683
684 fn updated_epoch<V: Variant>() {
685 let num_validators: u32 = 4;
686 let quorum: u32 = 3;
687 let runner = deterministic::Runner::timed(Duration::from_secs(60));
688
689 runner.start(|mut context| async move {
690 let (polynomial, mut shares_vec) =
691 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
692 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
693
694 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
696 context.with_label("simulation"),
697 num_validators,
698 &mut shares_vec,
699 )
700 .await;
701 let automatons = Arc::new(Mutex::new(
702 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
703 ));
704 let mut reporters =
705 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
706 let monitors = spawn_validator_engines::<V>(
707 context.with_label("validator"),
708 polynomial.clone(),
709 &pks,
710 &pks,
711 &validators,
712 &mut registrations,
713 &mut automatons.lock().unwrap(),
714 &mut reporters,
715 Duration::from_secs(1),
716 |_| false,
717 Some(5),
718 );
719
720 await_reporters(
722 context.with_label("reporter"),
723 reporters.keys().cloned().collect::<Vec<_>>(),
724 &reporters,
725 (100, 111, true),
726 )
727 .await;
728
729 link_participants(&mut oracle, &pks, Action::Unlink, None).await;
731 context.sleep(Duration::from_secs(30)).await;
732
733 let max_height = get_max_height(&mut reporters).await;
735
736 for monitor in monitors.values() {
738 monitor.update(112);
739 }
740
741 let link = Link {
743 latency: Duration::from_millis(10),
744 jitter: Duration::from_millis(1),
745 success_rate: 1.0,
746 };
747 link_participants(&mut oracle, &pks, Action::Link(link), None).await;
748 await_reporters(
749 context.with_label("reporter"),
750 reporters.keys().cloned().collect::<Vec<_>>(),
751 &reporters,
752 (max_height + 100, 112, true),
753 )
754 .await;
755 });
756 }
757
758 #[test_traced]
759 fn test_updated_epoch() {
760 updated_epoch::<MinPk>();
761 updated_epoch::<MinSig>();
762 }
763
764 fn external_sequencer<V: Variant>() {
765 let num_validators: u32 = 4;
766 let quorum: u32 = quorum(3);
767 let runner = deterministic::Runner::timed(Duration::from_secs(60));
768 runner.start(|mut context| async move {
769 let (polynomial, shares) =
771 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
772
773 let mut schemes = (0..num_validators)
775 .map(|i| PrivateKey::from_seed(i as u64))
776 .collect::<Vec<_>>();
777 schemes.sort_by_key(|s| s.public_key());
778
779 let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
781 .iter()
782 .enumerate()
783 .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares[i].clone()))
784 .collect();
785 let validator_pks = validators
786 .iter()
787 .map(|(pk, _, _)| pk.clone())
788 .collect::<Vec<_>>();
789
790 let sequencer = PrivateKey::from_seed(u64::MAX);
792
793 let mut participants = validators
795 .iter()
796 .map(|(pk, _, _)| pk.clone())
797 .collect::<Vec<_>>();
798 participants.push(sequencer.public_key()); let (network, mut oracle) = Network::new(
802 context.with_label("network"),
803 commonware_p2p::simulated::Config {
804 max_size: 1024 * 1024,
805 },
806 );
807 network.start();
808
809 let mut registrations = register_participants(&mut oracle, &participants).await;
811 let link = commonware_p2p::simulated::Link {
812 latency: Duration::from_millis(10),
813 jitter: Duration::from_millis(1),
814 success_rate: 1.0,
815 };
816 link_participants(&mut oracle, &participants, Action::Link(link), None).await;
817
818 let automatons = Arc::new(Mutex::new(
820 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
821 ));
822 let mut reporters =
823 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
824 let mut monitors = HashMap::new();
825 let namespace = b"my testing namespace";
826
827 for (validator, scheme, share) in validators.iter() {
829 let context = context.with_label(&validator.to_string());
830 let monitor = mocks::Monitor::new(111);
831 monitors.insert(validator.clone(), monitor.clone());
832 let sequencers = mocks::Sequencers::<PublicKey>::new(vec![sequencer.public_key()]);
833 let validators = mocks::Validators::<PublicKey, V>::new(
834 polynomial.clone(),
835 validator_pks.clone(),
836 Some(share.clone()),
837 );
838
839 let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
840 automatons
841 .lock()
842 .unwrap()
843 .insert(validator.clone(), automaton.clone());
844
845 let (reporter, reporter_mailbox) =
846 mocks::Reporter::<PublicKey, V, Sha256Digest>::new(
847 namespace,
848 *poly::public::<V>(&polynomial),
849 Some(5),
850 );
851 context.with_label("reporter").spawn(|_| reporter.run());
852 reporters.insert(validator.clone(), reporter_mailbox);
853
854 let engine = Engine::new(
855 context.with_label("engine"),
856 Config {
857 crypto: scheme.clone(),
858 relay: automaton.clone(),
859 automaton: automaton.clone(),
860 reporter: reporters.get(validator).unwrap().clone(),
861 monitor,
862 sequencers,
863 validators,
864 namespace: namespace.to_vec(),
865 epoch_bounds: (1, 1),
866 height_bound: 2,
867 rebroadcast_timeout: Duration::from_secs(5),
868 priority_acks: false,
869 priority_proposals: false,
870 journal_heights_per_section: 10,
871 journal_replay_buffer: NZUsize!(4096),
872 journal_write_buffer: NZUsize!(4096),
873 journal_name_prefix: format!("ordered-broadcast-seq/{validator}/"),
874 journal_compression: Some(3),
875 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
876 },
877 );
878
879 let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
880 engine.start((a1, a2), (b1, b2));
881 }
882
883 {
885 let context = context.with_label("sequencer");
886 let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
887 automatons
888 .lock()
889 .unwrap()
890 .insert(sequencer.public_key(), automaton.clone());
891 let (reporter, reporter_mailbox) =
892 mocks::Reporter::<PublicKey, V, Sha256Digest>::new(
893 namespace,
894 *poly::public::<V>(&polynomial),
895 Some(5),
896 );
897 context.with_label("reporter").spawn(|_| reporter.run());
898 reporters.insert(sequencer.public_key(), reporter_mailbox);
899 let engine = Engine::new(
900 context.with_label("engine"),
901 Config {
902 crypto: sequencer.clone(),
903 relay: automaton.clone(),
904 automaton: automaton.clone(),
905 reporter: reporters.get(&sequencer.public_key()).unwrap().clone(),
906 monitor: mocks::Monitor::new(111),
907 sequencers: mocks::Sequencers::<PublicKey>::new(vec![
908 sequencer.public_key()
909 ]),
910 validators: mocks::Validators::<PublicKey, V>::new(
911 polynomial.clone(),
912 validator_pks,
913 None,
914 ),
915 namespace: namespace.to_vec(),
916 epoch_bounds: (1, 1),
917 height_bound: 2,
918 rebroadcast_timeout: Duration::from_secs(5),
919 priority_acks: false,
920 priority_proposals: false,
921 journal_heights_per_section: 10,
922 journal_replay_buffer: NZUsize!(4096),
923 journal_write_buffer: NZUsize!(4096),
924 journal_name_prefix: format!(
925 "ordered-broadcast-seq/{}/",
926 sequencer.public_key()
927 ),
928 journal_compression: Some(3),
929 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
930 },
931 );
932
933 let ((a1, a2), (b1, b2)) = registrations.remove(&sequencer.public_key()).unwrap();
934 engine.start((a1, a2), (b1, b2));
935 }
936
937 await_reporters(
939 context.with_label("reporter"),
940 vec![sequencer.public_key()],
941 &reporters,
942 (100, 111, true),
943 )
944 .await;
945 });
946 }
947
948 #[test_traced]
949 fn test_external_sequencer() {
950 external_sequencer::<MinPk>();
951 external_sequencer::<MinSig>();
952 }
953
954 fn run_1k<V: Variant>() {
955 let num_validators: u32 = 10;
956 let quorum: u32 = 3;
957 let cfg = deterministic::Config::new();
958 let runner = deterministic::Runner::new(cfg);
959
960 runner.start(|mut context| async move {
961 let (polynomial, mut shares_vec) =
962 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
963 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
964
965 let (oracle, validators, pks, mut registrations) = initialize_simulation(
966 context.with_label("simulation"),
967 num_validators,
968 &mut shares_vec,
969 )
970 .await;
971 let delayed_link = Link {
972 latency: Duration::from_millis(80),
973 jitter: Duration::from_millis(10),
974 success_rate: 0.98,
975 };
976 let mut oracle_clone = oracle.clone();
977 link_participants(&mut oracle_clone, &pks, Action::Update(delayed_link), None).await;
978
979 let automatons = Arc::new(Mutex::new(
980 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
981 ));
982 let mut reporters =
983 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
984 let sequencers = &pks[0..pks.len() / 2];
985 spawn_validator_engines::<V>(
986 context.with_label("validator"),
987 polynomial.clone(),
988 sequencers,
989 &pks,
990 &validators,
991 &mut registrations,
992 &mut automatons.lock().unwrap(),
993 &mut reporters,
994 Duration::from_millis(150),
995 |_| false,
996 None,
997 );
998
999 await_reporters(
1000 context.with_label("reporter"),
1001 sequencers.to_vec(),
1002 &reporters,
1003 (1_000, 111, false),
1004 )
1005 .await;
1006 })
1007 }
1008
1009 #[test_traced]
1010 #[ignore]
1011 fn test_1k() {
1012 run_1k::<MinPk>();
1013 run_1k::<MinSig>();
1014 }
1015}