1pub mod types;
37
38cfg_if::cfg_if! {
39 if #[cfg(not(target_arch = "wasm32"))] {
40 mod ack_manager;
41 use ack_manager::AckManager;
42 mod config;
43 pub use config::Config;
44 mod engine;
45 pub use engine::Engine;
46 mod metrics;
47 mod tip_manager;
48 use tip_manager::TipManager;
49 }
50}
51
52#[cfg(test)]
53pub mod mocks;
54
55#[cfg(test)]
56mod tests {
57 use super::{mocks, Config, Engine};
58 use crate::types::Epoch;
59 use commonware_cryptography::{
60 bls12381::{
61 dkg::ops,
62 primitives::{
63 group::Share,
64 poly,
65 variant::{MinPk, MinSig, Variant},
66 },
67 },
68 ed25519::{PrivateKey, PublicKey},
69 sha256::Digest as Sha256Digest,
70 PrivateKeyExt as _, Signer as _,
71 };
72 use commonware_macros::test_traced;
73 use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
74 use commonware_runtime::{
75 buffer::PoolRef,
76 deterministic::{self, Context},
77 Clock, Metrics, Runner, Spawner,
78 };
79 use commonware_utils::{quorum, NZUsize};
80 use futures::{channel::oneshot, future::join_all};
81 use rand::{rngs::StdRng, SeedableRng as _};
82 use std::{
83 collections::{BTreeMap, HashMap, HashSet},
84 num::NonZeroUsize,
85 sync::{Arc, Mutex},
86 time::Duration,
87 };
88 use tracing::debug;
89
90 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
91 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
92
93 type Registrations<P> = BTreeMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))>;
94
95 async fn register_participants(
96 oracle: &mut Oracle<PublicKey>,
97 participants: &[PublicKey],
98 ) -> Registrations<PublicKey> {
99 let mut registrations = BTreeMap::new();
100 for participant in participants.iter() {
101 let mut control = oracle.control(participant.clone());
102 let (a1, a2) = control.register(0).await.unwrap();
103 let (b1, b2) = control.register(1).await.unwrap();
104 registrations.insert(participant.clone(), ((a1, a2), (b1, b2)));
105 }
106 registrations
107 }
108
109 enum Action {
110 Link(Link),
111 Update(Link),
112 Unlink,
113 }
114
115 async fn link_participants(
116 oracle: &mut Oracle<PublicKey>,
117 participants: &[PublicKey],
118 action: Action,
119 restrict_to: Option<fn(usize, usize, usize) -> bool>,
120 ) {
121 for (i1, v1) in participants.iter().enumerate() {
122 for (i2, v2) in participants.iter().enumerate() {
123 if v2 == v1 {
124 continue;
125 }
126 if let Some(f) = restrict_to {
127 if !f(participants.len(), i1, i2) {
128 continue;
129 }
130 }
131 if matches!(action, Action::Update(_) | Action::Unlink) {
132 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
133 }
134 if let Action::Link(ref link) | Action::Update(ref link) = action {
135 oracle
136 .add_link(v1.clone(), v2.clone(), link.clone())
137 .await
138 .unwrap();
139 }
140 }
141 }
142 }
143
144 async fn initialize_simulation(
145 context: Context,
146 num_validators: u32,
147 shares_vec: &mut [Share],
148 ) -> (
149 Oracle<PublicKey>,
150 Vec<(PublicKey, PrivateKey, Share)>,
151 Vec<PublicKey>,
152 Registrations<PublicKey>,
153 ) {
154 let (network, mut oracle) = Network::new(
155 context.with_label("network"),
156 commonware_p2p::simulated::Config {
157 max_size: 1024 * 1024,
158 disconnect_on_block: true,
159 tracked_peer_sets: None,
160 },
161 );
162 network.start();
163
164 let mut schemes = (0..num_validators)
165 .map(|i| PrivateKey::from_seed(i as u64))
166 .collect::<Vec<_>>();
167 schemes.sort_by_key(|s| s.public_key());
168 let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
169 .iter()
170 .enumerate()
171 .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
172 .collect();
173 let pks = validators
174 .iter()
175 .map(|(pk, _, _)| pk.clone())
176 .collect::<Vec<_>>();
177
178 let registrations = register_participants(&mut oracle, &pks).await;
179 let link = Link {
180 latency: Duration::from_millis(10),
181 jitter: Duration::from_millis(1),
182 success_rate: 1.0,
183 };
184 link_participants(&mut oracle, &pks, Action::Link(link), None).await;
185 (oracle, validators, pks, registrations)
186 }
187
188 #[allow(clippy::too_many_arguments)]
189 fn spawn_validator_engines<V: Variant>(
190 context: Context,
191 polynomial: poly::Public<V>,
192 sequencer_pks: &[PublicKey],
193 validator_pks: &[PublicKey],
194 validators: &[(PublicKey, PrivateKey, Share)],
195 registrations: &mut Registrations<PublicKey>,
196 automatons: &mut BTreeMap<PublicKey, mocks::Automaton<PublicKey>>,
197 reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>,
198 rebroadcast_timeout: Duration,
199 invalid_when: fn(u64) -> bool,
200 misses_allowed: Option<usize>,
201 ) -> HashMap<PublicKey, mocks::Monitor> {
202 let mut monitors = HashMap::new();
203 let namespace = b"my testing namespace";
204 for (validator, scheme, share) in validators.iter() {
205 let context = context.with_label(&validator.to_string());
206 let monitor = mocks::Monitor::new(111);
207 monitors.insert(validator.clone(), monitor.clone());
208 let sequencers = mocks::Sequencers::<PublicKey>::new(sequencer_pks.to_vec());
209 let validators = mocks::Validators::<PublicKey, V>::new(
210 polynomial.clone(),
211 validator_pks.to_vec(),
212 Some(share.clone()),
213 );
214
215 let automaton = mocks::Automaton::<PublicKey>::new(invalid_when);
216 automatons.insert(validator.clone(), automaton.clone());
217
218 let (reporter, reporter_mailbox) = mocks::Reporter::<PublicKey, V, Sha256Digest>::new(
219 namespace,
220 *poly::public::<V>(&polynomial),
221 misses_allowed,
222 );
223 context.with_label("reporter").spawn(|_| reporter.run());
224 reporters.insert(validator.clone(), reporter_mailbox);
225
226 let engine = Engine::new(
227 context.with_label("engine"),
228 Config {
229 crypto: scheme.clone(),
230 relay: automaton.clone(),
231 automaton: automaton.clone(),
232 reporter: reporters.get(validator).unwrap().clone(),
233 monitor,
234 sequencers,
235 validators,
236 namespace: namespace.to_vec(),
237 epoch_bounds: (1, 1),
238 height_bound: 2,
239 rebroadcast_timeout,
240 priority_acks: false,
241 priority_proposals: false,
242 journal_heights_per_section: 10,
243 journal_replay_buffer: NZUsize!(4096),
244 journal_write_buffer: NZUsize!(4096),
245 journal_name_prefix: format!("ordered-broadcast-seq/{validator}/"),
246 journal_compression: Some(3),
247 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
248 },
249 );
250
251 let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
252 engine.start((a1, a2), (b1, b2));
253 }
254 monitors
255 }
256
257 async fn await_reporters<V: Variant>(
258 context: Context,
259 sequencers: Vec<PublicKey>,
260 reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>,
261 threshold: (u64, Epoch, bool),
262 ) {
263 let mut receivers = Vec::new();
264 for (reporter, mailbox) in reporters.iter() {
265 for sequencer in sequencers.iter() {
267 let (tx, rx) = oneshot::channel();
269 receivers.push(rx);
270
271 context.with_label("reporter_watcher").spawn({
272 let reporter = reporter.clone();
273 let sequencer = sequencer.clone();
274 let mut mailbox = mailbox.clone();
275 move |context| async move {
276 loop {
277 let (height, epoch) =
278 mailbox.get_tip(sequencer.clone()).await.unwrap_or((0, 0));
279 debug!(height, epoch, ?sequencer, ?reporter, "reporter");
280 let contiguous_height = mailbox
281 .get_contiguous_tip(sequencer.clone())
282 .await
283 .unwrap_or(0);
284 if height >= threshold.0
285 && epoch >= threshold.1
286 && (!threshold.2 || contiguous_height >= threshold.0)
287 {
288 let _ = tx.send(sequencer.clone());
289 break;
290 }
291 context.sleep(Duration::from_millis(100)).await;
292 }
293 }
294 });
295 }
296 }
297
298 let results = join_all(receivers).await;
300 assert_eq!(results.len(), sequencers.len() * reporters.len());
301
302 for result in results {
304 assert!(result.is_ok(), "reporter was cancelled");
305 }
306 }
307
308 async fn get_max_height<V: Variant>(
309 reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>,
310 ) -> u64 {
311 let mut max_height = 0;
312 for (sequencer, mailbox) in reporters.iter_mut() {
313 let (height, _) = mailbox.get_tip(sequencer.clone()).await.unwrap_or((0, 0));
314 if height > max_height {
315 max_height = height;
316 }
317 }
318 max_height
319 }
320
321 fn all_online<V: Variant>() {
322 let num_validators: u32 = 4;
323 let quorum: u32 = 3;
324 let runner = deterministic::Runner::timed(Duration::from_secs(30));
325
326 runner.start(|mut context| async move {
327 let (polynomial, mut shares_vec) =
328 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
329 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
330
331 let (_oracle, validators, pks, mut registrations) = initialize_simulation(
332 context.with_label("simulation"),
333 num_validators,
334 &mut shares_vec,
335 )
336 .await;
337 let automatons = Arc::new(Mutex::new(
338 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
339 ));
340 let mut reporters =
341 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
342 spawn_validator_engines::<V>(
343 context.with_label("validator"),
344 polynomial.clone(),
345 &pks,
346 &pks,
347 &validators,
348 &mut registrations,
349 &mut automatons.lock().unwrap(),
350 &mut reporters,
351 Duration::from_secs(5),
352 |_| false,
353 Some(5),
354 );
355 await_reporters(
356 context.with_label("reporter"),
357 reporters.keys().cloned().collect::<Vec<_>>(),
358 &reporters,
359 (100, 111, true),
360 )
361 .await;
362 });
363 }
364
365 #[test_traced]
366 fn test_all_online() {
367 all_online::<MinPk>();
368 all_online::<MinSig>();
369 }
370
371 fn unclean_shutdown<V: Variant>() {
372 let num_validators: u32 = 4;
373 let quorum: u32 = 3;
374 let mut rng = StdRng::seed_from_u64(0);
375 let (polynomial, mut shares_vec) =
376 ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum);
377 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
378 let completed = Arc::new(Mutex::new(HashSet::new()));
379 let shutdowns = Arc::new(Mutex::new(0u64));
380 let mut prev_checkpoint = None;
381
382 while completed.lock().unwrap().len() != num_validators as usize {
383 let completed = completed.clone();
384 let shares_vec = shares_vec.clone();
385 let shutdowns = shutdowns.clone();
386 let polynomial = polynomial.clone();
387
388 let f = |context: deterministic::Context| async move {
389 let (network, mut oracle) = Network::new(
390 context.with_label("network"),
391 commonware_p2p::simulated::Config {
392 max_size: 1024 * 1024,
393 disconnect_on_block: true,
394 tracked_peer_sets: None,
395 },
396 );
397 network.start();
398
399 let mut schemes = (0..num_validators)
400 .map(|i| PrivateKey::from_seed(i as u64))
401 .collect::<Vec<_>>();
402 schemes.sort_by_key(|s| s.public_key());
403 let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
404 .iter()
405 .enumerate()
406 .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
407 .collect();
408 let pks = validators
409 .iter()
410 .map(|(pk, _, _)| pk.clone())
411 .collect::<Vec<_>>();
412
413 let mut registrations = register_participants(&mut oracle, &pks).await;
414 let link = commonware_p2p::simulated::Link {
415 latency: Duration::from_millis(10),
416 jitter: Duration::from_millis(1),
417 success_rate: 1.0,
418 };
419 link_participants(&mut oracle, &pks, Action::Link(link), None).await;
420
421 let automatons = Arc::new(Mutex::new(BTreeMap::<
422 PublicKey,
423 mocks::Automaton<PublicKey>,
424 >::new()));
425 let mut reporters =
426 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new(
427 );
428 spawn_validator_engines(
429 context.with_label("validator"),
430 polynomial.clone(),
431 &pks,
432 &pks,
433 &validators,
434 &mut registrations,
435 &mut automatons.lock().unwrap(),
436 &mut reporters,
437 Duration::from_secs(5),
438 |_| false,
439 None,
440 );
441
442 let reporter_pairs: Vec<(
443 PublicKey,
444 mocks::ReporterMailbox<PublicKey, V, Sha256Digest>,
445 )> = reporters
446 .iter()
447 .map(|(v, m)| (v.clone(), m.clone()))
448 .collect();
449 for (validator, mut mailbox) in reporter_pairs {
450 let completed_clone = completed.clone();
451 context
452 .with_label("reporter_unclean")
453 .spawn(|context| async move {
454 loop {
455 let (height, _) =
456 mailbox.get_tip(validator.clone()).await.unwrap_or((0, 0));
457 if height >= 100 {
458 completed_clone.lock().unwrap().insert(validator.clone());
459 break;
460 }
461 context.sleep(Duration::from_millis(100)).await;
462 }
463 });
464 }
465 context.sleep(Duration::from_millis(1000)).await;
466 *shutdowns.lock().unwrap() += 1;
467 };
468
469 let (_, checkpoint) = if let Some(prev_checkpoint) = prev_checkpoint {
470 deterministic::Runner::from(prev_checkpoint)
471 } else {
472 deterministic::Runner::timed(Duration::from_secs(45))
473 }
474 .start_and_recover(f);
475
476 prev_checkpoint = Some(checkpoint);
477 }
478 }
479
480 #[test_traced]
481 fn test_unclean_shutdown() {
482 unclean_shutdown::<MinPk>();
483 unclean_shutdown::<MinSig>();
484 }
485
486 fn network_partition<V: Variant>() {
487 let num_validators: u32 = 4;
488 let quorum: u32 = 3;
489 let runner = deterministic::Runner::timed(Duration::from_secs(60));
490
491 runner.start(|mut context| async move {
492 let (polynomial, mut shares_vec) =
493 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
494 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
495
496 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
498 context.with_label("simulation"),
499 num_validators,
500 &mut shares_vec,
501 )
502 .await;
503 let automatons = Arc::new(Mutex::new(
504 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
505 ));
506 let mut reporters =
507 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
508 spawn_validator_engines(
509 context.with_label("validator"),
510 polynomial.clone(),
511 &pks,
512 &pks,
513 &validators,
514 &mut registrations,
515 &mut automatons.lock().unwrap(),
516 &mut reporters,
517 Duration::from_secs(1),
518 |_| false,
519 None,
520 );
521
522 link_participants(&mut oracle, &pks, Action::Unlink, None).await;
524 context.sleep(Duration::from_secs(30)).await;
525
526 let max_height = get_max_height(&mut reporters).await;
528
529 let link = Link {
531 latency: Duration::from_millis(10),
532 jitter: Duration::from_millis(1),
533 success_rate: 1.0,
534 };
535 link_participants(&mut oracle, &pks, Action::Link(link), None).await;
536 await_reporters(
537 context.with_label("reporter"),
538 reporters.keys().cloned().collect::<Vec<_>>(),
539 &reporters,
540 (max_height + 100, 111, false),
541 )
542 .await;
543 });
544 }
545
546 #[test_traced]
547 #[ignore]
548 fn test_network_partition() {
549 network_partition::<MinPk>();
550 network_partition::<MinSig>();
551 }
552
553 fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
554 let num_validators: u32 = 4;
555 let quorum: u32 = 3;
556 let cfg = deterministic::Config::new()
557 .with_seed(seed)
558 .with_timeout(Some(Duration::from_secs(40)));
559 let runner = deterministic::Runner::new(cfg);
560
561 runner.start(|mut context| async move {
562 let (polynomial, mut shares_vec) =
563 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
564 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
565
566 let (oracle, validators, pks, mut registrations) = initialize_simulation(
567 context.with_label("simulation"),
568 num_validators,
569 &mut shares_vec,
570 )
571 .await;
572 let delayed_link = Link {
573 latency: Duration::from_millis(50),
574 jitter: Duration::from_millis(40),
575 success_rate: 0.5,
576 };
577 let mut oracle_clone = oracle.clone();
578 link_participants(&mut oracle_clone, &pks, Action::Update(delayed_link), None).await;
579
580 let automatons = Arc::new(Mutex::new(
581 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
582 ));
583 let mut reporters =
584 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
585 spawn_validator_engines(
586 context.with_label("validator"),
587 polynomial.clone(),
588 &pks,
589 &pks,
590 &validators,
591 &mut registrations,
592 &mut automatons.lock().unwrap(),
593 &mut reporters,
594 Duration::from_millis(150),
595 |_| false,
596 None,
597 );
598
599 await_reporters(
600 context.with_label("reporter"),
601 reporters.keys().cloned().collect::<Vec<_>>(),
602 &reporters,
603 (40, 111, false),
604 )
605 .await;
606
607 context.auditor().state()
608 })
609 }
610
611 #[test_traced]
612 fn test_slow_and_lossy_links() {
613 slow_and_lossy_links::<MinPk>(0);
614 slow_and_lossy_links::<MinSig>(0);
615 }
616
617 #[test_traced]
618 #[ignore]
619 fn test_determinism() {
620 for seed in 1..6 {
623 let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
624 let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
625 assert_eq!(pk_state_1, pk_state_2);
626
627 let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
628 let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
629 assert_eq!(sig_state_1, sig_state_2);
630
631 assert_ne!(pk_state_1, sig_state_1);
633 }
634 }
635
636 fn invalid_signature_injection<V: Variant>() {
637 let num_validators: u32 = 4;
638 let quorum: u32 = 3;
639 let runner = deterministic::Runner::timed(Duration::from_secs(30));
640
641 runner.start(|mut context| async move {
642 let (polynomial, mut shares_vec) =
643 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
644 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
645
646 let (_oracle, validators, pks, mut registrations) = initialize_simulation(
647 context.with_label("simulation"),
648 num_validators,
649 &mut shares_vec,
650 )
651 .await;
652 let automatons = Arc::new(Mutex::new(
653 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
654 ));
655 let mut reporters =
656 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
657 spawn_validator_engines::<V>(
658 context.with_label("validator"),
659 polynomial.clone(),
660 &pks,
661 &pks,
662 &validators,
663 &mut registrations,
664 &mut automatons.lock().unwrap(),
665 &mut reporters,
666 Duration::from_secs(5),
667 |i| i % 10 == 0,
668 None,
669 );
670
671 await_reporters(
672 context.with_label("reporter"),
673 reporters.keys().cloned().collect::<Vec<_>>(),
674 &reporters,
675 (100, 111, true),
676 )
677 .await;
678 });
679 }
680
681 #[test_traced]
682 fn test_invalid_signature_injection() {
683 invalid_signature_injection::<MinPk>();
684 invalid_signature_injection::<MinSig>();
685 }
686
687 fn updated_epoch<V: Variant>() {
688 let num_validators: u32 = 4;
689 let quorum: u32 = 3;
690 let runner = deterministic::Runner::timed(Duration::from_secs(60));
691
692 runner.start(|mut context| async move {
693 let (polynomial, mut shares_vec) =
694 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
695 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
696
697 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
699 context.with_label("simulation"),
700 num_validators,
701 &mut shares_vec,
702 )
703 .await;
704 let automatons = Arc::new(Mutex::new(
705 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
706 ));
707 let mut reporters =
708 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
709 let monitors = spawn_validator_engines::<V>(
710 context.with_label("validator"),
711 polynomial.clone(),
712 &pks,
713 &pks,
714 &validators,
715 &mut registrations,
716 &mut automatons.lock().unwrap(),
717 &mut reporters,
718 Duration::from_secs(1),
719 |_| false,
720 Some(5),
721 );
722
723 await_reporters(
725 context.with_label("reporter"),
726 reporters.keys().cloned().collect::<Vec<_>>(),
727 &reporters,
728 (100, 111, true),
729 )
730 .await;
731
732 link_participants(&mut oracle, &pks, Action::Unlink, None).await;
734 context.sleep(Duration::from_secs(30)).await;
735
736 let max_height = get_max_height(&mut reporters).await;
738
739 for monitor in monitors.values() {
741 monitor.update(112);
742 }
743
744 let link = Link {
746 latency: Duration::from_millis(10),
747 jitter: Duration::from_millis(1),
748 success_rate: 1.0,
749 };
750 link_participants(&mut oracle, &pks, Action::Link(link), None).await;
751 await_reporters(
752 context.with_label("reporter"),
753 reporters.keys().cloned().collect::<Vec<_>>(),
754 &reporters,
755 (max_height + 100, 112, true),
756 )
757 .await;
758 });
759 }
760
761 #[test_traced]
762 fn test_updated_epoch() {
763 updated_epoch::<MinPk>();
764 updated_epoch::<MinSig>();
765 }
766
767 fn external_sequencer<V: Variant>() {
768 let num_validators: u32 = 4;
769 let quorum: u32 = quorum(3);
770 let runner = deterministic::Runner::timed(Duration::from_secs(60));
771 runner.start(|mut context| async move {
772 let (polynomial, shares) =
774 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
775
776 let mut schemes = (0..num_validators)
778 .map(|i| PrivateKey::from_seed(i as u64))
779 .collect::<Vec<_>>();
780 schemes.sort_by_key(|s| s.public_key());
781
782 let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
784 .iter()
785 .enumerate()
786 .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares[i].clone()))
787 .collect();
788 let validator_pks = validators
789 .iter()
790 .map(|(pk, _, _)| pk.clone())
791 .collect::<Vec<_>>();
792
793 let sequencer = PrivateKey::from_seed(u64::MAX);
795
796 let mut participants = validators
798 .iter()
799 .map(|(pk, _, _)| pk.clone())
800 .collect::<Vec<_>>();
801 participants.push(sequencer.public_key()); let (network, mut oracle) = Network::new(
805 context.with_label("network"),
806 commonware_p2p::simulated::Config {
807 max_size: 1024 * 1024,
808 disconnect_on_block: true,
809 tracked_peer_sets: None,
810 },
811 );
812 network.start();
813
814 let mut registrations = register_participants(&mut oracle, &participants).await;
816 let link = commonware_p2p::simulated::Link {
817 latency: Duration::from_millis(10),
818 jitter: Duration::from_millis(1),
819 success_rate: 1.0,
820 };
821 link_participants(&mut oracle, &participants, Action::Link(link), None).await;
822
823 let automatons = Arc::new(Mutex::new(
825 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
826 ));
827 let mut reporters =
828 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
829 let mut monitors = HashMap::new();
830 let namespace = b"my testing namespace";
831
832 for (validator, scheme, share) in validators.iter() {
834 let context = context.with_label(&validator.to_string());
835 let monitor = mocks::Monitor::new(111);
836 monitors.insert(validator.clone(), monitor.clone());
837 let sequencers = mocks::Sequencers::<PublicKey>::new(vec![sequencer.public_key()]);
838 let validators = mocks::Validators::<PublicKey, V>::new(
839 polynomial.clone(),
840 validator_pks.clone(),
841 Some(share.clone()),
842 );
843
844 let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
845 automatons
846 .lock()
847 .unwrap()
848 .insert(validator.clone(), automaton.clone());
849
850 let (reporter, reporter_mailbox) =
851 mocks::Reporter::<PublicKey, V, Sha256Digest>::new(
852 namespace,
853 *poly::public::<V>(&polynomial),
854 Some(5),
855 );
856 context.with_label("reporter").spawn(|_| reporter.run());
857 reporters.insert(validator.clone(), reporter_mailbox);
858
859 let engine = Engine::new(
860 context.with_label("engine"),
861 Config {
862 crypto: scheme.clone(),
863 relay: automaton.clone(),
864 automaton: automaton.clone(),
865 reporter: reporters.get(validator).unwrap().clone(),
866 monitor,
867 sequencers,
868 validators,
869 namespace: namespace.to_vec(),
870 epoch_bounds: (1, 1),
871 height_bound: 2,
872 rebroadcast_timeout: Duration::from_secs(5),
873 priority_acks: false,
874 priority_proposals: false,
875 journal_heights_per_section: 10,
876 journal_replay_buffer: NZUsize!(4096),
877 journal_write_buffer: NZUsize!(4096),
878 journal_name_prefix: format!("ordered-broadcast-seq/{validator}/"),
879 journal_compression: Some(3),
880 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
881 },
882 );
883
884 let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
885 engine.start((a1, a2), (b1, b2));
886 }
887
888 {
890 let context = context.with_label("sequencer");
891 let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
892 automatons
893 .lock()
894 .unwrap()
895 .insert(sequencer.public_key(), automaton.clone());
896 let (reporter, reporter_mailbox) =
897 mocks::Reporter::<PublicKey, V, Sha256Digest>::new(
898 namespace,
899 *poly::public::<V>(&polynomial),
900 Some(5),
901 );
902 context.with_label("reporter").spawn(|_| reporter.run());
903 reporters.insert(sequencer.public_key(), reporter_mailbox);
904 let engine = Engine::new(
905 context.with_label("engine"),
906 Config {
907 crypto: sequencer.clone(),
908 relay: automaton.clone(),
909 automaton: automaton.clone(),
910 reporter: reporters.get(&sequencer.public_key()).unwrap().clone(),
911 monitor: mocks::Monitor::new(111),
912 sequencers: mocks::Sequencers::<PublicKey>::new(vec![
913 sequencer.public_key()
914 ]),
915 validators: mocks::Validators::<PublicKey, V>::new(
916 polynomial.clone(),
917 validator_pks,
918 None,
919 ),
920 namespace: namespace.to_vec(),
921 epoch_bounds: (1, 1),
922 height_bound: 2,
923 rebroadcast_timeout: Duration::from_secs(5),
924 priority_acks: false,
925 priority_proposals: false,
926 journal_heights_per_section: 10,
927 journal_replay_buffer: NZUsize!(4096),
928 journal_write_buffer: NZUsize!(4096),
929 journal_name_prefix: format!(
930 "ordered-broadcast-seq/{}/",
931 sequencer.public_key()
932 ),
933 journal_compression: Some(3),
934 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
935 },
936 );
937
938 let ((a1, a2), (b1, b2)) = registrations.remove(&sequencer.public_key()).unwrap();
939 engine.start((a1, a2), (b1, b2));
940 }
941
942 await_reporters(
944 context.with_label("reporter"),
945 vec![sequencer.public_key()],
946 &reporters,
947 (100, 111, true),
948 )
949 .await;
950 });
951 }
952
953 #[test_traced]
954 fn test_external_sequencer() {
955 external_sequencer::<MinPk>();
956 external_sequencer::<MinSig>();
957 }
958
959 fn run_1k<V: Variant>() {
960 let num_validators: u32 = 10;
961 let quorum: u32 = 3;
962 let cfg = deterministic::Config::new();
963 let runner = deterministic::Runner::new(cfg);
964
965 runner.start(|mut context| async move {
966 let (polynomial, mut shares_vec) =
967 ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
968 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
969
970 let (oracle, validators, pks, mut registrations) = initialize_simulation(
971 context.with_label("simulation"),
972 num_validators,
973 &mut shares_vec,
974 )
975 .await;
976 let delayed_link = Link {
977 latency: Duration::from_millis(80),
978 jitter: Duration::from_millis(10),
979 success_rate: 0.98,
980 };
981 let mut oracle_clone = oracle.clone();
982 link_participants(&mut oracle_clone, &pks, Action::Update(delayed_link), None).await;
983
984 let automatons = Arc::new(Mutex::new(
985 BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
986 ));
987 let mut reporters =
988 BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
989 let sequencers = &pks[0..pks.len() / 2];
990 spawn_validator_engines::<V>(
991 context.with_label("validator"),
992 polynomial.clone(),
993 sequencers,
994 &pks,
995 &validators,
996 &mut registrations,
997 &mut automatons.lock().unwrap(),
998 &mut reporters,
999 Duration::from_millis(150),
1000 |_| false,
1001 None,
1002 );
1003
1004 await_reporters(
1005 context.with_label("reporter"),
1006 sequencers.to_vec(),
1007 &reporters,
1008 (1_000, 111, false),
1009 )
1010 .await;
1011 })
1012 }
1013
1014 #[test_traced]
1015 #[ignore]
1016 fn test_1k_min_pk() {
1017 run_1k::<MinPk>();
1018 }
1019
1020 #[test_traced]
1021 #[ignore]
1022 fn test_1k_min_sig() {
1023 run_1k::<MinSig>();
1024 }
1025}