commonware_consensus/ordered_broadcast/
mod.rs

1//! Ordered, reliable broadcast across reconfigurable participants.
2//!
3//! # Concepts
4//!
5//! The system has two types of network participants: `sequencers` and `validators`. Their sets may
6//! overlap and are defined by the current `epoch`, a monotonically increasing integer. This module
7//! can handle reconfiguration of these sets across different epochs.
8//!
9//! Sequencers broadcast data. The smallest unit of data is a `chunk`. Sequencers broadcast `node`s
10//! that contain a chunk and a threshold signature over the previous chunk, forming a linked chain
11//! of nodes from each sequencer.
12//!
13//! Validators verify and sign chunks using partial signatures. These can be combined to recover a
14//! threshold signature, ensuring a quorum verifies each chunk. The threshold signature allows
15//! external parties to confirm that the chunk was reliably broadcast.
16//!
17//! Network participants persist any new nodes to a journal. This enables recovery from crashes and
18//! ensures that sequencers do not broadcast conflicting chunks and that validators do not sign
19//! them. "Conflicting" chunks are chunks from the same sequencer at the same height with different
20//! payloads.
21//!
22//! # Design
23//!
24//! The core of the module is the [Engine]. It is responsible for:
25//! - Broadcasting nodes (if a sequencer)
26//! - Signing chunks (if a validator)
27//! - Tracking the latest chunk in each sequencer’s chain
28//! - Recovering threshold signatures from partial signatures for each chunk
29//! - Notifying other actors of new chunks and threshold signatures
30//!
31//! # Acknowledgements
32//!
33//! [Autobahn](https://arxiv.org/abs/2401.10369) provided the insight that a succinct
34//! proof-of-availability could be produced by linking sequencer broadcasts.
35
36pub mod types;
37
38cfg_if::cfg_if! {
39    if #[cfg(not(target_arch = "wasm32"))] {
40        mod ack_manager;
41        use ack_manager::AckManager;
42        mod config;
43        pub use config::Config;
44        mod engine;
45        pub use engine::Engine;
46        mod metrics;
47        mod tip_manager;
48        use tip_manager::TipManager;
49    }
50}
51
52#[cfg(test)]
53pub mod mocks;
54
55#[cfg(test)]
56mod tests {
57    use super::{mocks, types::Epoch, Config, Engine};
58    use commonware_cryptography::{
59        bls12381::{
60            dkg::ops,
61            primitives::{
62                group::Share,
63                poly,
64                variant::{MinPk, MinSig, Variant},
65            },
66        },
67        ed25519::{PrivateKey, PublicKey},
68        sha256::Digest as Sha256Digest,
69        PrivateKeyExt as _, Signer as _,
70    };
71    use commonware_macros::test_traced;
72    use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
73    use commonware_runtime::{
74        deterministic::{self, Context},
75        Clock, Metrics, Runner, Spawner,
76    };
77    use commonware_utils::quorum;
78    use futures::{channel::oneshot, future::join_all};
79    use rand::{rngs::StdRng, SeedableRng as _};
80    use std::{
81        collections::{BTreeMap, HashMap, HashSet},
82        sync::{Arc, Mutex},
83        time::Duration,
84    };
85    use tracing::debug;
86
87    type Registrations<P> = BTreeMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))>;
88
89    async fn register_participants(
90        oracle: &mut Oracle<PublicKey>,
91        participants: &[PublicKey],
92    ) -> Registrations<PublicKey> {
93        let mut registrations = BTreeMap::new();
94        for participant in participants.iter() {
95            let (a1, a2) = oracle.register(participant.clone(), 0).await.unwrap();
96            let (b1, b2) = oracle.register(participant.clone(), 1).await.unwrap();
97            registrations.insert(participant.clone(), ((a1, a2), (b1, b2)));
98        }
99        registrations
100    }
101
102    #[allow(dead_code)]
103    enum Action {
104        Link(Link),
105        Update(Link),
106        Unlink,
107    }
108
109    async fn link_participants(
110        oracle: &mut Oracle<PublicKey>,
111        participants: &[PublicKey],
112        action: Action,
113        restrict_to: Option<fn(usize, usize, usize) -> bool>,
114    ) {
115        for (i1, v1) in participants.iter().enumerate() {
116            for (i2, v2) in participants.iter().enumerate() {
117                if v2 == v1 {
118                    continue;
119                }
120                if let Some(f) = restrict_to {
121                    if !f(participants.len(), i1, i2) {
122                        continue;
123                    }
124                }
125                if matches!(action, Action::Update(_) | Action::Unlink) {
126                    oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
127                }
128                if let Action::Link(ref link) | Action::Update(ref link) = action {
129                    oracle
130                        .add_link(v1.clone(), v2.clone(), link.clone())
131                        .await
132                        .unwrap();
133                }
134            }
135        }
136    }
137
138    async fn initialize_simulation(
139        context: Context,
140        num_validators: u32,
141        shares_vec: &mut [Share],
142    ) -> (
143        Oracle<PublicKey>,
144        Vec<(PublicKey, PrivateKey, Share)>,
145        Vec<PublicKey>,
146        Registrations<PublicKey>,
147    ) {
148        let (network, mut oracle) = Network::new(
149            context.with_label("network"),
150            commonware_p2p::simulated::Config {
151                max_size: 1024 * 1024,
152            },
153        );
154        network.start();
155
156        let mut schemes = (0..num_validators)
157            .map(|i| PrivateKey::from_seed(i as u64))
158            .collect::<Vec<_>>();
159        schemes.sort_by_key(|s| s.public_key());
160        let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
161            .iter()
162            .enumerate()
163            .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
164            .collect();
165        let pks = validators
166            .iter()
167            .map(|(pk, _, _)| pk.clone())
168            .collect::<Vec<_>>();
169
170        let registrations = register_participants(&mut oracle, &pks).await;
171        let link = Link {
172            latency: 10.0,
173            jitter: 1.0,
174            success_rate: 1.0,
175        };
176        link_participants(&mut oracle, &pks, Action::Link(link), None).await;
177        (oracle, validators, pks, registrations)
178    }
179
180    #[allow(clippy::too_many_arguments)]
181    fn spawn_validator_engines<V: Variant>(
182        context: Context,
183        polynomial: poly::Public<V>,
184        sequencer_pks: &[PublicKey],
185        validator_pks: &[PublicKey],
186        validators: &[(PublicKey, PrivateKey, Share)],
187        registrations: &mut Registrations<PublicKey>,
188        automatons: &mut BTreeMap<PublicKey, mocks::Automaton<PublicKey>>,
189        reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>,
190        rebroadcast_timeout: Duration,
191        invalid_when: fn(u64) -> bool,
192        misses_allowed: Option<usize>,
193    ) -> HashMap<PublicKey, mocks::Monitor> {
194        let mut monitors = HashMap::new();
195        let namespace = b"my testing namespace";
196        for (validator, scheme, share) in validators.iter() {
197            let context = context.with_label(&validator.to_string());
198            let monitor = mocks::Monitor::new(111);
199            monitors.insert(validator.clone(), monitor.clone());
200            let sequencers = mocks::Sequencers::<PublicKey>::new(sequencer_pks.to_vec());
201            let validators = mocks::Validators::<PublicKey, V>::new(
202                polynomial.clone(),
203                validator_pks.to_vec(),
204                Some(share.clone()),
205            );
206
207            let automaton = mocks::Automaton::<PublicKey>::new(invalid_when);
208            automatons.insert(validator.clone(), automaton.clone());
209
210            let (reporter, reporter_mailbox) = mocks::Reporter::<PublicKey, V, Sha256Digest>::new(
211                namespace,
212                *poly::public::<V>(&polynomial),
213                misses_allowed,
214            );
215            context.with_label("reporter").spawn(|_| reporter.run());
216            reporters.insert(validator.clone(), reporter_mailbox);
217
218            let engine = Engine::new(
219                context.with_label("engine"),
220                Config {
221                    crypto: scheme.clone(),
222                    relay: automaton.clone(),
223                    automaton: automaton.clone(),
224                    reporter: reporters.get(validator).unwrap().clone(),
225                    monitor,
226                    sequencers,
227                    validators,
228                    namespace: namespace.to_vec(),
229                    epoch_bounds: (1, 1),
230                    height_bound: 2,
231                    rebroadcast_timeout,
232                    priority_acks: false,
233                    priority_proposals: false,
234                    journal_heights_per_section: 10,
235                    journal_replay_buffer: 4096,
236                    journal_write_buffer: 4096,
237                    journal_name_prefix: format!("ordered-broadcast-seq/{validator}/"),
238                    journal_compression: Some(3),
239                },
240            );
241
242            let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
243            engine.start((a1, a2), (b1, b2));
244        }
245        monitors
246    }
247
248    async fn await_reporters<V: Variant>(
249        context: Context,
250        sequencers: Vec<PublicKey>,
251        reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>,
252        threshold: (u64, Epoch, bool),
253    ) {
254        let mut receivers = Vec::new();
255        for (reporter, mailbox) in reporters.iter() {
256            // Spawn a watcher for the reporter.
257            for sequencer in sequencers.iter() {
258                // Create a oneshot channel to signal when the reporter has reached the threshold.
259                let (tx, rx) = oneshot::channel();
260                receivers.push(rx);
261
262                context.with_label("reporter_watcher").spawn({
263                    let reporter = reporter.clone();
264                    let sequencer = sequencer.clone();
265                    let mut mailbox = mailbox.clone();
266                    move |context| async move {
267                        loop {
268                            let (height, epoch) =
269                                mailbox.get_tip(sequencer.clone()).await.unwrap_or((0, 0));
270                            debug!(height, epoch, ?sequencer, ?reporter, "reporter");
271                            let contiguous_height = mailbox
272                                .get_contiguous_tip(sequencer.clone())
273                                .await
274                                .unwrap_or(0);
275                            if height >= threshold.0
276                                && epoch >= threshold.1
277                                && (!threshold.2 || contiguous_height >= threshold.0)
278                            {
279                                let _ = tx.send(sequencer.clone());
280                                break;
281                            }
282                            context.sleep(Duration::from_millis(100)).await;
283                        }
284                    }
285                });
286            }
287        }
288
289        // Wait for all oneshot receivers to complete.
290        let results = join_all(receivers).await;
291        assert_eq!(results.len(), sequencers.len() * reporters.len());
292
293        // Check that none were cancelled.
294        for result in results {
295            assert!(result.is_ok(), "reporter was cancelled");
296        }
297    }
298
299    async fn get_max_height<V: Variant>(
300        reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>,
301    ) -> u64 {
302        let mut max_height = 0;
303        for (sequencer, mailbox) in reporters.iter_mut() {
304            let (height, _) = mailbox.get_tip(sequencer.clone()).await.unwrap_or((0, 0));
305            if height > max_height {
306                max_height = height;
307            }
308        }
309        max_height
310    }
311
312    fn all_online<V: Variant>() {
313        let num_validators: u32 = 4;
314        let quorum: u32 = 3;
315        let runner = deterministic::Runner::timed(Duration::from_secs(30));
316
317        runner.start(|mut context| async move {
318            let (polynomial, mut shares_vec) =
319                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
320            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
321
322            let (_oracle, validators, pks, mut registrations) = initialize_simulation(
323                context.with_label("simulation"),
324                num_validators,
325                &mut shares_vec,
326            )
327            .await;
328            let automatons = Arc::new(Mutex::new(
329                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
330            ));
331            let mut reporters =
332                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
333            spawn_validator_engines::<V>(
334                context.with_label("validator"),
335                polynomial.clone(),
336                &pks,
337                &pks,
338                &validators,
339                &mut registrations,
340                &mut automatons.lock().unwrap(),
341                &mut reporters,
342                Duration::from_secs(5),
343                |_| false,
344                Some(5),
345            );
346            await_reporters(
347                context.with_label("reporter"),
348                reporters.keys().cloned().collect::<Vec<_>>(),
349                &reporters,
350                (100, 111, true),
351            )
352            .await;
353        });
354    }
355
356    #[test_traced]
357    fn test_all_online() {
358        all_online::<MinPk>();
359        all_online::<MinSig>();
360    }
361
362    fn unclean_shutdown<V: Variant>() {
363        let num_validators: u32 = 4;
364        let quorum: u32 = 3;
365        let mut rng = StdRng::seed_from_u64(0);
366        let (polynomial, mut shares_vec) =
367            ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum);
368        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
369        let completed = Arc::new(Mutex::new(HashSet::new()));
370        let shutdowns = Arc::new(Mutex::new(0u64));
371        let mut prev_ctx = None;
372
373        while completed.lock().unwrap().len() != num_validators as usize {
374            let completed = completed.clone();
375            let shares_vec = shares_vec.clone();
376            let shutdowns = shutdowns.clone();
377            let polynomial = polynomial.clone();
378
379            let f = |context: deterministic::Context| async move {
380                let (network, mut oracle) = Network::new(
381                    context.with_label("network"),
382                    commonware_p2p::simulated::Config {
383                        max_size: 1024 * 1024,
384                    },
385                );
386                network.start();
387
388                let mut schemes = (0..num_validators)
389                    .map(|i| PrivateKey::from_seed(i as u64))
390                    .collect::<Vec<_>>();
391                schemes.sort_by_key(|s| s.public_key());
392                let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
393                    .iter()
394                    .enumerate()
395                    .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
396                    .collect();
397                let pks = validators
398                    .iter()
399                    .map(|(pk, _, _)| pk.clone())
400                    .collect::<Vec<_>>();
401
402                let mut registrations = register_participants(&mut oracle, &pks).await;
403                let link = commonware_p2p::simulated::Link {
404                    latency: 10.0,
405                    jitter: 1.0,
406                    success_rate: 1.0,
407                };
408                link_participants(&mut oracle, &pks, Action::Link(link), None).await;
409
410                let automatons = Arc::new(Mutex::new(BTreeMap::<
411                    PublicKey,
412                    mocks::Automaton<PublicKey>,
413                >::new()));
414                let mut reporters =
415                    BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new(
416                    );
417                spawn_validator_engines(
418                    context.with_label("validator"),
419                    polynomial.clone(),
420                    &pks,
421                    &pks,
422                    &validators,
423                    &mut registrations,
424                    &mut automatons.lock().unwrap(),
425                    &mut reporters,
426                    Duration::from_secs(5),
427                    |_| false,
428                    None,
429                );
430
431                let reporter_pairs: Vec<(
432                    PublicKey,
433                    mocks::ReporterMailbox<PublicKey, V, Sha256Digest>,
434                )> = reporters
435                    .iter()
436                    .map(|(v, m)| (v.clone(), m.clone()))
437                    .collect();
438                for (validator, mut mailbox) in reporter_pairs {
439                    let completed_clone = completed.clone();
440                    context
441                        .with_label("reporter_unclean")
442                        .spawn(|context| async move {
443                            loop {
444                                let (height, _) =
445                                    mailbox.get_tip(validator.clone()).await.unwrap_or((0, 0));
446                                if height >= 100 {
447                                    completed_clone.lock().unwrap().insert(validator.clone());
448                                    break;
449                                }
450                                context.sleep(Duration::from_millis(100)).await;
451                            }
452                        });
453                }
454                context.sleep(Duration::from_millis(1000)).await;
455                *shutdowns.lock().unwrap() += 1;
456
457                context
458            };
459
460            let context = if let Some(prev_ctx) = prev_ctx {
461                deterministic::Runner::from(prev_ctx)
462            } else {
463                deterministic::Runner::timed(Duration::from_secs(45))
464            }
465            .start(f);
466
467            prev_ctx = Some(context.recover());
468        }
469    }
470
471    #[test_traced]
472    fn test_unclean_shutdown() {
473        unclean_shutdown::<MinPk>();
474        unclean_shutdown::<MinSig>();
475    }
476
477    fn network_partition<V: Variant>() {
478        let num_validators: u32 = 4;
479        let quorum: u32 = 3;
480        let runner = deterministic::Runner::timed(Duration::from_secs(60));
481
482        runner.start(|mut context| async move {
483            let (polynomial, mut shares_vec) =
484                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
485            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
486
487            // Configure the network
488            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
489                context.with_label("simulation"),
490                num_validators,
491                &mut shares_vec,
492            )
493            .await;
494            let automatons = Arc::new(Mutex::new(
495                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
496            ));
497            let mut reporters =
498                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
499            spawn_validator_engines(
500                context.with_label("validator"),
501                polynomial.clone(),
502                &pks,
503                &pks,
504                &validators,
505                &mut registrations,
506                &mut automatons.lock().unwrap(),
507                &mut reporters,
508                Duration::from_secs(1),
509                |_| false,
510                None,
511            );
512
513            // Simulate partition by removing all links.
514            link_participants(&mut oracle, &pks, Action::Unlink, None).await;
515            context.sleep(Duration::from_secs(30)).await;
516
517            // Get the maximum height from all reporters.
518            let max_height = get_max_height(&mut reporters).await;
519
520            // Heal the partition by re-adding links.
521            let link = Link {
522                latency: 10.0,
523                jitter: 1.0,
524                success_rate: 1.0,
525            };
526            link_participants(&mut oracle, &pks, Action::Link(link), None).await;
527            await_reporters(
528                context.with_label("reporter"),
529                reporters.keys().cloned().collect::<Vec<_>>(),
530                &reporters,
531                (max_height + 100, 111, false),
532            )
533            .await;
534        });
535    }
536
537    #[test_traced]
538    #[ignore]
539    fn test_network_partition() {
540        network_partition::<MinPk>();
541        network_partition::<MinSig>();
542    }
543
544    fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
545        let num_validators: u32 = 4;
546        let quorum: u32 = 3;
547        let cfg = deterministic::Config::new()
548            .with_seed(seed)
549            .with_timeout(Some(Duration::from_secs(40)));
550        let runner = deterministic::Runner::new(cfg);
551
552        runner.start(|mut context| async move {
553            let (polynomial, mut shares_vec) =
554                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
555            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
556
557            let (oracle, validators, pks, mut registrations) = initialize_simulation(
558                context.with_label("simulation"),
559                num_validators,
560                &mut shares_vec,
561            )
562            .await;
563            let delayed_link = Link {
564                latency: 50.0,
565                jitter: 40.0,
566                success_rate: 0.5,
567            };
568            let mut oracle_clone = oracle.clone();
569            link_participants(&mut oracle_clone, &pks, Action::Update(delayed_link), None).await;
570
571            let automatons = Arc::new(Mutex::new(
572                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
573            ));
574            let mut reporters =
575                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
576            spawn_validator_engines(
577                context.with_label("validator"),
578                polynomial.clone(),
579                &pks,
580                &pks,
581                &validators,
582                &mut registrations,
583                &mut automatons.lock().unwrap(),
584                &mut reporters,
585                Duration::from_millis(150),
586                |_| false,
587                None,
588            );
589
590            await_reporters(
591                context.with_label("reporter"),
592                reporters.keys().cloned().collect::<Vec<_>>(),
593                &reporters,
594                (40, 111, false),
595            )
596            .await;
597
598            context.auditor().state()
599        })
600    }
601
602    #[test_traced]
603    fn test_slow_and_lossy_links() {
604        slow_and_lossy_links::<MinPk>(0);
605        slow_and_lossy_links::<MinSig>(0);
606    }
607
608    #[test_traced]
609    #[ignore]
610    fn test_determinism() {
611        // We use slow and lossy links as the deterministic test
612        // because it is the most complex test.
613        for seed in 1..6 {
614            let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
615            let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
616            assert_eq!(pk_state_1, pk_state_2);
617
618            let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
619            let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
620            assert_eq!(sig_state_1, sig_state_2);
621
622            // Sanity check that different types can't be identical.
623            assert_ne!(pk_state_1, sig_state_1);
624        }
625    }
626
627    fn invalid_signature_injection<V: Variant>() {
628        let num_validators: u32 = 4;
629        let quorum: u32 = 3;
630        let runner = deterministic::Runner::timed(Duration::from_secs(30));
631
632        runner.start(|mut context| async move {
633            let (polynomial, mut shares_vec) =
634                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
635            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
636
637            let (_oracle, validators, pks, mut registrations) = initialize_simulation(
638                context.with_label("simulation"),
639                num_validators,
640                &mut shares_vec,
641            )
642            .await;
643            let automatons = Arc::new(Mutex::new(
644                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
645            ));
646            let mut reporters =
647                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
648            spawn_validator_engines::<V>(
649                context.with_label("validator"),
650                polynomial.clone(),
651                &pks,
652                &pks,
653                &validators,
654                &mut registrations,
655                &mut automatons.lock().unwrap(),
656                &mut reporters,
657                Duration::from_secs(5),
658                |i| i % 10 == 0,
659                None,
660            );
661
662            await_reporters(
663                context.with_label("reporter"),
664                reporters.keys().cloned().collect::<Vec<_>>(),
665                &reporters,
666                (100, 111, true),
667            )
668            .await;
669        });
670    }
671
672    #[test_traced]
673    fn test_invalid_signature_injection() {
674        invalid_signature_injection::<MinPk>();
675        invalid_signature_injection::<MinSig>();
676    }
677
678    fn updated_epoch<V: Variant>() {
679        let num_validators: u32 = 4;
680        let quorum: u32 = 3;
681        let runner = deterministic::Runner::timed(Duration::from_secs(60));
682
683        runner.start(|mut context| async move {
684            let (polynomial, mut shares_vec) =
685                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
686            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
687
688            // Setup network
689            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
690                context.with_label("simulation"),
691                num_validators,
692                &mut shares_vec,
693            )
694            .await;
695            let automatons = Arc::new(Mutex::new(
696                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
697            ));
698            let mut reporters =
699                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
700            let monitors = spawn_validator_engines::<V>(
701                context.with_label("validator"),
702                polynomial.clone(),
703                &pks,
704                &pks,
705                &validators,
706                &mut registrations,
707                &mut automatons.lock().unwrap(),
708                &mut reporters,
709                Duration::from_secs(1),
710                |_| false,
711                Some(5),
712            );
713
714            // Perform some work
715            await_reporters(
716                context.with_label("reporter"),
717                reporters.keys().cloned().collect::<Vec<_>>(),
718                &reporters,
719                (100, 111, true),
720            )
721            .await;
722
723            // Simulate partition by removing all links.
724            link_participants(&mut oracle, &pks, Action::Unlink, None).await;
725            context.sleep(Duration::from_secs(30)).await;
726
727            // Get the maximum height from all reporters.
728            let max_height = get_max_height(&mut reporters).await;
729
730            // Update the epoch
731            for monitor in monitors.values() {
732                monitor.update(112);
733            }
734
735            // Heal the partition by re-adding links.
736            let link = Link {
737                latency: 10.0,
738                jitter: 1.0,
739                success_rate: 1.0,
740            };
741            link_participants(&mut oracle, &pks, Action::Link(link), None).await;
742            await_reporters(
743                context.with_label("reporter"),
744                reporters.keys().cloned().collect::<Vec<_>>(),
745                &reporters,
746                (max_height + 100, 112, true),
747            )
748            .await;
749        });
750    }
751
752    #[test_traced]
753    fn test_updated_epoch() {
754        updated_epoch::<MinPk>();
755        updated_epoch::<MinSig>();
756    }
757
758    fn external_sequencer<V: Variant>() {
759        let num_validators: u32 = 4;
760        let quorum: u32 = quorum(3);
761        let runner = deterministic::Runner::timed(Duration::from_secs(60));
762        runner.start(|mut context| async move {
763            // Generate validator shares
764            let (polynomial, shares) =
765                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
766
767            // Generate validator schemes
768            let mut schemes = (0..num_validators)
769                .map(|i| PrivateKey::from_seed(i as u64))
770                .collect::<Vec<_>>();
771            schemes.sort_by_key(|s| s.public_key());
772
773            // Generate validators
774            let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
775                .iter()
776                .enumerate()
777                .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares[i].clone()))
778                .collect();
779            let validator_pks = validators
780                .iter()
781                .map(|(pk, _, _)| pk.clone())
782                .collect::<Vec<_>>();
783
784            // Generate sequencer
785            let sequencer = PrivateKey::from_seed(u64::MAX);
786
787            // Generate network participants
788            let mut participants = validators
789                .iter()
790                .map(|(pk, _, _)| pk.clone())
791                .collect::<Vec<_>>();
792            participants.push(sequencer.public_key()); // as long as external participants are in same position for all, it is safe
793
794            // Create network
795            let (network, mut oracle) = Network::new(
796                context.with_label("network"),
797                commonware_p2p::simulated::Config {
798                    max_size: 1024 * 1024,
799                },
800            );
801            network.start();
802
803            // Register all participants
804            let mut registrations = register_participants(&mut oracle, &participants).await;
805            let link = commonware_p2p::simulated::Link {
806                latency: 10.0,
807                jitter: 1.0,
808                success_rate: 1.0,
809            };
810            link_participants(&mut oracle, &participants, Action::Link(link), None).await;
811
812            // Setup engines
813            let automatons = Arc::new(Mutex::new(
814                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
815            ));
816            let mut reporters =
817                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
818            let mut monitors = HashMap::new();
819            let namespace = b"my testing namespace";
820
821            // Spawn validator engines
822            for (validator, scheme, share) in validators.iter() {
823                let context = context.with_label(&validator.to_string());
824                let monitor = mocks::Monitor::new(111);
825                monitors.insert(validator.clone(), monitor.clone());
826                let sequencers = mocks::Sequencers::<PublicKey>::new(vec![sequencer.public_key()]);
827                let validators = mocks::Validators::<PublicKey, V>::new(
828                    polynomial.clone(),
829                    validator_pks.clone(),
830                    Some(share.clone()),
831                );
832
833                let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
834                automatons
835                    .lock()
836                    .unwrap()
837                    .insert(validator.clone(), automaton.clone());
838
839                let (reporter, reporter_mailbox) =
840                    mocks::Reporter::<PublicKey, V, Sha256Digest>::new(
841                        namespace,
842                        *poly::public::<V>(&polynomial),
843                        Some(5),
844                    );
845                context.with_label("reporter").spawn(|_| reporter.run());
846                reporters.insert(validator.clone(), reporter_mailbox);
847
848                let engine = Engine::new(
849                    context.with_label("engine"),
850                    Config {
851                        crypto: scheme.clone(),
852                        relay: automaton.clone(),
853                        automaton: automaton.clone(),
854                        reporter: reporters.get(validator).unwrap().clone(),
855                        monitor,
856                        sequencers,
857                        validators,
858                        namespace: namespace.to_vec(),
859                        epoch_bounds: (1, 1),
860                        height_bound: 2,
861                        rebroadcast_timeout: Duration::from_secs(5),
862                        priority_acks: false,
863                        priority_proposals: false,
864                        journal_heights_per_section: 10,
865                        journal_replay_buffer: 4096,
866                        journal_write_buffer: 4096,
867                        journal_name_prefix: format!("ordered-broadcast-seq/{validator}/"),
868                        journal_compression: Some(3),
869                    },
870                );
871
872                let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
873                engine.start((a1, a2), (b1, b2));
874            }
875
876            // Spawn sequencer engine
877            {
878                let context = context.with_label("sequencer");
879                let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
880                automatons
881                    .lock()
882                    .unwrap()
883                    .insert(sequencer.public_key(), automaton.clone());
884                let (reporter, reporter_mailbox) =
885                    mocks::Reporter::<PublicKey, V, Sha256Digest>::new(
886                        namespace,
887                        *poly::public::<V>(&polynomial),
888                        Some(5),
889                    );
890                context.with_label("reporter").spawn(|_| reporter.run());
891                reporters.insert(sequencer.public_key(), reporter_mailbox);
892                let engine = Engine::new(
893                    context.with_label("engine"),
894                    Config {
895                        crypto: sequencer.clone(),
896                        relay: automaton.clone(),
897                        automaton: automaton.clone(),
898                        reporter: reporters.get(&sequencer.public_key()).unwrap().clone(),
899                        monitor: mocks::Monitor::new(111),
900                        sequencers: mocks::Sequencers::<PublicKey>::new(vec![
901                            sequencer.public_key()
902                        ]),
903                        validators: mocks::Validators::<PublicKey, V>::new(
904                            polynomial.clone(),
905                            validator_pks,
906                            None,
907                        ),
908                        namespace: namespace.to_vec(),
909                        epoch_bounds: (1, 1),
910                        height_bound: 2,
911                        rebroadcast_timeout: Duration::from_secs(5),
912                        priority_acks: false,
913                        priority_proposals: false,
914                        journal_heights_per_section: 10,
915                        journal_replay_buffer: 4096,
916                        journal_write_buffer: 4096,
917                        journal_name_prefix: format!(
918                            "ordered-broadcast-seq/{}/",
919                            sequencer.public_key()
920                        ),
921                        journal_compression: Some(3),
922                    },
923                );
924
925                let ((a1, a2), (b1, b2)) = registrations.remove(&sequencer.public_key()).unwrap();
926                engine.start((a1, a2), (b1, b2));
927            }
928
929            // Await reporters
930            await_reporters(
931                context.with_label("reporter"),
932                vec![sequencer.public_key()],
933                &reporters,
934                (100, 111, true),
935            )
936            .await;
937        });
938    }
939
940    #[test_traced]
941    fn test_external_sequencer() {
942        external_sequencer::<MinPk>();
943        external_sequencer::<MinSig>();
944    }
945
946    fn run_1k<V: Variant>() {
947        let num_validators: u32 = 10;
948        let quorum: u32 = 3;
949        let cfg = deterministic::Config::new();
950        let runner = deterministic::Runner::new(cfg);
951
952        runner.start(|mut context| async move {
953            let (polynomial, mut shares_vec) =
954                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
955            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
956
957            let (oracle, validators, pks, mut registrations) = initialize_simulation(
958                context.with_label("simulation"),
959                num_validators,
960                &mut shares_vec,
961            )
962            .await;
963            let delayed_link = Link {
964                latency: 80.0,
965                jitter: 10.0,
966                success_rate: 0.98,
967            };
968            let mut oracle_clone = oracle.clone();
969            link_participants(&mut oracle_clone, &pks, Action::Update(delayed_link), None).await;
970
971            let automatons = Arc::new(Mutex::new(
972                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
973            ));
974            let mut reporters =
975                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
976            let sequencers = &pks[0..pks.len() / 2];
977            spawn_validator_engines::<V>(
978                context.with_label("validator"),
979                polynomial.clone(),
980                sequencers,
981                &pks,
982                &validators,
983                &mut registrations,
984                &mut automatons.lock().unwrap(),
985                &mut reporters,
986                Duration::from_millis(150),
987                |_| false,
988                None,
989            );
990
991            await_reporters(
992                context.with_label("reporter"),
993                sequencers.to_vec(),
994                &reporters,
995                (1_000, 111, false),
996            )
997            .await;
998        })
999    }
1000
1001    #[test_traced]
1002    #[ignore]
1003    fn test_1k() {
1004        run_1k::<MinPk>();
1005        run_1k::<MinSig>();
1006    }
1007}