commonware_consensus/ordered_broadcast/
mod.rs

1//! Ordered, reliable broadcast across reconfigurable participants.
2//!
3//! # Concepts
4//!
5//! The system has two types of network participants: `sequencers` and `validators`. Their sets may
6//! overlap and are defined by the current `epoch`, a monotonically increasing integer. This module
7//! can handle reconfiguration of these sets across different epochs.
8//!
9//! Sequencers broadcast data. The smallest unit of data is a `chunk`. Sequencers broadcast `node`s
10//! that contain a chunk and a threshold signature over the previous chunk, forming a linked chain
11//! of nodes from each sequencer.
12//!
13//! Validators verify and sign chunks using partial signatures. These can be combined to recover a
14//! threshold signature, ensuring a quorum verifies each chunk. The threshold signature allows
15//! external parties to confirm that the chunk was reliably broadcast.
16//!
17//! Network participants persist any new nodes to a journal. This enables recovery from crashes and
18//! ensures that sequencers do not broadcast conflicting chunks and that validators do not sign
19//! them. "Conflicting" chunks are chunks from the same sequencer at the same height with different
20//! payloads.
21//!
22//! # Design
23//!
24//! The core of the module is the [Engine]. It is responsible for:
25//! - Broadcasting nodes (if a sequencer)
26//! - Signing chunks (if a validator)
27//! - Tracking the latest chunk in each sequencer’s chain
28//! - Recovering threshold signatures from partial signatures for each chunk
29//! - Notifying other actors of new chunks and threshold signatures
30//!
31//! # Acknowledgements
32//!
33//! [Autobahn](https://arxiv.org/abs/2401.10369) provided the insight that a succinct
34//! proof-of-availability could be produced by linking sequencer broadcasts.
35
36pub mod types;
37
38cfg_if::cfg_if! {
39    if #[cfg(not(target_arch = "wasm32"))] {
40        mod ack_manager;
41        use ack_manager::AckManager;
42        mod config;
43        pub use config::Config;
44        mod engine;
45        pub use engine::Engine;
46        mod metrics;
47        mod tip_manager;
48        use tip_manager::TipManager;
49    }
50}
51
52#[cfg(test)]
53pub mod mocks;
54
55#[cfg(test)]
56mod tests {
57    use super::{mocks, types::Epoch, Config, Engine};
58    use commonware_cryptography::{
59        bls12381::{
60            dkg::ops,
61            primitives::{
62                group::Share,
63                poly,
64                variant::{MinPk, MinSig, Variant},
65            },
66        },
67        ed25519::{PrivateKey, PublicKey},
68        sha256::Digest as Sha256Digest,
69        PrivateKeyExt as _, Signer as _,
70    };
71    use commonware_macros::test_traced;
72    use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
73    use commonware_runtime::{
74        buffer::PoolRef,
75        deterministic::{self, Context},
76        Clock, Metrics, Runner, Spawner,
77    };
78    use commonware_utils::{quorum, NZUsize};
79    use futures::{channel::oneshot, future::join_all};
80    use rand::{rngs::StdRng, SeedableRng as _};
81    use std::{
82        collections::{BTreeMap, HashMap, HashSet},
83        num::NonZeroUsize,
84        sync::{Arc, Mutex},
85        time::Duration,
86    };
87    use tracing::debug;
88
89    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
90    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
91
92    type Registrations<P> = BTreeMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))>;
93
94    async fn register_participants(
95        oracle: &mut Oracle<PublicKey>,
96        participants: &[PublicKey],
97    ) -> Registrations<PublicKey> {
98        let mut registrations = BTreeMap::new();
99        for participant in participants.iter() {
100            let (a1, a2) = oracle.register(participant.clone(), 0).await.unwrap();
101            let (b1, b2) = oracle.register(participant.clone(), 1).await.unwrap();
102            registrations.insert(participant.clone(), ((a1, a2), (b1, b2)));
103        }
104        registrations
105    }
106
107    #[allow(dead_code)]
108    enum Action {
109        Link(Link),
110        Update(Link),
111        Unlink,
112    }
113
114    async fn link_participants(
115        oracle: &mut Oracle<PublicKey>,
116        participants: &[PublicKey],
117        action: Action,
118        restrict_to: Option<fn(usize, usize, usize) -> bool>,
119    ) {
120        for (i1, v1) in participants.iter().enumerate() {
121            for (i2, v2) in participants.iter().enumerate() {
122                if v2 == v1 {
123                    continue;
124                }
125                if let Some(f) = restrict_to {
126                    if !f(participants.len(), i1, i2) {
127                        continue;
128                    }
129                }
130                if matches!(action, Action::Update(_) | Action::Unlink) {
131                    oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
132                }
133                if let Action::Link(ref link) | Action::Update(ref link) = action {
134                    oracle
135                        .add_link(v1.clone(), v2.clone(), link.clone())
136                        .await
137                        .unwrap();
138                }
139            }
140        }
141    }
142
143    async fn initialize_simulation(
144        context: Context,
145        num_validators: u32,
146        shares_vec: &mut [Share],
147    ) -> (
148        Oracle<PublicKey>,
149        Vec<(PublicKey, PrivateKey, Share)>,
150        Vec<PublicKey>,
151        Registrations<PublicKey>,
152    ) {
153        let (network, mut oracle) = Network::new(
154            context.with_label("network"),
155            commonware_p2p::simulated::Config {
156                max_size: 1024 * 1024,
157            },
158        );
159        network.start();
160
161        let mut schemes = (0..num_validators)
162            .map(|i| PrivateKey::from_seed(i as u64))
163            .collect::<Vec<_>>();
164        schemes.sort_by_key(|s| s.public_key());
165        let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
166            .iter()
167            .enumerate()
168            .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
169            .collect();
170        let pks = validators
171            .iter()
172            .map(|(pk, _, _)| pk.clone())
173            .collect::<Vec<_>>();
174
175        let registrations = register_participants(&mut oracle, &pks).await;
176        let link = Link {
177            latency: Duration::from_millis(10),
178            jitter: Duration::from_millis(1),
179            success_rate: 1.0,
180        };
181        link_participants(&mut oracle, &pks, Action::Link(link), None).await;
182        (oracle, validators, pks, registrations)
183    }
184
185    #[allow(clippy::too_many_arguments)]
186    fn spawn_validator_engines<V: Variant>(
187        context: Context,
188        polynomial: poly::Public<V>,
189        sequencer_pks: &[PublicKey],
190        validator_pks: &[PublicKey],
191        validators: &[(PublicKey, PrivateKey, Share)],
192        registrations: &mut Registrations<PublicKey>,
193        automatons: &mut BTreeMap<PublicKey, mocks::Automaton<PublicKey>>,
194        reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>,
195        rebroadcast_timeout: Duration,
196        invalid_when: fn(u64) -> bool,
197        misses_allowed: Option<usize>,
198    ) -> HashMap<PublicKey, mocks::Monitor> {
199        let mut monitors = HashMap::new();
200        let namespace = b"my testing namespace";
201        for (validator, scheme, share) in validators.iter() {
202            let context = context.with_label(&validator.to_string());
203            let monitor = mocks::Monitor::new(111);
204            monitors.insert(validator.clone(), monitor.clone());
205            let sequencers = mocks::Sequencers::<PublicKey>::new(sequencer_pks.to_vec());
206            let validators = mocks::Validators::<PublicKey, V>::new(
207                polynomial.clone(),
208                validator_pks.to_vec(),
209                Some(share.clone()),
210            );
211
212            let automaton = mocks::Automaton::<PublicKey>::new(invalid_when);
213            automatons.insert(validator.clone(), automaton.clone());
214
215            let (reporter, reporter_mailbox) = mocks::Reporter::<PublicKey, V, Sha256Digest>::new(
216                namespace,
217                *poly::public::<V>(&polynomial),
218                misses_allowed,
219            );
220            context.with_label("reporter").spawn(|_| reporter.run());
221            reporters.insert(validator.clone(), reporter_mailbox);
222
223            let engine = Engine::new(
224                context.with_label("engine"),
225                Config {
226                    crypto: scheme.clone(),
227                    relay: automaton.clone(),
228                    automaton: automaton.clone(),
229                    reporter: reporters.get(validator).unwrap().clone(),
230                    monitor,
231                    sequencers,
232                    validators,
233                    namespace: namespace.to_vec(),
234                    epoch_bounds: (1, 1),
235                    height_bound: 2,
236                    rebroadcast_timeout,
237                    priority_acks: false,
238                    priority_proposals: false,
239                    journal_heights_per_section: 10,
240                    journal_replay_buffer: NZUsize!(4096),
241                    journal_write_buffer: NZUsize!(4096),
242                    journal_name_prefix: format!("ordered-broadcast-seq/{validator}/"),
243                    journal_compression: Some(3),
244                    journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
245                },
246            );
247
248            let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
249            engine.start((a1, a2), (b1, b2));
250        }
251        monitors
252    }
253
254    async fn await_reporters<V: Variant>(
255        context: Context,
256        sequencers: Vec<PublicKey>,
257        reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>,
258        threshold: (u64, Epoch, bool),
259    ) {
260        let mut receivers = Vec::new();
261        for (reporter, mailbox) in reporters.iter() {
262            // Spawn a watcher for the reporter.
263            for sequencer in sequencers.iter() {
264                // Create a oneshot channel to signal when the reporter has reached the threshold.
265                let (tx, rx) = oneshot::channel();
266                receivers.push(rx);
267
268                context.with_label("reporter_watcher").spawn({
269                    let reporter = reporter.clone();
270                    let sequencer = sequencer.clone();
271                    let mut mailbox = mailbox.clone();
272                    move |context| async move {
273                        loop {
274                            let (height, epoch) =
275                                mailbox.get_tip(sequencer.clone()).await.unwrap_or((0, 0));
276                            debug!(height, epoch, ?sequencer, ?reporter, "reporter");
277                            let contiguous_height = mailbox
278                                .get_contiguous_tip(sequencer.clone())
279                                .await
280                                .unwrap_or(0);
281                            if height >= threshold.0
282                                && epoch >= threshold.1
283                                && (!threshold.2 || contiguous_height >= threshold.0)
284                            {
285                                let _ = tx.send(sequencer.clone());
286                                break;
287                            }
288                            context.sleep(Duration::from_millis(100)).await;
289                        }
290                    }
291                });
292            }
293        }
294
295        // Wait for all oneshot receivers to complete.
296        let results = join_all(receivers).await;
297        assert_eq!(results.len(), sequencers.len() * reporters.len());
298
299        // Check that none were cancelled.
300        for result in results {
301            assert!(result.is_ok(), "reporter was cancelled");
302        }
303    }
304
305    async fn get_max_height<V: Variant>(
306        reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>,
307    ) -> u64 {
308        let mut max_height = 0;
309        for (sequencer, mailbox) in reporters.iter_mut() {
310            let (height, _) = mailbox.get_tip(sequencer.clone()).await.unwrap_or((0, 0));
311            if height > max_height {
312                max_height = height;
313            }
314        }
315        max_height
316    }
317
318    fn all_online<V: Variant>() {
319        let num_validators: u32 = 4;
320        let quorum: u32 = 3;
321        let runner = deterministic::Runner::timed(Duration::from_secs(30));
322
323        runner.start(|mut context| async move {
324            let (polynomial, mut shares_vec) =
325                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
326            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
327
328            let (_oracle, validators, pks, mut registrations) = initialize_simulation(
329                context.with_label("simulation"),
330                num_validators,
331                &mut shares_vec,
332            )
333            .await;
334            let automatons = Arc::new(Mutex::new(
335                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
336            ));
337            let mut reporters =
338                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
339            spawn_validator_engines::<V>(
340                context.with_label("validator"),
341                polynomial.clone(),
342                &pks,
343                &pks,
344                &validators,
345                &mut registrations,
346                &mut automatons.lock().unwrap(),
347                &mut reporters,
348                Duration::from_secs(5),
349                |_| false,
350                Some(5),
351            );
352            await_reporters(
353                context.with_label("reporter"),
354                reporters.keys().cloned().collect::<Vec<_>>(),
355                &reporters,
356                (100, 111, true),
357            )
358            .await;
359        });
360    }
361
362    #[test_traced]
363    fn test_all_online() {
364        all_online::<MinPk>();
365        all_online::<MinSig>();
366    }
367
368    fn unclean_shutdown<V: Variant>() {
369        let num_validators: u32 = 4;
370        let quorum: u32 = 3;
371        let mut rng = StdRng::seed_from_u64(0);
372        let (polynomial, mut shares_vec) =
373            ops::generate_shares::<_, V>(&mut rng, None, num_validators, quorum);
374        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
375        let completed = Arc::new(Mutex::new(HashSet::new()));
376        let shutdowns = Arc::new(Mutex::new(0u64));
377        let mut prev_ctx = None;
378
379        while completed.lock().unwrap().len() != num_validators as usize {
380            let completed = completed.clone();
381            let shares_vec = shares_vec.clone();
382            let shutdowns = shutdowns.clone();
383            let polynomial = polynomial.clone();
384
385            let f = |context: deterministic::Context| async move {
386                let (network, mut oracle) = Network::new(
387                    context.with_label("network"),
388                    commonware_p2p::simulated::Config {
389                        max_size: 1024 * 1024,
390                    },
391                );
392                network.start();
393
394                let mut schemes = (0..num_validators)
395                    .map(|i| PrivateKey::from_seed(i as u64))
396                    .collect::<Vec<_>>();
397                schemes.sort_by_key(|s| s.public_key());
398                let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
399                    .iter()
400                    .enumerate()
401                    .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i].clone()))
402                    .collect();
403                let pks = validators
404                    .iter()
405                    .map(|(pk, _, _)| pk.clone())
406                    .collect::<Vec<_>>();
407
408                let mut registrations = register_participants(&mut oracle, &pks).await;
409                let link = commonware_p2p::simulated::Link {
410                    latency: Duration::from_millis(10),
411                    jitter: Duration::from_millis(1),
412                    success_rate: 1.0,
413                };
414                link_participants(&mut oracle, &pks, Action::Link(link), None).await;
415
416                let automatons = Arc::new(Mutex::new(BTreeMap::<
417                    PublicKey,
418                    mocks::Automaton<PublicKey>,
419                >::new()));
420                let mut reporters =
421                    BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new(
422                    );
423                spawn_validator_engines(
424                    context.with_label("validator"),
425                    polynomial.clone(),
426                    &pks,
427                    &pks,
428                    &validators,
429                    &mut registrations,
430                    &mut automatons.lock().unwrap(),
431                    &mut reporters,
432                    Duration::from_secs(5),
433                    |_| false,
434                    None,
435                );
436
437                let reporter_pairs: Vec<(
438                    PublicKey,
439                    mocks::ReporterMailbox<PublicKey, V, Sha256Digest>,
440                )> = reporters
441                    .iter()
442                    .map(|(v, m)| (v.clone(), m.clone()))
443                    .collect();
444                for (validator, mut mailbox) in reporter_pairs {
445                    let completed_clone = completed.clone();
446                    context
447                        .with_label("reporter_unclean")
448                        .spawn(|context| async move {
449                            loop {
450                                let (height, _) =
451                                    mailbox.get_tip(validator.clone()).await.unwrap_or((0, 0));
452                                if height >= 100 {
453                                    completed_clone.lock().unwrap().insert(validator.clone());
454                                    break;
455                                }
456                                context.sleep(Duration::from_millis(100)).await;
457                            }
458                        });
459                }
460                context.sleep(Duration::from_millis(1000)).await;
461                *shutdowns.lock().unwrap() += 1;
462
463                context
464            };
465
466            let context = if let Some(prev_ctx) = prev_ctx {
467                deterministic::Runner::from(prev_ctx)
468            } else {
469                deterministic::Runner::timed(Duration::from_secs(45))
470            }
471            .start(f);
472
473            prev_ctx = Some(context.recover());
474        }
475    }
476
477    #[test_traced]
478    fn test_unclean_shutdown() {
479        unclean_shutdown::<MinPk>();
480        unclean_shutdown::<MinSig>();
481    }
482
483    fn network_partition<V: Variant>() {
484        let num_validators: u32 = 4;
485        let quorum: u32 = 3;
486        let runner = deterministic::Runner::timed(Duration::from_secs(60));
487
488        runner.start(|mut context| async move {
489            let (polynomial, mut shares_vec) =
490                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
491            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
492
493            // Configure the network
494            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
495                context.with_label("simulation"),
496                num_validators,
497                &mut shares_vec,
498            )
499            .await;
500            let automatons = Arc::new(Mutex::new(
501                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
502            ));
503            let mut reporters =
504                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
505            spawn_validator_engines(
506                context.with_label("validator"),
507                polynomial.clone(),
508                &pks,
509                &pks,
510                &validators,
511                &mut registrations,
512                &mut automatons.lock().unwrap(),
513                &mut reporters,
514                Duration::from_secs(1),
515                |_| false,
516                None,
517            );
518
519            // Simulate partition by removing all links.
520            link_participants(&mut oracle, &pks, Action::Unlink, None).await;
521            context.sleep(Duration::from_secs(30)).await;
522
523            // Get the maximum height from all reporters.
524            let max_height = get_max_height(&mut reporters).await;
525
526            // Heal the partition by re-adding links.
527            let link = Link {
528                latency: Duration::from_millis(10),
529                jitter: Duration::from_millis(1),
530                success_rate: 1.0,
531            };
532            link_participants(&mut oracle, &pks, Action::Link(link), None).await;
533            await_reporters(
534                context.with_label("reporter"),
535                reporters.keys().cloned().collect::<Vec<_>>(),
536                &reporters,
537                (max_height + 100, 111, false),
538            )
539            .await;
540        });
541    }
542
543    #[test_traced]
544    #[ignore]
545    fn test_network_partition() {
546        network_partition::<MinPk>();
547        network_partition::<MinSig>();
548    }
549
550    fn slow_and_lossy_links<V: Variant>(seed: u64) -> String {
551        let num_validators: u32 = 4;
552        let quorum: u32 = 3;
553        let cfg = deterministic::Config::new()
554            .with_seed(seed)
555            .with_timeout(Some(Duration::from_secs(40)));
556        let runner = deterministic::Runner::new(cfg);
557
558        runner.start(|mut context| async move {
559            let (polynomial, mut shares_vec) =
560                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
561            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
562
563            let (oracle, validators, pks, mut registrations) = initialize_simulation(
564                context.with_label("simulation"),
565                num_validators,
566                &mut shares_vec,
567            )
568            .await;
569            let delayed_link = Link {
570                latency: Duration::from_millis(50),
571                jitter: Duration::from_millis(40),
572                success_rate: 0.5,
573            };
574            let mut oracle_clone = oracle.clone();
575            link_participants(&mut oracle_clone, &pks, Action::Update(delayed_link), None).await;
576
577            let automatons = Arc::new(Mutex::new(
578                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
579            ));
580            let mut reporters =
581                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
582            spawn_validator_engines(
583                context.with_label("validator"),
584                polynomial.clone(),
585                &pks,
586                &pks,
587                &validators,
588                &mut registrations,
589                &mut automatons.lock().unwrap(),
590                &mut reporters,
591                Duration::from_millis(150),
592                |_| false,
593                None,
594            );
595
596            await_reporters(
597                context.with_label("reporter"),
598                reporters.keys().cloned().collect::<Vec<_>>(),
599                &reporters,
600                (40, 111, false),
601            )
602            .await;
603
604            context.auditor().state()
605        })
606    }
607
608    #[test_traced]
609    fn test_slow_and_lossy_links() {
610        slow_and_lossy_links::<MinPk>(0);
611        slow_and_lossy_links::<MinSig>(0);
612    }
613
614    #[test_traced]
615    #[ignore]
616    fn test_determinism() {
617        // We use slow and lossy links as the deterministic test
618        // because it is the most complex test.
619        for seed in 1..6 {
620            let pk_state_1 = slow_and_lossy_links::<MinPk>(seed);
621            let pk_state_2 = slow_and_lossy_links::<MinPk>(seed);
622            assert_eq!(pk_state_1, pk_state_2);
623
624            let sig_state_1 = slow_and_lossy_links::<MinSig>(seed);
625            let sig_state_2 = slow_and_lossy_links::<MinSig>(seed);
626            assert_eq!(sig_state_1, sig_state_2);
627
628            // Sanity check that different types can't be identical.
629            assert_ne!(pk_state_1, sig_state_1);
630        }
631    }
632
633    fn invalid_signature_injection<V: Variant>() {
634        let num_validators: u32 = 4;
635        let quorum: u32 = 3;
636        let runner = deterministic::Runner::timed(Duration::from_secs(30));
637
638        runner.start(|mut context| async move {
639            let (polynomial, mut shares_vec) =
640                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
641            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
642
643            let (_oracle, validators, pks, mut registrations) = initialize_simulation(
644                context.with_label("simulation"),
645                num_validators,
646                &mut shares_vec,
647            )
648            .await;
649            let automatons = Arc::new(Mutex::new(
650                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
651            ));
652            let mut reporters =
653                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
654            spawn_validator_engines::<V>(
655                context.with_label("validator"),
656                polynomial.clone(),
657                &pks,
658                &pks,
659                &validators,
660                &mut registrations,
661                &mut automatons.lock().unwrap(),
662                &mut reporters,
663                Duration::from_secs(5),
664                |i| i % 10 == 0,
665                None,
666            );
667
668            await_reporters(
669                context.with_label("reporter"),
670                reporters.keys().cloned().collect::<Vec<_>>(),
671                &reporters,
672                (100, 111, true),
673            )
674            .await;
675        });
676    }
677
678    #[test_traced]
679    fn test_invalid_signature_injection() {
680        invalid_signature_injection::<MinPk>();
681        invalid_signature_injection::<MinSig>();
682    }
683
684    fn updated_epoch<V: Variant>() {
685        let num_validators: u32 = 4;
686        let quorum: u32 = 3;
687        let runner = deterministic::Runner::timed(Duration::from_secs(60));
688
689        runner.start(|mut context| async move {
690            let (polynomial, mut shares_vec) =
691                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
692            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
693
694            // Setup network
695            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
696                context.with_label("simulation"),
697                num_validators,
698                &mut shares_vec,
699            )
700            .await;
701            let automatons = Arc::new(Mutex::new(
702                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
703            ));
704            let mut reporters =
705                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
706            let monitors = spawn_validator_engines::<V>(
707                context.with_label("validator"),
708                polynomial.clone(),
709                &pks,
710                &pks,
711                &validators,
712                &mut registrations,
713                &mut automatons.lock().unwrap(),
714                &mut reporters,
715                Duration::from_secs(1),
716                |_| false,
717                Some(5),
718            );
719
720            // Perform some work
721            await_reporters(
722                context.with_label("reporter"),
723                reporters.keys().cloned().collect::<Vec<_>>(),
724                &reporters,
725                (100, 111, true),
726            )
727            .await;
728
729            // Simulate partition by removing all links.
730            link_participants(&mut oracle, &pks, Action::Unlink, None).await;
731            context.sleep(Duration::from_secs(30)).await;
732
733            // Get the maximum height from all reporters.
734            let max_height = get_max_height(&mut reporters).await;
735
736            // Update the epoch
737            for monitor in monitors.values() {
738                monitor.update(112);
739            }
740
741            // Heal the partition by re-adding links.
742            let link = Link {
743                latency: Duration::from_millis(10),
744                jitter: Duration::from_millis(1),
745                success_rate: 1.0,
746            };
747            link_participants(&mut oracle, &pks, Action::Link(link), None).await;
748            await_reporters(
749                context.with_label("reporter"),
750                reporters.keys().cloned().collect::<Vec<_>>(),
751                &reporters,
752                (max_height + 100, 112, true),
753            )
754            .await;
755        });
756    }
757
758    #[test_traced]
759    fn test_updated_epoch() {
760        updated_epoch::<MinPk>();
761        updated_epoch::<MinSig>();
762    }
763
764    fn external_sequencer<V: Variant>() {
765        let num_validators: u32 = 4;
766        let quorum: u32 = quorum(3);
767        let runner = deterministic::Runner::timed(Duration::from_secs(60));
768        runner.start(|mut context| async move {
769            // Generate validator shares
770            let (polynomial, shares) =
771                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
772
773            // Generate validator schemes
774            let mut schemes = (0..num_validators)
775                .map(|i| PrivateKey::from_seed(i as u64))
776                .collect::<Vec<_>>();
777            schemes.sort_by_key(|s| s.public_key());
778
779            // Generate validators
780            let validators: Vec<(PublicKey, PrivateKey, Share)> = schemes
781                .iter()
782                .enumerate()
783                .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares[i].clone()))
784                .collect();
785            let validator_pks = validators
786                .iter()
787                .map(|(pk, _, _)| pk.clone())
788                .collect::<Vec<_>>();
789
790            // Generate sequencer
791            let sequencer = PrivateKey::from_seed(u64::MAX);
792
793            // Generate network participants
794            let mut participants = validators
795                .iter()
796                .map(|(pk, _, _)| pk.clone())
797                .collect::<Vec<_>>();
798            participants.push(sequencer.public_key()); // as long as external participants are in same position for all, it is safe
799
800            // Create network
801            let (network, mut oracle) = Network::new(
802                context.with_label("network"),
803                commonware_p2p::simulated::Config {
804                    max_size: 1024 * 1024,
805                },
806            );
807            network.start();
808
809            // Register all participants
810            let mut registrations = register_participants(&mut oracle, &participants).await;
811            let link = commonware_p2p::simulated::Link {
812                latency: Duration::from_millis(10),
813                jitter: Duration::from_millis(1),
814                success_rate: 1.0,
815            };
816            link_participants(&mut oracle, &participants, Action::Link(link), None).await;
817
818            // Setup engines
819            let automatons = Arc::new(Mutex::new(
820                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
821            ));
822            let mut reporters =
823                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
824            let mut monitors = HashMap::new();
825            let namespace = b"my testing namespace";
826
827            // Spawn validator engines
828            for (validator, scheme, share) in validators.iter() {
829                let context = context.with_label(&validator.to_string());
830                let monitor = mocks::Monitor::new(111);
831                monitors.insert(validator.clone(), monitor.clone());
832                let sequencers = mocks::Sequencers::<PublicKey>::new(vec![sequencer.public_key()]);
833                let validators = mocks::Validators::<PublicKey, V>::new(
834                    polynomial.clone(),
835                    validator_pks.clone(),
836                    Some(share.clone()),
837                );
838
839                let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
840                automatons
841                    .lock()
842                    .unwrap()
843                    .insert(validator.clone(), automaton.clone());
844
845                let (reporter, reporter_mailbox) =
846                    mocks::Reporter::<PublicKey, V, Sha256Digest>::new(
847                        namespace,
848                        *poly::public::<V>(&polynomial),
849                        Some(5),
850                    );
851                context.with_label("reporter").spawn(|_| reporter.run());
852                reporters.insert(validator.clone(), reporter_mailbox);
853
854                let engine = Engine::new(
855                    context.with_label("engine"),
856                    Config {
857                        crypto: scheme.clone(),
858                        relay: automaton.clone(),
859                        automaton: automaton.clone(),
860                        reporter: reporters.get(validator).unwrap().clone(),
861                        monitor,
862                        sequencers,
863                        validators,
864                        namespace: namespace.to_vec(),
865                        epoch_bounds: (1, 1),
866                        height_bound: 2,
867                        rebroadcast_timeout: Duration::from_secs(5),
868                        priority_acks: false,
869                        priority_proposals: false,
870                        journal_heights_per_section: 10,
871                        journal_replay_buffer: NZUsize!(4096),
872                        journal_write_buffer: NZUsize!(4096),
873                        journal_name_prefix: format!("ordered-broadcast-seq/{validator}/"),
874                        journal_compression: Some(3),
875                        journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
876                    },
877                );
878
879                let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
880                engine.start((a1, a2), (b1, b2));
881            }
882
883            // Spawn sequencer engine
884            {
885                let context = context.with_label("sequencer");
886                let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
887                automatons
888                    .lock()
889                    .unwrap()
890                    .insert(sequencer.public_key(), automaton.clone());
891                let (reporter, reporter_mailbox) =
892                    mocks::Reporter::<PublicKey, V, Sha256Digest>::new(
893                        namespace,
894                        *poly::public::<V>(&polynomial),
895                        Some(5),
896                    );
897                context.with_label("reporter").spawn(|_| reporter.run());
898                reporters.insert(sequencer.public_key(), reporter_mailbox);
899                let engine = Engine::new(
900                    context.with_label("engine"),
901                    Config {
902                        crypto: sequencer.clone(),
903                        relay: automaton.clone(),
904                        automaton: automaton.clone(),
905                        reporter: reporters.get(&sequencer.public_key()).unwrap().clone(),
906                        monitor: mocks::Monitor::new(111),
907                        sequencers: mocks::Sequencers::<PublicKey>::new(vec![
908                            sequencer.public_key()
909                        ]),
910                        validators: mocks::Validators::<PublicKey, V>::new(
911                            polynomial.clone(),
912                            validator_pks,
913                            None,
914                        ),
915                        namespace: namespace.to_vec(),
916                        epoch_bounds: (1, 1),
917                        height_bound: 2,
918                        rebroadcast_timeout: Duration::from_secs(5),
919                        priority_acks: false,
920                        priority_proposals: false,
921                        journal_heights_per_section: 10,
922                        journal_replay_buffer: NZUsize!(4096),
923                        journal_write_buffer: NZUsize!(4096),
924                        journal_name_prefix: format!(
925                            "ordered-broadcast-seq/{}/",
926                            sequencer.public_key()
927                        ),
928                        journal_compression: Some(3),
929                        journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
930                    },
931                );
932
933                let ((a1, a2), (b1, b2)) = registrations.remove(&sequencer.public_key()).unwrap();
934                engine.start((a1, a2), (b1, b2));
935            }
936
937            // Await reporters
938            await_reporters(
939                context.with_label("reporter"),
940                vec![sequencer.public_key()],
941                &reporters,
942                (100, 111, true),
943            )
944            .await;
945        });
946    }
947
948    #[test_traced]
949    fn test_external_sequencer() {
950        external_sequencer::<MinPk>();
951        external_sequencer::<MinSig>();
952    }
953
954    fn run_1k<V: Variant>() {
955        let num_validators: u32 = 10;
956        let quorum: u32 = 3;
957        let cfg = deterministic::Config::new();
958        let runner = deterministic::Runner::new(cfg);
959
960        runner.start(|mut context| async move {
961            let (polynomial, mut shares_vec) =
962                ops::generate_shares::<_, V>(&mut context, None, num_validators, quorum);
963            shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
964
965            let (oracle, validators, pks, mut registrations) = initialize_simulation(
966                context.with_label("simulation"),
967                num_validators,
968                &mut shares_vec,
969            )
970            .await;
971            let delayed_link = Link {
972                latency: Duration::from_millis(80),
973                jitter: Duration::from_millis(10),
974                success_rate: 0.98,
975            };
976            let mut oracle_clone = oracle.clone();
977            link_participants(&mut oracle_clone, &pks, Action::Update(delayed_link), None).await;
978
979            let automatons = Arc::new(Mutex::new(
980                BTreeMap::<PublicKey, mocks::Automaton<PublicKey>>::new(),
981            ));
982            let mut reporters =
983                BTreeMap::<PublicKey, mocks::ReporterMailbox<PublicKey, V, Sha256Digest>>::new();
984            let sequencers = &pks[0..pks.len() / 2];
985            spawn_validator_engines::<V>(
986                context.with_label("validator"),
987                polynomial.clone(),
988                sequencers,
989                &pks,
990                &validators,
991                &mut registrations,
992                &mut automatons.lock().unwrap(),
993                &mut reporters,
994                Duration::from_millis(150),
995                |_| false,
996                None,
997            );
998
999            await_reporters(
1000                context.with_label("reporter"),
1001                sequencers.to_vec(),
1002                &reporters,
1003                (1_000, 111, false),
1004            )
1005            .await;
1006        })
1007    }
1008
1009    #[test_traced]
1010    #[ignore]
1011    fn test_1k() {
1012        run_1k::<MinPk>();
1013        run_1k::<MinSig>();
1014    }
1015}