commonware_consensus/ordered_broadcast/
mod.rs

1//! Ordered, reliable broadcast across reconfigurable participants.
2//!
3//! # Concepts
4//!
5//! The system has two types of network participants: `sequencers` and `validators`. Their sets may
6//! overlap and are defined by the current `epoch`, a monotonically increasing integer. This module
7//! can handle reconfiguration of these sets across different epochs.
8//!
9//! Sequencers broadcast data. The smallest unit of data is a `chunk`. Sequencers broadcast `node`s
10//! that contain a chunk and a threshold signature over the previous chunk, forming a linked chain
11//! of nodes from each sequencer.
12//!
13//! Validators verify and sign chunks using partial signatures. These can be combined to recover a
14//! threshold signature, ensuring a quorum verifies each chunk. The threshold signature allows
15//! external parties to confirm that the chunk was reliably broadcast.
16//!
17//! Network participants persist any new nodes to a journal. This enables recovery from crashes and
18//! ensures that sequencers do not broadcast conflicting chunks and that validators do not sign
19//! them. "Conflicting" chunks are chunks from the same sequencer at the same height with different
20//! payloads.
21//!
22//! # Design
23//!
24//! The core of the module is the [Engine]. It is responsible for:
25//! - Broadcasting nodes (if a sequencer)
26//! - Signing chunks (if a validator)
27//! - Tracking the latest chunk in each sequencer’s chain
28//! - Recovering threshold signatures from partial signatures for each chunk
29//! - Notifying other actors of new chunks and threshold signatures
30//!
31//! # Acknowledgements
32//!
33//! [Autobahn](https://arxiv.org/abs/2401.10369) provided the insight that a succinct
34//! proof-of-availability could be produced by linking sequencer broadcasts.
35
36pub mod types;
37
38cfg_if::cfg_if! {
39    if #[cfg(not(target_arch = "wasm32"))] {
40        mod ack_manager;
41        use ack_manager::AckManager;
42        mod config;
43        pub use config::Config;
44        mod engine;
45        pub use engine::Engine;
46        mod metrics;
47        mod tip_manager;
48        use tip_manager::TipManager;
49    }
50}
51
52#[cfg(test)]
53pub mod mocks;
54
55#[cfg(test)]
56mod tests {
57    use super::{mocks, Config, Engine};
58    use crate::types::Epoch;
59    use commonware_cryptography::{
60        bls12381::{
61            dkg::ops,
62            primitives::{
63                group::Share,
64                poly,
65                variant::{MinPk, MinSig, Variant},
66            },
67        },
68        ed25519::{PrivateKey, PublicKey},
69        sha256::Digest as Sha256Digest,
70        PrivateKeyExt as _, Signer as _,
71    };
72    use commonware_macros::test_traced;
73    use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
74    use commonware_runtime::{
75        buffer::PoolRef,
76        deterministic::{self, Context},
77        Clock, Metrics, Runner, Spawner,
78    };
79    use commonware_utils::{quorum, NZUsize};
80    use futures::{channel::oneshot, future::join_all};
81    use rand::{rngs::StdRng, SeedableRng as _};
82    use std::{
83        collections::{BTreeMap, HashMap, HashSet},
84        num::NonZeroUsize,
85        sync::{Arc, Mutex},
86        time::Duration,
87    };
88    use tracing::debug;
89
90    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
91    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
92
93    type Registrations<P> = BTreeMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))>;
94
95    async fn register_participants(
96        oracle: &mut Oracle<PublicKey>,
97        participants: &[PublicKey],
98    ) -> Registrations<PublicKey> {
99        let mut registrations = BTreeMap::new();
100        for participant in participants.iter() {
101            let mut control = oracle.control(participant.clone());
102            let (a1, a2) = control.register(0).await.unwrap();
103            let (b1, b2) = control.register(1).await.unwrap();
104            registrations.insert(participant.clone(), ((a1, a2), (b1, b2)));
105        }
106        registrations
107    }
108
109    enum Action {
110        Link(Link),
111        Update(Link),
112        Unlink,
113    }
114
115    async fn link_participants(
116        oracle: &mut Oracle<PublicKey>,
117        participants: &[PublicKey],
118        action: Action,
119        restrict_to: Option<fn(usize, usize, usize) -> bool>,
120    ) {
121        for (i1, v1) in participants.iter().enumerate() {
122            for (i2, v2) in participants.iter().enumerate() {
123                if v2 == v1 {
124                    continue;
125                }
126                if let Some(f) = restrict_to {
127                    if !f(participants.len(), i1, i2) {
128                        continue;
129                    }
130                }
131                if matches!(action, Action::Update(_) | Action::Unlink) {
132                    oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
133                }
134                if let Action::Link(ref link) | Action::Update(ref link) = action {
135                    oracle
136                        .add_link(v1.clone(), v2.clone(), link.clone())
137                        .await
138                        .unwrap();
139                }
140            }
141        }
142    }
143
144    async fn initialize_simulation(
145        context: Context,
146        num_validators: u32,
147        shares_vec: &mut [Share],
148    ) -> (
149        Oracle<PublicKey>,
150        Vec<(PublicKey, PrivateKey, Share)>,
151        Vec<PublicKey>,
152        Registrations<PublicKey>,
153    ) {
154        let (network, mut oracle) = Network::new(
155            context.with_label("network"),
156            commonware_p2p::simulated::Config {
157                max_size: 1024 * 1024,
158                disconnect_on_block: true,
159                tracked_peer_sets: None,
160            },
161        );
162        network.start();
163
164        let mut schemes = (0..num_validators)
165            .map(|i| PrivateKey::from_seed(i as u64))
166            .collect::<Vec<_>>();
167        schemes.sort_by_key(|s| s.public_key());
168        let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
169            .iter()
170            .enumerate()
171            .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
172            .collect();
173        let pks = validators
174            .iter()
175            .map(|(pk, _, _)| pk.clone())
176            .collect::<Vec<_>>();
177
178        let registrations = register_participants(&mut oracle, &pks).await;
179        let link = Link {
180            latency: Duration::from_millis(10),
181            jitter: Duration::from_millis(1),
182            success_rate: 1.0,
183        };
184        link_participants(&mut oracle, &pks, Action::Link(link), None).await;
185        (oracle, validators, pks, registrations)
186    }
187
188    #[allow(clippy::too_many_arguments)]
189    fn spawn_validator_engines<V: Variant>(
190        context: Context,
191        polynomial: poly::Public<V>,
192        sequencer_pks: &[PublicKey],
193        validator_pks: &[PublicKey],
194        validators: &[(PublicKey, PrivateKey, Share)],
195        registrations: &mut Registrations<PublicKey>,
196        automatons: &mut BTreeMap<PublicKey, mocks::Automaton<PublicKey>>,
197        reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>,
198        rebroadcast_timeout: Duration,
199        invalid_when: fn(u64) -> bool,
200        misses_allowed: Option<usize>,
201    ) -> HashMap<PublicKey, mocks::Monitor> {
202        let mut monitors = HashMap::new();
203        let namespace = b"my testing namespace";
204        for (validator, scheme, share) in validators.iter() {
205            let context = context.with_label(&validator.to_string());
206            let monitor = mocks::Monitor::new(111);
207            monitors.insert(validator.clone(), monitor.clone());
208            let sequencers = mocks::Sequencers::<PublicKey>::new(sequencer_pks.to_vec());
209            let validators = mocks::Validators::<PublicKey, V>::new(
210                polynomial.clone(),
211                validator_pks.to_vec(),
212                Some(share.clone()),
213            );
214
215            let automaton = mocks::Automaton::<PublicKey>::new(invalid_when);
216            automatons.insert(validator.clone(), automaton.clone());
217
218            let (reporter, reporter_mailbox) = mocks::Reporter::<PublicKey, V, Sha256Digest>::new(
219                namespace,
220                *poly::public::<V>(&polynomial),
221                misses_allowed,
222            );
223            context.with_label("reporter").spawn(|_| reporter.run());
224            reporters.insert(validator.clone(), reporter_mailbox);
225
226            let engine = Engine::new(
227                context.with_label("engine"),
228                Config {
229                    crypto: scheme.clone(),
230                    relay: automaton.clone(),
231                    automaton: automaton.clone(),
232                    reporter: reporters.get(validator).unwrap().clone(),
233                    monitor,
234                    sequencers,
235                    validators,
236                    namespace: namespace.to_vec(),
237                    epoch_bounds: (1, 1),
238                    height_bound: 2,
239                    rebroadcast_timeout,
240                    priority_acks: false,
241                    priority_proposals: false,
242                    journal_heights_per_section: 10,
243                    journal_replay_buffer: NZUsize!(4096),
244                    journal_write_buffer: NZUsize!(4096),
245                    journal_name_prefix: format!("ordered-broadcast-seq/{validator}/"),
246                    journal_compression: Some(3),
247                    journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
248                },
249            );
250
251            let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
252            engine.start((a1, a2), (b1, b2));
253        }
254        monitors
255    }
256
257    async fn await_reporters<V: Variant>(
258        context: Context,
259        sequencers: Vec<PublicKey>,
260        reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>,
261        threshold: (u64, Epoch, bool),
262    ) {
263        let mut receivers = Vec::new();
264        for (reporter, mailbox) in reporters.iter() {
265            // Spawn a watcher for the reporter.
266            for sequencer in sequencers.iter() {
267                // Create a oneshot channel to signal when the reporter has reached the threshold.
268                let (tx, rx) = oneshot::channel();
269                receivers.push(rx);
270
271                context.with_label("reporter_watcher").spawn({
272                    let reporter = reporter.clone();
273                    let sequencer = sequencer.clone();
274                    let mut mailbox = mailbox.clone();
275                    move |context| async move {
276                        loop {
277                            let (height, epoch) =
278                                mailbox.get_tip(sequencer.clone()).await.unwrap_or((0, 0));
279                            debug!(height, epoch, ?sequencer, ?reporter, "reporter");
280                            let contiguous_height = mailbox
281                                .get_contiguous_tip(sequencer.clone())
282                                .await
283                                .unwrap_or(0);
284                            if height >= threshold.0
285                                && epoch >= threshold.1
286                                && (!threshold.2 || contiguous_height >= threshold.0)
287                            {
288                                let _ = tx.send(sequencer.clone());
289                                break;
290                            }
291                            context.sleep(Duration::from_millis(100)).await;
292                        }
293                    }
294                });
295            }
296        }
297
298        // Wait for all oneshot receivers to complete.
299        let results = join_all(receivers).await;
300        assert_eq!(results.len(), sequencers.len() * reporters.len());
301
302        // Check that none were cancelled.
303        for result in results {
304            assert!(result.is_ok(), "reporter was cancelled");
305        }
306    }
307
308    async fn get_max_height<V: Variant>(
309        reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>,
310    ) -> u64 {
311        let mut max_height = 0;
312        for (sequencer, mailbox) in reporters.iter_mut() {
313            let (height, _) = mailbox.get_tip(sequencer.clone()).await.unwrap_or((0, 0));
314            if height > max_height {
315                max_height = height;
316            }
317        }
318        max_height
319    }
320
321    fn all_online<V: Variant>() {
322        let num_validators: u32 = 4;
323        let quorum: u32 = 3;
324        let runner = deterministic::Runner::timed(Duration::from_secs(30));
325
326        runner.start(|mut context| async move {
327            let (polynomial, mut shares_vec) =
328                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
329            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
330
331            let (_oracle, validators, pks, mut registrations) = initialize_simulation(
332                context.with_label("simulation"),
333                num_validators,
334                &mut shares_vec,
335            )
336            .await;
337            let automatons = Arc::new(Mutex::new(
338                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
339            ));
340            let mut reporters =
341                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
342            spawn_validator_engines::<V>(
343                context.with_label("validator"),
344                polynomial.clone(),
345                &pks,
346                &pks,
347                &validators,
348                &mut registrations,
349                &mut automatons.lock().unwrap(),
350                &mut reporters,
351                Duration::from_secs(5),
352                |_| false,
353                Some(5),
354            );
355            await_reporters(
356                context.with_label("reporter"),
357                reporters.keys().cloned().collect::<Vec<_>>(),
358                &reporters,
359                (100, 111, true),
360            )
361            .await;
362        });
363    }
364
365    #[test_traced]
366    fn test_all_online() {
367        all_online::<MinPk>();
368        all_online::<MinSig>();
369    }
370
371    fn unclean_shutdown<V: Variant>() {
372        let num_validators: u32 = 4;
373        let quorum: u32 = 3;
374        let mut rng = StdRng::seed_from_u64(0);
375        let (polynomial, mut shares_vec) =
376            ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum);
377        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
378        let completed = Arc::new(Mutex::new(HashSet::new()));
379        let shutdowns = Arc::new(Mutex::new(0u64));
380        let mut prev_checkpoint = None;
381
382        while completed.lock().unwrap().len() != num_validators as usize {
383            let completed = completed.clone();
384            let shares_vec = shares_vec.clone();
385            let shutdowns = shutdowns.clone();
386            let polynomial = polynomial.clone();
387
388            let f = |context: deterministic::Context| async move {
389                let (network, mut oracle) = Network::new(
390                    context.with_label("network"),
391                    commonware_p2p::simulated::Config {
392                        max_size: 1024 * 1024,
393                        disconnect_on_block: true,
394                        tracked_peer_sets: None,
395                    },
396                );
397                network.start();
398
399                let mut schemes = (0..num_validators)
400                    .map(|i| PrivateKey::from_seed(i as u64))
401                    .collect::<Vec<_>>();
402                schemes.sort_by_key(|s| s.public_key());
403                let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
404                    .iter()
405                    .enumerate()
406                    .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
407                    .collect();
408                let pks = validators
409                    .iter()
410                    .map(|(pk, _, _)| pk.clone())
411                    .collect::<Vec<_>>();
412
413                let mut registrations = register_participants(&mut oracle, &pks).await;
414                let link = commonware_p2p::simulated::Link {
415                    latency: Duration::from_millis(10),
416                    jitter: Duration::from_millis(1),
417                    success_rate: 1.0,
418                };
419                link_participants(&mut oracle, &pks, Action::Link(link), None).await;
420
421                let automatons = Arc::new(Mutex::new(BTreeMap::<
422                    PublicKey,
423                    mocks::Automaton<PublicKey>,
424                >::new()));
425                let mut reporters =
426                    BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new(
427                    );
428                spawn_validator_engines(
429                    context.with_label("validator"),
430                    polynomial.clone(),
431                    &pks,
432                    &pks,
433                    &validators,
434                    &mut registrations,
435                    &mut automatons.lock().unwrap(),
436                    &mut reporters,
437                    Duration::from_secs(5),
438                    |_| false,
439                    None,
440                );
441
442                let reporter_pairs: Vec<(
443                    PublicKey,
444                    mocks::ReporterMailbox<PublicKey, V, Sha256Digest>,
445                )> = reporters
446                    .iter()
447                    .map(|(v, m)| (v.clone(), m.clone()))
448                    .collect();
449                for (validator, mut mailbox) in reporter_pairs {
450                    let completed_clone = completed.clone();
451                    context
452                        .with_label("reporter_unclean")
453                        .spawn(|context| async move {
454                            loop {
455                                let (height, _) =
456                                    mailbox.get_tip(validator.clone()).await.unwrap_or((0, 0));
457                                if height >= 100 {
458                                    completed_clone.lock().unwrap().insert(validator.clone());
459                                    break;
460                                }
461                                context.sleep(Duration::from_millis(100)).await;
462                            }
463                        });
464                }
465                context.sleep(Duration::from_millis(1000)).await;
466                *shutdowns.lock().unwrap() += 1;
467            };
468
469            let (_, checkpoint) = if let Some(prev_checkpoint) = prev_checkpoint {
470                deterministic::Runner::from(prev_checkpoint)
471            } else {
472                deterministic::Runner::timed(Duration::from_secs(45))
473            }
474            .start_and_recover(f);
475
476            prev_checkpoint = Some(checkpoint);
477        }
478    }
479
480    #[test_traced]
481    fn test_unclean_shutdown() {
482        unclean_shutdown::<MinPk>();
483        unclean_shutdown::<MinSig>();
484    }
485
486    fn network_partition<V: Variant>() {
487        let num_validators: u32 = 4;
488        let quorum: u32 = 3;
489        let runner = deterministic::Runner::timed(Duration::from_secs(60));
490
491        runner.start(|mut context| async move {
492            let (polynomial, mut shares_vec) =
493                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
494            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
495
496            // Configure the network
497            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
498                context.with_label("simulation"),
499                num_validators,
500                &mut shares_vec,
501            )
502            .await;
503            let automatons = Arc::new(Mutex::new(
504                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
505            ));
506            let mut reporters =
507                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
508            spawn_validator_engines(
509                context.with_label("validator"),
510                polynomial.clone(),
511                &pks,
512                &pks,
513                &validators,
514                &mut registrations,
515                &mut automatons.lock().unwrap(),
516                &mut reporters,
517                Duration::from_secs(1),
518                |_| false,
519                None,
520            );
521
522            // Simulate partition by removing all links.
523            link_participants(&mut oracle, &pks, Action::Unlink, None).await;
524            context.sleep(Duration::from_secs(30)).await;
525
526            // Get the maximum height from all reporters.
527            let max_height = get_max_height(&mut reporters).await;
528
529            // Heal the partition by re-adding links.
530            let link = Link {
531                latency: Duration::from_millis(10),
532                jitter: Duration::from_millis(1),
533                success_rate: 1.0,
534            };
535            link_participants(&mut oracle, &pks, Action::Link(link), None).await;
536            await_reporters(
537                context.with_label("reporter"),
538                reporters.keys().cloned().collect::<Vec<_>>(),
539                &reporters,
540                (max_height + 100, 111, false),
541            )
542            .await;
543        });
544    }
545
546    #[test_traced]
547    #[ignore]
548    fn test_network_partition() {
549        network_partition::<MinPk>();
550        network_partition::<MinSig>();
551    }
552
553    fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
554        let num_validators: u32 = 4;
555        let quorum: u32 = 3;
556        let cfg = deterministic::Config::new()
557            .with_seed(seed)
558            .with_timeout(Some(Duration::from_secs(40)));
559        let runner = deterministic::Runner::new(cfg);
560
561        runner.start(|mut context| async move {
562            let (polynomial, mut shares_vec) =
563                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
564            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
565
566            let (oracle, validators, pks, mut registrations) = initialize_simulation(
567                context.with_label("simulation"),
568                num_validators,
569                &mut shares_vec,
570            )
571            .await;
572            let delayed_link = Link {
573                latency: Duration::from_millis(50),
574                jitter: Duration::from_millis(40),
575                success_rate: 0.5,
576            };
577            let mut oracle_clone = oracle.clone();
578            link_participants(&mut oracle_clone, &pks, Action::Update(delayed_link), None).await;
579
580            let automatons = Arc::new(Mutex::new(
581                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
582            ));
583            let mut reporters =
584                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
585            spawn_validator_engines(
586                context.with_label("validator"),
587                polynomial.clone(),
588                &pks,
589                &pks,
590                &validators,
591                &mut registrations,
592                &mut automatons.lock().unwrap(),
593                &mut reporters,
594                Duration::from_millis(150),
595                |_| false,
596                None,
597            );
598
599            await_reporters(
600                context.with_label("reporter"),
601                reporters.keys().cloned().collect::<Vec<_>>(),
602                &reporters,
603                (40, 111, false),
604            )
605            .await;
606
607            context.auditor().state()
608        })
609    }
610
611    #[test_traced]
612    fn test_slow_and_lossy_links() {
613        slow_and_lossy_links::<MinPk>(0);
614        slow_and_lossy_links::<MinSig>(0);
615    }
616
617    #[test_traced]
618    #[ignore]
619    fn test_determinism() {
620        // We use slow and lossy links as the deterministic test
621        // because it is the most complex test.
622        for seed in 1..6 {
623            let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
624            let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
625            assert_eq!(pk_state_1, pk_state_2);
626
627            let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
628            let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
629            assert_eq!(sig_state_1, sig_state_2);
630
631            // Sanity check that different types can't be identical.
632            assert_ne!(pk_state_1, sig_state_1);
633        }
634    }
635
636    fn invalid_signature_injection<V: Variant>() {
637        let num_validators: u32 = 4;
638        let quorum: u32 = 3;
639        let runner = deterministic::Runner::timed(Duration::from_secs(30));
640
641        runner.start(|mut context| async move {
642            let (polynomial, mut shares_vec) =
643                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
644            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
645
646            let (_oracle, validators, pks, mut registrations) = initialize_simulation(
647                context.with_label("simulation"),
648                num_validators,
649                &mut shares_vec,
650            )
651            .await;
652            let automatons = Arc::new(Mutex::new(
653                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
654            ));
655            let mut reporters =
656                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
657            spawn_validator_engines::<V>(
658                context.with_label("validator"),
659                polynomial.clone(),
660                &pks,
661                &pks,
662                &validators,
663                &mut registrations,
664                &mut automatons.lock().unwrap(),
665                &mut reporters,
666                Duration::from_secs(5),
667                |i| i % 10 == 0,
668                None,
669            );
670
671            await_reporters(
672                context.with_label("reporter"),
673                reporters.keys().cloned().collect::<Vec<_>>(),
674                &reporters,
675                (100, 111, true),
676            )
677            .await;
678        });
679    }
680
681    #[test_traced]
682    fn test_invalid_signature_injection() {
683        invalid_signature_injection::<MinPk>();
684        invalid_signature_injection::<MinSig>();
685    }
686
687    fn updated_epoch<V: Variant>() {
688        let num_validators: u32 = 4;
689        let quorum: u32 = 3;
690        let runner = deterministic::Runner::timed(Duration::from_secs(60));
691
692        runner.start(|mut context| async move {
693            let (polynomial, mut shares_vec) =
694                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
695            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
696
697            // Setup network
698            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
699                context.with_label("simulation"),
700                num_validators,
701                &mut shares_vec,
702            )
703            .await;
704            let automatons = Arc::new(Mutex::new(
705                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
706            ));
707            let mut reporters =
708                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
709            let monitors = spawn_validator_engines::<V>(
710                context.with_label("validator"),
711                polynomial.clone(),
712                &pks,
713                &pks,
714                &validators,
715                &mut registrations,
716                &mut automatons.lock().unwrap(),
717                &mut reporters,
718                Duration::from_secs(1),
719                |_| false,
720                Some(5),
721            );
722
723            // Perform some work
724            await_reporters(
725                context.with_label("reporter"),
726                reporters.keys().cloned().collect::<Vec<_>>(),
727                &reporters,
728                (100, 111, true),
729            )
730            .await;
731
732            // Simulate partition by removing all links.
733            link_participants(&mut oracle, &pks, Action::Unlink, None).await;
734            context.sleep(Duration::from_secs(30)).await;
735
736            // Get the maximum height from all reporters.
737            let max_height = get_max_height(&mut reporters).await;
738
739            // Update the epoch
740            for monitor in monitors.values() {
741                monitor.update(112);
742            }
743
744            // Heal the partition by re-adding links.
745            let link = Link {
746                latency: Duration::from_millis(10),
747                jitter: Duration::from_millis(1),
748                success_rate: 1.0,
749            };
750            link_participants(&mut oracle, &pks, Action::Link(link), None).await;
751            await_reporters(
752                context.with_label("reporter"),
753                reporters.keys().cloned().collect::<Vec<_>>(),
754                &reporters,
755                (max_height + 100, 112, true),
756            )
757            .await;
758        });
759    }
760
761    #[test_traced]
762    fn test_updated_epoch() {
763        updated_epoch::<MinPk>();
764        updated_epoch::<MinSig>();
765    }
766
767    fn external_sequencer<V: Variant>() {
768        let num_validators: u32 = 4;
769        let quorum: u32 = quorum(3);
770        let runner = deterministic::Runner::timed(Duration::from_secs(60));
771        runner.start(|mut context| async move {
772            // Generate validator shares
773            let (polynomial, shares) =
774                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
775
776            // Generate validator schemes
777            let mut schemes = (0..num_validators)
778                .map(|i| PrivateKey::from_seed(i as u64))
779                .collect::<Vec<_>>();
780            schemes.sort_by_key(|s| s.public_key());
781
782            // Generate validators
783            let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
784                .iter()
785                .enumerate()
786                .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares[i].clone()))
787                .collect();
788            let validator_pks = validators
789                .iter()
790                .map(|(pk, _, _)| pk.clone())
791                .collect::<Vec<_>>();
792
793            // Generate sequencer
794            let sequencer = PrivateKey::from_seed(u64::MAX);
795
796            // Generate network participants
797            let mut participants = validators
798                .iter()
799                .map(|(pk, _, _)| pk.clone())
800                .collect::<Vec<_>>();
801            participants.push(sequencer.public_key()); // as long as external participants are in same position for all, it is safe
802
803            // Create network
804            let (network, mut oracle) = Network::new(
805                context.with_label("network"),
806                commonware_p2p::simulated::Config {
807                    max_size: 1024 * 1024,
808                    disconnect_on_block: true,
809                    tracked_peer_sets: None,
810                },
811            );
812            network.start();
813
814            // Register all participants
815            let mut registrations = register_participants(&mut oracle, &participants).await;
816            let link = commonware_p2p::simulated::Link {
817                latency: Duration::from_millis(10),
818                jitter: Duration::from_millis(1),
819                success_rate: 1.0,
820            };
821            link_participants(&mut oracle, &participants, Action::Link(link), None).await;
822
823            // Setup engines
824            let automatons = Arc::new(Mutex::new(
825                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
826            ));
827            let mut reporters =
828                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
829            let mut monitors = HashMap::new();
830            let namespace = b"my testing namespace";
831
832            // Spawn validator engines
833            for (validator, scheme, share) in validators.iter() {
834                let context = context.with_label(&validator.to_string());
835                let monitor = mocks::Monitor::new(111);
836                monitors.insert(validator.clone(), monitor.clone());
837                let sequencers = mocks::Sequencers::<PublicKey>::new(vec![sequencer.public_key()]);
838                let validators = mocks::Validators::<PublicKey, V>::new(
839                    polynomial.clone(),
840                    validator_pks.clone(),
841                    Some(share.clone()),
842                );
843
844                let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
845                automatons
846                    .lock()
847                    .unwrap()
848                    .insert(validator.clone(), automaton.clone());
849
850                let (reporter, reporter_mailbox) =
851                    mocks::Reporter::<PublicKey, V, Sha256Digest>::new(
852                        namespace,
853                        *poly::public::<V>(&polynomial),
854                        Some(5),
855                    );
856                context.with_label("reporter").spawn(|_| reporter.run());
857                reporters.insert(validator.clone(), reporter_mailbox);
858
859                let engine = Engine::new(
860                    context.with_label("engine"),
861                    Config {
862                        crypto: scheme.clone(),
863                        relay: automaton.clone(),
864                        automaton: automaton.clone(),
865                        reporter: reporters.get(validator).unwrap().clone(),
866                        monitor,
867                        sequencers,
868                        validators,
869                        namespace: namespace.to_vec(),
870                        epoch_bounds: (1, 1),
871                        height_bound: 2,
872                        rebroadcast_timeout: Duration::from_secs(5),
873                        priority_acks: false,
874                        priority_proposals: false,
875                        journal_heights_per_section: 10,
876                        journal_replay_buffer: NZUsize!(4096),
877                        journal_write_buffer: NZUsize!(4096),
878                        journal_name_prefix: format!("ordered-broadcast-seq/{validator}/"),
879                        journal_compression: Some(3),
880                        journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
881                    },
882                );
883
884                let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
885                engine.start((a1, a2), (b1, b2));
886            }
887
888            // Spawn sequencer engine
889            {
890                let context = context.with_label("sequencer");
891                let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
892                automatons
893                    .lock()
894                    .unwrap()
895                    .insert(sequencer.public_key(), automaton.clone());
896                let (reporter, reporter_mailbox) =
897                    mocks::Reporter::<PublicKey, V, Sha256Digest>::new(
898                        namespace,
899                        *poly::public::<V>(&polynomial),
900                        Some(5),
901                    );
902                context.with_label("reporter").spawn(|_| reporter.run());
903                reporters.insert(sequencer.public_key(), reporter_mailbox);
904                let engine = Engine::new(
905                    context.with_label("engine"),
906                    Config {
907                        crypto: sequencer.clone(),
908                        relay: automaton.clone(),
909                        automaton: automaton.clone(),
910                        reporter: reporters.get(&sequencer.public_key()).unwrap().clone(),
911                        monitor: mocks::Monitor::new(111),
912                        sequencers: mocks::Sequencers::<PublicKey>::new(vec![
913                            sequencer.public_key()
914                        ]),
915                        validators: mocks::Validators::<PublicKey, V>::new(
916                            polynomial.clone(),
917                            validator_pks,
918                            None,
919                        ),
920                        namespace: namespace.to_vec(),
921                        epoch_bounds: (1, 1),
922                        height_bound: 2,
923                        rebroadcast_timeout: Duration::from_secs(5),
924                        priority_acks: false,
925                        priority_proposals: false,
926                        journal_heights_per_section: 10,
927                        journal_replay_buffer: NZUsize!(4096),
928                        journal_write_buffer: NZUsize!(4096),
929                        journal_name_prefix: format!(
930                            "ordered-broadcast-seq/{}/",
931                            sequencer.public_key()
932                        ),
933                        journal_compression: Some(3),
934                        journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
935                    },
936                );
937
938                let ((a1, a2), (b1, b2)) = registrations.remove(&sequencer.public_key()).unwrap();
939                engine.start((a1, a2), (b1, b2));
940            }
941
942            // Await reporters
943            await_reporters(
944                context.with_label("reporter"),
945                vec![sequencer.public_key()],
946                &reporters,
947                (100, 111, true),
948            )
949            .await;
950        });
951    }
952
953    #[test_traced]
954    fn test_external_sequencer() {
955        external_sequencer::<MinPk>();
956        external_sequencer::<MinSig>();
957    }
958
959    fn run_1k<V: Variant>() {
960        let num_validators: u32 = 10;
961        let quorum: u32 = 3;
962        let cfg = deterministic::Config::new();
963        let runner = deterministic::Runner::new(cfg);
964
965        runner.start(|mut context| async move {
966            let (polynomial, mut shares_vec) =
967                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
968            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
969
970            let (oracle, validators, pks, mut registrations) = initialize_simulation(
971                context.with_label("simulation"),
972                num_validators,
973                &mut shares_vec,
974            )
975            .await;
976            let delayed_link = Link {
977                latency: Duration::from_millis(80),
978                jitter: Duration::from_millis(10),
979                success_rate: 0.98,
980            };
981            let mut oracle_clone = oracle.clone();
982            link_participants(&mut oracle_clone, &pks, Action::Update(delayed_link), None).await;
983
984            let automatons = Arc::new(Mutex::new(
985                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
986            ));
987            let mut reporters =
988                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
989            let sequencers = &pks[0..pks.len() / 2];
990            spawn_validator_engines::<V>(
991                context.with_label("validator"),
992                polynomial.clone(),
993                sequencers,
994                &pks,
995                &validators,
996                &mut registrations,
997                &mut automatons.lock().unwrap(),
998                &mut reporters,
999                Duration::from_millis(150),
1000                |_| false,
1001                None,
1002            );
1003
1004            await_reporters(
1005                context.with_label("reporter"),
1006                sequencers.to_vec(),
1007                &reporters,
1008                (1_000, 111, false),
1009            )
1010            .await;
1011        })
1012    }
1013
1014    #[test_traced]
1015    #[ignore]
1016    fn test_1k_min_pk() {
1017        run_1k::<MinPk>();
1018    }
1019
1020    #[test_traced]
1021    #[ignore]
1022    fn test_1k_min_sig() {
1023        run_1k::<MinSig>();
1024    }
1025}