1pub mod scheme;
51pub mod types;
52
53cfg_if::cfg_if! {
54 if #[cfg(not(target_arch = "wasm32"))] {
55 mod ack_manager;
56 use ack_manager::AckManager;
57 mod config;
58 pub use config::Config;
59 mod engine;
60 pub use engine::Engine;
61 mod metrics;
62 mod tip_manager;
63 use tip_manager::TipManager;
64 }
65}
66
67#[cfg(test)]
68pub mod mocks;
69
70#[cfg(test)]
71mod tests {
72 use super::{mocks, Config, Engine};
73 use crate::{
74 ordered_broadcast::scheme::{bls12381_multisig, bls12381_threshold, ed25519, Scheme},
75 types::{Epoch, EpochDelta},
76 };
77 use commonware_cryptography::{
78 bls12381::primitives::variant::{MinPk, MinSig},
79 certificate::{self, mocks::Fixture},
80 ed25519::{PrivateKey, PublicKey},
81 sha256::Digest as Sha256Digest,
82 Signer as _,
83 };
84 use commonware_macros::{select, test_group, test_traced};
85 use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
86 use commonware_runtime::{
87 buffer::PoolRef,
88 deterministic::{self, Context},
89 Clock, Metrics, Quota, Runner, Spawner,
90 };
91 use commonware_utils::NZUsize;
92 use futures::{channel::oneshot, future::join_all};
93 use std::{
94 collections::{BTreeMap, HashMap},
95 num::{NonZeroU32, NonZeroUsize},
96 time::Duration,
97 };
98 use tracing::debug;
99
100 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
101 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
102 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
103
104 type Registrations<P> = BTreeMap<
105 P,
106 (
107 (Sender<P, deterministic::Context>, Receiver<P>),
108 (Sender<P, deterministic::Context>, Receiver<P>),
109 ),
110 >;
111
112 async fn register_participants(
113 oracle: &mut Oracle<PublicKey, deterministic::Context>,
114 participants: &[PublicKey],
115 ) -> Registrations<PublicKey> {
116 let mut registrations = BTreeMap::new();
117 for participant in participants.iter() {
118 let mut control = oracle.control(participant.clone());
119 let (a1, a2) = control.register(0, TEST_QUOTA).await.unwrap();
120 let (b1, b2) = control.register(1, TEST_QUOTA).await.unwrap();
121 registrations.insert(participant.clone(), ((a1, a2), (b1, b2)));
122 }
123 registrations
124 }
125
126 enum Action {
127 Link(Link),
128 Update(Link),
129 Unlink,
130 }
131
132 async fn link_participants(
133 oracle: &mut Oracle<PublicKey, deterministic::Context>,
134 participants: &[PublicKey],
135 action: Action,
136 restrict_to: Option<fn(usize, usize, usize) -> bool>,
137 ) {
138 for (i1, v1) in participants.iter().enumerate() {
139 for (i2, v2) in participants.iter().enumerate() {
140 if v2 == v1 {
141 continue;
142 }
143 if let Some(f) = restrict_to {
144 if !f(participants.len(), i1, i2) {
145 continue;
146 }
147 }
148 if matches!(action, Action::Update(_) | Action::Unlink) {
149 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
150 }
151 if let Action::Link(ref link) | Action::Update(ref link) = action {
152 oracle
153 .add_link(v1.clone(), v2.clone(), link.clone())
154 .await
155 .unwrap();
156 }
157 }
158 }
159 }
160
161 const RELIABLE_LINK: Link = Link {
162 latency: Duration::from_millis(10),
163 jitter: Duration::from_millis(1),
164 success_rate: 1.0,
165 };
166
167 async fn initialize_simulation<S: certificate::Scheme>(
168 context: Context,
169 fixture: &Fixture<S>,
170 link: Link,
171 ) -> (
172 Oracle<PublicKey, deterministic::Context>,
173 Registrations<PublicKey>,
174 ) {
175 let (network, mut oracle) = Network::new(
176 context.with_label("network"),
177 commonware_p2p::simulated::Config {
178 max_size: 1024 * 1024,
179 disconnect_on_block: true,
180 tracked_peer_sets: None,
181 },
182 );
183 network.start();
184
185 let registrations = register_participants(&mut oracle, &fixture.participants).await;
186 link_participants(&mut oracle, &fixture.participants, Action::Link(link), None).await;
187 (oracle, registrations)
188 }
189
190 #[allow(clippy::too_many_arguments)]
191 fn spawn_validator_engines<S>(
192 context: Context,
193 fixture: &Fixture<S>,
194 sequencer_pks: &[PublicKey],
195 registrations: &mut Registrations<PublicKey>,
196 rebroadcast_timeout: Duration,
197 invalid_when: fn(u64) -> bool,
198 misses_allowed: Option<usize>,
199 epoch: Epoch,
200 ) -> BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, S, Sha256Digest>>
201 where
202 S: Scheme<PublicKey, Sha256Digest>,
203 {
204 let mut reporters = BTreeMap::new();
205 let namespace = b"my testing namespace";
206
207 for (idx, validator) in fixture.participants.iter().enumerate() {
208 let context = context.with_label(&format!("validator_{validator}"));
209 let monitor = mocks::Monitor::new(epoch);
210 let sequencers = mocks::Sequencers::<PublicKey>::new(sequencer_pks.to_vec());
211
212 let validators_provider = mocks::Provider::new();
214 assert!(validators_provider.register(epoch, fixture.schemes[idx].clone()));
215
216 let automaton = mocks::Automaton::<PublicKey>::new(invalid_when);
217 let (reporter, reporter_mailbox) = mocks::Reporter::new(
218 context.clone(),
219 namespace,
220 fixture.verifier.clone(),
221 misses_allowed,
222 );
223 context.with_label("reporter").spawn(|_| reporter.run());
224 reporters.insert(validator.clone(), reporter_mailbox);
225
226 let engine = Engine::new(
227 context.with_label("engine"),
228 Config {
229 sequencer_signer: Some(fixture.private_keys[idx].clone()),
230 sequencers_provider: sequencers,
231 validators_provider,
232 automaton: automaton.clone(),
233 relay: automaton.clone(),
234 reporter: reporters.get(validator).unwrap().clone(),
235 monitor,
236 namespace: namespace.to_vec(),
237 priority_proposals: false,
238 priority_acks: false,
239 rebroadcast_timeout,
240 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
241 height_bound: 2,
242 journal_heights_per_section: 10,
243 journal_replay_buffer: NZUsize!(4096),
244 journal_write_buffer: NZUsize!(4096),
245 journal_name_prefix: format!("ordered-broadcast-seq-{validator}-"),
246 journal_compression: Some(3),
247 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
248 },
249 );
250
251 let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
252 engine.start((a1, a2), (b1, b2));
253 }
254 reporters
255 }
256
257 async fn await_reporters<S>(
258 context: Context,
259 sequencers: Vec<PublicKey>,
260 reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, S, Sha256Digest>>,
261 threshold: (u64, Epoch, bool),
262 ) where
263 S: certificate::Scheme,
264 {
265 let (threshold_height, threshold_epoch, require_contiguous) =
266 (threshold.0, threshold.1, threshold.2);
267 let mut receivers = Vec::new();
268 for (reporter, mailbox) in reporters.iter() {
269 for sequencer in sequencers.iter() {
271 let (tx, rx) = oneshot::channel();
273 receivers.push(rx);
274
275 context.with_label("reporter_watcher").spawn({
276 let reporter = reporter.clone();
277 let sequencer = sequencer.clone();
278 let mut mailbox = mailbox.clone();
279 move |context| async move {
280 loop {
281 let (height, epoch) = mailbox
282 .get_tip(sequencer.clone())
283 .await
284 .unwrap_or((0, Epoch::zero()));
285 debug!(height, epoch = %epoch, ?sequencer, ?reporter, "reporter");
286 let contiguous_height = mailbox
287 .get_contiguous_tip(sequencer.clone())
288 .await
289 .unwrap_or(0);
290 if height >= threshold_height
291 && epoch >= threshold_epoch
292 && (!require_contiguous || contiguous_height >= threshold_height)
293 {
294 let _ = tx.send(sequencer.clone());
295 break;
296 }
297 context.sleep(Duration::from_millis(100)).await;
298 }
299 }
300 });
301 }
302 }
303
304 let results = join_all(receivers).await;
306 assert_eq!(results.len(), sequencers.len() * reporters.len());
307
308 for result in results {
310 assert!(result.is_ok(), "reporter was cancelled");
311 }
312 }
313
314 async fn get_max_height<S: certificate::Scheme>(
315 reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, S, Sha256Digest>>,
316 ) -> u64 {
317 let mut max_height = 0;
318 for (sequencer, mailbox) in reporters.iter_mut() {
319 let (height, _) = mailbox
320 .get_tip(sequencer.clone())
321 .await
322 .unwrap_or((0, Epoch::zero()));
323 if height > max_height {
324 max_height = height;
325 }
326 }
327 max_height
328 }
329
330 fn all_online<S, F>(fixture: F)
331 where
332 S: Scheme<PublicKey, Sha256Digest>,
333 F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
334 {
335 let runner = deterministic::Runner::timed(Duration::from_secs(120));
336
337 runner.start(|mut context| async move {
338 let epoch = Epoch::new(111);
339 let num_validators = 4;
340 let fixture = fixture(&mut context, num_validators);
341
342 let (_oracle, mut registrations) =
343 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
344 .await;
345
346 let reporters = spawn_validator_engines(
347 context.with_label("validator"),
348 &fixture,
349 &fixture.participants,
350 &mut registrations,
351 Duration::from_secs(5),
352 |_| false,
353 Some(5),
354 epoch,
355 );
356
357 await_reporters(
358 context.with_label("reporter"),
359 reporters.keys().cloned().collect::<Vec<_>>(),
360 &reporters,
361 (100, epoch, true),
362 )
363 .await;
364 });
365 }
366
367 #[test_traced]
368 fn test_all_online() {
369 all_online(bls12381_threshold::fixture::<MinPk, _>);
370 all_online(bls12381_threshold::fixture::<MinSig, _>);
371 all_online(bls12381_multisig::fixture::<MinPk, _>);
372 all_online(bls12381_multisig::fixture::<MinSig, _>);
373 all_online(ed25519::fixture);
374 }
375
376 fn unclean_shutdown<S, F>(fixture: F)
377 where
378 S: Scheme<PublicKey, Sha256Digest>,
379 F: Fn(&mut deterministic::Context, u32) -> Fixture<S> + Clone,
380 {
381 let mut prev_checkpoint = None;
382 let epoch = Epoch::new(111);
383 let num_validators = 4;
384 let crash_after = Duration::from_secs(5);
385 let target_height = 30;
386
387 loop {
388 let fixture = fixture.clone();
389 let f = |mut context: deterministic::Context| async move {
390 let fixture = fixture(&mut context, num_validators);
391
392 let (network, mut oracle) = Network::new(
393 context.with_label("network"),
394 commonware_p2p::simulated::Config {
395 max_size: 1024 * 1024,
396 disconnect_on_block: true,
397 tracked_peer_sets: None,
398 },
399 );
400 network.start();
401
402 let mut registrations =
403 register_participants(&mut oracle, &fixture.participants).await;
404 link_participants(
405 &mut oracle,
406 &fixture.participants,
407 Action::Link(RELIABLE_LINK),
408 None,
409 )
410 .await;
411
412 let reporters = spawn_validator_engines(
413 context.with_label("validator"),
414 &fixture,
415 &fixture.participants,
416 &mut registrations,
417 Duration::from_secs(5),
418 |_| false,
419 None,
420 epoch,
421 );
422
423 let crash = context.sleep(crash_after);
425 let run = await_reporters(
426 context.with_label("reporter"),
427 reporters.keys().cloned().collect::<Vec<_>>(),
428 &reporters,
429 (target_height, epoch, true),
430 );
431
432 select! {
433 _ = crash => { false },
434 _ = run => { true },
435 }
436 };
437
438 let (complete, checkpoint) = prev_checkpoint
439 .map_or_else(
440 || deterministic::Runner::timed(Duration::from_secs(180)),
441 deterministic::Runner::from,
442 )
443 .start_and_recover(f);
444
445 if complete {
446 break;
447 }
448
449 prev_checkpoint = Some(checkpoint);
450 }
451 }
452
453 #[test_traced]
454 fn test_unclean_shutdown() {
455 unclean_shutdown(bls12381_threshold::fixture::<MinPk, _>);
456 unclean_shutdown(bls12381_threshold::fixture::<MinSig, _>);
457 unclean_shutdown(bls12381_multisig::fixture::<MinPk, _>);
458 unclean_shutdown(bls12381_multisig::fixture::<MinSig, _>);
459 unclean_shutdown(ed25519::fixture);
460 }
461
462 fn network_partition<S, F>(fixture: F)
463 where
464 S: Scheme<PublicKey, Sha256Digest>,
465 F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
466 {
467 let runner = deterministic::Runner::timed(Duration::from_secs(60));
468
469 runner.start(|mut context| async move {
470 let epoch = Epoch::new(111);
471 let num_validators = 4;
472 let fixture = fixture(&mut context, num_validators);
473
474 let (mut oracle, mut registrations) =
476 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
477 .await;
478 let mut reporters = spawn_validator_engines(
479 context.with_label("validator"),
480 &fixture,
481 &fixture.participants,
482 &mut registrations,
483 Duration::from_secs(1),
484 |_| false,
485 None,
486 epoch,
487 );
488
489 link_participants(&mut oracle, &fixture.participants, Action::Unlink, None).await;
491 context.sleep(Duration::from_secs(30)).await;
492
493 let max_height = get_max_height(&mut reporters).await;
495
496 link_participants(
498 &mut oracle,
499 &fixture.participants,
500 Action::Link(RELIABLE_LINK),
501 None,
502 )
503 .await;
504 await_reporters(
505 context.with_label("reporter"),
506 reporters.keys().cloned().collect::<Vec<_>>(),
507 &reporters,
508 (max_height + 100, epoch, false),
509 )
510 .await;
511 });
512 }
513
514 #[test_group("slow")]
515 #[test_traced]
516 fn test_network_partition() {
517 network_partition(bls12381_threshold::fixture::<MinPk, _>);
518 network_partition(bls12381_threshold::fixture::<MinSig, _>);
519 network_partition(bls12381_multisig::fixture::<MinPk, _>);
520 network_partition(bls12381_multisig::fixture::<MinSig, _>);
521 network_partition(ed25519::fixture);
522 }
523
524 fn slow_and_lossy_links<S, F>(fixture: F, seed: u64) -> String
525 where
526 S: Scheme<PublicKey, Sha256Digest>,
527 F: Fn(&mut deterministic::Context, u32) -> Fixture<S>,
528 {
529 let cfg = deterministic::Config::new()
530 .with_seed(seed)
531 .with_timeout(Some(Duration::from_secs(40)));
532 let runner = deterministic::Runner::new(cfg);
533
534 runner.start(|mut context| async move {
535 let epoch = Epoch::new(111);
536 let num_validators = 4;
537 let fixture = fixture(&mut context, num_validators);
538
539 let (mut oracle, mut registrations) =
540 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
541 .await;
542 let delayed_link = Link {
543 latency: Duration::from_millis(50),
544 jitter: Duration::from_millis(40),
545 success_rate: 0.5,
546 };
547 link_participants(
548 &mut oracle,
549 &fixture.participants,
550 Action::Update(delayed_link),
551 None,
552 )
553 .await;
554
555 let reporters = spawn_validator_engines(
556 context.with_label("validator"),
557 &fixture,
558 &fixture.participants,
559 &mut registrations,
560 Duration::from_millis(150),
561 |_| false,
562 None,
563 epoch,
564 );
565
566 await_reporters(
567 context.with_label("reporter"),
568 reporters.keys().cloned().collect::<Vec<_>>(),
569 &reporters,
570 (40, epoch, false),
571 )
572 .await;
573
574 context.auditor().state()
575 })
576 }
577
578 #[test_traced]
579 fn test_slow_and_lossy_links() {
580 slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, 0);
581 slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, 0);
582 slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, 0);
583 slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, 0);
584 slow_and_lossy_links(ed25519::fixture, 0);
585 }
586
587 #[test_group("slow")]
588 #[test_traced]
589 fn test_determinism() {
590 for seed in 1..6 {
593 let ts_pk_state_1 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
595 let ts_pk_state_2 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
596 assert_eq!(ts_pk_state_1, ts_pk_state_2);
597
598 let ts_sig_state_1 =
600 slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
601 let ts_sig_state_2 =
602 slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
603 assert_eq!(ts_sig_state_1, ts_sig_state_2);
604
605 let ed_state_1 = slow_and_lossy_links(ed25519::fixture, seed);
607 let ed_state_2 = slow_and_lossy_links(ed25519::fixture, seed);
608 assert_eq!(ed_state_1, ed_state_2);
609
610 let ms_pk_state_1 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
612 let ms_pk_state_2 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
613 assert_eq!(ms_pk_state_1, ms_pk_state_2);
614
615 let ms_sig_state_1 =
617 slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
618 let ms_sig_state_2 =
619 slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
620 assert_eq!(ms_sig_state_1, ms_sig_state_2);
621
622 let states = [
623 ("threshold-minpk", ts_pk_state_1),
624 ("threshold-minsig", ts_sig_state_1),
625 ("multisig-minpk", ms_pk_state_1),
626 ("multisig-minsig", ms_sig_state_1),
627 ("ed25519", ed_state_1),
628 ];
629
630 for pair in states.windows(2) {
632 assert_ne!(
633 pair[0].1, pair[1].1,
634 "state {} equals state {}",
635 pair[0].0, pair[1].0
636 );
637 }
638 }
639 }
640
641 fn invalid_signature_injection<S, F>(fixture: F)
642 where
643 S: Scheme<PublicKey, Sha256Digest>,
644 F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
645 {
646 let runner = deterministic::Runner::timed(Duration::from_secs(30));
647
648 runner.start(|mut context| async move {
649 let epoch = Epoch::new(111);
650 let num_validators = 4;
651 let fixture = fixture(&mut context, num_validators);
652
653 let (_oracle, mut registrations) =
654 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
655 .await;
656
657 let reporters = spawn_validator_engines(
658 context.with_label("validator"),
659 &fixture,
660 &fixture.participants,
661 &mut registrations,
662 Duration::from_secs(5),
663 |i| i % 10 == 0,
664 None,
665 epoch,
666 );
667
668 await_reporters(
669 context.with_label("reporter"),
670 reporters.keys().cloned().collect::<Vec<_>>(),
671 &reporters,
672 (100, epoch, true),
673 )
674 .await;
675 });
676 }
677
678 #[test_traced]
679 fn test_invalid_signature_injection() {
680 invalid_signature_injection(bls12381_threshold::fixture::<MinPk, _>);
681 invalid_signature_injection(bls12381_threshold::fixture::<MinSig, _>);
682 invalid_signature_injection(bls12381_multisig::fixture::<MinPk, _>);
683 invalid_signature_injection(bls12381_multisig::fixture::<MinSig, _>);
684 invalid_signature_injection(ed25519::fixture);
685 }
686
687 fn updated_epoch<S, F>(fixture: F)
688 where
689 S: Scheme<PublicKey, Sha256Digest>,
690 F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
691 {
692 let runner = deterministic::Runner::timed(Duration::from_secs(60));
693
694 runner.start(|mut context| async move {
695 let epoch = Epoch::new(111);
696 let num_validators = 4;
697 let fixture = fixture(&mut context, num_validators);
698
699 let (mut oracle, mut registrations) =
701 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
702 .await;
703
704 let mut reporters = BTreeMap::new();
705
706 let mut validators_providers = HashMap::new();
708 let mut monitors = HashMap::new();
709 let namespace = b"my testing namespace";
710
711 for (idx, validator) in fixture.participants.iter().enumerate() {
712 let context = context.with_label(&format!("validator_{validator}"));
713 let monitor = mocks::Monitor::new(epoch);
714 monitors.insert(validator.clone(), monitor.clone());
715 let sequencers = mocks::Sequencers::<PublicKey>::new(fixture.participants.clone());
716
717 let validators_provider = mocks::Provider::new();
719 assert!(validators_provider.register(epoch, fixture.schemes[idx].clone()));
720 validators_providers.insert(validator.clone(), validators_provider.clone());
721
722 let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
723 let (reporter, reporter_mailbox) = mocks::Reporter::new(
724 context.clone(),
725 namespace,
726 fixture.verifier.clone(),
727 Some(5),
728 );
729 context.with_label("reporter").spawn(|_| reporter.run());
730 reporters.insert(validator.clone(), reporter_mailbox);
731
732 let engine = Engine::new(
733 context.with_label("engine"),
734 Config {
735 sequencer_signer: Some(fixture.private_keys[idx].clone()),
736 sequencers_provider: sequencers,
737 validators_provider,
738 relay: automaton.clone(),
739 automaton: automaton.clone(),
740 reporter: reporters.get(validator).unwrap().clone(),
741 monitor,
742 namespace: namespace.to_vec(),
743 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
744 height_bound: 2,
745 rebroadcast_timeout: Duration::from_secs(1),
746 priority_acks: false,
747 priority_proposals: false,
748 journal_heights_per_section: 10,
749 journal_replay_buffer: NZUsize!(4096),
750 journal_write_buffer: NZUsize!(4096),
751 journal_name_prefix: format!("ordered-broadcast-seq-{validator}-"),
752 journal_compression: Some(3),
753 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
754 },
755 );
756
757 let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
758 engine.start((a1, a2), (b1, b2));
759 }
760
761 await_reporters(
763 context.with_label("reporter"),
764 reporters.keys().cloned().collect::<Vec<_>>(),
765 &reporters,
766 (100, epoch, true),
767 )
768 .await;
769
770 link_participants(&mut oracle, &fixture.participants, Action::Unlink, None).await;
772 context.sleep(Duration::from_secs(30)).await;
773
774 let max_height = get_max_height(&mut reporters).await;
776
777 let next_epoch = epoch.next();
779 for (validator, monitor) in monitors.iter() {
780 monitor.update(next_epoch);
781 let idx = fixture
783 .participants
784 .iter()
785 .position(|v| v == validator)
786 .unwrap();
787 let validators_provider = validators_providers.get(validator).unwrap();
788 assert!(validators_provider.register(next_epoch, fixture.schemes[idx].clone()));
789 }
790
791 link_participants(
793 &mut oracle,
794 &fixture.participants,
795 Action::Link(RELIABLE_LINK),
796 None,
797 )
798 .await;
799 await_reporters(
800 context.with_label("reporter"),
801 reporters.keys().cloned().collect::<Vec<_>>(),
802 &reporters,
803 (max_height + 100, next_epoch, true),
804 )
805 .await;
806 });
807 }
808
809 #[test_group("slow")]
810 #[test_traced]
811 fn test_updated_epoch() {
812 updated_epoch(bls12381_threshold::fixture::<MinPk, _>);
813 updated_epoch(bls12381_threshold::fixture::<MinSig, _>);
814 updated_epoch(bls12381_multisig::fixture::<MinPk, _>);
815 updated_epoch(bls12381_multisig::fixture::<MinSig, _>);
816 updated_epoch(ed25519::fixture);
817 }
818
819 fn external_sequencer<S, F>(fixture: F)
820 where
821 S: Scheme<PublicKey, Sha256Digest>,
822 F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
823 {
824 let runner = deterministic::Runner::timed(Duration::from_secs(60));
825 runner.start(|mut context| async move {
826 let epoch = Epoch::new(111);
827 let num_validators = 4;
828 let fixture = fixture(&mut context, num_validators);
829
830 let sequencer = PrivateKey::from_seed(u64::MAX);
832
833 let mut participants = fixture.participants.clone();
835 participants.push(sequencer.public_key());
836
837 let (network, mut oracle) = Network::new(
839 context.with_label("network"),
840 commonware_p2p::simulated::Config {
841 max_size: 1024 * 1024,
842 disconnect_on_block: true,
843 tracked_peer_sets: None,
844 },
845 );
846 network.start();
847
848 let mut registrations = register_participants(&mut oracle, &participants).await;
850 link_participants(
851 &mut oracle,
852 &participants,
853 Action::Link(RELIABLE_LINK),
854 None,
855 )
856 .await;
857
858 let mut reporters = BTreeMap::new();
860 let namespace = b"my testing namespace";
861
862 for (idx, validator) in fixture.participants.iter().enumerate() {
864 let context = context.with_label(&format!("validator_{validator}"));
865 let monitor = mocks::Monitor::new(epoch);
866 let sequencers = mocks::Sequencers::<PublicKey>::new(vec![sequencer.public_key()]);
867
868 let validators_provider = mocks::Provider::new();
870 assert!(validators_provider.register(epoch, fixture.schemes[idx].clone()));
871
872 let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
873
874 let (reporter, reporter_mailbox) = mocks::Reporter::new(
875 context.clone(),
876 namespace,
877 fixture.verifier.clone(),
878 Some(5),
879 );
880 context.with_label("reporter").spawn(|_| reporter.run());
881 reporters.insert(validator.clone(), reporter_mailbox);
882
883 let engine = Engine::new(
884 context.with_label("engine"),
885 Config {
886 sequencer_signer: None::<PrivateKey>, sequencers_provider: sequencers,
888 validators_provider,
889 relay: automaton.clone(),
890 automaton: automaton.clone(),
891 reporter: reporters.get(validator).unwrap().clone(),
892 monitor,
893 namespace: namespace.to_vec(),
894 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
895 height_bound: 2,
896 rebroadcast_timeout: Duration::from_secs(5),
897 priority_acks: false,
898 priority_proposals: false,
899 journal_heights_per_section: 10,
900 journal_replay_buffer: NZUsize!(4096),
901 journal_write_buffer: NZUsize!(4096),
902 journal_name_prefix: format!("ordered-broadcast-seq-{validator}-"),
903 journal_compression: Some(3),
904 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
905 },
906 );
907
908 let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
909 engine.start((a1, a2), (b1, b2));
910 }
911
912 {
914 let context = context.with_label("sequencer");
915 let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
916 let (reporter, reporter_mailbox) = mocks::Reporter::new(
917 context.clone(),
918 namespace,
919 fixture.verifier.clone(),
920 Some(5),
921 );
922 context.with_label("reporter").spawn(|_| reporter.run());
923 reporters.insert(sequencer.public_key(), reporter_mailbox);
924
925 let validators_provider = mocks::Provider::new();
928 assert!(validators_provider.register(epoch, fixture.verifier.clone()));
929
930 let engine = Engine::new(
931 context.with_label("engine"),
932 Config {
933 sequencer_signer: Some(sequencer.clone()),
934 sequencers_provider: mocks::Sequencers::<PublicKey>::new(vec![
935 sequencer.public_key()
936 ]),
937 validators_provider,
938 relay: automaton.clone(),
939 automaton,
940 reporter: reporters.get(&sequencer.public_key()).unwrap().clone(),
941 monitor: mocks::Monitor::new(epoch),
942 namespace: namespace.to_vec(),
943 epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
944 height_bound: 2,
945 rebroadcast_timeout: Duration::from_secs(5),
946 priority_acks: false,
947 priority_proposals: false,
948 journal_heights_per_section: 10,
949 journal_replay_buffer: NZUsize!(4096),
950 journal_write_buffer: NZUsize!(4096),
951 journal_name_prefix: format!(
952 "ordered-broadcast-seq-{}-",
953 sequencer.public_key()
954 ),
955 journal_compression: Some(3),
956 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
957 },
958 );
959
960 let ((a1, a2), (b1, b2)) = registrations.remove(&sequencer.public_key()).unwrap();
961 engine.start((a1, a2), (b1, b2));
962 }
963
964 await_reporters(
966 context.with_label("reporter"),
967 vec![sequencer.public_key()],
968 &reporters,
969 (100, epoch, true),
970 )
971 .await;
972 });
973 }
974
975 #[test_traced]
976 fn test_external_sequencer() {
977 external_sequencer(bls12381_threshold::fixture::<MinPk, _>);
978 external_sequencer(bls12381_threshold::fixture::<MinSig, _>);
979 external_sequencer(bls12381_multisig::fixture::<MinPk, _>);
980 external_sequencer(bls12381_multisig::fixture::<MinSig, _>);
981 external_sequencer(ed25519::fixture);
982 }
983
984 fn run_1k<S, F>(fixture: F)
985 where
986 S: Scheme<PublicKey, Sha256Digest>,
987 F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
988 {
989 let cfg = deterministic::Config::new();
990 let runner = deterministic::Runner::new(cfg);
991
992 runner.start(|mut context| async move {
993 let epoch = Epoch::new(111);
994 let num_validators = 10;
995 let fixture = fixture(&mut context, num_validators);
996
997 let delayed_link = Link {
998 latency: Duration::from_millis(80),
999 jitter: Duration::from_millis(10),
1000 success_rate: 0.98,
1001 };
1002
1003 let (mut oracle, mut registrations) =
1004 initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
1005 .await;
1006
1007 link_participants(
1009 &mut oracle,
1010 &fixture.participants,
1011 Action::Update(delayed_link),
1012 None,
1013 )
1014 .await;
1015
1016 let sequencers: Vec<PublicKey> =
1018 fixture.participants[0..num_validators as usize / 2].to_vec();
1019
1020 let reporters = spawn_validator_engines(
1021 context.with_label("validator"),
1022 &fixture,
1023 &sequencers,
1024 &mut registrations,
1025 Duration::from_millis(150),
1026 |_| false,
1027 None,
1028 epoch,
1029 );
1030
1031 await_reporters(
1032 context.with_label("reporter"),
1033 sequencers,
1034 &reporters,
1035 (1_000, epoch, false),
1036 )
1037 .await;
1038 })
1039 }
1040
1041 #[test_group("slow")]
1042 #[test_traced]
1043 fn test_1k_bls12381_threshold_min_pk() {
1044 run_1k(bls12381_threshold::fixture::<MinPk, _>);
1045 }
1046
1047 #[test_group("slow")]
1048 #[test_traced]
1049 fn test_1k_bls12381_threshold_min_sig() {
1050 run_1k(bls12381_threshold::fixture::<MinSig, _>);
1051 }
1052
1053 #[test_group("slow")]
1054 #[test_traced]
1055 fn test_1k_bls12381_multisig_min_pk() {
1056 run_1k(bls12381_multisig::fixture::<MinPk, _>);
1057 }
1058
1059 #[test_group("slow")]
1060 #[test_traced]
1061 fn test_1k_bls12381_multisig_min_sig() {
1062 run_1k(bls12381_multisig::fixture::<MinSig, _>);
1063 }
1064
1065 #[test_group("slow")]
1066 #[test_traced]
1067 fn test_1k_ed25519() {
1068 run_1k(ed25519::fixture);
1069 }
1070}