commonware_consensus/ordered_broadcast/
mod.rs

1//! Ordered, reliable broadcast across reconfigurable participants.
2//!
3//! # Concepts
4//!
5//! The system has two types of network participants: `sequencers` and `validators`. Their sets may
6//! overlap and are defined by the current `epoch`, a monotonically increasing integer. This module
7//! can handle reconfiguration of these sets across different epochs.
8//!
9//! Sequencers broadcast data. The smallest unit of data is a `chunk`. Sequencers broadcast `node`s
10//! that contain a chunk and a certificate over the previous chunk, forming a linked chain
11//! of nodes from each sequencer.
12//!
13//! Validators verify and sign chunks. These signatures can be combined to form a quorum
14//! certificate, ensuring a quorum verifies each chunk. The certificate allows external parties
15//! to confirm that the chunk was reliably broadcast.
16//!
17//! Network participants persist any new nodes to a journal. This enables recovery from crashes and
18//! ensures that sequencers do not broadcast conflicting chunks and that validators do not sign
19//! them. "Conflicting" chunks are chunks from the same sequencer at the same height with different
20//! payloads.
21//!
22//! # Pluggable Cryptography
23//!
24//! The ordered broadcast module is generic over the signing scheme, allowing users to choose the
25//! cryptographic scheme best suited for their requirements:
26//!
27//! - [`ed25519`][scheme::ed25519]: Attributable signatures with individual verification.
28//!   HSM-friendly, no trusted setup required. Certificates contain individual signatures.
29//!
30//! - [`bls12381_multisig`][scheme::bls12381_multisig]: Attributable signatures with aggregated
31//!   verification. Produces compact certificates while preserving signer attribution.
32//!
33//! - [`bls12381_threshold`][scheme::bls12381_threshold]: Non-attributable threshold signatures.
34//!   Produces succinct constant-size certificates. Requires trusted setup (DKG).
35//!
36//! # Design
37//!
38//! The core of the module is the [Engine]. It is responsible for:
39//! - Broadcasting nodes (if a sequencer)
40//! - Signing chunks (if a validator)
41//! - Tracking the latest chunk in each sequencer's chain
42//! - Assembling certificates from a quorum of signatures
43//! - Notifying other actors of new chunks and certificates
44//!
45//! # Acknowledgements
46//!
47//! [Autobahn](https://arxiv.org/abs/2401.10369) provided the insight that a succinct
48//! proof-of-availability could be produced by linking sequencer broadcasts.
49
50pub mod scheme;
51pub mod types;
52
53cfg_if::cfg_if! {
54    if #[cfg(not(target_arch = "wasm32"))] {
55        mod ack_manager;
56        use ack_manager::AckManager;
57        mod config;
58        pub use config::Config;
59        mod engine;
60        pub use engine::Engine;
61        mod metrics;
62        mod tip_manager;
63        use tip_manager::TipManager;
64    }
65}
66
67#[cfg(test)]
68pub mod mocks;
69
70#[cfg(test)]
71mod tests {
72    use super::{mocks, Config, Engine};
73    use crate::{
74        ordered_broadcast::scheme::{bls12381_multisig, bls12381_threshold, ed25519, Scheme},
75        types::{Epoch, EpochDelta},
76    };
77    use commonware_cryptography::{
78        bls12381::primitives::variant::{MinPk, MinSig},
79        certificate::{self, mocks::Fixture},
80        ed25519::{PrivateKey, PublicKey},
81        sha256::Digest as Sha256Digest,
82        Signer as _,
83    };
84    use commonware_macros::{select, test_group, test_traced};
85    use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
86    use commonware_runtime::{
87        buffer::PoolRef,
88        deterministic::{self, Context},
89        Clock, Metrics, Quota, Runner, Spawner,
90    };
91    use commonware_utils::NZUsize;
92    use futures::{channel::oneshot, future::join_all};
93    use std::{
94        collections::{BTreeMap, HashMap},
95        num::{NonZeroU32, NonZeroUsize},
96        time::Duration,
97    };
98    use tracing::debug;
99
100    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
101    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
102    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
103
104    type Registrations<P> = BTreeMap<
105        P,
106        (
107            (Sender<P, deterministic::Context>, Receiver<P>),
108            (Sender<P, deterministic::Context>, Receiver<P>),
109        ),
110    >;
111
112    async fn register_participants(
113        oracle: &mut Oracle<PublicKey, deterministic::Context>,
114        participants: &[PublicKey],
115    ) -> Registrations<PublicKey> {
116        let mut registrations = BTreeMap::new();
117        for participant in participants.iter() {
118            let mut control = oracle.control(participant.clone());
119            let (a1, a2) = control.register(0, TEST_QUOTA).await.unwrap();
120            let (b1, b2) = control.register(1, TEST_QUOTA).await.unwrap();
121            registrations.insert(participant.clone(), ((a1, a2), (b1, b2)));
122        }
123        registrations
124    }
125
126    enum Action {
127        Link(Link),
128        Update(Link),
129        Unlink,
130    }
131
132    async fn link_participants(
133        oracle: &mut Oracle<PublicKey, deterministic::Context>,
134        participants: &[PublicKey],
135        action: Action,
136        restrict_to: Option<fn(usize, usize, usize) -> bool>,
137    ) {
138        for (i1, v1) in participants.iter().enumerate() {
139            for (i2, v2) in participants.iter().enumerate() {
140                if v2 == v1 {
141                    continue;
142                }
143                if let Some(f) = restrict_to {
144                    if !f(participants.len(), i1, i2) {
145                        continue;
146                    }
147                }
148                if matches!(action, Action::Update(_) | Action::Unlink) {
149                    oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
150                }
151                if let Action::Link(ref link) | Action::Update(ref link) = action {
152                    oracle
153                        .add_link(v1.clone(), v2.clone(), link.clone())
154                        .await
155                        .unwrap();
156                }
157            }
158        }
159    }
160
161    const RELIABLE_LINK: Link = Link {
162        latency: Duration::from_millis(10),
163        jitter: Duration::from_millis(1),
164        success_rate: 1.0,
165    };
166
167    async fn initialize_simulation<S: certificate::Scheme>(
168        context: Context,
169        fixture: &Fixture<S>,
170        link: Link,
171    ) -> (
172        Oracle<PublicKey, deterministic::Context>,
173        Registrations<PublicKey>,
174    ) {
175        let (network, mut oracle) = Network::new(
176            context.with_label("network"),
177            commonware_p2p::simulated::Config {
178                max_size: 1024 * 1024,
179                disconnect_on_block: true,
180                tracked_peer_sets: None,
181            },
182        );
183        network.start();
184
185        let registrations = register_participants(&mut oracle, &fixture.participants).await;
186        link_participants(&mut oracle, &fixture.participants, Action::Link(link), None).await;
187        (oracle, registrations)
188    }
189
190    #[allow(clippy::too_many_arguments)]
191    fn spawn_validator_engines<S>(
192        context: Context,
193        fixture: &Fixture<S>,
194        sequencer_pks: &[PublicKey],
195        registrations: &mut Registrations<PublicKey>,
196        rebroadcast_timeout: Duration,
197        invalid_when: fn(u64) -> bool,
198        misses_allowed: Option<usize>,
199        epoch: Epoch,
200    ) -> BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, S, Sha256Digest>>
201    where
202        S: Scheme<PublicKey, Sha256Digest>,
203    {
204        let mut reporters = BTreeMap::new();
205        let namespace = b"my testing namespace";
206
207        for (idx, validator) in fixture.participants.iter().enumerate() {
208            let context = context.with_label(&format!("validator_{validator}"));
209            let monitor = mocks::Monitor::new(epoch);
210            let sequencers = mocks::Sequencers::<PublicKey>::new(sequencer_pks.to_vec());
211
212            // Create Provider and register only this validator's scheme for the epoch
213            let validators_provider = mocks::Provider::new();
214            assert!(validators_provider.register(epoch, fixture.schemes[idx].clone()));
215
216            let automaton = mocks::Automaton::<PublicKey>::new(invalid_when);
217            let (reporter, reporter_mailbox) = mocks::Reporter::new(
218                context.clone(),
219                namespace,
220                fixture.verifier.clone(),
221                misses_allowed,
222            );
223            context.with_label("reporter").spawn(|_| reporter.run());
224            reporters.insert(validator.clone(), reporter_mailbox);
225
226            let engine = Engine::new(
227                context.with_label("engine"),
228                Config {
229                    sequencer_signer: Some(fixture.private_keys[idx].clone()),
230                    sequencers_provider: sequencers,
231                    validators_provider,
232                    automaton: automaton.clone(),
233                    relay: automaton.clone(),
234                    reporter: reporters.get(validator).unwrap().clone(),
235                    monitor,
236                    namespace: namespace.to_vec(),
237                    priority_proposals: false,
238                    priority_acks: false,
239                    rebroadcast_timeout,
240                    epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
241                    height_bound: 2,
242                    journal_heights_per_section: 10,
243                    journal_replay_buffer: NZUsize!(4096),
244                    journal_write_buffer: NZUsize!(4096),
245                    journal_name_prefix: format!("ordered-broadcast-seq-{validator}-"),
246                    journal_compression: Some(3),
247                    journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
248                },
249            );
250
251            let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
252            engine.start((a1, a2), (b1, b2));
253        }
254        reporters
255    }
256
257    async fn await_reporters<S>(
258        context: Context,
259        sequencers: Vec<PublicKey>,
260        reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, S, Sha256Digest>>,
261        threshold: (u64, Epoch, bool),
262    ) where
263        S: certificate::Scheme,
264    {
265        let (threshold_height, threshold_epoch, require_contiguous) =
266            (threshold.0, threshold.1, threshold.2);
267        let mut receivers = Vec::new();
268        for (reporter, mailbox) in reporters.iter() {
269            // Spawn a watcher for the reporter.
270            for sequencer in sequencers.iter() {
271                // Create a oneshot channel to signal when the reporter has reached the threshold.
272                let (tx, rx) = oneshot::channel();
273                receivers.push(rx);
274
275                context.with_label("reporter_watcher").spawn({
276                    let reporter = reporter.clone();
277                    let sequencer = sequencer.clone();
278                    let mut mailbox = mailbox.clone();
279                    move |context| async move {
280                        loop {
281                            let (height, epoch) = mailbox
282                                .get_tip(sequencer.clone())
283                                .await
284                                .unwrap_or((0, Epoch::zero()));
285                            debug!(height, epoch = %epoch, ?sequencer, ?reporter, "reporter");
286                            let contiguous_height = mailbox
287                                .get_contiguous_tip(sequencer.clone())
288                                .await
289                                .unwrap_or(0);
290                            if height >= threshold_height
291                                && epoch >= threshold_epoch
292                                && (!require_contiguous || contiguous_height >= threshold_height)
293                            {
294                                let _ = tx.send(sequencer.clone());
295                                break;
296                            }
297                            context.sleep(Duration::from_millis(100)).await;
298                        }
299                    }
300                });
301            }
302        }
303
304        // Wait for all oneshot receivers to complete.
305        let results = join_all(receivers).await;
306        assert_eq!(results.len(), sequencers.len() * reporters.len());
307
308        // Check that none were cancelled.
309        for result in results {
310            assert!(result.is_ok(), "reporter was cancelled");
311        }
312    }
313
314    async fn get_max_height<S: certificate::Scheme>(
315        reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, S, Sha256Digest>>,
316    ) -> u64 {
317        let mut max_height = 0;
318        for (sequencer, mailbox) in reporters.iter_mut() {
319            let (height, _) = mailbox
320                .get_tip(sequencer.clone())
321                .await
322                .unwrap_or((0, Epoch::zero()));
323            if height > max_height {
324                max_height = height;
325            }
326        }
327        max_height
328    }
329
330    fn all_online<S, F>(fixture: F)
331    where
332        S: Scheme<PublicKey, Sha256Digest>,
333        F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
334    {
335        let runner = deterministic::Runner::timed(Duration::from_secs(120));
336
337        runner.start(|mut context| async move {
338            let epoch = Epoch::new(111);
339            let num_validators = 4;
340            let fixture = fixture(&mut context, num_validators);
341
342            let (_oracle, mut registrations) =
343                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
344                    .await;
345
346            let reporters = spawn_validator_engines(
347                context.with_label("validator"),
348                &fixture,
349                &fixture.participants,
350                &mut registrations,
351                Duration::from_secs(5),
352                |_| false,
353                Some(5),
354                epoch,
355            );
356
357            await_reporters(
358                context.with_label("reporter"),
359                reporters.keys().cloned().collect::<Vec<_>>(),
360                &reporters,
361                (100, epoch, true),
362            )
363            .await;
364        });
365    }
366
367    #[test_traced]
368    fn test_all_online() {
369        all_online(bls12381_threshold::fixture::<MinPk, _>);
370        all_online(bls12381_threshold::fixture::<MinSig, _>);
371        all_online(bls12381_multisig::fixture::<MinPk, _>);
372        all_online(bls12381_multisig::fixture::<MinSig, _>);
373        all_online(ed25519::fixture);
374    }
375
376    fn unclean_shutdown<S, F>(fixture: F)
377    where
378        S: Scheme<PublicKey, Sha256Digest>,
379        F: Fn(&mut deterministic::Context, u32) -> Fixture<S> + Clone,
380    {
381        let mut prev_checkpoint = None;
382        let epoch = Epoch::new(111);
383        let num_validators = 4;
384        let crash_after = Duration::from_secs(5);
385        let target_height = 30;
386
387        loop {
388            let fixture = fixture.clone();
389            let f = |mut context: deterministic::Context| async move {
390                let fixture = fixture(&mut context, num_validators);
391
392                let (network, mut oracle) = Network::new(
393                    context.with_label("network"),
394                    commonware_p2p::simulated::Config {
395                        max_size: 1024 * 1024,
396                        disconnect_on_block: true,
397                        tracked_peer_sets: None,
398                    },
399                );
400                network.start();
401
402                let mut registrations =
403                    register_participants(&mut oracle, &fixture.participants).await;
404                link_participants(
405                    &mut oracle,
406                    &fixture.participants,
407                    Action::Link(RELIABLE_LINK),
408                    None,
409                )
410                .await;
411
412                let reporters = spawn_validator_engines(
413                    context.with_label("validator"),
414                    &fixture,
415                    &fixture.participants,
416                    &mut registrations,
417                    Duration::from_secs(5),
418                    |_| false,
419                    None,
420                    epoch,
421                );
422
423                // Either crash after `crash_after` or succeed once everyone reaches `target_height`.
424                let crash = context.sleep(crash_after);
425                let run = await_reporters(
426                    context.with_label("reporter"),
427                    reporters.keys().cloned().collect::<Vec<_>>(),
428                    &reporters,
429                    (target_height, epoch, true),
430                );
431
432                select! {
433                    _ = crash => { false },
434                    _ = run => { true },
435                }
436            };
437
438            let (complete, checkpoint) = prev_checkpoint
439                .map_or_else(
440                    || deterministic::Runner::timed(Duration::from_secs(180)),
441                    deterministic::Runner::from,
442                )
443                .start_and_recover(f);
444
445            if complete {
446                break;
447            }
448
449            prev_checkpoint = Some(checkpoint);
450        }
451    }
452
453    #[test_traced]
454    fn test_unclean_shutdown() {
455        unclean_shutdown(bls12381_threshold::fixture::<MinPk, _>);
456        unclean_shutdown(bls12381_threshold::fixture::<MinSig, _>);
457        unclean_shutdown(bls12381_multisig::fixture::<MinPk, _>);
458        unclean_shutdown(bls12381_multisig::fixture::<MinSig, _>);
459        unclean_shutdown(ed25519::fixture);
460    }
461
462    fn network_partition<S, F>(fixture: F)
463    where
464        S: Scheme<PublicKey, Sha256Digest>,
465        F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
466    {
467        let runner = deterministic::Runner::timed(Duration::from_secs(60));
468
469        runner.start(|mut context| async move {
470            let epoch = Epoch::new(111);
471            let num_validators = 4;
472            let fixture = fixture(&mut context, num_validators);
473
474            // Configure the network
475            let (mut oracle, mut registrations) =
476                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
477                    .await;
478            let mut reporters = spawn_validator_engines(
479                context.with_label("validator"),
480                &fixture,
481                &fixture.participants,
482                &mut registrations,
483                Duration::from_secs(1),
484                |_| false,
485                None,
486                epoch,
487            );
488
489            // Simulate partition by removing all links.
490            link_participants(&mut oracle, &fixture.participants, Action::Unlink, None).await;
491            context.sleep(Duration::from_secs(30)).await;
492
493            // Get the maximum height from all reporters.
494            let max_height = get_max_height(&mut reporters).await;
495
496            // Heal the partition by re-adding links.
497            link_participants(
498                &mut oracle,
499                &fixture.participants,
500                Action::Link(RELIABLE_LINK),
501                None,
502            )
503            .await;
504            await_reporters(
505                context.with_label("reporter"),
506                reporters.keys().cloned().collect::<Vec<_>>(),
507                &reporters,
508                (max_height + 100, epoch, false),
509            )
510            .await;
511        });
512    }
513
514    #[test_group("slow")]
515    #[test_traced]
516    fn test_network_partition() {
517        network_partition(bls12381_threshold::fixture::<MinPk, _>);
518        network_partition(bls12381_threshold::fixture::<MinSig, _>);
519        network_partition(bls12381_multisig::fixture::<MinPk, _>);
520        network_partition(bls12381_multisig::fixture::<MinSig, _>);
521        network_partition(ed25519::fixture);
522    }
523
524    fn slow_and_lossy_links<S, F>(fixture: F, seed: u64) -> String
525    where
526        S: Scheme<PublicKey, Sha256Digest>,
527        F: Fn(&mut deterministic::Context, u32) -> Fixture<S>,
528    {
529        let cfg = deterministic::Config::new()
530            .with_seed(seed)
531            .with_timeout(Some(Duration::from_secs(40)));
532        let runner = deterministic::Runner::new(cfg);
533
534        runner.start(|mut context| async move {
535            let epoch = Epoch::new(111);
536            let num_validators = 4;
537            let fixture = fixture(&mut context, num_validators);
538
539            let (mut oracle, mut registrations) =
540                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
541                    .await;
542            let delayed_link = Link {
543                latency: Duration::from_millis(50),
544                jitter: Duration::from_millis(40),
545                success_rate: 0.5,
546            };
547            link_participants(
548                &mut oracle,
549                &fixture.participants,
550                Action::Update(delayed_link),
551                None,
552            )
553            .await;
554
555            let reporters = spawn_validator_engines(
556                context.with_label("validator"),
557                &fixture,
558                &fixture.participants,
559                &mut registrations,
560                Duration::from_millis(150),
561                |_| false,
562                None,
563                epoch,
564            );
565
566            await_reporters(
567                context.with_label("reporter"),
568                reporters.keys().cloned().collect::<Vec<_>>(),
569                &reporters,
570                (40, epoch, false),
571            )
572            .await;
573
574            context.auditor().state()
575        })
576    }
577
578    #[test_traced]
579    fn test_slow_and_lossy_links() {
580        slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, 0);
581        slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, 0);
582        slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, 0);
583        slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, 0);
584        slow_and_lossy_links(ed25519::fixture, 0);
585    }
586
587    #[test_group("slow")]
588    #[test_traced]
589    fn test_determinism() {
590        // We use slow and lossy links as the deterministic test
591        // because it is the most complex test.
592        for seed in 1..6 {
593            // Test BLS threshold MinPk
594            let ts_pk_state_1 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
595            let ts_pk_state_2 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
596            assert_eq!(ts_pk_state_1, ts_pk_state_2);
597
598            // Test BLS threshold MinSig
599            let ts_sig_state_1 =
600                slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
601            let ts_sig_state_2 =
602                slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
603            assert_eq!(ts_sig_state_1, ts_sig_state_2);
604
605            // Test ed25519
606            let ed_state_1 = slow_and_lossy_links(ed25519::fixture, seed);
607            let ed_state_2 = slow_and_lossy_links(ed25519::fixture, seed);
608            assert_eq!(ed_state_1, ed_state_2);
609
610            // Test BLS multisig MinPk
611            let ms_pk_state_1 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
612            let ms_pk_state_2 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
613            assert_eq!(ms_pk_state_1, ms_pk_state_2);
614
615            // Test BLS multisig MinSig
616            let ms_sig_state_1 =
617                slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
618            let ms_sig_state_2 =
619                slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
620            assert_eq!(ms_sig_state_1, ms_sig_state_2);
621
622            let states = [
623                ("threshold-minpk", ts_pk_state_1),
624                ("threshold-minsig", ts_sig_state_1),
625                ("multisig-minpk", ms_pk_state_1),
626                ("multisig-minsig", ms_sig_state_1),
627                ("ed25519", ed_state_1),
628            ];
629
630            // Sanity check that different schemes produce different states
631            for pair in states.windows(2) {
632                assert_ne!(
633                    pair[0].1, pair[1].1,
634                    "state {} equals state {}",
635                    pair[0].0, pair[1].0
636                );
637            }
638        }
639    }
640
641    fn invalid_signature_injection<S, F>(fixture: F)
642    where
643        S: Scheme<PublicKey, Sha256Digest>,
644        F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
645    {
646        let runner = deterministic::Runner::timed(Duration::from_secs(30));
647
648        runner.start(|mut context| async move {
649            let epoch = Epoch::new(111);
650            let num_validators = 4;
651            let fixture = fixture(&mut context, num_validators);
652
653            let (_oracle, mut registrations) =
654                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
655                    .await;
656
657            let reporters = spawn_validator_engines(
658                context.with_label("validator"),
659                &fixture,
660                &fixture.participants,
661                &mut registrations,
662                Duration::from_secs(5),
663                |i| i % 10 == 0,
664                None,
665                epoch,
666            );
667
668            await_reporters(
669                context.with_label("reporter"),
670                reporters.keys().cloned().collect::<Vec<_>>(),
671                &reporters,
672                (100, epoch, true),
673            )
674            .await;
675        });
676    }
677
678    #[test_traced]
679    fn test_invalid_signature_injection() {
680        invalid_signature_injection(bls12381_threshold::fixture::<MinPk, _>);
681        invalid_signature_injection(bls12381_threshold::fixture::<MinSig, _>);
682        invalid_signature_injection(bls12381_multisig::fixture::<MinPk, _>);
683        invalid_signature_injection(bls12381_multisig::fixture::<MinSig, _>);
684        invalid_signature_injection(ed25519::fixture);
685    }
686
687    fn updated_epoch<S, F>(fixture: F)
688    where
689        S: Scheme<PublicKey, Sha256Digest>,
690        F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
691    {
692        let runner = deterministic::Runner::timed(Duration::from_secs(60));
693
694        runner.start(|mut context| async move {
695            let epoch = Epoch::new(111);
696            let num_validators = 4;
697            let fixture = fixture(&mut context, num_validators);
698
699            // Setup network
700            let (mut oracle, mut registrations) =
701                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
702                    .await;
703
704            let mut reporters = BTreeMap::new();
705
706            // Create validators instances that we can update later for epoch changes
707            let mut validators_providers = HashMap::new();
708            let mut monitors = HashMap::new();
709            let namespace = b"my testing namespace";
710
711            for (idx, validator) in fixture.participants.iter().enumerate() {
712                let context = context.with_label(&format!("validator_{validator}"));
713                let monitor = mocks::Monitor::new(epoch);
714                monitors.insert(validator.clone(), monitor.clone());
715                let sequencers = mocks::Sequencers::<PublicKey>::new(fixture.participants.clone());
716
717                // Create and store Provider so we can register new epochs later
718                let validators_provider = mocks::Provider::new();
719                assert!(validators_provider.register(epoch, fixture.schemes[idx].clone()));
720                validators_providers.insert(validator.clone(), validators_provider.clone());
721
722                let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
723                let (reporter, reporter_mailbox) = mocks::Reporter::new(
724                    context.clone(),
725                    namespace,
726                    fixture.verifier.clone(),
727                    Some(5),
728                );
729                context.with_label("reporter").spawn(|_| reporter.run());
730                reporters.insert(validator.clone(), reporter_mailbox);
731
732                let engine = Engine::new(
733                    context.with_label("engine"),
734                    Config {
735                        sequencer_signer: Some(fixture.private_keys[idx].clone()),
736                        sequencers_provider: sequencers,
737                        validators_provider,
738                        relay: automaton.clone(),
739                        automaton: automaton.clone(),
740                        reporter: reporters.get(validator).unwrap().clone(),
741                        monitor,
742                        namespace: namespace.to_vec(),
743                        epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
744                        height_bound: 2,
745                        rebroadcast_timeout: Duration::from_secs(1),
746                        priority_acks: false,
747                        priority_proposals: false,
748                        journal_heights_per_section: 10,
749                        journal_replay_buffer: NZUsize!(4096),
750                        journal_write_buffer: NZUsize!(4096),
751                        journal_name_prefix: format!("ordered-broadcast-seq-{validator}-"),
752                        journal_compression: Some(3),
753                        journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
754                    },
755                );
756
757                let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
758                engine.start((a1, a2), (b1, b2));
759            }
760
761            // Perform some work
762            await_reporters(
763                context.with_label("reporter"),
764                reporters.keys().cloned().collect::<Vec<_>>(),
765                &reporters,
766                (100, epoch, true),
767            )
768            .await;
769
770            // Simulate partition by removing all links.
771            link_participants(&mut oracle, &fixture.participants, Action::Unlink, None).await;
772            context.sleep(Duration::from_secs(30)).await;
773
774            // Get the maximum height from all reporters.
775            let max_height = get_max_height(&mut reporters).await;
776
777            // Update the epoch and register schemes for new epoch
778            let next_epoch = epoch.next();
779            for (validator, monitor) in monitors.iter() {
780                monitor.update(next_epoch);
781                // Register the scheme for the new epoch
782                let idx = fixture
783                    .participants
784                    .iter()
785                    .position(|v| v == validator)
786                    .unwrap();
787                let validators_provider = validators_providers.get(validator).unwrap();
788                assert!(validators_provider.register(next_epoch, fixture.schemes[idx].clone()));
789            }
790
791            // Heal the partition by re-adding links.
792            link_participants(
793                &mut oracle,
794                &fixture.participants,
795                Action::Link(RELIABLE_LINK),
796                None,
797            )
798            .await;
799            await_reporters(
800                context.with_label("reporter"),
801                reporters.keys().cloned().collect::<Vec<_>>(),
802                &reporters,
803                (max_height + 100, next_epoch, true),
804            )
805            .await;
806        });
807    }
808
809    #[test_group("slow")]
810    #[test_traced]
811    fn test_updated_epoch() {
812        updated_epoch(bls12381_threshold::fixture::<MinPk, _>);
813        updated_epoch(bls12381_threshold::fixture::<MinSig, _>);
814        updated_epoch(bls12381_multisig::fixture::<MinPk, _>);
815        updated_epoch(bls12381_multisig::fixture::<MinSig, _>);
816        updated_epoch(ed25519::fixture);
817    }
818
819    fn external_sequencer<S, F>(fixture: F)
820    where
821        S: Scheme<PublicKey, Sha256Digest>,
822        F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
823    {
824        let runner = deterministic::Runner::timed(Duration::from_secs(60));
825        runner.start(|mut context| async move {
826            let epoch = Epoch::new(111);
827            let num_validators = 4;
828            let fixture = fixture(&mut context, num_validators);
829
830            // Generate sequencer (external, not a validator)
831            let sequencer = PrivateKey::from_seed(u64::MAX);
832
833            // Generate network participants (validators + sequencer)
834            let mut participants = fixture.participants.clone();
835            participants.push(sequencer.public_key());
836
837            // Create network
838            let (network, mut oracle) = Network::new(
839                context.with_label("network"),
840                commonware_p2p::simulated::Config {
841                    max_size: 1024 * 1024,
842                    disconnect_on_block: true,
843                    tracked_peer_sets: None,
844                },
845            );
846            network.start();
847
848            // Register all participants
849            let mut registrations = register_participants(&mut oracle, &participants).await;
850            link_participants(
851                &mut oracle,
852                &participants,
853                Action::Link(RELIABLE_LINK),
854                None,
855            )
856            .await;
857
858            // Setup engines
859            let mut reporters = BTreeMap::new();
860            let namespace = b"my testing namespace";
861
862            // Spawn validator engines (no signing key, only validate)
863            for (idx, validator) in fixture.participants.iter().enumerate() {
864                let context = context.with_label(&format!("validator_{validator}"));
865                let monitor = mocks::Monitor::new(epoch);
866                let sequencers = mocks::Sequencers::<PublicKey>::new(vec![sequencer.public_key()]);
867
868                // Create Provider and register this validator's scheme
869                let validators_provider = mocks::Provider::new();
870                assert!(validators_provider.register(epoch, fixture.schemes[idx].clone()));
871
872                let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
873
874                let (reporter, reporter_mailbox) = mocks::Reporter::new(
875                    context.clone(),
876                    namespace,
877                    fixture.verifier.clone(),
878                    Some(5),
879                );
880                context.with_label("reporter").spawn(|_| reporter.run());
881                reporters.insert(validator.clone(), reporter_mailbox);
882
883                let engine = Engine::new(
884                    context.with_label("engine"),
885                    Config {
886                        sequencer_signer: None::<PrivateKey>, // Validators don't propose in this test
887                        sequencers_provider: sequencers,
888                        validators_provider,
889                        relay: automaton.clone(),
890                        automaton: automaton.clone(),
891                        reporter: reporters.get(validator).unwrap().clone(),
892                        monitor,
893                        namespace: namespace.to_vec(),
894                        epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
895                        height_bound: 2,
896                        rebroadcast_timeout: Duration::from_secs(5),
897                        priority_acks: false,
898                        priority_proposals: false,
899                        journal_heights_per_section: 10,
900                        journal_replay_buffer: NZUsize!(4096),
901                        journal_write_buffer: NZUsize!(4096),
902                        journal_name_prefix: format!("ordered-broadcast-seq-{validator}-"),
903                        journal_compression: Some(3),
904                        journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
905                    },
906                );
907
908                let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
909                engine.start((a1, a2), (b1, b2));
910            }
911
912            // Spawn sequencer engine
913            {
914                let context = context.with_label("sequencer");
915                let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
916                let (reporter, reporter_mailbox) = mocks::Reporter::new(
917                    context.clone(),
918                    namespace,
919                    fixture.verifier.clone(),
920                    Some(5),
921                );
922                context.with_label("reporter").spawn(|_| reporter.run());
923                reporters.insert(sequencer.public_key(), reporter_mailbox);
924
925                // Sequencer doesn't need a scheme (it uses ed25519 signing directly)
926                // But it needs the verifier to validate acks from validators
927                let validators_provider = mocks::Provider::new();
928                assert!(validators_provider.register(epoch, fixture.verifier.clone()));
929
930                let engine = Engine::new(
931                    context.with_label("engine"),
932                    Config {
933                        sequencer_signer: Some(sequencer.clone()),
934                        sequencers_provider: mocks::Sequencers::<PublicKey>::new(vec![
935                            sequencer.public_key()
936                        ]),
937                        validators_provider,
938                        relay: automaton.clone(),
939                        automaton,
940                        reporter: reporters.get(&sequencer.public_key()).unwrap().clone(),
941                        monitor: mocks::Monitor::new(epoch),
942                        namespace: namespace.to_vec(),
943                        epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
944                        height_bound: 2,
945                        rebroadcast_timeout: Duration::from_secs(5),
946                        priority_acks: false,
947                        priority_proposals: false,
948                        journal_heights_per_section: 10,
949                        journal_replay_buffer: NZUsize!(4096),
950                        journal_write_buffer: NZUsize!(4096),
951                        journal_name_prefix: format!(
952                            "ordered-broadcast-seq-{}-",
953                            sequencer.public_key()
954                        ),
955                        journal_compression: Some(3),
956                        journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
957                    },
958                );
959
960                let ((a1, a2), (b1, b2)) = registrations.remove(&sequencer.public_key()).unwrap();
961                engine.start((a1, a2), (b1, b2));
962            }
963
964            // Await reporters
965            await_reporters(
966                context.with_label("reporter"),
967                vec![sequencer.public_key()],
968                &reporters,
969                (100, epoch, true),
970            )
971            .await;
972        });
973    }
974
975    #[test_traced]
976    fn test_external_sequencer() {
977        external_sequencer(bls12381_threshold::fixture::<MinPk, _>);
978        external_sequencer(bls12381_threshold::fixture::<MinSig, _>);
979        external_sequencer(bls12381_multisig::fixture::<MinPk, _>);
980        external_sequencer(bls12381_multisig::fixture::<MinSig, _>);
981        external_sequencer(ed25519::fixture);
982    }
983
984    fn run_1k<S, F>(fixture: F)
985    where
986        S: Scheme<PublicKey, Sha256Digest>,
987        F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
988    {
989        let cfg = deterministic::Config::new();
990        let runner = deterministic::Runner::new(cfg);
991
992        runner.start(|mut context| async move {
993            let epoch = Epoch::new(111);
994            let num_validators = 10;
995            let fixture = fixture(&mut context, num_validators);
996
997            let delayed_link = Link {
998                latency: Duration::from_millis(80),
999                jitter: Duration::from_millis(10),
1000                success_rate: 0.98,
1001            };
1002
1003            let (mut oracle, mut registrations) =
1004                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
1005                    .await;
1006
1007            // Update to delayed links
1008            link_participants(
1009                &mut oracle,
1010                &fixture.participants,
1011                Action::Update(delayed_link),
1012                None,
1013            )
1014            .await;
1015
1016            // Use first half of validators as sequencers
1017            let sequencers: Vec<PublicKey> =
1018                fixture.participants[0..num_validators as usize / 2].to_vec();
1019
1020            let reporters = spawn_validator_engines(
1021                context.with_label("validator"),
1022                &fixture,
1023                &sequencers,
1024                &mut registrations,
1025                Duration::from_millis(150),
1026                |_| false,
1027                None,
1028                epoch,
1029            );
1030
1031            await_reporters(
1032                context.with_label("reporter"),
1033                sequencers,
1034                &reporters,
1035                (1_000, epoch, false),
1036            )
1037            .await;
1038        })
1039    }
1040
1041    #[test_group("slow")]
1042    #[test_traced]
1043    fn test_1k_bls12381_threshold_min_pk() {
1044        run_1k(bls12381_threshold::fixture::<MinPk, _>);
1045    }
1046
1047    #[test_group("slow")]
1048    #[test_traced]
1049    fn test_1k_bls12381_threshold_min_sig() {
1050        run_1k(bls12381_threshold::fixture::<MinSig, _>);
1051    }
1052
1053    #[test_group("slow")]
1054    #[test_traced]
1055    fn test_1k_bls12381_multisig_min_pk() {
1056        run_1k(bls12381_multisig::fixture::<MinPk, _>);
1057    }
1058
1059    #[test_group("slow")]
1060    #[test_traced]
1061    fn test_1k_bls12381_multisig_min_sig() {
1062        run_1k(bls12381_multisig::fixture::<MinSig, _>);
1063    }
1064
1065    #[test_group("slow")]
1066    #[test_traced]
1067    fn test_1k_ed25519() {
1068        run_1k(ed25519::fixture);
1069    }
1070}