commonware_consensus/aggregation/
mod.rs

1//! Recover quorum certificates over an externally synchronized sequencer of items.
2//!
3//! This module allows a dynamic set of participants to collectively produce quorum certificates
4//! for any ordered sequence of items.
5//!
6//! The primary use case for this primitive is to allow blockchain validators to agree on a series
7//! of state roots emitted from an opaque consensus process. Because some chains may finalize transaction
8//! data but not the output of said transactions during consensus, agreement must be achieved asynchronously
9//! over the output of consensus to support state sync and client balance proofs.
10//!
11//! _For applications that want to collect quorum certificates over concurrent, sequencer-driven broadcast,
12//! check out [crate::ordered_broadcast]._
13//!
14//! # Pluggable Cryptography
15//!
16//! The aggregation module is generic over the signing scheme, allowing users to choose the
17//! cryptographic scheme best suited for their requirements:
18//!
19//! - [`ed25519`][scheme::ed25519]: Attributable signatures with individual verification.
20//!   HSM-friendly, no trusted setup required. Certificates contain individual signatures.
21//!
22//! - [`bls12381_multisig`][scheme::bls12381_multisig]: Attributable signatures with aggregated
23//!   verification. Produces compact certificates while preserving signer attribution.
24//!
25//! - [`bls12381_threshold`][scheme::bls12381_threshold]: Non-attributable threshold signatures.
26//!   Produces succinct constant-size certificates. Requires trusted setup (DKG).
27//!
28//! # Architecture
29//!
30//! The core of the module is the [Engine]. It manages the agreement process by:
31//! - Requesting externally synchronized [commonware_cryptography::Digest]s
32//! - Signing said digests with the configured scheme's signature type
33//! - Multicasting signatures/shares to other validators
34//! - Assembling certificates from a quorum of signatures
35//! - Monitoring recovery progress and notifying the application layer of recoveries
36//!
37//! The engine interacts with four main components:
38//! - [crate::Automaton]: Provides external digests
39//! - [crate::Reporter]: Receives agreement confirmations
40//! - [crate::Monitor]: Tracks epoch transitions
41//! - [commonware_cryptography::certificate::Provider]: Manages validator sets and network identities
42//!
43//! # Design Decisions
44//!
45//! ## Missing Certificate Resolution
46//!
47//! The engine does not try to "fill gaps" when certificates are missing. When validators
48//! fall behind or miss signatures for certain indices, the tip may skip ahead and those
49//! certificates may never be emitted by the local engine. Before skipping ahead, we ensure that
50//! at-least-one honest validator has the certificate for any skipped index.
51//!
52//! Like other consensus primitives, aggregation's design prioritizes doing useful work at tip and
53//! minimal complexity over providing a comprehensive recovery mechanism. As a result, applications that need
54//! to build a complete history of all formed [types::Certificate]s must implement their own mechanism to synchronize
55//! historical results.
56//!
57//! ## Recovering Certificates
58//!
59//! In aggregation, participants never gossip recovered certificates. Rather, they gossip [types::TipAck]s
60//! with signatures over some index and their latest tip. This approach reduces the overhead of running aggregation
61//! concurrently with a consensus mechanism and consistently results in local recovery on stable networks. To increase
62//! the likelihood of local recovery, participants should tune the [Config::activity_timeout] to a value larger than the expected
63//! drift of online participants (even if all participants are synchronous the tip advancement logic will advance to the `f+1`th highest
64//! reported tip and drop all work below that tip minus the [Config::activity_timeout]).
65
66pub mod scheme;
67pub mod types;
68
69cfg_if::cfg_if! {
70    if #[cfg(not(target_arch = "wasm32"))] {
71        mod config;
72        pub use config::Config;
73        mod engine;
74        pub use engine::Engine;
75        mod metrics;
76        mod safe_tip;
77
78        #[cfg(test)]
79        pub mod mocks;
80    }
81}
82
83#[cfg(test)]
84mod tests {
85    use super::{mocks, Config, Engine};
86    use crate::{
87        aggregation::scheme::{bls12381_multisig, bls12381_threshold, ed25519, Scheme},
88        types::{Epoch, EpochDelta},
89    };
90    use commonware_cryptography::{
91        bls12381::primitives::variant::{MinPk, MinSig},
92        certificate::mocks::Fixture,
93        ed25519::PublicKey,
94        sha256::Digest as Sha256Digest,
95    };
96    use commonware_macros::{select, test_group, test_traced};
97    use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
98    use commonware_runtime::{
99        buffer::PoolRef,
100        deterministic::{self, Context},
101        Clock, Metrics, Quota, Runner, Spawner,
102    };
103    use commonware_utils::{NZUsize, NonZeroDuration};
104    use futures::{channel::oneshot, future::join_all};
105    use rand::{rngs::StdRng, Rng, SeedableRng};
106    use std::{
107        collections::BTreeMap,
108        num::{NonZeroU32, NonZeroUsize},
109        time::Duration,
110    };
111    use tracing::debug;
112
113    type Registrations<P> = BTreeMap<P, (Sender<P, deterministic::Context>, Receiver<P>)>;
114
115    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
116    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
117    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
118
119    /// Reliable network link configuration for testing.
120    const RELIABLE_LINK: Link = Link {
121        latency: Duration::from_millis(10),
122        jitter: Duration::from_millis(1),
123        success_rate: 1.0,
124    };
125
126    /// Register all participants with the network oracle.
127    async fn register_participants(
128        oracle: &mut Oracle<PublicKey, deterministic::Context>,
129        participants: &[PublicKey],
130    ) -> Registrations<PublicKey> {
131        let mut registrations = BTreeMap::new();
132        for participant in participants.iter() {
133            let (sender, receiver) = oracle
134                .control(participant.clone())
135                .register(0, TEST_QUOTA)
136                .await
137                .unwrap();
138            registrations.insert(participant.clone(), (sender, receiver));
139        }
140        registrations
141    }
142
143    /// Establish network links between all participants.
144    async fn link_participants(
145        oracle: &mut Oracle<PublicKey, deterministic::Context>,
146        participants: &[PublicKey],
147        link: Link,
148    ) {
149        for v1 in participants.iter() {
150            for v2 in participants.iter() {
151                if v2 == v1 {
152                    continue;
153                }
154                oracle
155                    .add_link(v1.clone(), v2.clone(), link.clone())
156                    .await
157                    .unwrap();
158            }
159        }
160    }
161
162    /// Initialize a simulated network environment.
163    async fn initialize_simulation<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
164        context: Context,
165        fixture: &Fixture<S>,
166        link: Link,
167    ) -> (
168        Oracle<PublicKey, deterministic::Context>,
169        Registrations<PublicKey>,
170    ) {
171        let (network, mut oracle) = Network::new(
172            context.with_label("network"),
173            commonware_p2p::simulated::Config {
174                max_size: 1024 * 1024,
175                disconnect_on_block: true,
176                tracked_peer_sets: None,
177            },
178        );
179        network.start();
180
181        let registrations = register_participants(&mut oracle, &fixture.participants).await;
182        link_participants(&mut oracle, &fixture.participants, link).await;
183
184        (oracle, registrations)
185    }
186
187    /// Spawn aggregation engines for all validators.
188    #[allow(clippy::too_many_arguments)]
189    fn spawn_validator_engines<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
190        context: Context,
191        fixture: &Fixture<S>,
192        registrations: &mut Registrations<PublicKey>,
193        oracle: &mut Oracle<PublicKey, deterministic::Context>,
194        namespace: &[u8],
195        epoch: Epoch,
196        rebroadcast_timeout: Duration,
197        incorrect: Vec<usize>,
198    ) -> BTreeMap<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>> {
199        let mut reporters = BTreeMap::new();
200
201        for (idx, participant) in fixture.participants.iter().enumerate() {
202            let context = context.with_label(&format!("participant_{participant}"));
203
204            // Create Provider and register scheme for epoch
205            let provider = mocks::Provider::new();
206            assert!(provider.register(epoch, fixture.schemes[idx].clone()));
207
208            // Create monitor
209            let monitor = mocks::Monitor::new(epoch);
210
211            // Create automaton with Incorrect strategy for byzantine validators
212            let strategy = if incorrect.contains(&idx) {
213                mocks::Strategy::Incorrect
214            } else {
215                mocks::Strategy::Correct
216            };
217            let automaton = mocks::Application::new(strategy);
218
219            // Create reporter with verifier scheme
220            let (reporter, reporter_mailbox) =
221                mocks::Reporter::new(context.clone(), namespace, fixture.verifier.clone());
222            context.with_label("reporter").spawn(|_| reporter.run());
223            reporters.insert(participant.clone(), reporter_mailbox.clone());
224
225            // Create blocker
226            let blocker = oracle.control(participant.clone());
227
228            // Create and start engine
229            let engine = Engine::new(
230                context.with_label("engine"),
231                Config {
232                    monitor,
233                    provider,
234                    automaton,
235                    reporter: reporter_mailbox,
236                    blocker,
237                    namespace: namespace.to_vec(),
238                    priority_acks: false,
239                    rebroadcast_timeout: NonZeroDuration::new_panic(rebroadcast_timeout),
240                    epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
241                    window: std::num::NonZeroU64::new(10).unwrap(),
242                    activity_timeout: 100,
243                    journal_partition: format!("aggregation-{participant}"),
244                    journal_write_buffer: NZUsize!(4096),
245                    journal_replay_buffer: NZUsize!(4096),
246                    journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
247                    journal_compression: Some(3),
248                    journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
249                },
250            );
251
252            let (sender, receiver) = registrations.remove(participant).unwrap();
253            engine.start((sender, receiver));
254        }
255
256        reporters
257    }
258
259    /// Wait for all reporters to reach the specified consensus threshold.
260    async fn await_reporters<S: Scheme<Sha256Digest, PublicKey = PublicKey>>(
261        context: Context,
262        reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>>,
263        threshold_index: u64,
264        threshold_epoch: Epoch,
265    ) {
266        let mut receivers = Vec::new();
267        for (reporter, mailbox) in reporters.iter() {
268            // Create a oneshot channel to signal when the reporter has reached the threshold.
269            let (tx, rx) = oneshot::channel();
270            receivers.push(rx);
271
272            context.with_label("reporter_watcher").spawn({
273                let reporter = reporter.clone();
274                let mut mailbox = mailbox.clone();
275                move |context| async move {
276                    loop {
277                        let (index, epoch) = mailbox.get_tip().await.unwrap_or((0, Epoch::zero()));
278                        debug!(
279                            index,
280                            epoch = %epoch,
281                            threshold_index,
282                            threshold_epoch = %threshold_epoch,
283                            ?reporter,
284                            "reporter status"
285                        );
286                        if index >= threshold_index && epoch >= threshold_epoch {
287                            debug!(
288                                ?reporter,
289                                "reporter reached threshold, signaling completion"
290                            );
291                            let _ = tx.send(reporter.clone());
292                            break;
293                        }
294                        context.sleep(Duration::from_millis(100)).await;
295                    }
296                }
297            });
298        }
299
300        // Wait for all oneshot receivers to complete.
301        let results = join_all(receivers).await;
302        assert_eq!(results.len(), reporters.len());
303
304        // Check that none were cancelled.
305        for result in results {
306            assert!(result.is_ok(), "reporter was cancelled");
307        }
308    }
309
310    /// Test aggregation consensus with all validators online.
311    fn all_online<S, F>(fixture: F)
312    where
313        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
314        F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
315    {
316        let runner = deterministic::Runner::timed(Duration::from_secs(30));
317
318        runner.start(|mut context| async move {
319            let num_validators = 4;
320            let fixture = fixture(&mut context, num_validators);
321            let namespace = b"my testing namespace";
322            let epoch = Epoch::new(111);
323
324            let (mut oracle, mut registrations) =
325                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
326                    .await;
327
328            let reporters = spawn_validator_engines(
329                context.with_label("validator"),
330                &fixture,
331                &mut registrations,
332                &mut oracle,
333                namespace,
334                epoch,
335                Duration::from_secs(5),
336                vec![],
337            );
338
339            await_reporters(context.with_label("reporter"), &reporters, 100, epoch).await;
340        });
341    }
342
343    #[test_traced("INFO")]
344    fn test_all_online() {
345        all_online(bls12381_threshold::fixture::<MinPk, _>);
346        all_online(bls12381_threshold::fixture::<MinSig, _>);
347        all_online(bls12381_multisig::fixture::<MinPk, _>);
348        all_online(bls12381_multisig::fixture::<MinSig, _>);
349        all_online(ed25519::fixture);
350    }
351
352    /// Test consensus resilience to Byzantine behavior.
353    fn byzantine_proposer<S, F>(fixture: F)
354    where
355        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
356        F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
357    {
358        let runner = deterministic::Runner::timed(Duration::from_secs(30));
359
360        runner.start(|mut context| async move {
361            let num_validators = 4;
362            let fixture = fixture(&mut context, num_validators);
363            let namespace = b"my testing namespace";
364            let epoch = Epoch::new(111);
365
366            let (mut oracle, mut registrations) =
367                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
368                    .await;
369
370            let reporters = spawn_validator_engines(
371                context.with_label("validator"),
372                &fixture,
373                &mut registrations,
374                &mut oracle,
375                namespace,
376                epoch,
377                Duration::from_secs(5),
378                vec![0],
379            );
380
381            await_reporters(context.with_label("reporter"), &reporters, 100, epoch).await;
382        });
383    }
384
385    #[test_traced("INFO")]
386    fn test_byzantine_proposer() {
387        byzantine_proposer(bls12381_threshold::fixture::<MinPk, _>);
388        byzantine_proposer(bls12381_threshold::fixture::<MinSig, _>);
389        byzantine_proposer(bls12381_multisig::fixture::<MinPk, _>);
390        byzantine_proposer(bls12381_multisig::fixture::<MinSig, _>);
391        byzantine_proposer(ed25519::fixture);
392    }
393
394    fn unclean_byzantine_shutdown<S, F>(fixture: F)
395    where
396        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
397        F: Fn(&mut StdRng, u32) -> Fixture<S>,
398    {
399        // Test parameters
400        let num_validators = 4;
401        let target_index = 200; // Target multiple rounds of signing
402        let min_shutdowns = 4; // Minimum number of shutdowns per validator
403        let max_shutdowns = 10; // Maximum number of shutdowns per validator
404        let shutdown_range_min = Duration::from_millis(100);
405        let shutdown_range_max = Duration::from_millis(1_000);
406        let rebroadcast_timeout = NonZeroDuration::new_panic(Duration::from_millis(20));
407
408        let mut prev_checkpoint = None;
409
410        // Generate fixture once (persists across restarts)
411        let mut rng = StdRng::seed_from_u64(0);
412        let fixture = fixture(&mut rng, num_validators);
413
414        // Continue until shared reporter reaches target or max shutdowns exceeded
415        let mut shutdown_count = 0;
416        while shutdown_count < max_shutdowns {
417            let fixture = fixture.clone();
418            let f = move |mut context: Context| {
419                async move {
420                    let namespace = b"my testing namespace";
421                    let epoch = Epoch::new(111);
422
423                    let (oracle, mut registrations) = initialize_simulation(
424                        context.with_label("simulation"),
425                        &fixture,
426                        RELIABLE_LINK,
427                    )
428                    .await;
429
430                    // Create a shared reporter
431                    //
432                    // We rely on replay to populate this reporter with a contiguous history of certificates.
433                    let (reporter, mut reporter_mailbox) =
434                        mocks::Reporter::new(context.clone(), namespace, fixture.verifier.clone());
435                    context.with_label("reporter").spawn(|_| reporter.run());
436
437                    // Spawn validator engines
438                    for (idx, participant) in fixture.participants.iter().enumerate() {
439                        let validator_context =
440                            context.with_label(&format!("participant_{participant}"));
441
442                        // Create Provider and register scheme for epoch
443                        let provider = mocks::Provider::new();
444                        assert!(provider.register(epoch, fixture.schemes[idx].clone()));
445
446                        // Create monitor
447                        let monitor = mocks::Monitor::new(epoch);
448
449                        // Create automaton (validator 0 is Byzantine)
450                        let strategy = if idx == 0 {
451                            mocks::Strategy::Incorrect
452                        } else {
453                            mocks::Strategy::Correct
454                        };
455                        let automaton = mocks::Application::new(strategy);
456
457                        // Create blocker
458                        let blocker = oracle.control(participant.clone());
459
460                        // Create and start engine
461                        let engine = Engine::new(
462                            validator_context.with_label("engine"),
463                            Config {
464                                monitor,
465                                provider,
466                                automaton,
467                                reporter: reporter_mailbox.clone(),
468                                blocker,
469                                namespace: namespace.to_vec(),
470                                priority_acks: false,
471                                rebroadcast_timeout,
472                                epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
473                                window: std::num::NonZeroU64::new(10).unwrap(),
474                                activity_timeout: 1_024, // ensure we don't drop any certificates
475                                journal_partition: format!("unclean_shutdown_test_{participant}"),
476                                journal_write_buffer: NZUsize!(4096),
477                                journal_replay_buffer: NZUsize!(4096),
478                                journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
479                                journal_compression: Some(3),
480                                journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
481                            },
482                        );
483
484                        let (sender, receiver) = registrations.remove(participant).unwrap();
485                        engine.start((sender, receiver));
486                    }
487
488                    // Create a single completion watcher for the shared reporter
489                    let completion =
490                        context
491                            .with_label("completion_watcher")
492                            .spawn(move |context| async move {
493                                loop {
494                                    if let Some(tip_index) =
495                                        reporter_mailbox.get_contiguous_tip().await
496                                    {
497                                        if tip_index >= target_index {
498                                            break;
499                                        }
500                                    }
501                                    context.sleep(Duration::from_millis(50)).await;
502                                }
503                            });
504
505                    // Random shutdown timing to simulate unclean shutdown
506                    let shutdown_wait = context.gen_range(shutdown_range_min..shutdown_range_max);
507                    select! {
508                        _ = context.sleep(shutdown_wait) => {
509                            debug!(shutdown_wait = ?shutdown_wait, "Simulating unclean shutdown");
510                            false // Unclean shutdown
511                        },
512                        _ = completion => {
513                            debug!("Shared reporter completed normally");
514                            true // Clean completion
515                        },
516                    }
517                }
518            };
519
520            let (complete, checkpoint) = prev_checkpoint
521                .map_or_else(
522                    || {
523                        debug!("Starting initial run");
524                        deterministic::Runner::timed(Duration::from_secs(45))
525                    },
526                    |prev_checkpoint| {
527                        debug!(shutdown_count, "Restarting from previous context");
528                        deterministic::Runner::from(prev_checkpoint)
529                    },
530                )
531                .start_and_recover(f);
532
533            if complete && shutdown_count >= min_shutdowns {
534                debug!("Test completed successfully");
535                break;
536            }
537
538            prev_checkpoint = Some(checkpoint);
539            shutdown_count += 1;
540        }
541    }
542
543    #[test_traced("INFO")]
544    fn test_unclean_byzantine_shutdown() {
545        unclean_byzantine_shutdown(bls12381_threshold::fixture::<MinPk, _>);
546        unclean_byzantine_shutdown(bls12381_threshold::fixture::<MinSig, _>);
547        unclean_byzantine_shutdown(bls12381_multisig::fixture::<MinPk, _>);
548        unclean_byzantine_shutdown(bls12381_multisig::fixture::<MinSig, _>);
549        unclean_byzantine_shutdown(ed25519::fixture);
550    }
551
552    fn unclean_shutdown_with_unsigned_index<S, F>(fixture: F)
553    where
554        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
555        F: Fn(&mut StdRng, u32) -> Fixture<S>,
556    {
557        // Test parameters
558        let num_validators = 4;
559        let skip_index = 50; // Index where no one will sign
560        let window = 10;
561        let target_index = 100;
562        let namespace = b"my testing namespace";
563
564        // Generate fixture once (persists across restarts)
565        let mut rng = StdRng::seed_from_u64(0);
566        let fixture = fixture(&mut rng, num_validators);
567
568        // First run: let validators skip signing at skip_index and reach beyond it
569        let f = |context: Context| {
570            let fixture = fixture.clone();
571            async move {
572                let epoch = Epoch::new(111);
573
574                // Set up simulated network
575                let (oracle, mut registrations) = initialize_simulation(
576                    context.with_label("simulation"),
577                    &fixture,
578                    RELIABLE_LINK,
579                )
580                .await;
581
582                // Create a shared reporter
583                let (reporter, mut reporter_mailbox) =
584                    mocks::Reporter::new(context.clone(), namespace, fixture.verifier.clone());
585                context.with_label("reporter").spawn(|_| reporter.run());
586
587                // Start validator engines with Skip strategy for skip_index
588                for (idx, participant) in fixture.participants.iter().enumerate() {
589                    let validator_context =
590                        context.with_label(&format!("participant_{participant}"));
591
592                    // Create Provider and register scheme for epoch
593                    let provider = mocks::Provider::new();
594                    assert!(provider.register(epoch, fixture.schemes[idx].clone()));
595
596                    // Create monitor
597                    let monitor = mocks::Monitor::new(epoch);
598
599                    // All validators use Skip strategy for skip_index
600                    let automaton =
601                        mocks::Application::new(mocks::Strategy::Skip { index: skip_index });
602
603                    // Create blocker
604                    let blocker = oracle.control(participant.clone());
605
606                    // Create and start engine
607                    let engine = Engine::new(
608                        validator_context.with_label("engine"),
609                        Config {
610                            monitor,
611                            provider,
612                            automaton,
613                            reporter: reporter_mailbox.clone(),
614                            blocker,
615                            namespace: namespace.to_vec(),
616                            priority_acks: false,
617                            rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
618                                100,
619                            )),
620                            epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
621                            window: std::num::NonZeroU64::new(window).unwrap(),
622                            activity_timeout: 100,
623                            journal_partition: format!("unsigned_index_test_{participant}"),
624                            journal_write_buffer: NZUsize!(4096),
625                            journal_replay_buffer: NZUsize!(4096),
626                            journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
627                            journal_compression: Some(3),
628                            journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
629                        },
630                    );
631
632                    let (sender, receiver) = registrations.remove(participant).unwrap();
633                    engine.start((sender, receiver));
634                }
635
636                // Wait for validators to reach target_index (past skip_index)
637                loop {
638                    if let Some((tip_index, _)) = reporter_mailbox.get_tip().await {
639                        debug!(tip_index, skip_index, target_index, "reporter status");
640                        if tip_index >= skip_index + window - 1 {
641                            // max we can proceed before item confirmed
642                            return;
643                        }
644                    }
645                    context.sleep(Duration::from_millis(50)).await;
646                }
647            }
648        };
649
650        let (_, checkpoint) =
651            deterministic::Runner::timed(Duration::from_secs(60)).start_and_recover(f);
652
653        // Second run: restart and verify the skip_index gets confirmed
654        let f2 = |context: Context| {
655            async move {
656                let epoch = Epoch::new(111);
657
658                // Set up simulated network
659                let (oracle, mut registrations) = initialize_simulation(
660                    context.with_label("simulation"),
661                    &fixture,
662                    RELIABLE_LINK,
663                )
664                .await;
665
666                // Create a shared reporter
667                let (reporter, mut reporter_mailbox) =
668                    mocks::Reporter::new(context.clone(), namespace, fixture.verifier.clone());
669                context.with_label("reporter").spawn(|_| reporter.run());
670
671                // Start validator engines with Correct strategy (will sign everything now)
672                for (idx, participant) in fixture.participants.iter().enumerate() {
673                    let validator_context =
674                        context.with_label(&format!("participant_{participant}"));
675
676                    // Create Provider and register scheme for epoch
677                    let provider = mocks::Provider::new();
678                    assert!(provider.register(epoch, fixture.schemes[idx].clone()));
679
680                    // Create monitor
681                    let monitor = mocks::Monitor::new(epoch);
682
683                    // Now all validators use Correct strategy
684                    let automaton = mocks::Application::new(mocks::Strategy::Correct);
685
686                    // Create blocker
687                    let blocker = oracle.control(participant.clone());
688
689                    // Create and start engine
690                    let engine = Engine::new(
691                        validator_context.with_label("engine"),
692                        Config {
693                            monitor,
694                            provider,
695                            automaton,
696                            reporter: reporter_mailbox.clone(),
697                            blocker,
698                            namespace: namespace.to_vec(),
699                            priority_acks: false,
700                            rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_millis(
701                                100,
702                            )),
703                            epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
704                            window: std::num::NonZeroU64::new(10).unwrap(),
705                            activity_timeout: 100,
706                            journal_partition: format!("unsigned_index_test_{participant}"),
707                            journal_write_buffer: NZUsize!(4096),
708                            journal_replay_buffer: NZUsize!(4096),
709                            journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
710                            journal_compression: Some(3),
711                            journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
712                        },
713                    );
714
715                    let (sender, receiver) = registrations.remove(participant).unwrap();
716                    engine.start((sender, receiver));
717                }
718
719                // Wait for skip_index to be confirmed (should happen on replay)
720                loop {
721                    if let Some(tip_index) = reporter_mailbox.get_contiguous_tip().await {
722                        debug!(
723                            tip_index,
724                            skip_index, target_index, "reporter status on restart"
725                        );
726                        if tip_index >= target_index {
727                            break;
728                        }
729                    }
730                    context.sleep(Duration::from_millis(50)).await;
731                }
732            }
733        };
734
735        deterministic::Runner::from(checkpoint).start(f2);
736    }
737
738    #[test_traced("INFO")]
739    fn test_unclean_shutdown_with_unsigned_index() {
740        unclean_shutdown_with_unsigned_index(bls12381_threshold::fixture::<MinPk, _>);
741        unclean_shutdown_with_unsigned_index(bls12381_threshold::fixture::<MinSig, _>);
742        unclean_shutdown_with_unsigned_index(bls12381_multisig::fixture::<MinPk, _>);
743        unclean_shutdown_with_unsigned_index(bls12381_multisig::fixture::<MinSig, _>);
744        unclean_shutdown_with_unsigned_index(ed25519::fixture);
745    }
746
747    fn slow_and_lossy_links<S, F>(fixture: F, seed: u64) -> String
748    where
749        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
750        F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
751    {
752        let cfg = deterministic::Config::new()
753            .with_seed(seed)
754            .with_timeout(Some(Duration::from_secs(120)));
755        let runner = deterministic::Runner::new(cfg);
756
757        runner.start(|mut context| async move {
758            let num_validators = 4;
759            let fixture = fixture(&mut context, num_validators);
760            let namespace = b"my testing namespace";
761            let epoch = Epoch::new(111);
762
763            // Use degraded network links with realistic conditions
764            let degraded_link = Link {
765                latency: Duration::from_millis(200),
766                jitter: Duration::from_millis(150),
767                success_rate: 0.5,
768            };
769
770            let (mut oracle, mut registrations) =
771                initialize_simulation(context.with_label("simulation"), &fixture, degraded_link)
772                    .await;
773
774            let reporters = spawn_validator_engines(
775                context.with_label("validator"),
776                &fixture,
777                &mut registrations,
778                &mut oracle,
779                namespace,
780                epoch,
781                Duration::from_secs(2),
782                vec![],
783            );
784
785            await_reporters(context.with_label("reporter"), &reporters, 100, epoch).await;
786
787            context.auditor().state()
788        })
789    }
790
791    #[test_traced("INFO")]
792    fn test_slow_and_lossy_links() {
793        slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, 0);
794        slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, 0);
795        slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, 0);
796        slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, 0);
797        slow_and_lossy_links(ed25519::fixture, 0);
798    }
799
800    #[test_group("slow")]
801    #[test_traced("INFO")]
802    fn test_determinism() {
803        // We use slow and lossy links as the deterministic test
804        // because it is the most complex test.
805        for seed in 1..6 {
806            // Test BLS threshold MinPk
807            let ts_pk_state_1 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
808            let ts_pk_state_2 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
809            assert_eq!(ts_pk_state_1, ts_pk_state_2);
810
811            // Test BLS threshold MinSig
812            let ts_sig_state_1 =
813                slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
814            let ts_sig_state_2 =
815                slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
816            assert_eq!(ts_sig_state_1, ts_sig_state_2);
817
818            // Test BLS multisig MinPk
819            let ms_pk_state_1 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
820            let ms_pk_state_2 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
821            assert_eq!(ms_pk_state_1, ms_pk_state_2);
822
823            // Test BLS multisig MinSig
824            let ms_sig_state_1 =
825                slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
826            let ms_sig_state_2 =
827                slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
828            assert_eq!(ms_sig_state_1, ms_sig_state_2);
829
830            // Test ed25519
831            let ed_state_1 = slow_and_lossy_links(ed25519::fixture, seed);
832            let ed_state_2 = slow_and_lossy_links(ed25519::fixture, seed);
833            assert_eq!(ed_state_1, ed_state_2);
834
835            let states = [
836                ("threshold-minpk", ts_pk_state_1),
837                ("threshold-minsig", ts_sig_state_1),
838                ("multisig-minpk", ms_pk_state_1),
839                ("multisig-minsig", ms_sig_state_1),
840                ("ed25519", ed_state_1),
841            ];
842
843            // Sanity check that different types can't be identical
844            for pair in states.windows(2) {
845                assert_ne!(
846                    pair[0].1, pair[1].1,
847                    "state {} equals state {}",
848                    pair[0].0, pair[1].0
849                );
850            }
851        }
852    }
853
854    fn one_offline<S, F>(fixture: F)
855    where
856        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
857        F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
858    {
859        let runner = deterministic::Runner::timed(Duration::from_secs(30));
860
861        runner.start(|mut context| async move {
862            let num_validators = 5;
863            let mut fixture = fixture(&mut context, num_validators);
864            let namespace = b"my testing namespace";
865            let epoch = Epoch::new(111);
866
867            // Truncate to only 4 validators (one offline)
868            fixture.participants.truncate(4);
869            fixture.schemes.truncate(4);
870
871            let (mut oracle, mut registrations) =
872                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
873                    .await;
874
875            let reporters = spawn_validator_engines(
876                context.with_label("validator"),
877                &fixture,
878                &mut registrations,
879                &mut oracle,
880                namespace,
881                epoch,
882                Duration::from_secs(5),
883                vec![],
884            );
885
886            await_reporters(context.with_label("reporter"), &reporters, 100, epoch).await;
887        });
888    }
889
890    #[test_traced("INFO")]
891    fn test_one_offline() {
892        one_offline(bls12381_threshold::fixture::<MinPk, _>);
893        one_offline(bls12381_threshold::fixture::<MinSig, _>);
894        one_offline(bls12381_multisig::fixture::<MinPk, _>);
895        one_offline(bls12381_multisig::fixture::<MinSig, _>);
896        one_offline(ed25519::fixture);
897    }
898
899    /// Test consensus recovery after a network partition.
900    fn network_partition<S, F>(fixture: F)
901    where
902        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
903        F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
904    {
905        let runner = deterministic::Runner::timed(Duration::from_secs(60));
906
907        runner.start(|mut context| async move {
908            let num_validators = 4;
909            let fixture = fixture(&mut context, num_validators);
910            let namespace = b"my testing namespace";
911            let epoch = Epoch::new(111);
912
913            let (mut oracle, mut registrations) =
914                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
915                    .await;
916
917            let reporters = spawn_validator_engines(
918                context.with_label("validator"),
919                &fixture,
920                &mut registrations,
921                &mut oracle,
922                namespace,
923                epoch,
924                Duration::from_secs(5),
925                vec![],
926            );
927
928            // Partition network (remove all links)
929            for v1 in fixture.participants.iter() {
930                for v2 in fixture.participants.iter() {
931                    if v2 == v1 {
932                        continue;
933                    }
934                    oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
935                }
936            }
937            context.sleep(Duration::from_secs(20)).await;
938
939            // Restore network links
940            for v1 in fixture.participants.iter() {
941                for v2 in fixture.participants.iter() {
942                    if v2 == v1 {
943                        continue;
944                    }
945                    oracle
946                        .add_link(v1.clone(), v2.clone(), RELIABLE_LINK)
947                        .await
948                        .unwrap();
949                }
950            }
951
952            await_reporters(context.with_label("reporter"), &reporters, 100, epoch).await;
953        });
954    }
955
956    #[test_traced("INFO")]
957    fn test_network_partition() {
958        network_partition(bls12381_threshold::fixture::<MinPk, _>);
959        network_partition(bls12381_threshold::fixture::<MinSig, _>);
960        network_partition(bls12381_multisig::fixture::<MinPk, _>);
961        network_partition(bls12381_multisig::fixture::<MinSig, _>);
962        network_partition(ed25519::fixture);
963    }
964
965    /// Test insufficient validator participation (below quorum).
966    fn insufficient_validators<S, F>(fixture: F)
967    where
968        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
969        F: FnOnce(&mut deterministic::Context, u32) -> Fixture<S>,
970    {
971        let runner = deterministic::Runner::timed(Duration::from_secs(15));
972
973        runner.start(|mut context| async move {
974            let num_validators = 5;
975            let fixture = fixture(&mut context, num_validators);
976            let namespace = b"my testing namespace";
977            let epoch = Epoch::new(111);
978
979            // Set up simulated network
980            let (oracle, mut registrations) =
981                initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
982                    .await;
983
984            // Create reporters (one per online validator)
985            let mut reporters =
986                BTreeMap::<PublicKey, mocks::ReporterMailbox<S, Sha256Digest>>::new();
987
988            // Start only 2 out of 5 validators (below quorum of 3)
989            for (idx, participant) in fixture.participants.iter().take(2).enumerate() {
990                let context = context.with_label(&format!("participant_{participant}"));
991
992                // Create Provider and register scheme for epoch
993                let provider = mocks::Provider::new();
994                assert!(provider.register(epoch, fixture.schemes[idx].clone()));
995
996                // Create monitor
997                let monitor = mocks::Monitor::new(epoch);
998
999                // Create automaton with Correct strategy
1000                let automaton = mocks::Application::new(mocks::Strategy::Correct);
1001
1002                // Create reporter with verifier scheme
1003                let (reporter, reporter_mailbox) =
1004                    mocks::Reporter::new(context.clone(), namespace, fixture.verifier.clone());
1005                context.with_label("reporter").spawn(|_| reporter.run());
1006                reporters.insert(participant.clone(), reporter_mailbox.clone());
1007
1008                // Create blocker
1009                let blocker = oracle.control(participant.clone());
1010
1011                // Create and start engine
1012                let engine = Engine::new(
1013                    context.with_label("engine"),
1014                    Config {
1015                        monitor,
1016                        provider,
1017                        automaton,
1018                        reporter: reporter_mailbox,
1019                        blocker,
1020                        namespace: namespace.to_vec(),
1021                        priority_acks: false,
1022                        rebroadcast_timeout: NonZeroDuration::new_panic(Duration::from_secs(3)),
1023                        epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
1024                        window: std::num::NonZeroU64::new(10).unwrap(),
1025                        activity_timeout: 100,
1026                        journal_partition: format!("aggregation-{participant}"),
1027                        journal_write_buffer: NZUsize!(4096),
1028                        journal_replay_buffer: NZUsize!(4096),
1029                        journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
1030                        journal_compression: Some(3),
1031                        journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1032                    },
1033                );
1034
1035                let (sender, receiver) = registrations.remove(participant).unwrap();
1036                engine.start((sender, receiver));
1037            }
1038
1039            // With insufficient validators, consensus should not be achievable
1040            // Wait long enough for any potential consensus attempts to complete
1041            context.sleep(Duration::from_secs(12)).await;
1042
1043            // Check that no validator achieved consensus
1044            let mut any_consensus = false;
1045            for (validator_pk, mut reporter_mailbox) in reporters {
1046                let (tip, _) = reporter_mailbox
1047                    .get_tip()
1048                    .await
1049                    .unwrap_or((0, Epoch::zero()));
1050                if tip > 0 {
1051                    any_consensus = true;
1052                    tracing::warn!(
1053                        ?validator_pk,
1054                        tip,
1055                        "Unexpected consensus with insufficient validators"
1056                    );
1057                }
1058            }
1059
1060            // With only 2 out of 5 validators (below quorum of 3), consensus should not succeed
1061            assert!(
1062                !any_consensus,
1063                "Consensus should not be achieved with insufficient validator participation (below quorum)"
1064            );
1065        });
1066    }
1067
1068    #[test_traced("INFO")]
1069    fn test_insufficient_validators() {
1070        insufficient_validators(bls12381_threshold::fixture::<MinPk, _>);
1071        insufficient_validators(bls12381_threshold::fixture::<MinSig, _>);
1072        insufficient_validators(bls12381_multisig::fixture::<MinPk, _>);
1073        insufficient_validators(bls12381_multisig::fixture::<MinSig, _>);
1074        insufficient_validators(ed25519::fixture);
1075    }
1076}