commonware_consensus/simplex/
mod.rs

1//! Simple and fast BFT agreement inspired by Simplex Consensus.
2//!
3//! Inspired by [Simplex Consensus](https://eprint.iacr.org/2023/463), `simplex` provides simple and fast BFT
4//! agreement with network-speed view (i.e. block time) latency and optimal finalization latency in a
5//! partially synchronous setting.
6//!
7//! # Features
8//!
9//! * Wicked Fast Block Times (2 Network Hops)
10//! * Optimal Finalization Latency (3 Network Hops)
11//! * Externalized Uptime and Fault Proofs
12//! * Require Certification Before Finalization
13//! * Decoupled Block Broadcast and Sync
14//! * Lazy Message Verification
15//! * Application-Defined Block Format
16//! * Pluggable Hashing and Cryptography
17//! * Embedded VRF (via [scheme::bls12381_threshold])
18//!
19//! # Design
20//!
21//! ## Protocol Description
22//!
23//! ### Specification for View `v`
24//!
25//! Upon entering view `v`:
26//! * Determine leader `l` for view `v`
27//! * Set timer for leader proposal `t_l = 2Δ` and advance `t_a = 3Δ`
28//!     * If leader `l` has not been active in last `r` views, set `t_l` to 0.
29//! * If leader `l`, broadcast `notarize(c,v)`
30//!   * If can't propose container in view `v` because missing notarization/nullification for a
31//!     previous view `v_m`, request `v_m`
32//!
33//! Upon receiving first `notarize(c,v)` from `l`:
34//! * Cancel `t_l`
35//! * If the container's parent `c_parent` is notarized at `v_parent` and we have nullifications for all views
36//!   between `v` and `v_parent`, verify `c` and broadcast `notarize(c,v)`
37//!
38//! Upon receiving `2f+1` `notarize(c,v)`:
39//! * Cancel `t_a`
40//! * Mark `c` as notarized
41//! * Broadcast `notarization(c,v)` (even if we have not verified `c`)
42//! * If have not broadcast `nullify(v)`, broadcast `finalize(c,v)`
43//! * Enter `v+1`
44//!
45//! Upon receiving `2f+1` `nullify(v)`:
46//! * Broadcast `nullification(v)`
47//! * Enter `v+1`
48//!
49//! Upon receiving `2f+1` `finalize(c,v)`:
50//! * Mark `c` as finalized (and recursively finalize its parents)
51//! * Broadcast `finalization(c,v)` (even if we have not verified `c`)
52//!
53//! Upon `t_l` or `t_a` firing:
54//! * Broadcast `nullify(v)`
55//! * Every `t_r` after `nullify(v)` broadcast that we are still in view `v`:
56//!    * Rebroadcast `nullify(v)` and either `notarization(v-1)` or `nullification(v-1)`
57//!
58//! _When `2f+1` votes of a given type (`notarize(c,v)`, `nullify(v)`, or `finalize(c,v)`) have been have been collected
59//! from unique participants, a certificate (`notarization(c,v)`, `nullification(v)`, or `finalization(c,v)`) can be assembled.
60//! These certificates serve as a standalone proof of consensus progress that downstream systems can ingest without executing
61//! the protocol._
62//!
63//! ### Joining Consensus
64//!
65//! As soon as `2f+1` notarizes, nullifies, or finalizes are observed for some view `v`, the `Voter` will
66//! enter `v+1`. This means that a new participant joining consensus will immediately jump ahead to the
67//! latest view and begin participating in consensus (assuming it can verify blocks).
68//!
69//! ### Deviations from Simplex Consensus
70//!
71//! * Fetch missing notarizations/nullifications as needed rather than assuming each proposal contains
72//!   a set of all notarizations/nullifications for all historical blocks.
73//! * Introduce distinct messages for `notarize` and `nullify` rather than referring to both as a `vote` for
74//!   either a "block" or a "dummy block", respectively.
75//! * Introduce a "leader timeout" to trigger early view transitions for unresponsive leaders.
76//! * Skip "leader timeout" and "notarization timeout" if a designated leader hasn't participated in
77//!   some number of views (again to trigger early view transition for an unresponsive leader).
78//! * Introduce message rebroadcast to continue making progress if messages from a given view are dropped (only way
79//!   to ensure messages are reliably delivered is with a heavyweight reliable broadcast protocol).
80//!
81//! ## Architecture
82//!
83//! All logic is split into four components: the `Batcher`, the `Voter`, the `Resolver`, and the `Application` (provided by the user).
84//! The `Batcher` is responsible for collecting messages from peers and lazily verifying them when a quorum is met. The `Voter`
85//! is responsible for directing participation in the current view. The `Resolver` is responsible for
86//! fetching artifacts from previous views required to verify proposed blocks in the latest view. Lastly, the `Application`
87//! is responsible for proposing new blocks and indicating whether some block is valid.
88//!
89//! To drive great performance, all interactions between `Batcher`, `Voter`, `Resolver`, and `Application` are
90//! non-blocking. This means that, for example, the `Voter` can continue processing messages while the
91//! `Application` verifies a proposed block or the `Resolver` fetches a notarization.
92//!
93//! ```txt
94//!                            +------------+          +++++++++++++++
95//!                            |            +--------->+             +
96//!                            |  Batcher   |          +    Peers    +
97//!                            |            |<---------+             +
98//!                            +-------+----+          +++++++++++++++
99//!                                |   ^
100//!                                |   |
101//!                                |   |
102//!                                |   |
103//!                                v   |
104//! +---------------+           +---------+            +++++++++++++++
105//! |               |<----------+         +----------->+             +
106//! |  Application  |           |  Voter  |            +    Peers    +
107//! |               +---------->|         |<-----------+             +
108//! +---------------+           +--+------+            +++++++++++++++
109//!                                |   ^
110//!                                |   |
111//!                                |   |
112//!                                |   |
113//!                                v   |
114//!                            +-------+----+          +++++++++++++++
115//!                            |            +--------->+             +
116//!                            |  Resolver  |          +    Peers    +
117//!                            |            |<---------+             +
118//!                            +------------+          +++++++++++++++
119//! ```
120//!
121//! ### Batched Verification
122//!
123//! Unlike other consensus constructions that verify all incoming messages received from peers, for schemes
124//! where [`Scheme::is_batchable()`](commonware_cryptography::certificate::Scheme::is_batchable) returns `true`
125//! (such as [scheme::ed25519], [scheme::bls12381_multisig] and [scheme::bls12381_threshold]), `simplex` lazily
126//! verifies messages (only when a quorum is met), enabling efficient batch verification. For schemes where
127//! `is_batchable()` returns `false` (such as [scheme::secp256r1]), signatures are verified eagerly as they
128//! arrive since there is no batching benefit.
129//!
130//! If an invalid signature is detected, the `Batcher` will perform repeated bisections over collected
131//! messages to find the offending message (and block the peer(s) that sent it via [commonware_p2p::Blocker]).
132//!
133//! _If using a p2p implementation that is not authenticated, it is not safe to employ this optimization
134//! as any attacking peer could simply reconnect from a different address. We recommend [commonware_p2p::authenticated]._
135//!
136//! ### Fetching Missing Certificates
137//!
138//! Instead of trying to fetch all possible certificates above the floor, we only attempt to fetch
139//! nullifications for all views from the floor (last certified notarization or finalization) to the current view.
140//! This technique, however, is not sufficient to guarantee progress.
141//!
142//! Consider the case where `f` honest participants have seen a finalization for a given view `v` (and nullifications only
143//! from `v` to the current view `c`) but the remaining `f+1` honest participants have not (they have exclusively seen
144//! nullifications from some view `o < v` to `c`). Neither partition of participants will vote for the other's proposals.
145//!
146//! To ensure progress is eventually made, leaders with nullified proposals directly broadcast the best finalization
147//! certificate they are aware of to ensure all honest participants eventually consider the same proposal ancestry valid.
148//!
149//! _While a more aggressive recovery mechanism could be employed, like requiring all participants to broadcast their highest
150//! finalization certificate after nullification, it would impose significant overhead under normal network
151//! conditions (whereas the approach described incurs no overhead under normal network conditions). Recall, honest participants
152//! already broadcast observed certificates to all other participants in each view (and misaligned participants should only ever
153//! be observed following severe network degradation)._
154//!
155//! ## Pluggable Hashing and Cryptography
156//!
157//! Hashing is abstracted via the [commonware_cryptography::Hasher] trait and cryptography is abstracted via
158//! the [commonware_cryptography::certificate::Scheme] trait, allowing deployments to employ approaches that best match their
159//! requirements (or to provide their own without modifying any consensus logic). The following schemes
160//! are supported out-of-the-box:
161//!
162//! ### [scheme::ed25519]
163//!
164//! [commonware_cryptography::ed25519] signatures are ["High-speed high-security signatures"](https://eprint.iacr.org/2011/368)
165//! with 32 byte public keys and 64 byte signatures. While they are well-supported by commercial HSMs and offer efficient batch
166//! verification, the signatures are not aggregatable (and certificates grow linearly with the quorum size).
167//!
168//! ### [scheme::bls12381_multisig]
169//!
170//! [commonware_cryptography::bls12381] is a ["digital signature scheme with aggregation properties"](https://www.ietf.org/archive/id/draft-irtf-cfrg-bls-signature-05.txt).
171//! Unlike [commonware_cryptography::ed25519], signatures from multiple participants (say the signers in a certificate) can be aggregated
172//! into a single signature (reducing bandwidth usage per broadcast). That being said, [commonware_cryptography::bls12381] is much slower
173//! to verify than [commonware_cryptography::ed25519] and isn't supported by most HSMs (a standardization effort expired in 2022).
174//!
175//! ### [scheme::secp256r1]
176//!
177//! [commonware_cryptography::secp256r1] signatures use the NIST P-256 elliptic curve (also known as prime256v1), which is widely
178//! supported by commercial HSMs and hardware security modules. Unlike [commonware_cryptography::ed25519], Secp256r1 does not
179//! benefit from batch verification, so signatures are verified individually. Certificates grow linearly with quorum size
180//! (similar to ed25519).
181//!
182//! ### [scheme::bls12381_threshold]
183//!
184//! Last but not least, [scheme::bls12381_threshold] employs threshold cryptography (specifically BLS12-381 threshold signatures
185//! with a `2f+1` of `3f+1` quorum) to generate both a bias-resistant beacon (for leader election and post-facto execution randomness)
186//! and succinct consensus certificates (any certificate can be verified with just the static public key of the consensus instance) for each view
187//! with zero message overhead (natively integrated). While powerful, this scheme requires both instantiating the shared secret
188//! via [commonware_cryptography::bls12381::dkg] and performing a resharing procedure whenever participants are added or removed.
189//!
190//! #### Embedded VRF
191//!
192//! Every `notarize(c,v)` or `nullify(v)` message includes an `attestation(v)` (a partial signature over the view `v`). After `2f+1`
193//! `notarize(c,v)` or `nullify(v)` messages are collected from unique participants, `seed(v)` can be recovered. Because `attestation(v)` is
194//! only over the view `v`, the seed derived for a given view `v` is the same regardless of whether or not a block was notarized in said
195//! view `v`.
196//!
197//! Because the value of `seed(v)` cannot be known prior to message broadcast by any participant (including the leader) in view `v`
198//! and cannot be manipulated by any participant (deterministic for any `2f+1` signers at a given view `v`), it can be used both as a beacon
199//! for leader election (where `seed(v)` determines the leader for `v+1`) and a source of randomness in execution (where `seed(v)`
200//! is used as a seed in `v`).
201//!
202//! #### Succinct Certificates
203//!
204//! All broadcast consensus messages (`notarize(c,v)`, `nullify(v)`, `finalize(c,v)`) contain attestations (partial signatures) for a static
205//! public key (derived from a group polynomial that can be recomputed during reconfiguration using [dkg](commonware_cryptography::bls12381::dkg)).
206//! As soon as `2f+1` messages are collected, a threshold signature over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)`
207//! can be recovered, respectively. Because the public key is static, any of these certificates can be verified by an external
208//! process without following the consensus instance and/or tracking the current set of participants (as is typically required
209//! to operate a lite client).
210//!
211//! These threshold signatures over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)` (i.e. the consensus certificates)
212//! can be used to secure interoperability between different consensus instances and user interactions with an infrastructure provider
213//! (where any data served can be proven to derive from some finalized block of some consensus instance with a known static public key).
214//!
215//! ## Certification
216//!
217//! After a payload is notarized, the application can optionally delay or prevent finalization via the
218//! [`CertifiableAutomaton::certify`](crate::CertifiableAutomaton::certify) method. This is particularly useful for systems that employ
219//! erasure coding, where participants may want to wait until they have received enough shards to reconstruct
220//! and validate the full block before voting to finalize.
221//!
222//! If `certify` returns `false`, the participant will not broadcast a `finalize` vote for the payload.
223//! Because finalization requires `2f+1` `finalize` votes, a payload will only be finalized if a quorum
224//! of participants certify it. By default, `certify` returns `true` for all payloads, meaning finalization
225//! proceeds immediately after notarization.
226//!
227//! _The decision returned by `certify` must be deterministic and consistent across all honest participants to ensure
228//! liveness._
229//!
230//! ## Persistence
231//!
232//! The `Voter` caches all data required to participate in consensus to avoid any disk reads on
233//! on the critical path. To enable recovery, the `Voter` writes valid messages it receives from
234//! consensus and messages it generates to a write-ahead log (WAL) implemented by [commonware_storage::journal::segmented::variable::Journal].
235//! Before sending a message, the `Journal` sync is invoked to prevent inadvertent Byzantine behavior
236//! on restart (especially in the case of unclean shutdown).
237
238pub mod elector;
239pub mod scheme;
240pub mod types;
241
242cfg_if::cfg_if! {
243    if #[cfg(not(target_arch = "wasm32"))] {
244        mod actors;
245        pub mod config;
246        pub use config::Config;
247        mod engine;
248        pub use engine::Engine;
249        mod metrics;
250    }
251}
252
253#[cfg(any(test, feature = "fuzz"))]
254pub mod mocks;
255
256use crate::types::{View, ViewDelta};
257
258/// The minimum view we are tracking both in-memory and on-disk.
259pub(crate) const fn min_active(activity_timeout: ViewDelta, last_finalized: View) -> View {
260    last_finalized.saturating_sub(activity_timeout)
261}
262
263/// Whether or not a view is interesting to us. This is a function
264/// of both `min_active` and whether or not the view is too far
265/// in the future (based on the view we are currently in).
266pub(crate) fn interesting(
267    activity_timeout: ViewDelta,
268    last_finalized: View,
269    current: View,
270    pending: View,
271    allow_future: bool,
272) -> bool {
273    // If the view is genesis, skip it, genesis doesn't have votes
274    if pending.is_zero() {
275        return false;
276    }
277    if pending < min_active(activity_timeout, last_finalized) {
278        return false;
279    }
280    if !allow_future && pending > current.next() {
281        return false;
282    }
283    true
284}
285
286/// Convenience alias for [`N3f1::quorum`].
287#[cfg(test)]
288pub(crate) fn quorum(n: u32) -> u32 {
289    use commonware_utils::{Faults, N3f1};
290
291    N3f1::quorum(n)
292}
293
294#[cfg(test)]
295mod tests {
296    use super::*;
297    use crate::{
298        simplex::{
299            elector::{Config as Elector, Random, RoundRobin},
300            mocks::twins::Strategy,
301            scheme::{
302                bls12381_multisig, bls12381_threshold, bls12381_threshold::Seedable, ed25519,
303                secp256r1, Scheme,
304            },
305            types::{
306                Certificate, Finalization as TFinalization, Finalize as TFinalize,
307                Notarization as TNotarization, Notarize as TNotarize,
308                Nullification as TNullification, Nullify as TNullify, Proposal, Vote,
309            },
310        },
311        types::{Epoch, Round},
312        Monitor, Viewable,
313    };
314    use bytes::Bytes;
315    use commonware_codec::{Decode, DecodeExt, Encode};
316    use commonware_cryptography::{
317        bls12381::primitives::variant::{MinPk, MinSig, Variant},
318        certificate::mocks::Fixture,
319        ed25519::{PrivateKey, PublicKey},
320        sha256::{Digest as Sha256Digest, Digest as D},
321        Hasher as _, Sha256, Signer as _,
322    };
323    use commonware_macros::{select, test_group, test_traced};
324    use commonware_p2p::{
325        simulated::{Config, Link, Network, Oracle, Receiver, Sender, SplitOrigin, SplitTarget},
326        Recipients, Sender as _,
327    };
328    use commonware_parallel::Sequential;
329    use commonware_runtime::{
330        buffer::PoolRef, count_running_tasks, deterministic, Clock, Metrics, Quota, Runner, Spawner,
331    };
332    use commonware_utils::{test_rng, Faults, N3f1, NZUsize, NZU16};
333    use engine::Engine;
334    use futures::{future::join_all, StreamExt};
335    use rand::{rngs::StdRng, Rng as _};
336    use std::{
337        collections::{BTreeMap, HashMap},
338        num::{NonZeroU16, NonZeroU32, NonZeroUsize},
339        sync::{Arc, Mutex},
340        time::Duration,
341    };
342    use tracing::{debug, info, warn};
343    use types::Activity;
344
345    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
346    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
347    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
348
349    #[test]
350    fn test_interesting() {
351        let activity_timeout = ViewDelta::new(10);
352
353        // Genesis view is never interesting
354        assert!(!interesting(
355            activity_timeout,
356            View::zero(),
357            View::zero(),
358            View::zero(),
359            false
360        ));
361        assert!(!interesting(
362            activity_timeout,
363            View::zero(),
364            View::new(1),
365            View::zero(),
366            true
367        ));
368
369        // View below min_active is not interesting
370        assert!(!interesting(
371            activity_timeout,
372            View::new(20),
373            View::new(25),
374            View::new(5), // below min_active (10)
375            false
376        ));
377
378        // View at min_active boundary is interesting
379        assert!(interesting(
380            activity_timeout,
381            View::new(20),
382            View::new(25),
383            View::new(10), // exactly min_active
384            false
385        ));
386
387        // Future view beyond current.next() is not interesting when allow_future is false
388        assert!(!interesting(
389            activity_timeout,
390            View::new(20),
391            View::new(25),
392            View::new(27),
393            false
394        ));
395
396        // Future view beyond current.next() is interesting when allow_future is true
397        assert!(interesting(
398            activity_timeout,
399            View::new(20),
400            View::new(25),
401            View::new(27),
402            true
403        ));
404
405        // View at current.next() is interesting
406        assert!(interesting(
407            activity_timeout,
408            View::new(20),
409            View::new(25),
410            View::new(26),
411            false
412        ));
413
414        // View within valid range is interesting
415        assert!(interesting(
416            activity_timeout,
417            View::new(20),
418            View::new(25),
419            View::new(22),
420            false
421        ));
422
423        // When last_finalized is 0 and activity_timeout would underflow
424        // min_active saturates at 0, so view 1 should still be interesting
425        assert!(interesting(
426            activity_timeout,
427            View::zero(),
428            View::new(5),
429            View::new(1),
430            false
431        ));
432    }
433
434    /// Register a validator with the oracle.
435    async fn register_validator(
436        oracle: &mut Oracle<PublicKey, deterministic::Context>,
437        validator: PublicKey,
438    ) -> (
439        (
440            Sender<PublicKey, deterministic::Context>,
441            Receiver<PublicKey>,
442        ),
443        (
444            Sender<PublicKey, deterministic::Context>,
445            Receiver<PublicKey>,
446        ),
447        (
448            Sender<PublicKey, deterministic::Context>,
449            Receiver<PublicKey>,
450        ),
451    ) {
452        let control = oracle.control(validator.clone());
453        let (vote_sender, vote_receiver) = control.register(0, TEST_QUOTA).await.unwrap();
454        let (certificate_sender, certificate_receiver) =
455            control.register(1, TEST_QUOTA).await.unwrap();
456        let (resolver_sender, resolver_receiver) = control.register(2, TEST_QUOTA).await.unwrap();
457        (
458            (vote_sender, vote_receiver),
459            (certificate_sender, certificate_receiver),
460            (resolver_sender, resolver_receiver),
461        )
462    }
463
464    /// Registers all validators using the oracle.
465    async fn register_validators(
466        oracle: &mut Oracle<PublicKey, deterministic::Context>,
467        validators: &[PublicKey],
468    ) -> HashMap<
469        PublicKey,
470        (
471            (
472                Sender<PublicKey, deterministic::Context>,
473                Receiver<PublicKey>,
474            ),
475            (
476                Sender<PublicKey, deterministic::Context>,
477                Receiver<PublicKey>,
478            ),
479            (
480                Sender<PublicKey, deterministic::Context>,
481                Receiver<PublicKey>,
482            ),
483        ),
484    > {
485        let mut registrations = HashMap::new();
486        for validator in validators.iter() {
487            let registration = register_validator(oracle, validator.clone()).await;
488            registrations.insert(validator.clone(), registration);
489        }
490        registrations
491    }
492
493    /// Enum to describe the action to take when linking validators.
494    enum Action {
495        Link(Link),
496        Update(Link), // Unlink and then link
497        Unlink,
498    }
499
500    /// Links (or unlinks) validators using the oracle.
501    ///
502    /// The `action` parameter determines the action (e.g. link, unlink) to take.
503    /// The `restrict_to` function can be used to restrict the linking to certain connections,
504    /// otherwise all validators will be linked to all other validators.
505    async fn link_validators(
506        oracle: &mut Oracle<PublicKey, deterministic::Context>,
507        validators: &[PublicKey],
508        action: Action,
509        restrict_to: Option<fn(usize, usize, usize) -> bool>,
510    ) {
511        for (i1, v1) in validators.iter().enumerate() {
512            for (i2, v2) in validators.iter().enumerate() {
513                // Ignore self
514                if v2 == v1 {
515                    continue;
516                }
517
518                // Restrict to certain connections
519                if let Some(f) = restrict_to {
520                    if !f(validators.len(), i1, i2) {
521                        continue;
522                    }
523                }
524
525                // Do any unlinking first
526                match action {
527                    Action::Update(_) | Action::Unlink => {
528                        oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
529                    }
530                    _ => {}
531                }
532
533                // Do any linking after
534                match action {
535                    Action::Link(ref link) | Action::Update(ref link) => {
536                        oracle
537                            .add_link(v1.clone(), v2.clone(), link.clone())
538                            .await
539                            .unwrap();
540                    }
541                    _ => {}
542                }
543            }
544        }
545    }
546
547    fn all_online<S, F, L>(mut fixture: F)
548    where
549        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
550        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
551        L: Elector<S>,
552    {
553        // Create context
554        let n = 5;
555        let quorum = quorum(n) as usize;
556        let required_containers = View::new(100);
557        let activity_timeout = ViewDelta::new(10);
558        let skip_timeout = ViewDelta::new(5);
559        let namespace = b"consensus".to_vec();
560        let executor = deterministic::Runner::timed(Duration::from_secs(300));
561        executor.start(|mut context| async move {
562            // Create simulated network
563            let (network, mut oracle) = Network::new(
564                context.with_label("network"),
565                Config {
566                    max_size: 1024 * 1024,
567                    disconnect_on_block: true,
568                    tracked_peer_sets: None,
569                },
570            );
571
572            // Start network
573            network.start();
574
575            // Register participants
576            let Fixture {
577                participants,
578                schemes,
579                ..
580            } = fixture(&mut context, &namespace, n);
581            let mut registrations = register_validators(&mut oracle, &participants).await;
582
583            // Link all validators
584            let link = Link {
585                latency: Duration::from_millis(10),
586                jitter: Duration::from_millis(1),
587                success_rate: 1.0,
588            };
589            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
590
591            // Create engines
592            let elector = L::default();
593            let relay = Arc::new(mocks::relay::Relay::new());
594            let mut reporters = Vec::new();
595            let mut engine_handlers = Vec::new();
596            for (idx, validator) in participants.iter().enumerate() {
597                // Create scheme context
598                let context = context.with_label(&format!("validator_{}", *validator));
599
600                // Configure engine
601                let reporter_config = mocks::reporter::Config {
602                    participants: participants.clone().try_into().unwrap(),
603                    scheme: schemes[idx].clone(),
604                    elector: elector.clone(),
605                };
606                let reporter =
607                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
608                reporters.push(reporter.clone());
609                let application_cfg = mocks::application::Config {
610                    hasher: Sha256::default(),
611                    relay: relay.clone(),
612                    me: validator.clone(),
613                    propose_latency: (10.0, 5.0),
614                    verify_latency: (10.0, 5.0),
615                    certify_latency: (10.0, 5.0),
616                    should_certify: mocks::application::Certifier::Sometimes,
617                };
618                let (actor, application) = mocks::application::Application::new(
619                    context.with_label("application"),
620                    application_cfg,
621                );
622                actor.start();
623                let blocker = oracle.control(validator.clone());
624                let cfg = config::Config {
625                    scheme: schemes[idx].clone(),
626                    elector: elector.clone(),
627                    blocker,
628                    automaton: application.clone(),
629                    relay: application.clone(),
630                    reporter: reporter.clone(),
631                    strategy: Sequential,
632                    partition: validator.to_string(),
633                    mailbox_size: 1024,
634                    epoch: Epoch::new(333),
635                    leader_timeout: Duration::from_secs(1),
636                    notarization_timeout: Duration::from_secs(2),
637                    nullify_retry: Duration::from_secs(10),
638                    fetch_timeout: Duration::from_secs(1),
639                    activity_timeout,
640                    skip_timeout,
641                    fetch_concurrent: 4,
642                    replay_buffer: NZUsize!(1024 * 1024),
643                    write_buffer: NZUsize!(1024 * 1024),
644                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
645                };
646                let engine = Engine::new(context.with_label("engine"), cfg);
647
648                // Start engine
649                let (pending, recovered, resolver) = registrations
650                    .remove(validator)
651                    .expect("validator should be registered");
652                engine_handlers.push(engine.start(pending, recovered, resolver));
653            }
654
655            // Wait for all engines to finish
656            let mut finalizers = Vec::new();
657            for reporter in reporters.iter_mut() {
658                let (mut latest, mut monitor) = reporter.subscribe().await;
659                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
660                    while latest < required_containers {
661                        latest = monitor.next().await.expect("event missing");
662                    }
663                }));
664            }
665            join_all(finalizers).await;
666
667            // Check reporters for correct activity
668            let latest_complete = required_containers.saturating_sub(activity_timeout);
669            for reporter in reporters.iter() {
670                // Ensure no faults
671                {
672                    let faults = reporter.faults.lock().unwrap();
673                    assert!(faults.is_empty());
674                }
675
676                // Ensure no invalid signatures
677                {
678                    let invalid = reporter.invalid.lock().unwrap();
679                    assert_eq!(*invalid, 0);
680                }
681
682                // Ensure certificates for all views
683                {
684                    let certified = reporter.certified.lock().unwrap();
685                    for view in View::range(View::new(1), latest_complete) {
686                        // Ensure certificate for every view
687                        if !certified.contains(&view) {
688                            panic!("view: {view}");
689                        }
690                    }
691                }
692
693                // Ensure no forks
694                let mut notarized = HashMap::new();
695                let mut finalized = HashMap::new();
696                {
697                    let notarizes = reporter.notarizes.lock().unwrap();
698                    for view in View::range(View::new(1), latest_complete) {
699                        // Ensure only one payload proposed per view
700                        let Some(payloads) = notarizes.get(&view) else {
701                            continue;
702                        };
703                        if payloads.len() > 1 {
704                            panic!("view: {view}");
705                        }
706                        let (digest, notarizers) = payloads.iter().next().unwrap();
707                        notarized.insert(view, *digest);
708
709                        if notarizers.len() < quorum {
710                            // We can't verify that everyone participated at every view because some nodes may
711                            // have started later.
712                            panic!("view: {view}");
713                        }
714                    }
715                }
716                {
717                    let notarizations = reporter.notarizations.lock().unwrap();
718                    for view in View::range(View::new(1), latest_complete) {
719                        // Ensure notarization matches digest from notarizes
720                        let Some(notarization) = notarizations.get(&view) else {
721                            continue;
722                        };
723                        let Some(digest) = notarized.get(&view) else {
724                            continue;
725                        };
726                        assert_eq!(&notarization.proposal.payload, digest);
727                    }
728                }
729                {
730                    let finalizes = reporter.finalizes.lock().unwrap();
731                    for view in View::range(View::new(1), latest_complete) {
732                        // Ensure only one payload proposed per view
733                        let Some(payloads) = finalizes.get(&view) else {
734                            continue;
735                        };
736                        if payloads.len() > 1 {
737                            panic!("view: {view}");
738                        }
739                        let (digest, finalizers) = payloads.iter().next().unwrap();
740                        finalized.insert(view, *digest);
741
742                        // Only check at views below timeout
743                        if view > latest_complete {
744                            continue;
745                        }
746
747                        // Ensure everyone participating
748                        if finalizers.len() < quorum {
749                            // We can't verify that everyone participated at every view because some nodes may
750                            // have started later.
751                            panic!("view: {view}");
752                        }
753
754                        // Ensure no nullifies for any finalizers
755                        let nullifies = reporter.nullifies.lock().unwrap();
756                        let Some(nullifies) = nullifies.get(&view) else {
757                            continue;
758                        };
759                        for (_, finalizers) in payloads.iter() {
760                            for finalizer in finalizers.iter() {
761                                if nullifies.contains(finalizer) {
762                                    panic!("should not nullify and finalize at same view");
763                                }
764                            }
765                        }
766                    }
767                }
768                {
769                    let finalizations = reporter.finalizations.lock().unwrap();
770                    for view in View::range(View::new(1), latest_complete) {
771                        // Ensure finalization matches digest from finalizes
772                        let Some(finalization) = finalizations.get(&view) else {
773                            continue;
774                        };
775                        let Some(digest) = finalized.get(&view) else {
776                            continue;
777                        };
778                        assert_eq!(&finalization.proposal.payload, digest);
779                    }
780                }
781            }
782
783            // Ensure no blocked connections
784            let blocked = oracle.blocked().await.unwrap();
785            assert!(blocked.is_empty());
786        });
787    }
788
789    #[test_traced]
790    fn test_all_online() {
791        all_online::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
792        all_online::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
793        all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
794        all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
795        all_online::<_, _, RoundRobin>(ed25519::fixture);
796        all_online::<_, _, RoundRobin>(secp256r1::fixture);
797    }
798
799    fn observer<S, F, L>(mut fixture: F)
800    where
801        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
802        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
803        L: Elector<S>,
804    {
805        // Create context
806        let n_active = 5;
807        let required_containers = View::new(100);
808        let activity_timeout = ViewDelta::new(10);
809        let skip_timeout = ViewDelta::new(5);
810        let namespace = b"consensus".to_vec();
811        let executor = deterministic::Runner::timed(Duration::from_secs(300));
812        executor.start(|mut context| async move {
813            // Create simulated network
814            let (network, mut oracle) = Network::new(
815                context.with_label("network"),
816                Config {
817                    max_size: 1024 * 1024,
818                    disconnect_on_block: true,
819                    tracked_peer_sets: None,
820                },
821            );
822
823            // Start network
824            network.start();
825
826            // Register participants (active)
827            let Fixture {
828                participants,
829                schemes,
830                verifier,
831                ..
832            } = fixture(&mut context, &namespace, n_active);
833
834            // Add observer (no share)
835            let private_key_observer = PrivateKey::from_seed(n_active as u64);
836            let public_key_observer = private_key_observer.public_key();
837
838            // Register all (including observer) with the network
839            let mut all_validators = participants.clone();
840            all_validators.push(public_key_observer.clone());
841            all_validators.sort();
842            let mut registrations = register_validators(&mut oracle, &all_validators).await;
843
844            // Link all peers (including observer)
845            let link = Link {
846                latency: Duration::from_millis(10),
847                jitter: Duration::from_millis(1),
848                success_rate: 1.0,
849            };
850            link_validators(&mut oracle, &all_validators, Action::Link(link), None).await;
851
852            // Create engines
853            let elector = L::default();
854            let relay = Arc::new(mocks::relay::Relay::new());
855            let mut reporters = Vec::new();
856
857            for (idx, validator) in participants.iter().enumerate() {
858                let is_observer = *validator == public_key_observer;
859
860                // Create scheme context
861                let context = context.with_label(&format!("validator_{}", *validator));
862
863                // Configure engine
864                let signing = if is_observer {
865                    verifier.clone()
866                } else {
867                    schemes[idx].clone()
868                };
869                let reporter_config = mocks::reporter::Config {
870                    participants: participants.clone().try_into().unwrap(),
871                    scheme: signing.clone(),
872                    elector: elector.clone(),
873                };
874                let reporter =
875                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
876                reporters.push(reporter.clone());
877                let application_cfg = mocks::application::Config {
878                    hasher: Sha256::default(),
879                    relay: relay.clone(),
880                    me: validator.clone(),
881                    propose_latency: (10.0, 5.0),
882                    verify_latency: (10.0, 5.0),
883                    certify_latency: (10.0, 5.0),
884                    should_certify: mocks::application::Certifier::Sometimes,
885                };
886                let (actor, application) = mocks::application::Application::new(
887                    context.with_label("application"),
888                    application_cfg,
889                );
890                actor.start();
891                let blocker = oracle.control(validator.clone());
892                let cfg = config::Config {
893                    scheme: signing.clone(),
894                    elector: elector.clone(),
895                    blocker,
896                    automaton: application.clone(),
897                    relay: application.clone(),
898                    reporter: reporter.clone(),
899                    strategy: Sequential,
900                    partition: validator.to_string(),
901                    mailbox_size: 1024,
902                    epoch: Epoch::new(333),
903                    leader_timeout: Duration::from_secs(1),
904                    notarization_timeout: Duration::from_secs(2),
905                    nullify_retry: Duration::from_secs(10),
906                    fetch_timeout: Duration::from_secs(1),
907                    activity_timeout,
908                    skip_timeout,
909                    fetch_concurrent: 4,
910                    replay_buffer: NZUsize!(1024 * 1024),
911                    write_buffer: NZUsize!(1024 * 1024),
912                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
913                };
914                let engine = Engine::new(context.with_label("engine"), cfg);
915
916                // Start engine
917                let (pending, recovered, resolver) = registrations
918                    .remove(validator)
919                    .expect("validator should be registered");
920                engine.start(pending, recovered, resolver);
921            }
922
923            // Wait for all  engines to finish
924            let mut finalizers = Vec::new();
925            for reporter in reporters.iter_mut() {
926                let (mut latest, mut monitor) = reporter.subscribe().await;
927                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
928                    while latest < required_containers {
929                        latest = monitor.next().await.expect("event missing");
930                    }
931                }));
932            }
933            join_all(finalizers).await;
934
935            // Sanity check
936            for reporter in reporters.iter() {
937                // Ensure no faults or invalid signatures
938                {
939                    let faults = reporter.faults.lock().unwrap();
940                    assert!(faults.is_empty());
941                }
942                {
943                    let invalid = reporter.invalid.lock().unwrap();
944                    assert_eq!(*invalid, 0);
945                }
946
947                // Ensure no blocked connections
948                let blocked = oracle.blocked().await.unwrap();
949                assert!(blocked.is_empty());
950            }
951        });
952    }
953
954    #[test_traced]
955    fn test_observer() {
956        observer::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
957        observer::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
958        observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
959        observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
960        observer::<_, _, RoundRobin>(ed25519::fixture);
961        observer::<_, _, RoundRobin>(secp256r1::fixture);
962    }
963
964    fn unclean_shutdown<S, F, L>(mut fixture: F)
965    where
966        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
967        F: FnMut(&mut StdRng, &[u8], u32) -> Fixture<S>,
968        L: Elector<S>,
969    {
970        // Create context
971        let n = 5;
972        let required_containers = View::new(100);
973        let activity_timeout = ViewDelta::new(10);
974        let skip_timeout = ViewDelta::new(5);
975        let namespace = b"consensus".to_vec();
976
977        // Random restarts every x seconds
978        let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
979        let supervised = Arc::new(Mutex::new(Vec::new()));
980        let mut prev_checkpoint = None;
981
982        // Create validator keys
983        let mut rng = test_rng();
984        let Fixture {
985            participants,
986            schemes,
987            ..
988        } = fixture(&mut rng, &namespace, n);
989
990        // Create block relay, shared across restarts.
991        let relay = Arc::new(mocks::relay::Relay::<Sha256Digest, S::PublicKey>::new());
992
993        loop {
994            let rng = rng.clone();
995            let participants = participants.clone();
996            let schemes = schemes.clone();
997            let shutdowns = shutdowns.clone();
998            let supervised = supervised.clone();
999            let relay = relay.clone();
1000            relay.deregister_all(); // Clear all recipients from previous restart.
1001
1002            let f = |mut context: deterministic::Context| async move {
1003                // Create simulated network
1004                let (network, mut oracle) = Network::new(
1005                    context.with_label("network"),
1006                    Config {
1007                        max_size: 1024 * 1024,
1008                        disconnect_on_block: true,
1009                        tracked_peer_sets: None,
1010                    },
1011                );
1012
1013                // Start network
1014                network.start();
1015
1016                // Register participants
1017                let mut registrations = register_validators(&mut oracle, &participants).await;
1018
1019                // Link all validators
1020                let link = Link {
1021                    latency: Duration::from_millis(50),
1022                    jitter: Duration::from_millis(50),
1023                    success_rate: 1.0,
1024                };
1025                link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1026
1027                // Create engines
1028                let elector = L::default();
1029                let relay = Arc::new(mocks::relay::Relay::new());
1030                let mut reporters = HashMap::new();
1031                let mut engine_handlers = Vec::new();
1032                for (idx, validator) in participants.iter().enumerate() {
1033                    // Create scheme context
1034                    let context = context.with_label(&format!("validator_{}", *validator));
1035
1036                    // Configure engine
1037                    let reporter_config = mocks::reporter::Config {
1038                        participants: participants.clone().try_into().unwrap(),
1039                        scheme: schemes[idx].clone(),
1040                        elector: elector.clone(),
1041                    };
1042                    let reporter = mocks::reporter::Reporter::new(rng.clone(), reporter_config);
1043                    reporters.insert(validator.clone(), reporter.clone());
1044                    let application_cfg = mocks::application::Config {
1045                        hasher: Sha256::default(),
1046                        relay: relay.clone(),
1047                        me: validator.clone(),
1048                        propose_latency: (10.0, 5.0),
1049                        verify_latency: (10.0, 5.0),
1050                        certify_latency: (10.0, 5.0),
1051                        should_certify: mocks::application::Certifier::Sometimes,
1052                    };
1053                    let (actor, application) = mocks::application::Application::new(
1054                        context.with_label("application"),
1055                        application_cfg,
1056                    );
1057                    actor.start();
1058                    let blocker = oracle.control(validator.clone());
1059                    let cfg = config::Config {
1060                        scheme: schemes[idx].clone(),
1061                        elector: elector.clone(),
1062                        blocker,
1063                        automaton: application.clone(),
1064                        relay: application.clone(),
1065                        reporter: reporter.clone(),
1066                        strategy: Sequential,
1067                        partition: validator.to_string(),
1068                        mailbox_size: 1024,
1069                        epoch: Epoch::new(333),
1070                        leader_timeout: Duration::from_secs(1),
1071                        notarization_timeout: Duration::from_secs(2),
1072                        nullify_retry: Duration::from_secs(10),
1073                        fetch_timeout: Duration::from_secs(1),
1074                        activity_timeout,
1075                        skip_timeout,
1076                        fetch_concurrent: 4,
1077                        replay_buffer: NZUsize!(1024 * 1024),
1078                        write_buffer: NZUsize!(1024 * 1024),
1079                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1080                    };
1081                    let engine = Engine::new(context.with_label("engine"), cfg);
1082
1083                    // Start engine
1084                    let (pending, recovered, resolver) = registrations
1085                        .remove(validator)
1086                        .expect("validator should be registered");
1087                    engine_handlers.push(engine.start(pending, recovered, resolver));
1088                }
1089
1090                // Store all finalizer handles
1091                let mut finalizers = Vec::new();
1092                for (_, reporter) in reporters.iter_mut() {
1093                    let (mut latest, mut monitor) = reporter.subscribe().await;
1094                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1095                        while latest < required_containers {
1096                            latest = monitor.next().await.expect("event missing");
1097                        }
1098                    }));
1099                }
1100
1101                // Exit at random points for unclean shutdown of entire set
1102                let wait =
1103                    context.gen_range(Duration::from_millis(100)..Duration::from_millis(2_000));
1104                let result = select! {
1105                    _ = context.sleep(wait) => {
1106                        // Collect reporters to check faults
1107                        {
1108                            let mut shutdowns = shutdowns.lock().unwrap();
1109                            debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
1110                            *shutdowns += 1;
1111                        }
1112                        supervised.lock().unwrap().push(reporters);
1113                        false
1114                    },
1115                    _ = join_all(finalizers) => {
1116                        // Check reporters for faults activity
1117                        let supervised = supervised.lock().unwrap();
1118                        for reporters in supervised.iter() {
1119                            for (_, reporter) in reporters.iter() {
1120                                let faults = reporter.faults.lock().unwrap();
1121                                assert!(faults.is_empty());
1122                            }
1123                        }
1124                        true
1125                    }
1126                };
1127
1128                // Ensure no blocked connections
1129                let blocked = oracle.blocked().await.unwrap();
1130                assert!(blocked.is_empty());
1131
1132                result
1133            };
1134
1135            let (complete, checkpoint) = prev_checkpoint
1136                .map_or_else(
1137                    || deterministic::Runner::timed(Duration::from_secs(180)),
1138                    deterministic::Runner::from,
1139                )
1140                .start_and_recover(f);
1141
1142            // Check if we should exit
1143            if complete {
1144                break;
1145            }
1146
1147            prev_checkpoint = Some(checkpoint);
1148        }
1149    }
1150
1151    #[test_group("slow")]
1152    #[test_traced]
1153    fn test_unclean_shutdown() {
1154        unclean_shutdown::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
1155        unclean_shutdown::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
1156        unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1157        unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1158        unclean_shutdown::<_, _, RoundRobin>(ed25519::fixture);
1159        unclean_shutdown::<_, _, RoundRobin>(secp256r1::fixture);
1160    }
1161
1162    fn backfill<S, F, L>(mut fixture: F)
1163    where
1164        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1165        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1166        L: Elector<S>,
1167    {
1168        // Create context
1169        let n = 4;
1170        let required_containers = View::new(100);
1171        let activity_timeout = ViewDelta::new(10);
1172        let skip_timeout = ViewDelta::new(5);
1173        let namespace = b"consensus".to_vec();
1174        let executor = deterministic::Runner::timed(Duration::from_secs(240));
1175        executor.start(|mut context| async move {
1176            // Create simulated network
1177            let (network, mut oracle) = Network::new(
1178                context.with_label("network"),
1179                Config {
1180                    max_size: 1024 * 1024,
1181                    disconnect_on_block: true,
1182                    tracked_peer_sets: None,
1183                },
1184            );
1185
1186            // Start network
1187            network.start();
1188
1189            // Register participants
1190            let Fixture {
1191                participants,
1192                schemes,
1193                ..
1194            } = fixture(&mut context, &namespace, n);
1195            let mut registrations = register_validators(&mut oracle, &participants).await;
1196
1197            // Link all validators except first
1198            let link = Link {
1199                latency: Duration::from_millis(10),
1200                jitter: Duration::from_millis(1),
1201                success_rate: 1.0,
1202            };
1203            link_validators(
1204                &mut oracle,
1205                &participants,
1206                Action::Link(link),
1207                Some(|_, i, j| ![i, j].contains(&0usize)),
1208            )
1209            .await;
1210
1211            // Create engines
1212            let elector = L::default();
1213            let relay = Arc::new(mocks::relay::Relay::new());
1214            let mut reporters = Vec::new();
1215            let mut engine_handlers = Vec::new();
1216            for (idx_scheme, validator) in participants.iter().enumerate() {
1217                // Skip first peer
1218                if idx_scheme == 0 {
1219                    continue;
1220                }
1221
1222                // Create scheme context
1223                let context = context.with_label(&format!("validator_{}", *validator));
1224
1225                // Configure engine
1226                let reporter_config = mocks::reporter::Config {
1227                    participants: participants.clone().try_into().unwrap(),
1228                    scheme: schemes[idx_scheme].clone(),
1229                    elector: elector.clone(),
1230                };
1231                let reporter =
1232                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1233                reporters.push(reporter.clone());
1234                let application_cfg = mocks::application::Config {
1235                    hasher: Sha256::default(),
1236                    relay: relay.clone(),
1237                    me: validator.clone(),
1238                    propose_latency: (10.0, 5.0),
1239                    verify_latency: (10.0, 5.0),
1240                    certify_latency: (10.0, 5.0),
1241                    should_certify: mocks::application::Certifier::Sometimes,
1242                };
1243                let (actor, application) = mocks::application::Application::new(
1244                    context.with_label("application"),
1245                    application_cfg,
1246                );
1247                actor.start();
1248                let blocker = oracle.control(validator.clone());
1249                let cfg = config::Config {
1250                    scheme: schemes[idx_scheme].clone(),
1251                    elector: elector.clone(),
1252                    blocker,
1253                    automaton: application.clone(),
1254                    relay: application.clone(),
1255                    reporter: reporter.clone(),
1256                    strategy: Sequential,
1257                    partition: validator.to_string(),
1258                    mailbox_size: 1024,
1259                    epoch: Epoch::new(333),
1260                    leader_timeout: Duration::from_secs(1),
1261                    notarization_timeout: Duration::from_secs(2),
1262                    nullify_retry: Duration::from_secs(10),
1263                    fetch_timeout: Duration::from_secs(1),
1264                    activity_timeout,
1265                    skip_timeout,
1266                    fetch_concurrent: 4,
1267                    replay_buffer: NZUsize!(1024 * 1024),
1268                    write_buffer: NZUsize!(1024 * 1024),
1269                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1270                };
1271                let engine = Engine::new(context.with_label("engine"), cfg);
1272
1273                // Start engine
1274                let (pending, recovered, resolver) = registrations
1275                    .remove(validator)
1276                    .expect("validator should be registered");
1277                engine_handlers.push(engine.start(pending, recovered, resolver));
1278            }
1279
1280            // Wait for all engines to finish
1281            let mut finalizers = Vec::new();
1282            for reporter in reporters.iter_mut() {
1283                let (mut latest, mut monitor) = reporter.subscribe().await;
1284                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1285                    while latest < required_containers {
1286                        latest = monitor.next().await.expect("event missing");
1287                    }
1288                }));
1289            }
1290            join_all(finalizers).await;
1291
1292            // Degrade network connections for online peers
1293            let link = Link {
1294                latency: Duration::from_secs(3),
1295                jitter: Duration::from_millis(0),
1296                success_rate: 1.0,
1297            };
1298            link_validators(
1299                &mut oracle,
1300                &participants,
1301                Action::Update(link.clone()),
1302                Some(|_, i, j| ![i, j].contains(&0usize)),
1303            )
1304            .await;
1305
1306            // Wait for nullifications to accrue
1307            context.sleep(Duration::from_secs(60)).await;
1308
1309            // Unlink second peer from all (except first)
1310            link_validators(
1311                &mut oracle,
1312                &participants,
1313                Action::Unlink,
1314                Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
1315            )
1316            .await;
1317
1318            // Configure engine for first peer
1319            let me = participants[0].clone();
1320            let context = context.with_label(&format!("validator_{me}"));
1321
1322            // Link first peer to all (except second)
1323            link_validators(
1324                &mut oracle,
1325                &participants,
1326                Action::Link(link),
1327                Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
1328            )
1329            .await;
1330
1331            // Restore network connections for all online peers
1332            let link = Link {
1333                latency: Duration::from_millis(10),
1334                jitter: Duration::from_millis(3),
1335                success_rate: 1.0,
1336            };
1337            link_validators(
1338                &mut oracle,
1339                &participants,
1340                Action::Update(link),
1341                Some(|_, i, j| ![i, j].contains(&1usize)),
1342            )
1343            .await;
1344
1345            // Configure engine
1346            let reporter_config = mocks::reporter::Config {
1347                participants: participants.clone().try_into().unwrap(),
1348                scheme: schemes[0].clone(),
1349                elector: elector.clone(),
1350            };
1351            let mut reporter =
1352                mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1353            reporters.push(reporter.clone());
1354            let application_cfg = mocks::application::Config {
1355                hasher: Sha256::default(),
1356                relay: relay.clone(),
1357                me: me.clone(),
1358                propose_latency: (10.0, 5.0),
1359                verify_latency: (10.0, 5.0),
1360                certify_latency: (10.0, 5.0),
1361                should_certify: mocks::application::Certifier::Sometimes,
1362            };
1363            let (actor, application) = mocks::application::Application::new(
1364                context.with_label("application"),
1365                application_cfg,
1366            );
1367            actor.start();
1368            let blocker = oracle.control(me.clone());
1369            let cfg = config::Config {
1370                scheme: schemes[0].clone(),
1371                elector: elector.clone(),
1372                blocker,
1373                automaton: application.clone(),
1374                relay: application.clone(),
1375                reporter: reporter.clone(),
1376                strategy: Sequential,
1377                partition: me.to_string(),
1378                mailbox_size: 1024,
1379                epoch: Epoch::new(333),
1380                leader_timeout: Duration::from_secs(1),
1381                notarization_timeout: Duration::from_secs(2),
1382                nullify_retry: Duration::from_secs(10),
1383                fetch_timeout: Duration::from_secs(1),
1384                activity_timeout,
1385                skip_timeout,
1386                fetch_concurrent: 4,
1387                replay_buffer: NZUsize!(1024 * 1024),
1388                write_buffer: NZUsize!(1024 * 1024),
1389                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1390            };
1391            let engine = Engine::new(context.with_label("engine"), cfg);
1392
1393            // Start engine
1394            let (pending, recovered, resolver) = registrations
1395                .remove(&me)
1396                .expect("validator should be registered");
1397            engine_handlers.push(engine.start(pending, recovered, resolver));
1398
1399            // Wait for new engine to finalize required
1400            let (mut latest, mut monitor) = reporter.subscribe().await;
1401            while latest < required_containers {
1402                latest = monitor.next().await.expect("event missing");
1403            }
1404
1405            // Ensure no blocked connections
1406            let blocked = oracle.blocked().await.unwrap();
1407            assert!(blocked.is_empty());
1408        });
1409    }
1410
1411    #[test_traced]
1412    fn test_backfill() {
1413        backfill::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
1414        backfill::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
1415        backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1416        backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1417        backfill::<_, _, RoundRobin>(ed25519::fixture);
1418        backfill::<_, _, RoundRobin>(secp256r1::fixture);
1419    }
1420
1421    fn one_offline<S, F, L>(mut fixture: F)
1422    where
1423        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1424        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1425        L: Elector<S>,
1426    {
1427        // Create context
1428        let n = 5;
1429        let quorum = quorum(n) as usize;
1430        let required_containers = View::new(100);
1431        let activity_timeout = ViewDelta::new(10);
1432        let skip_timeout = ViewDelta::new(5);
1433        let max_exceptions = 10;
1434        let namespace = b"consensus".to_vec();
1435        let executor = deterministic::Runner::timed(Duration::from_secs(300));
1436        executor.start(|mut context| async move {
1437            // Create simulated network
1438            let (network, mut oracle) = Network::new(
1439                context.with_label("network"),
1440                Config {
1441                    max_size: 1024 * 1024,
1442                    disconnect_on_block: true,
1443                    tracked_peer_sets: None,
1444                },
1445            );
1446
1447            // Start network
1448            network.start();
1449
1450            // Register participants
1451            let Fixture {
1452                participants,
1453                schemes,
1454                ..
1455            } = fixture(&mut context, &namespace, n);
1456            let mut registrations = register_validators(&mut oracle, &participants).await;
1457
1458            // Link all validators except first
1459            let link = Link {
1460                latency: Duration::from_millis(10),
1461                jitter: Duration::from_millis(1),
1462                success_rate: 1.0,
1463            };
1464            link_validators(
1465                &mut oracle,
1466                &participants,
1467                Action::Link(link),
1468                Some(|_, i, j| ![i, j].contains(&0usize)),
1469            )
1470            .await;
1471
1472            // Create engines
1473            let elector = L::default();
1474            let relay = Arc::new(mocks::relay::Relay::new());
1475            let mut reporters = Vec::new();
1476            let mut engine_handlers = Vec::new();
1477            for (idx_scheme, validator) in participants.iter().enumerate() {
1478                // Skip first peer
1479                if idx_scheme == 0 {
1480                    continue;
1481                }
1482
1483                // Create scheme context
1484                let context = context.with_label(&format!("validator_{}", *validator));
1485
1486                // Configure engine
1487                let reporter_config = mocks::reporter::Config {
1488                    participants: participants.clone().try_into().unwrap(),
1489                    scheme: schemes[idx_scheme].clone(),
1490                    elector: elector.clone(),
1491                };
1492                let reporter =
1493                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1494                reporters.push(reporter.clone());
1495                let application_cfg = mocks::application::Config {
1496                    hasher: Sha256::default(),
1497                    relay: relay.clone(),
1498                    me: validator.clone(),
1499                    propose_latency: (10.0, 5.0),
1500                    verify_latency: (10.0, 5.0),
1501                    certify_latency: (10.0, 5.0),
1502                    should_certify: mocks::application::Certifier::Sometimes,
1503                };
1504                let (actor, application) = mocks::application::Application::new(
1505                    context.with_label("application"),
1506                    application_cfg,
1507                );
1508                actor.start();
1509                let blocker = oracle.control(validator.clone());
1510                let cfg = config::Config {
1511                    scheme: schemes[idx_scheme].clone(),
1512                    elector: elector.clone(),
1513                    blocker,
1514                    automaton: application.clone(),
1515                    relay: application.clone(),
1516                    reporter: reporter.clone(),
1517                    strategy: Sequential,
1518                    partition: validator.to_string(),
1519                    mailbox_size: 1024,
1520                    epoch: Epoch::new(333),
1521                    leader_timeout: Duration::from_secs(1),
1522                    notarization_timeout: Duration::from_secs(2),
1523                    nullify_retry: Duration::from_secs(10),
1524                    fetch_timeout: Duration::from_secs(1),
1525                    activity_timeout,
1526                    skip_timeout,
1527                    fetch_concurrent: 4,
1528                    replay_buffer: NZUsize!(1024 * 1024),
1529                    write_buffer: NZUsize!(1024 * 1024),
1530                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1531                };
1532                let engine = Engine::new(context.with_label("engine"), cfg);
1533
1534                // Start engine
1535                let (pending, recovered, resolver) = registrations
1536                    .remove(validator)
1537                    .expect("validator should be registered");
1538                engine_handlers.push(engine.start(pending, recovered, resolver));
1539            }
1540
1541            // Wait for all engines to finish
1542            let mut finalizers = Vec::new();
1543            for reporter in reporters.iter_mut() {
1544                let (mut latest, mut monitor) = reporter.subscribe().await;
1545                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1546                    while latest < required_containers {
1547                        latest = monitor.next().await.expect("event missing");
1548                    }
1549                }));
1550            }
1551            join_all(finalizers).await;
1552
1553            // Check reporters for correct activity
1554            let exceptions = 0;
1555            let offline = &participants[0];
1556            for reporter in reporters.iter() {
1557                // Ensure no faults
1558                {
1559                    let faults = reporter.faults.lock().unwrap();
1560                    assert!(faults.is_empty());
1561                }
1562
1563                // Ensure no invalid signatures
1564                {
1565                    let invalid = reporter.invalid.lock().unwrap();
1566                    assert_eq!(*invalid, 0);
1567                }
1568
1569                // Ensure offline node is never active
1570                let mut exceptions = 0;
1571                {
1572                    let notarizes = reporter.notarizes.lock().unwrap();
1573                    for (view, payloads) in notarizes.iter() {
1574                        for (_, participants) in payloads.iter() {
1575                            if participants.contains(offline) {
1576                                panic!("view: {view}");
1577                            }
1578                        }
1579                    }
1580                }
1581                {
1582                    let nullifies = reporter.nullifies.lock().unwrap();
1583                    for (view, participants) in nullifies.iter() {
1584                        if participants.contains(offline) {
1585                            panic!("view: {view}");
1586                        }
1587                    }
1588                }
1589                {
1590                    let finalizes = reporter.finalizes.lock().unwrap();
1591                    for (view, payloads) in finalizes.iter() {
1592                        for (_, finalizers) in payloads.iter() {
1593                            if finalizers.contains(offline) {
1594                                panic!("view: {view}");
1595                            }
1596                        }
1597                    }
1598                }
1599
1600                // Identify offline views
1601                let mut offline_views = Vec::new();
1602                {
1603                    let leaders = reporter.leaders.lock().unwrap();
1604                    for (view, leader) in leaders.iter() {
1605                        if leader == offline {
1606                            offline_views.push(*view);
1607                        }
1608                    }
1609                }
1610                assert!(!offline_views.is_empty());
1611
1612                // Ensure nullifies/nullification collected for offline node
1613                {
1614                    let nullifies = reporter.nullifies.lock().unwrap();
1615                    for view in offline_views.iter() {
1616                        let nullifies = nullifies.get(view).map_or(0, |n| n.len());
1617                        if nullifies < quorum {
1618                            warn!("missing expected view nullifies: {}", view);
1619                            exceptions += 1;
1620                        }
1621                    }
1622                }
1623                {
1624                    let nullifications = reporter.nullifications.lock().unwrap();
1625                    for view in offline_views.iter() {
1626                        if !nullifications.contains_key(view) {
1627                            warn!("missing expected view nullifies: {}", view);
1628                            exceptions += 1;
1629                        }
1630                    }
1631                }
1632
1633                // Ensure exceptions within allowed
1634                assert!(exceptions <= max_exceptions);
1635            }
1636            assert!(exceptions <= max_exceptions);
1637
1638            // Ensure no blocked connections
1639            let blocked = oracle.blocked().await.unwrap();
1640            assert!(blocked.is_empty());
1641
1642            // Ensure we are skipping views
1643            let encoded = context.encode();
1644            let lines = encoded.lines();
1645            let mut skipped_views = 0;
1646            let mut nodes_skipping = 0;
1647            for line in lines {
1648                if line.contains("_skipped_views_total") {
1649                    let parts: Vec<&str> = line.split_whitespace().collect();
1650                    if let Some(number_str) = parts.last() {
1651                        if let Ok(number) = number_str.parse::<u64>() {
1652                            if number > 0 {
1653                                nodes_skipping += 1;
1654                            }
1655                            if number > skipped_views {
1656                                skipped_views = number;
1657                            }
1658                        }
1659                    }
1660                }
1661            }
1662            assert!(
1663                skipped_views > 0,
1664                "expected skipped views to be greater than 0"
1665            );
1666            assert_eq!(
1667                nodes_skipping,
1668                n - 1,
1669                "expected all online nodes to be skipping views"
1670            );
1671        });
1672    }
1673
1674    #[test_traced]
1675    fn test_one_offline() {
1676        one_offline::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
1677        one_offline::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
1678        one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1679        one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1680        one_offline::<_, _, RoundRobin>(ed25519::fixture);
1681        one_offline::<_, _, RoundRobin>(secp256r1::fixture);
1682    }
1683
1684    fn slow_validator<S, F, L>(mut fixture: F)
1685    where
1686        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1687        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1688        L: Elector<S>,
1689    {
1690        // Create context
1691        let n = 5;
1692        let required_containers = View::new(50);
1693        let activity_timeout = ViewDelta::new(10);
1694        let skip_timeout = ViewDelta::new(5);
1695        let namespace = b"consensus".to_vec();
1696        let executor = deterministic::Runner::timed(Duration::from_secs(300));
1697        executor.start(|mut context| async move {
1698            // Create simulated network
1699            let (network, mut oracle) = Network::new(
1700                context.with_label("network"),
1701                Config {
1702                    max_size: 1024 * 1024,
1703                    disconnect_on_block: true,
1704                    tracked_peer_sets: None,
1705                },
1706            );
1707
1708            // Start network
1709            network.start();
1710
1711            // Register participants
1712            let Fixture {
1713                participants,
1714                schemes,
1715                ..
1716            } = fixture(&mut context, &namespace, n);
1717            let mut registrations = register_validators(&mut oracle, &participants).await;
1718
1719            // Link all validators
1720            let link = Link {
1721                latency: Duration::from_millis(10),
1722                jitter: Duration::from_millis(1),
1723                success_rate: 1.0,
1724            };
1725            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1726
1727            // Create engines
1728            let elector = L::default();
1729            let relay = Arc::new(mocks::relay::Relay::new());
1730            let mut reporters = Vec::new();
1731            let mut engine_handlers = Vec::new();
1732            for (idx_scheme, validator) in participants.iter().enumerate() {
1733                // Create scheme context
1734                let context = context.with_label(&format!("validator_{}", *validator));
1735
1736                // Configure engine
1737                let reporter_config = mocks::reporter::Config {
1738                    participants: participants.clone().try_into().unwrap(),
1739                    scheme: schemes[idx_scheme].clone(),
1740                    elector: elector.clone(),
1741                };
1742                let reporter =
1743                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1744                reporters.push(reporter.clone());
1745                let application_cfg = if idx_scheme == 0 {
1746                    mocks::application::Config {
1747                        hasher: Sha256::default(),
1748                        relay: relay.clone(),
1749                        me: validator.clone(),
1750                        propose_latency: (10_000.0, 0.0),
1751                        verify_latency: (10_000.0, 5.0),
1752                        certify_latency: (10_000.0, 5.0),
1753                        should_certify: mocks::application::Certifier::Sometimes,
1754                    }
1755                } else {
1756                    mocks::application::Config {
1757                        hasher: Sha256::default(),
1758                        relay: relay.clone(),
1759                        me: validator.clone(),
1760                        propose_latency: (10.0, 5.0),
1761                        verify_latency: (10.0, 5.0),
1762                        certify_latency: (10.0, 5.0),
1763                        should_certify: mocks::application::Certifier::Sometimes,
1764                    }
1765                };
1766                let (actor, application) = mocks::application::Application::new(
1767                    context.with_label("application"),
1768                    application_cfg,
1769                );
1770                actor.start();
1771                let blocker = oracle.control(validator.clone());
1772                let cfg = config::Config {
1773                    scheme: schemes[idx_scheme].clone(),
1774                    elector: elector.clone(),
1775                    blocker,
1776                    automaton: application.clone(),
1777                    relay: application.clone(),
1778                    reporter: reporter.clone(),
1779                    strategy: Sequential,
1780                    partition: validator.to_string(),
1781                    mailbox_size: 1024,
1782                    epoch: Epoch::new(333),
1783                    leader_timeout: Duration::from_secs(1),
1784                    notarization_timeout: Duration::from_secs(2),
1785                    nullify_retry: Duration::from_secs(10),
1786                    fetch_timeout: Duration::from_secs(1),
1787                    activity_timeout,
1788                    skip_timeout,
1789                    fetch_concurrent: 4,
1790                    replay_buffer: NZUsize!(1024 * 1024),
1791                    write_buffer: NZUsize!(1024 * 1024),
1792                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1793                };
1794                let engine = Engine::new(context.with_label("engine"), cfg);
1795
1796                // Start engine
1797                let (pending, recovered, resolver) = registrations
1798                    .remove(validator)
1799                    .expect("validator should be registered");
1800                engine_handlers.push(engine.start(pending, recovered, resolver));
1801            }
1802
1803            // Wait for all engines to finish
1804            let mut finalizers = Vec::new();
1805            for reporter in reporters.iter_mut() {
1806                let (mut latest, mut monitor) = reporter.subscribe().await;
1807                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1808                    while latest < required_containers {
1809                        latest = monitor.next().await.expect("event missing");
1810                    }
1811                }));
1812            }
1813            join_all(finalizers).await;
1814
1815            // Check reporters for correct activity
1816            let slow = &participants[0];
1817            for reporter in reporters.iter() {
1818                // Ensure no faults
1819                {
1820                    let faults = reporter.faults.lock().unwrap();
1821                    assert!(faults.is_empty());
1822                }
1823
1824                // Ensure no invalid signatures
1825                {
1826                    let invalid = reporter.invalid.lock().unwrap();
1827                    assert_eq!(*invalid, 0);
1828                }
1829
1830                // Ensure slow node still emits notarizes and finalizes (when receiving certificates)
1831                let mut observed = false;
1832                {
1833                    let notarizes = reporter.notarizes.lock().unwrap();
1834                    for (_, payloads) in notarizes.iter() {
1835                        for (_, participants) in payloads.iter() {
1836                            if participants.contains(slow) {
1837                                observed = true;
1838                                break;
1839                            }
1840                        }
1841                    }
1842                }
1843                {
1844                    let finalizes = reporter.finalizes.lock().unwrap();
1845                    for (_, payloads) in finalizes.iter() {
1846                        for (_, finalizers) in payloads.iter() {
1847                            if finalizers.contains(slow) {
1848                                observed = true;
1849                                break;
1850                            }
1851                        }
1852                    }
1853                }
1854                assert!(observed);
1855            }
1856
1857            // Ensure no blocked connections
1858            let blocked = oracle.blocked().await.unwrap();
1859            assert!(blocked.is_empty());
1860        });
1861    }
1862
1863    #[test_traced]
1864    fn test_slow_validator() {
1865        slow_validator::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
1866        slow_validator::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
1867        slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1868        slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1869        slow_validator::<_, _, RoundRobin>(ed25519::fixture);
1870        slow_validator::<_, _, RoundRobin>(secp256r1::fixture);
1871    }
1872
1873    fn all_recovery<S, F, L>(mut fixture: F)
1874    where
1875        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1876        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1877        L: Elector<S>,
1878    {
1879        // Create context
1880        let n = 5;
1881        let required_containers = View::new(100);
1882        let activity_timeout = ViewDelta::new(10);
1883        let skip_timeout = ViewDelta::new(2);
1884        let namespace = b"consensus".to_vec();
1885        let executor = deterministic::Runner::timed(Duration::from_secs(1800));
1886        executor.start(|mut context| async move {
1887            // Create simulated network
1888            let (network, mut oracle) = Network::new(
1889                context.with_label("network"),
1890                Config {
1891                    max_size: 1024 * 1024,
1892                    disconnect_on_block: false,
1893                    tracked_peer_sets: None,
1894                },
1895            );
1896
1897            // Start network
1898            network.start();
1899
1900            // Register participants
1901            let Fixture {
1902                participants,
1903                schemes,
1904                ..
1905            } = fixture(&mut context, &namespace, n);
1906            let mut registrations = register_validators(&mut oracle, &participants).await;
1907
1908            // Link all validators
1909            let link = Link {
1910                latency: Duration::from_secs(3),
1911                jitter: Duration::from_millis(0),
1912                success_rate: 1.0,
1913            };
1914            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1915
1916            // Create engines
1917            let elector = L::default();
1918            let relay = Arc::new(mocks::relay::Relay::new());
1919            let mut reporters = Vec::new();
1920            let mut engine_handlers = Vec::new();
1921            for (idx, validator) in participants.iter().enumerate() {
1922                // Create scheme context
1923                let context = context.with_label(&format!("validator_{}", *validator));
1924
1925                // Configure engine
1926                let reporter_config = mocks::reporter::Config {
1927                    participants: participants.clone().try_into().unwrap(),
1928                    scheme: schemes[idx].clone(),
1929                    elector: elector.clone(),
1930                };
1931                let reporter =
1932                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1933                reporters.push(reporter.clone());
1934                let application_cfg = mocks::application::Config {
1935                    hasher: Sha256::default(),
1936                    relay: relay.clone(),
1937                    me: validator.clone(),
1938                    propose_latency: (10.0, 5.0),
1939                    verify_latency: (10.0, 5.0),
1940                    certify_latency: (10.0, 5.0),
1941                    should_certify: mocks::application::Certifier::Sometimes,
1942                };
1943                let (actor, application) = mocks::application::Application::new(
1944                    context.with_label("application"),
1945                    application_cfg,
1946                );
1947                actor.start();
1948                let blocker = oracle.control(validator.clone());
1949                let cfg = config::Config {
1950                    scheme: schemes[idx].clone(),
1951                    elector: elector.clone(),
1952                    blocker,
1953                    automaton: application.clone(),
1954                    relay: application.clone(),
1955                    reporter: reporter.clone(),
1956                    strategy: Sequential,
1957                    partition: validator.to_string(),
1958                    mailbox_size: 1024,
1959                    epoch: Epoch::new(333),
1960                    leader_timeout: Duration::from_secs(1),
1961                    notarization_timeout: Duration::from_secs(2),
1962                    nullify_retry: Duration::from_secs(10),
1963                    fetch_timeout: Duration::from_secs(1),
1964                    activity_timeout,
1965                    skip_timeout,
1966                    fetch_concurrent: 4,
1967                    replay_buffer: NZUsize!(1024 * 1024),
1968                    write_buffer: NZUsize!(1024 * 1024),
1969                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1970                };
1971                let engine = Engine::new(context.with_label("engine"), cfg);
1972
1973                // Start engine
1974                let (pending, recovered, resolver) = registrations
1975                    .remove(validator)
1976                    .expect("validator should be registered");
1977                engine_handlers.push(engine.start(pending, recovered, resolver));
1978            }
1979
1980            // Wait for a few virtual minutes (shouldn't finalize anything)
1981            let mut finalizers = Vec::new();
1982            for reporter in reporters.iter_mut() {
1983                let (_, mut monitor) = reporter.subscribe().await;
1984                finalizers.push(
1985                    context
1986                        .with_label("finalizer")
1987                        .spawn(move |context| async move {
1988                            select! {
1989                                _timeout = context.sleep(Duration::from_secs(60)) => {},
1990                                _done = monitor.next() => {
1991                                    panic!("engine should not notarize or finalize anything");
1992                                }
1993                            }
1994                        }),
1995                );
1996            }
1997            join_all(finalizers).await;
1998
1999            // Unlink all validators to get latest view
2000            link_validators(&mut oracle, &participants, Action::Unlink, None).await;
2001
2002            // Wait for a virtual minute (nothing should happen)
2003            context.sleep(Duration::from_secs(60)).await;
2004
2005            // Get latest view
2006            let mut latest = View::zero();
2007            for reporter in reporters.iter() {
2008                let nullifies = reporter.nullifies.lock().unwrap();
2009                let max = nullifies.keys().max().unwrap();
2010                if *max > latest {
2011                    latest = *max;
2012                }
2013            }
2014
2015            // Update links
2016            let link = Link {
2017                latency: Duration::from_millis(10),
2018                jitter: Duration::from_millis(1),
2019                success_rate: 1.0,
2020            };
2021            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2022
2023            // Wait for all engines to finish
2024            let mut finalizers = Vec::new();
2025            for reporter in reporters.iter_mut() {
2026                let (mut latest, mut monitor) = reporter.subscribe().await;
2027                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2028                    while latest < required_containers {
2029                        latest = monitor.next().await.expect("event missing");
2030                    }
2031                }));
2032            }
2033            join_all(finalizers).await;
2034
2035            // Check reporters for correct activity
2036            for reporter in reporters.iter() {
2037                // Ensure no faults
2038                {
2039                    let faults = reporter.faults.lock().unwrap();
2040                    assert!(faults.is_empty());
2041                }
2042
2043                // Ensure no invalid signatures
2044                {
2045                    let invalid = reporter.invalid.lock().unwrap();
2046                    assert_eq!(*invalid, 0);
2047                }
2048
2049                // Ensure quick recovery.
2050                //
2051                // If the skip timeout isn't implemented correctly, we may go many views before participants
2052                // start to notarize a validator's proposal.
2053                {
2054                    // Ensure nearly all views around latest are notarized.
2055                    // We don't check for finalization since some of the blocks may fail to be
2056                    // certified for the purposes of testing.
2057                    let mut found = 0;
2058                    let notarizations = reporter.notarizations.lock().unwrap();
2059                    for view in View::range(latest, latest.saturating_add(activity_timeout)) {
2060                        if notarizations.contains_key(&view) {
2061                            found += 1;
2062                        }
2063                    }
2064                    assert!(
2065                        found >= activity_timeout.get().saturating_sub(2),
2066                        "found: {found}"
2067                    );
2068                }
2069            }
2070
2071            // Ensure no blocked connections
2072            let blocked = oracle.blocked().await.unwrap();
2073            assert!(blocked.is_empty());
2074        });
2075    }
2076
2077    #[test_traced]
2078    fn test_all_recovery() {
2079        all_recovery::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
2080        all_recovery::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
2081        all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2082        all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2083        all_recovery::<_, _, RoundRobin>(ed25519::fixture);
2084        all_recovery::<_, _, RoundRobin>(secp256r1::fixture);
2085    }
2086
2087    fn partition<S, F, L>(mut fixture: F)
2088    where
2089        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2090        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2091        L: Elector<S>,
2092    {
2093        // Create context
2094        let n = 10;
2095        let required_containers = View::new(50);
2096        let activity_timeout = ViewDelta::new(10);
2097        let skip_timeout = ViewDelta::new(5);
2098        let namespace = b"consensus".to_vec();
2099        let executor = deterministic::Runner::timed(Duration::from_secs(900));
2100        executor.start(|mut context| async move {
2101            // Create simulated network
2102            let (network, mut oracle) = Network::new(
2103                context.with_label("network"),
2104                Config {
2105                    max_size: 1024 * 1024,
2106                    disconnect_on_block: false,
2107                    tracked_peer_sets: None,
2108                },
2109            );
2110
2111            // Start network
2112            network.start();
2113
2114            // Register participants
2115            let Fixture {
2116                participants,
2117                schemes,
2118                ..
2119            } = fixture(&mut context, &namespace, n);
2120            let mut registrations = register_validators(&mut oracle, &participants).await;
2121
2122            // Link all validators
2123            let link = Link {
2124                latency: Duration::from_millis(10),
2125                jitter: Duration::from_millis(1),
2126                success_rate: 1.0,
2127            };
2128            link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
2129
2130            // Create engines
2131            let elector = L::default();
2132            let relay = Arc::new(mocks::relay::Relay::new());
2133            let mut reporters = Vec::new();
2134            let mut engine_handlers = Vec::new();
2135            for (idx, validator) in participants.iter().enumerate() {
2136                // Create scheme context
2137                let context = context.with_label(&format!("validator_{}", *validator));
2138
2139                // Configure engine
2140                let reporter_config = mocks::reporter::Config {
2141                    participants: participants.clone().try_into().unwrap(),
2142                    scheme: schemes[idx].clone(),
2143                    elector: elector.clone(),
2144                };
2145                let reporter =
2146                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2147                reporters.push(reporter.clone());
2148                let application_cfg = mocks::application::Config {
2149                    hasher: Sha256::default(),
2150                    relay: relay.clone(),
2151                    me: validator.clone(),
2152                    propose_latency: (10.0, 5.0),
2153                    verify_latency: (10.0, 5.0),
2154                    certify_latency: (10.0, 5.0),
2155                    should_certify: mocks::application::Certifier::Sometimes,
2156                };
2157                let (actor, application) = mocks::application::Application::new(
2158                    context.with_label("application"),
2159                    application_cfg,
2160                );
2161                actor.start();
2162                let blocker = oracle.control(validator.clone());
2163                let cfg = config::Config {
2164                    scheme: schemes[idx].clone(),
2165                    elector: elector.clone(),
2166                    blocker,
2167                    automaton: application.clone(),
2168                    relay: application.clone(),
2169                    reporter: reporter.clone(),
2170                    strategy: Sequential,
2171                    partition: validator.to_string(),
2172                    mailbox_size: 1024,
2173                    epoch: Epoch::new(333),
2174                    leader_timeout: Duration::from_secs(1),
2175                    notarization_timeout: Duration::from_secs(2),
2176                    nullify_retry: Duration::from_secs(10),
2177                    fetch_timeout: Duration::from_secs(1),
2178                    activity_timeout,
2179                    skip_timeout,
2180                    fetch_concurrent: 4,
2181                    replay_buffer: NZUsize!(1024 * 1024),
2182                    write_buffer: NZUsize!(1024 * 1024),
2183                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2184                };
2185                let engine = Engine::new(context.with_label("engine"), cfg);
2186
2187                // Start engine
2188                let (pending, recovered, resolver) = registrations
2189                    .remove(validator)
2190                    .expect("validator should be registered");
2191                engine_handlers.push(engine.start(pending, recovered, resolver));
2192            }
2193
2194            // Wait for all engines to finish
2195            let mut finalizers = Vec::new();
2196            for reporter in reporters.iter_mut() {
2197                let (mut latest, mut monitor) = reporter.subscribe().await;
2198                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2199                    while latest < required_containers {
2200                        latest = monitor.next().await.expect("event missing");
2201                    }
2202                }));
2203            }
2204            join_all(finalizers).await;
2205
2206            // Cut all links between validator halves
2207            fn separated(n: usize, a: usize, b: usize) -> bool {
2208                let m = n / 2;
2209                (a < m && b >= m) || (a >= m && b < m)
2210            }
2211            link_validators(&mut oracle, &participants, Action::Unlink, Some(separated)).await;
2212
2213            // Wait for any in-progress notarizations/finalizations to finish
2214            context.sleep(Duration::from_secs(10)).await;
2215
2216            // Wait for a few virtual minutes (shouldn't finalize anything)
2217            let mut finalizers = Vec::new();
2218            for reporter in reporters.iter_mut() {
2219                let (_, mut monitor) = reporter.subscribe().await;
2220                finalizers.push(
2221                    context
2222                        .with_label("finalizer")
2223                        .spawn(move |context| async move {
2224                            select! {
2225                                _timeout = context.sleep(Duration::from_secs(60)) => {},
2226                                _done = monitor.next() => {
2227                                    panic!("engine should not notarize or finalize anything");
2228                                }
2229                            }
2230                        }),
2231                );
2232            }
2233            join_all(finalizers).await;
2234
2235            // Restore links
2236            link_validators(
2237                &mut oracle,
2238                &participants,
2239                Action::Link(link),
2240                Some(separated),
2241            )
2242            .await;
2243
2244            // Wait for all engines to finish
2245            let mut finalizers = Vec::new();
2246            for reporter in reporters.iter_mut() {
2247                let (mut latest, mut monitor) = reporter.subscribe().await;
2248                let required = latest.saturating_add(ViewDelta::new(required_containers.get()));
2249                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2250                    while latest < required {
2251                        latest = monitor.next().await.expect("event missing");
2252                    }
2253                }));
2254            }
2255            join_all(finalizers).await;
2256
2257            // Check reporters for correct activity
2258            for reporter in reporters.iter() {
2259                // Ensure no faults
2260                {
2261                    let faults = reporter.faults.lock().unwrap();
2262                    assert!(faults.is_empty());
2263                }
2264
2265                // Ensure no invalid signatures
2266                {
2267                    let invalid = reporter.invalid.lock().unwrap();
2268                    assert_eq!(*invalid, 0);
2269                }
2270            }
2271
2272            // Ensure no blocked connections
2273            let blocked = oracle.blocked().await.unwrap();
2274            assert!(blocked.is_empty());
2275        });
2276    }
2277
2278    #[test_group("slow")]
2279    #[test_traced]
2280    fn test_partition() {
2281        partition::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
2282        partition::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
2283        partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2284        partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2285        partition::<_, _, RoundRobin>(ed25519::fixture);
2286        partition::<_, _, RoundRobin>(secp256r1::fixture);
2287    }
2288
2289    fn slow_and_lossy_links<S, F, L>(seed: u64, mut fixture: F) -> String
2290    where
2291        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2292        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2293        L: Elector<S>,
2294    {
2295        // Create context
2296        let n = 5;
2297        let required_containers = View::new(50);
2298        let activity_timeout = ViewDelta::new(10);
2299        let skip_timeout = ViewDelta::new(5);
2300        let namespace = b"consensus".to_vec();
2301        let cfg = deterministic::Config::new()
2302            .with_seed(seed)
2303            .with_timeout(Some(Duration::from_secs(5_000)));
2304        let executor = deterministic::Runner::new(cfg);
2305        executor.start(|mut context| async move {
2306            // Create simulated network
2307            let (network, mut oracle) = Network::new(
2308                context.with_label("network"),
2309                Config {
2310                    max_size: 1024 * 1024,
2311                    disconnect_on_block: false,
2312                    tracked_peer_sets: None,
2313                },
2314            );
2315
2316            // Start network
2317            network.start();
2318
2319            // Register participants
2320            let Fixture {
2321                participants,
2322                schemes,
2323                ..
2324            } = fixture(&mut context, &namespace, n);
2325            let mut registrations = register_validators(&mut oracle, &participants).await;
2326
2327            // Link all validators
2328            let degraded_link = Link {
2329                latency: Duration::from_millis(200),
2330                jitter: Duration::from_millis(150),
2331                success_rate: 0.5,
2332            };
2333            link_validators(
2334                &mut oracle,
2335                &participants,
2336                Action::Link(degraded_link),
2337                None,
2338            )
2339            .await;
2340
2341            // Create engines
2342            let elector = L::default();
2343            let relay = Arc::new(mocks::relay::Relay::new());
2344            let mut reporters = Vec::new();
2345            let mut engine_handlers = Vec::new();
2346            for (idx, validator) in participants.iter().enumerate() {
2347                // Create scheme context
2348                let context = context.with_label(&format!("validator_{}", *validator));
2349
2350                // Configure engine
2351                let reporter_config = mocks::reporter::Config {
2352                    participants: participants.clone().try_into().unwrap(),
2353                    scheme: schemes[idx].clone(),
2354                    elector: elector.clone(),
2355                };
2356                let reporter =
2357                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2358                reporters.push(reporter.clone());
2359                let application_cfg = mocks::application::Config {
2360                    hasher: Sha256::default(),
2361                    relay: relay.clone(),
2362                    me: validator.clone(),
2363                    propose_latency: (10.0, 5.0),
2364                    verify_latency: (10.0, 5.0),
2365                    certify_latency: (10.0, 5.0),
2366                    should_certify: mocks::application::Certifier::Sometimes,
2367                };
2368                let (actor, application) = mocks::application::Application::new(
2369                    context.with_label("application"),
2370                    application_cfg,
2371                );
2372                actor.start();
2373                let blocker = oracle.control(validator.clone());
2374                let cfg = config::Config {
2375                    scheme: schemes[idx].clone(),
2376                    elector: elector.clone(),
2377                    blocker,
2378                    automaton: application.clone(),
2379                    relay: application.clone(),
2380                    reporter: reporter.clone(),
2381                    strategy: Sequential,
2382                    partition: validator.to_string(),
2383                    mailbox_size: 1024,
2384                    epoch: Epoch::new(333),
2385                    leader_timeout: Duration::from_secs(1),
2386                    notarization_timeout: Duration::from_secs(2),
2387                    nullify_retry: Duration::from_secs(10),
2388                    fetch_timeout: Duration::from_secs(1),
2389                    activity_timeout,
2390                    skip_timeout,
2391                    fetch_concurrent: 4,
2392                    replay_buffer: NZUsize!(1024 * 1024),
2393                    write_buffer: NZUsize!(1024 * 1024),
2394                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2395                };
2396                let engine = Engine::new(context.with_label("engine"), cfg);
2397
2398                // Start engine
2399                let (pending, recovered, resolver) = registrations
2400                    .remove(validator)
2401                    .expect("validator should be registered");
2402                engine_handlers.push(engine.start(pending, recovered, resolver));
2403            }
2404
2405            // Wait for all engines to finish
2406            let mut finalizers = Vec::new();
2407            for reporter in reporters.iter_mut() {
2408                let (mut latest, mut monitor) = reporter.subscribe().await;
2409                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2410                    while latest < required_containers {
2411                        latest = monitor.next().await.expect("event missing");
2412                    }
2413                }));
2414            }
2415            join_all(finalizers).await;
2416
2417            // Check reporters for correct activity
2418            for reporter in reporters.iter() {
2419                // Ensure no faults
2420                {
2421                    let faults = reporter.faults.lock().unwrap();
2422                    assert!(faults.is_empty());
2423                }
2424
2425                // Ensure no invalid signatures
2426                {
2427                    let invalid = reporter.invalid.lock().unwrap();
2428                    assert_eq!(*invalid, 0);
2429                }
2430            }
2431
2432            // Ensure no blocked connections
2433            let blocked = oracle.blocked().await.unwrap();
2434            assert!(blocked.is_empty());
2435
2436            context.auditor().state()
2437        })
2438    }
2439
2440    #[test_traced]
2441    fn test_slow_and_lossy_links() {
2442        slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold::fixture::<MinPk, _>);
2443        slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold::fixture::<MinSig, _>);
2444        slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinPk, _>);
2445        slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinSig, _>);
2446        slow_and_lossy_links::<_, _, RoundRobin>(0, ed25519::fixture);
2447        slow_and_lossy_links::<_, _, RoundRobin>(0, secp256r1::fixture);
2448    }
2449
2450    #[test_group("slow")]
2451    #[test_traced]
2452    fn test_determinism() {
2453        // We use slow and lossy links as the deterministic test
2454        // because it is the most complex test.
2455        for seed in 1..6 {
2456            let ts_pk_state_1 =
2457                slow_and_lossy_links::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
2458            let ts_pk_state_2 =
2459                slow_and_lossy_links::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
2460            assert_eq!(ts_pk_state_1, ts_pk_state_2);
2461
2462            let ts_sig_state_1 = slow_and_lossy_links::<_, _, Random>(
2463                seed,
2464                bls12381_threshold::fixture::<MinSig, _>,
2465            );
2466            let ts_sig_state_2 = slow_and_lossy_links::<_, _, Random>(
2467                seed,
2468                bls12381_threshold::fixture::<MinSig, _>,
2469            );
2470            assert_eq!(ts_sig_state_1, ts_sig_state_2);
2471
2472            let ms_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2473                seed,
2474                bls12381_multisig::fixture::<MinPk, _>,
2475            );
2476            let ms_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2477                seed,
2478                bls12381_multisig::fixture::<MinPk, _>,
2479            );
2480            assert_eq!(ms_pk_state_1, ms_pk_state_2);
2481
2482            let ms_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2483                seed,
2484                bls12381_multisig::fixture::<MinSig, _>,
2485            );
2486            let ms_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2487                seed,
2488                bls12381_multisig::fixture::<MinSig, _>,
2489            );
2490            assert_eq!(ms_sig_state_1, ms_sig_state_2);
2491
2492            let ed_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
2493            let ed_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
2494            assert_eq!(ed_state_1, ed_state_2);
2495
2496            let secp_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
2497            let secp_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
2498            assert_eq!(secp_state_1, secp_state_2);
2499
2500            let states = [
2501                ("threshold-minpk", ts_pk_state_1),
2502                ("threshold-minsig", ts_sig_state_1),
2503                ("multisig-minpk", ms_pk_state_1),
2504                ("multisig-minsig", ms_sig_state_1),
2505                ("ed25519", ed_state_1),
2506                ("secp256r1", secp_state_1),
2507            ];
2508
2509            // Sanity check that different types can't be identical
2510            for pair in states.windows(2) {
2511                assert_ne!(
2512                    pair[0].1, pair[1].1,
2513                    "state {} equals state {}",
2514                    pair[0].0, pair[1].0
2515                );
2516            }
2517        }
2518    }
2519
2520    fn conflicter<S, F, L>(seed: u64, mut fixture: F)
2521    where
2522        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2523        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2524        L: Elector<S>,
2525    {
2526        // Create context
2527        let n = 4;
2528        let required_containers = View::new(50);
2529        let activity_timeout = ViewDelta::new(10);
2530        let skip_timeout = ViewDelta::new(5);
2531        let namespace = b"consensus".to_vec();
2532        let cfg = deterministic::Config::new()
2533            .with_seed(seed)
2534            .with_timeout(Some(Duration::from_secs(30)));
2535        let executor = deterministic::Runner::new(cfg);
2536        executor.start(|mut context| async move {
2537            // Create simulated network
2538            let (network, mut oracle) = Network::new(
2539                context.with_label("network"),
2540                Config {
2541                    max_size: 1024 * 1024,
2542                    disconnect_on_block: false,
2543                    tracked_peer_sets: None,
2544                },
2545            );
2546
2547            // Start network
2548            network.start();
2549
2550            // Register participants
2551            let Fixture {
2552                participants,
2553                schemes,
2554                ..
2555            } = fixture(&mut context, &namespace, n);
2556            let mut registrations = register_validators(&mut oracle, &participants).await;
2557
2558            // Link all validators
2559            let link = Link {
2560                latency: Duration::from_millis(10),
2561                jitter: Duration::from_millis(1),
2562                success_rate: 1.0,
2563            };
2564            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2565
2566            // Create engines
2567            let elector = L::default();
2568            let relay = Arc::new(mocks::relay::Relay::new());
2569            let mut reporters = Vec::new();
2570            for (idx_scheme, validator) in participants.iter().enumerate() {
2571                // Create scheme context
2572                let context = context.with_label(&format!("validator_{}", *validator));
2573
2574                // Start engine
2575                let reporter_config = mocks::reporter::Config {
2576                    participants: participants.clone().try_into().unwrap(),
2577                    scheme: schemes[idx_scheme].clone(),
2578                    elector: elector.clone(),
2579                };
2580                let reporter =
2581                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2582                let (pending, recovered, resolver) = registrations
2583                    .remove(validator)
2584                    .expect("validator should be registered");
2585                if idx_scheme == 0 {
2586                    let cfg = mocks::conflicter::Config {
2587                        scheme: schemes[idx_scheme].clone(),
2588                    };
2589
2590                    let engine: mocks::conflicter::Conflicter<_, _, Sha256> =
2591                        mocks::conflicter::Conflicter::new(
2592                            context.with_label("byzantine_engine"),
2593                            cfg,
2594                        );
2595                    engine.start(pending);
2596                } else {
2597                    reporters.push(reporter.clone());
2598                    let application_cfg = mocks::application::Config {
2599                        hasher: Sha256::default(),
2600                        relay: relay.clone(),
2601                        me: validator.clone(),
2602                        propose_latency: (10.0, 5.0),
2603                        verify_latency: (10.0, 5.0),
2604                        certify_latency: (10.0, 5.0),
2605                        should_certify: mocks::application::Certifier::Sometimes,
2606                    };
2607                    let (actor, application) = mocks::application::Application::new(
2608                        context.with_label("application"),
2609                        application_cfg,
2610                    );
2611                    actor.start();
2612                    let blocker = oracle.control(validator.clone());
2613                    let cfg = config::Config {
2614                        scheme: schemes[idx_scheme].clone(),
2615                        elector: elector.clone(),
2616                        blocker,
2617                        automaton: application.clone(),
2618                        relay: application.clone(),
2619                        reporter: reporter.clone(),
2620                        strategy: Sequential,
2621                        partition: validator.to_string(),
2622                        mailbox_size: 1024,
2623                        epoch: Epoch::new(333),
2624                        leader_timeout: Duration::from_secs(1),
2625                        notarization_timeout: Duration::from_secs(2),
2626                        nullify_retry: Duration::from_secs(10),
2627                        fetch_timeout: Duration::from_secs(1),
2628                        activity_timeout,
2629                        skip_timeout,
2630                        fetch_concurrent: 4,
2631                        replay_buffer: NZUsize!(1024 * 1024),
2632                        write_buffer: NZUsize!(1024 * 1024),
2633                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2634                    };
2635                    let engine = Engine::new(context.with_label("engine"), cfg);
2636                    engine.start(pending, recovered, resolver);
2637                }
2638            }
2639
2640            // Wait for all engines to finish
2641            let mut finalizers = Vec::new();
2642            for reporter in reporters.iter_mut() {
2643                let (mut latest, mut monitor) = reporter.subscribe().await;
2644                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2645                    while latest < required_containers {
2646                        latest = monitor.next().await.expect("event missing");
2647                    }
2648                }));
2649            }
2650            join_all(finalizers).await;
2651
2652            // Check reporters for correct activity
2653            let byz = &participants[0];
2654            let mut count_conflicting = 0;
2655            for reporter in reporters.iter() {
2656                // Ensure only faults for byz
2657                {
2658                    let faults = reporter.faults.lock().unwrap();
2659                    assert_eq!(faults.len(), 1);
2660                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
2661                    for (_, faults) in faulter.iter() {
2662                        for fault in faults.iter() {
2663                            match fault {
2664                                Activity::ConflictingNotarize(_) => {
2665                                    count_conflicting += 1;
2666                                }
2667                                Activity::ConflictingFinalize(_) => {
2668                                    count_conflicting += 1;
2669                                }
2670                                _ => panic!("unexpected fault: {fault:?}"),
2671                            }
2672                        }
2673                    }
2674                }
2675
2676                // Ensure no invalid signatures
2677                {
2678                    let invalid = reporter.invalid.lock().unwrap();
2679                    assert_eq!(*invalid, 0);
2680                }
2681            }
2682            assert!(count_conflicting > 0);
2683
2684            // Ensure conflicter is blocked
2685            let blocked = oracle.blocked().await.unwrap();
2686            assert!(!blocked.is_empty());
2687            for (a, b) in blocked {
2688                assert_ne!(&a, byz);
2689                assert_eq!(&b, byz);
2690            }
2691        });
2692    }
2693
2694    #[test_group("slow")]
2695    #[test_traced]
2696    fn test_conflicter() {
2697        for seed in 0..5 {
2698            conflicter::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
2699            conflicter::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
2700            conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
2701            conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
2702            conflicter::<_, _, RoundRobin>(seed, ed25519::fixture);
2703            conflicter::<_, _, RoundRobin>(seed, secp256r1::fixture);
2704        }
2705    }
2706
2707    fn invalid<S, F, L>(seed: u64, mut fixture: F)
2708    where
2709        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2710        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2711        L: Elector<S>,
2712    {
2713        // Create context
2714        let n = 4;
2715        let required_containers = View::new(50);
2716        let activity_timeout = ViewDelta::new(10);
2717        let skip_timeout = ViewDelta::new(5);
2718        let namespace = b"consensus".to_vec();
2719        let cfg = deterministic::Config::new()
2720            .with_seed(seed)
2721            .with_timeout(Some(Duration::from_secs(30)));
2722        let executor = deterministic::Runner::new(cfg);
2723        executor.start(|mut context| async move {
2724            // Create simulated network
2725            let (network, mut oracle) = Network::new(
2726                context.with_label("network"),
2727                Config {
2728                    max_size: 1024 * 1024,
2729                    disconnect_on_block: false,
2730                    tracked_peer_sets: None,
2731                },
2732            );
2733
2734            // Start network
2735            network.start();
2736
2737            // Register participants
2738            let Fixture {
2739                participants,
2740                schemes,
2741                ..
2742            } = fixture(&mut context, &namespace, n);
2743
2744            // Create scheme with wrong namespace for byzantine node (index 0)
2745            let Fixture {
2746                schemes: wrong_schemes,
2747                ..
2748            } = fixture(&mut context, b"wrong-namespace", n);
2749
2750            let mut registrations = register_validators(&mut oracle, &participants).await;
2751
2752            // Link all validators
2753            let link = Link {
2754                latency: Duration::from_millis(10),
2755                jitter: Duration::from_millis(1),
2756                success_rate: 1.0,
2757            };
2758            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2759
2760            // Create engines
2761            let elector = L::default();
2762            let relay = Arc::new(mocks::relay::Relay::new());
2763            let mut reporters = Vec::new();
2764            for (idx_scheme, validator) in participants.iter().enumerate() {
2765                // Create scheme context
2766                let context = context.with_label(&format!("validator_{}", *validator));
2767
2768                // Byzantine node (idx 0) uses wrong namespace scheme to produce invalid signatures
2769                // Honest nodes use correct namespace schemes
2770                let scheme = if idx_scheme == 0 {
2771                    wrong_schemes[idx_scheme].clone()
2772                } else {
2773                    schemes[idx_scheme].clone()
2774                };
2775
2776                let reporter_config = mocks::reporter::Config {
2777                    participants: participants.clone().try_into().unwrap(),
2778                    scheme: schemes[idx_scheme].clone(),
2779                    elector: elector.clone(),
2780                };
2781                let reporter =
2782                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2783                reporters.push(reporter.clone());
2784
2785                let application_cfg = mocks::application::Config {
2786                    hasher: Sha256::default(),
2787                    relay: relay.clone(),
2788                    me: validator.clone(),
2789                    propose_latency: (10.0, 5.0),
2790                    verify_latency: (10.0, 5.0),
2791                    certify_latency: (10.0, 5.0),
2792                    should_certify: mocks::application::Certifier::Sometimes,
2793                };
2794                let (actor, application) = mocks::application::Application::new(
2795                    context.with_label("application"),
2796                    application_cfg,
2797                );
2798                actor.start();
2799                let blocker = oracle.control(validator.clone());
2800                let cfg = config::Config {
2801                    scheme,
2802                    elector: elector.clone(),
2803                    blocker,
2804                    automaton: application.clone(),
2805                    relay: application.clone(),
2806                    reporter: reporter.clone(),
2807                    strategy: Sequential,
2808                    partition: validator.clone().to_string(),
2809                    mailbox_size: 1024,
2810                    epoch: Epoch::new(333),
2811                    leader_timeout: Duration::from_secs(1),
2812                    notarization_timeout: Duration::from_secs(2),
2813                    nullify_retry: Duration::from_secs(10),
2814                    fetch_timeout: Duration::from_secs(1),
2815                    activity_timeout,
2816                    skip_timeout,
2817                    fetch_concurrent: 4,
2818                    replay_buffer: NZUsize!(1024 * 1024),
2819                    write_buffer: NZUsize!(1024 * 1024),
2820                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2821                };
2822                let engine = Engine::new(context.with_label("engine"), cfg);
2823                let (pending, recovered, resolver) = registrations
2824                    .remove(validator)
2825                    .expect("validator should be registered");
2826                engine.start(pending, recovered, resolver);
2827            }
2828
2829            // Wait for honest engines to finish (skip byzantine node at index 0)
2830            let mut finalizers = Vec::new();
2831            for reporter in reporters.iter_mut().skip(1) {
2832                let (mut latest, mut monitor) = reporter.subscribe().await;
2833                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2834                    while latest < required_containers {
2835                        latest = monitor.next().await.expect("event missing");
2836                    }
2837                }));
2838            }
2839            join_all(finalizers).await;
2840
2841            // Check honest reporters (reporters[1..]) for correct activity
2842            let mut invalid_count = 0;
2843            for reporter in reporters.iter().skip(1) {
2844                // Ensure no faults
2845                {
2846                    let faults = reporter.faults.lock().unwrap();
2847                    assert!(faults.is_empty());
2848                }
2849
2850                // Count invalid signatures
2851                {
2852                    let invalid = reporter.invalid.lock().unwrap();
2853                    if *invalid > 0 {
2854                        invalid_count += 1;
2855                    }
2856                }
2857            }
2858
2859            // All honest nodes should see invalid signatures from the byzantine node
2860            assert_eq!(invalid_count, n - 1);
2861
2862            // Ensure byzantine node is blocked by honest nodes
2863            let blocked = oracle.blocked().await.unwrap();
2864            assert!(!blocked.is_empty());
2865            for (a, b) in blocked {
2866                if a != participants[0] {
2867                    assert_eq!(b, participants[0]);
2868                }
2869            }
2870        });
2871    }
2872
2873    #[test_group("slow")]
2874    #[test_traced]
2875    fn test_invalid() {
2876        for seed in 0..5 {
2877            invalid::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
2878            invalid::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
2879            invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
2880            invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
2881            invalid::<_, _, RoundRobin>(seed, ed25519::fixture);
2882            invalid::<_, _, RoundRobin>(seed, secp256r1::fixture);
2883        }
2884    }
2885
2886    fn impersonator<S, F, L>(seed: u64, mut fixture: F)
2887    where
2888        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2889        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2890        L: Elector<S>,
2891    {
2892        // Create context
2893        let n = 4;
2894        let required_containers = View::new(50);
2895        let activity_timeout = ViewDelta::new(10);
2896        let skip_timeout = ViewDelta::new(5);
2897        let namespace = b"consensus".to_vec();
2898        let cfg = deterministic::Config::new()
2899            .with_seed(seed)
2900            .with_timeout(Some(Duration::from_secs(30)));
2901        let executor = deterministic::Runner::new(cfg);
2902        executor.start(|mut context| async move {
2903            // Create simulated network
2904            let (network, mut oracle) = Network::new(
2905                context.with_label("network"),
2906                Config {
2907                    max_size: 1024 * 1024,
2908                    disconnect_on_block: false,
2909                    tracked_peer_sets: None,
2910                },
2911            );
2912
2913            // Start network
2914            network.start();
2915
2916            // Register participants
2917            let Fixture {
2918                participants,
2919                schemes,
2920                ..
2921            } = fixture(&mut context, &namespace, n);
2922            let mut registrations = register_validators(&mut oracle, &participants).await;
2923
2924            // Link all validators
2925            let link = Link {
2926                latency: Duration::from_millis(10),
2927                jitter: Duration::from_millis(1),
2928                success_rate: 1.0,
2929            };
2930            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2931
2932            // Create engines
2933            let elector = L::default();
2934            let relay = Arc::new(mocks::relay::Relay::new());
2935            let mut reporters = Vec::new();
2936            for (idx_scheme, validator) in participants.iter().enumerate() {
2937                // Create scheme context
2938                let context = context.with_label(&format!("validator_{}", *validator));
2939
2940                // Start engine
2941                let reporter_config = mocks::reporter::Config {
2942                    participants: participants.clone().try_into().unwrap(),
2943                    scheme: schemes[idx_scheme].clone(),
2944                    elector: elector.clone(),
2945                };
2946                let reporter =
2947                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2948                let (pending, recovered, resolver) = registrations
2949                    .remove(validator)
2950                    .expect("validator should be registered");
2951                if idx_scheme == 0 {
2952                    let cfg = mocks::impersonator::Config {
2953                        scheme: schemes[idx_scheme].clone(),
2954                    };
2955
2956                    let engine: mocks::impersonator::Impersonator<_, _, Sha256> =
2957                        mocks::impersonator::Impersonator::new(
2958                            context.with_label("byzantine_engine"),
2959                            cfg,
2960                        );
2961                    engine.start(pending);
2962                } else {
2963                    reporters.push(reporter.clone());
2964                    let application_cfg = mocks::application::Config {
2965                        hasher: Sha256::default(),
2966                        relay: relay.clone(),
2967                        me: validator.clone(),
2968                        propose_latency: (10.0, 5.0),
2969                        verify_latency: (10.0, 5.0),
2970                        certify_latency: (10.0, 5.0),
2971                        should_certify: mocks::application::Certifier::Sometimes,
2972                    };
2973                    let (actor, application) = mocks::application::Application::new(
2974                        context.with_label("application"),
2975                        application_cfg,
2976                    );
2977                    actor.start();
2978                    let blocker = oracle.control(validator.clone());
2979                    let cfg = config::Config {
2980                        scheme: schemes[idx_scheme].clone(),
2981                        elector: elector.clone(),
2982                        blocker,
2983                        automaton: application.clone(),
2984                        relay: application.clone(),
2985                        reporter: reporter.clone(),
2986                        strategy: Sequential,
2987                        partition: validator.clone().to_string(),
2988                        mailbox_size: 1024,
2989                        epoch: Epoch::new(333),
2990                        leader_timeout: Duration::from_secs(1),
2991                        notarization_timeout: Duration::from_secs(2),
2992                        nullify_retry: Duration::from_secs(10),
2993                        fetch_timeout: Duration::from_secs(1),
2994                        activity_timeout,
2995                        skip_timeout,
2996                        fetch_concurrent: 4,
2997                        replay_buffer: NZUsize!(1024 * 1024),
2998                        write_buffer: NZUsize!(1024 * 1024),
2999                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3000                    };
3001                    let engine = Engine::new(context.with_label("engine"), cfg);
3002                    engine.start(pending, recovered, resolver);
3003                }
3004            }
3005
3006            // Wait for all engines to finish
3007            let mut finalizers = Vec::new();
3008            for reporter in reporters.iter_mut() {
3009                let (mut latest, mut monitor) = reporter.subscribe().await;
3010                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3011                    while latest < required_containers {
3012                        latest = monitor.next().await.expect("event missing");
3013                    }
3014                }));
3015            }
3016            join_all(finalizers).await;
3017
3018            // Check reporters for correct activity
3019            let byz = &participants[0];
3020            for reporter in reporters.iter() {
3021                // Ensure no faults
3022                {
3023                    let faults = reporter.faults.lock().unwrap();
3024                    assert!(faults.is_empty());
3025                }
3026
3027                // Ensure no invalid signatures
3028                {
3029                    let invalid = reporter.invalid.lock().unwrap();
3030                    assert_eq!(*invalid, 0);
3031                }
3032            }
3033
3034            // Ensure invalid is blocked
3035            let blocked = oracle.blocked().await.unwrap();
3036            assert!(!blocked.is_empty());
3037            for (a, b) in blocked {
3038                assert_ne!(&a, byz);
3039                assert_eq!(&b, byz);
3040            }
3041        });
3042    }
3043
3044    #[test_group("slow")]
3045    #[test_traced]
3046    fn test_impersonator() {
3047        for seed in 0..5 {
3048            impersonator::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
3049            impersonator::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
3050            impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3051            impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3052            impersonator::<_, _, RoundRobin>(seed, ed25519::fixture);
3053            impersonator::<_, _, RoundRobin>(seed, secp256r1::fixture);
3054        }
3055    }
3056
3057    fn equivocator<S, F, L>(seed: u64, mut fixture: F) -> bool
3058    where
3059        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3060        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3061        L: Elector<S>,
3062    {
3063        // Create context
3064        let n = 7;
3065        let required_containers = View::new(50);
3066        let activity_timeout = ViewDelta::new(10);
3067        let skip_timeout = ViewDelta::new(5);
3068        let namespace = b"consensus".to_vec();
3069        let cfg = deterministic::Config::new()
3070            .with_seed(seed)
3071            .with_timeout(Some(Duration::from_secs(60)));
3072        let executor = deterministic::Runner::new(cfg);
3073        executor.start(|mut context| async move {
3074            // Create simulated network
3075            let (network, mut oracle) = Network::new(
3076                context.with_label("network"),
3077                Config {
3078                    max_size: 1024 * 1024,
3079                    disconnect_on_block: false,
3080                    tracked_peer_sets: None,
3081                },
3082            );
3083
3084            // Start network
3085            network.start();
3086
3087            // Register participants
3088            let Fixture {
3089                participants,
3090                schemes,
3091                ..
3092            } = fixture(&mut context, &namespace, n);
3093            let mut registrations = register_validators(&mut oracle, &participants).await;
3094
3095            // Link all validators
3096            let link = Link {
3097                latency: Duration::from_millis(10),
3098                jitter: Duration::from_millis(1),
3099                success_rate: 1.0,
3100            };
3101            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3102
3103            // Create engines
3104            let elector = L::default();
3105            let mut engines = Vec::new();
3106            let relay = Arc::new(mocks::relay::Relay::new());
3107            let mut reporters = Vec::new();
3108            for (idx_scheme, validator) in participants.iter().enumerate() {
3109                // Create scheme context
3110                let context = context.with_label(&format!("validator_{}", *validator));
3111
3112                // Start engine
3113                let reporter_config = mocks::reporter::Config {
3114                    participants: participants.clone().try_into().unwrap(),
3115                    scheme: schemes[idx_scheme].clone(),
3116                    elector: elector.clone(),
3117                };
3118                let reporter =
3119                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3120                reporters.push(reporter.clone());
3121                let (pending, recovered, resolver) = registrations
3122                    .remove(validator)
3123                    .expect("validator should be registered");
3124                if idx_scheme == 0 {
3125                    let cfg = mocks::equivocator::Config {
3126                        scheme: schemes[idx_scheme].clone(),
3127                        epoch: Epoch::new(333),
3128                        relay: relay.clone(),
3129                        hasher: Sha256::default(),
3130                        elector: elector.clone(),
3131                    };
3132
3133                    let engine = mocks::equivocator::Equivocator::new(
3134                        context.with_label("byzantine_engine"),
3135                        cfg,
3136                    );
3137                    engines.push(engine.start(pending, recovered));
3138                } else {
3139                    let application_cfg = mocks::application::Config {
3140                        hasher: Sha256::default(),
3141                        relay: relay.clone(),
3142                        me: validator.clone(),
3143                        propose_latency: (10.0, 5.0),
3144                        verify_latency: (10.0, 5.0),
3145                        certify_latency: (10.0, 5.0),
3146                        should_certify: mocks::application::Certifier::Sometimes,
3147                    };
3148                    let (actor, application) = mocks::application::Application::new(
3149                        context.with_label("application"),
3150                        application_cfg,
3151                    );
3152                    actor.start();
3153                    let blocker = oracle.control(validator.clone());
3154                    let cfg = config::Config {
3155                        scheme: schemes[idx_scheme].clone(),
3156                        elector: elector.clone(),
3157                        blocker,
3158                        automaton: application.clone(),
3159                        relay: application.clone(),
3160                        reporter: reporter.clone(),
3161                        strategy: Sequential,
3162                        partition: validator.to_string(),
3163                        mailbox_size: 1024,
3164                        epoch: Epoch::new(333),
3165                        leader_timeout: Duration::from_secs(1),
3166                        notarization_timeout: Duration::from_secs(2),
3167                        nullify_retry: Duration::from_secs(10),
3168                        fetch_timeout: Duration::from_secs(1),
3169                        activity_timeout,
3170                        skip_timeout,
3171                        fetch_concurrent: 4,
3172                        replay_buffer: NZUsize!(1024 * 1024),
3173                        write_buffer: NZUsize!(1024 * 1024),
3174                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3175                    };
3176                    let engine = Engine::new(context.with_label("engine"), cfg);
3177                    engines.push(engine.start(pending, recovered, resolver));
3178                }
3179            }
3180
3181            // Wait for all engines to hit required containers
3182            let mut finalizers = Vec::new();
3183            for reporter in reporters.iter_mut().skip(1) {
3184                let (mut latest, mut monitor) = reporter.subscribe().await;
3185                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3186                    while latest < required_containers {
3187                        latest = monitor.next().await.expect("event missing");
3188                    }
3189                }));
3190            }
3191            join_all(finalizers).await;
3192
3193            // Abort a validator
3194            let idx = context.gen_range(1..engines.len()); // skip byzantine validator
3195            let validator = &participants[idx];
3196            let handle = engines.remove(idx);
3197            handle.abort();
3198            let _ = handle.await;
3199            reporters.remove(idx);
3200            info!(idx, ?validator, "aborted validator");
3201
3202            // Wait for all engines to hit required containers
3203            let mut finalizers = Vec::new();
3204            for reporter in reporters.iter_mut().skip(1) {
3205                let (mut latest, mut monitor) = reporter.subscribe().await;
3206                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3207                    while latest < View::new(required_containers.get() * 2) {
3208                        latest = monitor.next().await.expect("event missing");
3209                    }
3210                }));
3211            }
3212            join_all(finalizers).await;
3213
3214            // Recreate engine
3215            info!(idx, ?validator, "restarting validator");
3216            let context = context.with_label(&format!("validator_{}_restarted", *validator));
3217
3218            // Start engine
3219            let reporter_config = mocks::reporter::Config {
3220                participants: participants.clone().try_into().unwrap(),
3221                scheme: schemes[idx].clone(),
3222                elector: elector.clone(),
3223            };
3224            let reporter =
3225                mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3226            let (pending, recovered, resolver) =
3227                register_validator(&mut oracle, validator.clone()).await;
3228            reporters.push(reporter.clone());
3229            let application_cfg = mocks::application::Config {
3230                hasher: Sha256::default(),
3231                relay: relay.clone(),
3232                me: validator.clone(),
3233                propose_latency: (10.0, 5.0),
3234                verify_latency: (10.0, 5.0),
3235                certify_latency: (10.0, 5.0),
3236                should_certify: mocks::application::Certifier::Sometimes,
3237            };
3238            let (actor, application) = mocks::application::Application::new(
3239                context.with_label("application"),
3240                application_cfg,
3241            );
3242            actor.start();
3243            let blocker = oracle.control(validator.clone());
3244            let cfg = config::Config {
3245                scheme: schemes[idx].clone(),
3246                elector: elector.clone(),
3247                blocker,
3248                automaton: application.clone(),
3249                relay: application.clone(),
3250                reporter: reporter.clone(),
3251                strategy: Sequential,
3252                partition: validator.to_string(),
3253                mailbox_size: 1024,
3254                epoch: Epoch::new(333),
3255                leader_timeout: Duration::from_secs(1),
3256                notarization_timeout: Duration::from_secs(2),
3257                nullify_retry: Duration::from_secs(10),
3258                fetch_timeout: Duration::from_secs(1),
3259                activity_timeout,
3260                skip_timeout,
3261                fetch_concurrent: 4,
3262                replay_buffer: NZUsize!(1024 * 1024),
3263                write_buffer: NZUsize!(1024 * 1024),
3264                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3265            };
3266            let engine = Engine::new(context.with_label("engine"), cfg);
3267            engine.start(pending, recovered, resolver);
3268
3269            // Wait for all engines to hit required containers
3270            let mut finalizers = Vec::new();
3271            for reporter in reporters.iter_mut().skip(1) {
3272                let (mut latest, mut monitor) = reporter.subscribe().await;
3273                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3274                    while latest < View::new(required_containers.get() * 3) {
3275                        latest = monitor.next().await.expect("event missing");
3276                    }
3277                }));
3278            }
3279            join_all(finalizers).await;
3280
3281            // Check equivocator blocking (we aren't guaranteed a fault will be produced
3282            // because it may not be possible to extract a conflicting vote from the certificate
3283            // we receive)
3284            let byz = &participants[0];
3285            let blocked = oracle.blocked().await.unwrap();
3286            for (a, b) in &blocked {
3287                assert_ne!(a, byz);
3288                assert_eq!(b, byz);
3289            }
3290            !blocked.is_empty()
3291        })
3292    }
3293
3294    #[test_group("slow")]
3295    #[test_traced]
3296    fn test_equivocator_bls12381_threshold_min_pk() {
3297        let detected = (0..5)
3298            .any(|seed| equivocator::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>));
3299        assert!(
3300            detected,
3301            "expected at least one seed to detect equivocation"
3302        );
3303    }
3304
3305    #[test_group("slow")]
3306    #[test_traced]
3307    fn test_equivocator_bls12381_threshold_min_sig() {
3308        let detected = (0..5).any(|seed| {
3309            equivocator::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>)
3310        });
3311        assert!(
3312            detected,
3313            "expected at least one seed to detect equivocation"
3314        );
3315    }
3316
3317    #[test_group("slow")]
3318    #[test_traced]
3319    fn test_equivocator_bls12381_multisig_min_pk() {
3320        let detected = (0..5).any(|seed| {
3321            equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>)
3322        });
3323        assert!(
3324            detected,
3325            "expected at least one seed to detect equivocation"
3326        );
3327    }
3328
3329    #[test_group("slow")]
3330    #[test_traced]
3331    fn test_equivocator_bls12381_multisig_min_sig() {
3332        let detected = (0..5).any(|seed| {
3333            equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>)
3334        });
3335        assert!(
3336            detected,
3337            "expected at least one seed to detect equivocation"
3338        );
3339    }
3340
3341    #[test_group("slow")]
3342    #[test_traced]
3343    fn test_equivocator_ed25519() {
3344        let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, ed25519::fixture));
3345        assert!(
3346            detected,
3347            "expected at least one seed to detect equivocation"
3348        );
3349    }
3350
3351    #[test_group("slow")]
3352    #[test_traced]
3353    fn test_equivocator_secp256r1() {
3354        let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, secp256r1::fixture));
3355        assert!(
3356            detected,
3357            "expected at least one seed to detect equivocation"
3358        );
3359    }
3360
3361    fn reconfigurer<S, F, L>(seed: u64, mut fixture: F)
3362    where
3363        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3364        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3365        L: Elector<S>,
3366    {
3367        // Create context
3368        let n = 4;
3369        let required_containers = View::new(50);
3370        let activity_timeout = ViewDelta::new(10);
3371        let skip_timeout = ViewDelta::new(5);
3372        let namespace = b"consensus".to_vec();
3373        let cfg = deterministic::Config::new()
3374            .with_seed(seed)
3375            .with_timeout(Some(Duration::from_secs(30)));
3376        let executor = deterministic::Runner::new(cfg);
3377        executor.start(|mut context| async move {
3378            // Create simulated network
3379            let (network, mut oracle) = Network::new(
3380                context.with_label("network"),
3381                Config {
3382                    max_size: 1024 * 1024,
3383                    disconnect_on_block: false,
3384                    tracked_peer_sets: None,
3385                },
3386            );
3387
3388            // Start network
3389            network.start();
3390
3391            // Register participants
3392            let Fixture {
3393                participants,
3394                schemes,
3395                ..
3396            } = fixture(&mut context, &namespace, n);
3397            let mut registrations = register_validators(&mut oracle, &participants).await;
3398
3399            // Link all validators
3400            let link = Link {
3401                latency: Duration::from_millis(10),
3402                jitter: Duration::from_millis(1),
3403                success_rate: 1.0,
3404            };
3405            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3406
3407            // Create engines
3408            let elector = L::default();
3409            let relay = Arc::new(mocks::relay::Relay::new());
3410            let mut reporters = Vec::new();
3411            for (idx_scheme, validator) in participants.iter().enumerate() {
3412                // Create scheme context
3413                let context = context.with_label(&format!("validator_{}", *validator));
3414
3415                // Start engine
3416                let reporter_config = mocks::reporter::Config {
3417                    participants: participants.clone().try_into().unwrap(),
3418                    scheme: schemes[idx_scheme].clone(),
3419                    elector: elector.clone(),
3420                };
3421                let reporter =
3422                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3423                let (pending, recovered, resolver) = registrations
3424                    .remove(validator)
3425                    .expect("validator should be registered");
3426                if idx_scheme == 0 {
3427                    let cfg = mocks::reconfigurer::Config {
3428                        scheme: schemes[idx_scheme].clone(),
3429                    };
3430                    let engine: mocks::reconfigurer::Reconfigurer<_, _, Sha256> =
3431                        mocks::reconfigurer::Reconfigurer::new(
3432                            context.with_label("byzantine_engine"),
3433                            cfg,
3434                        );
3435                    engine.start(pending);
3436                } else {
3437                    reporters.push(reporter.clone());
3438                    let application_cfg = mocks::application::Config {
3439                        hasher: Sha256::default(),
3440                        relay: relay.clone(),
3441                        me: validator.clone(),
3442                        propose_latency: (10.0, 5.0),
3443                        verify_latency: (10.0, 5.0),
3444                        certify_latency: (10.0, 5.0),
3445                        should_certify: mocks::application::Certifier::Sometimes,
3446                    };
3447                    let (actor, application) = mocks::application::Application::new(
3448                        context.with_label("application"),
3449                        application_cfg,
3450                    );
3451                    actor.start();
3452                    let blocker = oracle.control(validator.clone());
3453                    let cfg = config::Config {
3454                        scheme: schemes[idx_scheme].clone(),
3455                        elector: elector.clone(),
3456                        blocker,
3457                        automaton: application.clone(),
3458                        relay: application.clone(),
3459                        reporter: reporter.clone(),
3460                        strategy: Sequential,
3461                        partition: validator.to_string(),
3462                        mailbox_size: 1024,
3463                        epoch: Epoch::new(333),
3464                        leader_timeout: Duration::from_secs(1),
3465                        notarization_timeout: Duration::from_secs(2),
3466                        nullify_retry: Duration::from_secs(10),
3467                        fetch_timeout: Duration::from_secs(1),
3468                        activity_timeout,
3469                        skip_timeout,
3470                        fetch_concurrent: 4,
3471                        replay_buffer: NZUsize!(1024 * 1024),
3472                        write_buffer: NZUsize!(1024 * 1024),
3473                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3474                    };
3475                    let engine = Engine::new(context.with_label("engine"), cfg);
3476                    engine.start(pending, recovered, resolver);
3477                }
3478            }
3479
3480            // Wait for all engines to finish
3481            let mut finalizers = Vec::new();
3482            for reporter in reporters.iter_mut() {
3483                let (mut latest, mut monitor) = reporter.subscribe().await;
3484                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3485                    while latest < required_containers {
3486                        latest = monitor.next().await.expect("event missing");
3487                    }
3488                }));
3489            }
3490            join_all(finalizers).await;
3491
3492            // Check reporters for correct activity
3493            let byz = &participants[0];
3494            for reporter in reporters.iter() {
3495                // Ensure no faults
3496                {
3497                    let faults = reporter.faults.lock().unwrap();
3498                    assert!(faults.is_empty());
3499                }
3500
3501                // Ensure no invalid signatures
3502                {
3503                    let invalid = reporter.invalid.lock().unwrap();
3504                    assert_eq!(*invalid, 0);
3505                }
3506            }
3507
3508            // Ensure reconfigurer is blocked (epoch mismatch)
3509            let blocked = oracle.blocked().await.unwrap();
3510            assert!(!blocked.is_empty());
3511            for (a, b) in blocked {
3512                assert_ne!(&a, byz);
3513                assert_eq!(&b, byz);
3514            }
3515        });
3516    }
3517
3518    #[test_group("slow")]
3519    #[test_traced]
3520    fn test_reconfigurer() {
3521        for seed in 0..5 {
3522            reconfigurer::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
3523            reconfigurer::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
3524            reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3525            reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3526            reconfigurer::<_, _, RoundRobin>(seed, ed25519::fixture);
3527            reconfigurer::<_, _, RoundRobin>(seed, secp256r1::fixture);
3528        }
3529    }
3530
3531    fn nuller<S, F, L>(seed: u64, mut fixture: F)
3532    where
3533        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3534        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3535        L: Elector<S>,
3536    {
3537        // Create context
3538        let n = 4;
3539        let required_containers = View::new(50);
3540        let activity_timeout = ViewDelta::new(10);
3541        let skip_timeout = ViewDelta::new(5);
3542        let namespace = b"consensus".to_vec();
3543        let cfg = deterministic::Config::new()
3544            .with_seed(seed)
3545            .with_timeout(Some(Duration::from_secs(30)));
3546        let executor = deterministic::Runner::new(cfg);
3547        executor.start(|mut context| async move {
3548            // Create simulated network
3549            let (network, mut oracle) = Network::new(
3550                context.with_label("network"),
3551                Config {
3552                    max_size: 1024 * 1024,
3553                    disconnect_on_block: false,
3554                    tracked_peer_sets: None,
3555                },
3556            );
3557
3558            // Start network
3559            network.start();
3560
3561            // Register participants
3562            let Fixture {
3563                participants,
3564                schemes,
3565                ..
3566            } = fixture(&mut context, &namespace, n);
3567            let mut registrations = register_validators(&mut oracle, &participants).await;
3568
3569            // Link all validators
3570            let link = Link {
3571                latency: Duration::from_millis(10),
3572                jitter: Duration::from_millis(1),
3573                success_rate: 1.0,
3574            };
3575            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3576
3577            // Create engines
3578            let elector = L::default();
3579            let relay = Arc::new(mocks::relay::Relay::new());
3580            let mut reporters = Vec::new();
3581            for (idx_scheme, validator) in participants.iter().enumerate() {
3582                // Create scheme context
3583                let context = context.with_label(&format!("validator_{}", *validator));
3584
3585                // Start engine
3586                let reporter_config = mocks::reporter::Config {
3587                    participants: participants.clone().try_into().unwrap(),
3588                    scheme: schemes[idx_scheme].clone(),
3589                    elector: elector.clone(),
3590                };
3591                let reporter =
3592                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3593                let (pending, recovered, resolver) = registrations
3594                    .remove(validator)
3595                    .expect("validator should be registered");
3596                if idx_scheme == 0 {
3597                    let cfg = mocks::nuller::Config {
3598                        scheme: schemes[idx_scheme].clone(),
3599                    };
3600                    let engine: mocks::nuller::Nuller<_, _, Sha256> =
3601                        mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
3602                    engine.start(pending);
3603                } else {
3604                    reporters.push(reporter.clone());
3605                    let application_cfg = mocks::application::Config {
3606                        hasher: Sha256::default(),
3607                        relay: relay.clone(),
3608                        me: validator.clone(),
3609                        propose_latency: (10.0, 5.0),
3610                        verify_latency: (10.0, 5.0),
3611                        certify_latency: (10.0, 5.0),
3612                        should_certify: mocks::application::Certifier::Sometimes,
3613                    };
3614                    let (actor, application) = mocks::application::Application::new(
3615                        context.with_label("application"),
3616                        application_cfg,
3617                    );
3618                    actor.start();
3619                    let blocker = oracle.control(validator.clone());
3620                    let cfg = config::Config {
3621                        scheme: schemes[idx_scheme].clone(),
3622                        elector: elector.clone(),
3623                        blocker,
3624                        automaton: application.clone(),
3625                        relay: application.clone(),
3626                        reporter: reporter.clone(),
3627                        strategy: Sequential,
3628                        partition: validator.clone().to_string(),
3629                        mailbox_size: 1024,
3630                        epoch: Epoch::new(333),
3631                        leader_timeout: Duration::from_secs(1),
3632                        notarization_timeout: Duration::from_secs(2),
3633                        nullify_retry: Duration::from_secs(10),
3634                        fetch_timeout: Duration::from_secs(1),
3635                        activity_timeout,
3636                        skip_timeout,
3637                        fetch_concurrent: 4,
3638                        replay_buffer: NZUsize!(1024 * 1024),
3639                        write_buffer: NZUsize!(1024 * 1024),
3640                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3641                    };
3642                    let engine = Engine::new(context.with_label("engine"), cfg);
3643                    engine.start(pending, recovered, resolver);
3644                }
3645            }
3646
3647            // Wait for all engines to finish
3648            let mut finalizers = Vec::new();
3649            for reporter in reporters.iter_mut() {
3650                let (mut latest, mut monitor) = reporter.subscribe().await;
3651                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3652                    while latest < required_containers {
3653                        latest = monitor.next().await.expect("event missing");
3654                    }
3655                }));
3656            }
3657            join_all(finalizers).await;
3658
3659            // Check reporters for correct activity
3660            let byz = &participants[0];
3661            let mut count_nullify_and_finalize = 0;
3662            for reporter in reporters.iter() {
3663                // Ensure only faults for byz
3664                {
3665                    let faults = reporter.faults.lock().unwrap();
3666                    assert_eq!(faults.len(), 1);
3667                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
3668                    for (_, faults) in faulter.iter() {
3669                        for fault in faults.iter() {
3670                            match fault {
3671                                Activity::NullifyFinalize(_) => {
3672                                    count_nullify_and_finalize += 1;
3673                                }
3674                                _ => panic!("unexpected fault: {fault:?}"),
3675                            }
3676                        }
3677                    }
3678                }
3679
3680                // Ensure no invalid signatures
3681                {
3682                    let invalid = reporter.invalid.lock().unwrap();
3683                    assert_eq!(*invalid, 0);
3684                }
3685            }
3686            assert!(count_nullify_and_finalize > 0);
3687
3688            // Ensure nullifier is blocked
3689            let blocked = oracle.blocked().await.unwrap();
3690            assert!(!blocked.is_empty());
3691            for (a, b) in blocked {
3692                assert_ne!(&a, byz);
3693                assert_eq!(&b, byz);
3694            }
3695        });
3696    }
3697
3698    #[test_group("slow")]
3699    #[test_traced]
3700    fn test_nuller() {
3701        for seed in 0..5 {
3702            nuller::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
3703            nuller::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
3704            nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3705            nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3706            nuller::<_, _, RoundRobin>(seed, ed25519::fixture);
3707            nuller::<_, _, RoundRobin>(seed, secp256r1::fixture);
3708        }
3709    }
3710
3711    fn outdated<S, F, L>(seed: u64, mut fixture: F)
3712    where
3713        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3714        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3715        L: Elector<S>,
3716    {
3717        // Create context
3718        let n = 4;
3719        let required_containers = View::new(100);
3720        let activity_timeout = ViewDelta::new(10);
3721        let skip_timeout = ViewDelta::new(5);
3722        let namespace = b"consensus".to_vec();
3723        let cfg = deterministic::Config::new()
3724            .with_seed(seed)
3725            .with_timeout(Some(Duration::from_secs(30)));
3726        let executor = deterministic::Runner::new(cfg);
3727        executor.start(|mut context| async move {
3728            // Create simulated network
3729            let (network, mut oracle) = Network::new(
3730                context.with_label("network"),
3731                Config {
3732                    max_size: 1024 * 1024,
3733                    disconnect_on_block: false,
3734                    tracked_peer_sets: None,
3735                },
3736            );
3737
3738            // Start network
3739            network.start();
3740
3741            // Register participants
3742            let Fixture {
3743                participants,
3744                schemes,
3745                ..
3746            } = fixture(&mut context, &namespace, n);
3747            let mut registrations = register_validators(&mut oracle, &participants).await;
3748
3749            // Link all validators
3750            let link = Link {
3751                latency: Duration::from_millis(10),
3752                jitter: Duration::from_millis(1),
3753                success_rate: 1.0,
3754            };
3755            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3756
3757            // Create engines
3758            let elector = L::default();
3759            let relay = Arc::new(mocks::relay::Relay::new());
3760            let mut reporters = Vec::new();
3761            for (idx_scheme, validator) in participants.iter().enumerate() {
3762                // Create scheme context
3763                let context = context.with_label(&format!("validator_{}", *validator));
3764
3765                // Start engine
3766                let reporter_config = mocks::reporter::Config {
3767                    participants: participants.clone().try_into().unwrap(),
3768                    scheme: schemes[idx_scheme].clone(),
3769                    elector: elector.clone(),
3770                };
3771                let reporter =
3772                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3773                let (pending, recovered, resolver) = registrations
3774                    .remove(validator)
3775                    .expect("validator should be registered");
3776                if idx_scheme == 0 {
3777                    let cfg = mocks::outdated::Config {
3778                        scheme: schemes[idx_scheme].clone(),
3779                        view_delta: ViewDelta::new(activity_timeout.get().saturating_mul(4)),
3780                    };
3781                    let engine: mocks::outdated::Outdated<_, _, Sha256> =
3782                        mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
3783                    engine.start(pending);
3784                } else {
3785                    reporters.push(reporter.clone());
3786                    let application_cfg = mocks::application::Config {
3787                        hasher: Sha256::default(),
3788                        relay: relay.clone(),
3789                        me: validator.clone(),
3790                        propose_latency: (10.0, 5.0),
3791                        verify_latency: (10.0, 5.0),
3792                        certify_latency: (10.0, 5.0),
3793                        should_certify: mocks::application::Certifier::Sometimes,
3794                    };
3795                    let (actor, application) = mocks::application::Application::new(
3796                        context.with_label("application"),
3797                        application_cfg,
3798                    );
3799                    actor.start();
3800                    let blocker = oracle.control(validator.clone());
3801                    let cfg = config::Config {
3802                        scheme: schemes[idx_scheme].clone(),
3803                        elector: elector.clone(),
3804                        blocker,
3805                        automaton: application.clone(),
3806                        relay: application.clone(),
3807                        reporter: reporter.clone(),
3808                        strategy: Sequential,
3809                        partition: validator.clone().to_string(),
3810                        mailbox_size: 1024,
3811                        epoch: Epoch::new(333),
3812                        leader_timeout: Duration::from_secs(1),
3813                        notarization_timeout: Duration::from_secs(2),
3814                        nullify_retry: Duration::from_secs(10),
3815                        fetch_timeout: Duration::from_secs(1),
3816                        activity_timeout,
3817                        skip_timeout,
3818                        fetch_concurrent: 4,
3819                        replay_buffer: NZUsize!(1024 * 1024),
3820                        write_buffer: NZUsize!(1024 * 1024),
3821                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3822                    };
3823                    let engine = Engine::new(context.with_label("engine"), cfg);
3824                    engine.start(pending, recovered, resolver);
3825                }
3826            }
3827
3828            // Wait for all engines to finish
3829            let mut finalizers = Vec::new();
3830            for reporter in reporters.iter_mut() {
3831                let (mut latest, mut monitor) = reporter.subscribe().await;
3832                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3833                    while latest < required_containers {
3834                        latest = monitor.next().await.expect("event missing");
3835                    }
3836                }));
3837            }
3838            join_all(finalizers).await;
3839
3840            // Check reporters for correct activity
3841            for reporter in reporters.iter() {
3842                // Ensure no faults
3843                {
3844                    let faults = reporter.faults.lock().unwrap();
3845                    assert!(faults.is_empty());
3846                }
3847
3848                // Ensure no invalid signatures
3849                {
3850                    let invalid = reporter.invalid.lock().unwrap();
3851                    assert_eq!(*invalid, 0);
3852                }
3853            }
3854
3855            // Ensure no blocked connections
3856            let blocked = oracle.blocked().await.unwrap();
3857            assert!(blocked.is_empty());
3858        });
3859    }
3860
3861    #[test_group("slow")]
3862    #[test_traced]
3863    fn test_outdated() {
3864        for seed in 0..5 {
3865            outdated::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
3866            outdated::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
3867            outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3868            outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3869            outdated::<_, _, RoundRobin>(seed, ed25519::fixture);
3870            outdated::<_, _, RoundRobin>(seed, secp256r1::fixture);
3871        }
3872    }
3873
3874    fn run_1k<S, F, L>(mut fixture: F)
3875    where
3876        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3877        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3878        L: Elector<S>,
3879    {
3880        // Create context
3881        let n = 10;
3882        let required_containers = View::new(1_000);
3883        let activity_timeout = ViewDelta::new(10);
3884        let skip_timeout = ViewDelta::new(5);
3885        let namespace = b"consensus".to_vec();
3886        let cfg = deterministic::Config::new();
3887        let executor = deterministic::Runner::new(cfg);
3888        executor.start(|mut context| async move {
3889            // Create simulated network
3890            let (network, mut oracle) = Network::new(
3891                context.with_label("network"),
3892                Config {
3893                    max_size: 1024 * 1024,
3894                    disconnect_on_block: false,
3895                    tracked_peer_sets: None,
3896                },
3897            );
3898
3899            // Start network
3900            network.start();
3901
3902            // Register participants
3903            let Fixture {
3904                participants,
3905                schemes,
3906                ..
3907            } = fixture(&mut context, &namespace, n);
3908            let mut registrations = register_validators(&mut oracle, &participants).await;
3909
3910            // Link all validators
3911            let link = Link {
3912                latency: Duration::from_millis(80),
3913                jitter: Duration::from_millis(10),
3914                success_rate: 0.98,
3915            };
3916            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3917
3918            // Create engines
3919            let elector = L::default();
3920            let relay = Arc::new(mocks::relay::Relay::new());
3921            let mut reporters = Vec::new();
3922            let mut engine_handlers = Vec::new();
3923            for (idx, validator) in participants.iter().enumerate() {
3924                // Create scheme context
3925                let context = context.with_label(&format!("validator_{}", *validator));
3926
3927                // Configure engine
3928                let reporter_config = mocks::reporter::Config {
3929                    participants: participants.clone().try_into().unwrap(),
3930                    scheme: schemes[idx].clone(),
3931                    elector: elector.clone(),
3932                };
3933                let reporter =
3934                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3935                reporters.push(reporter.clone());
3936                let application_cfg = mocks::application::Config {
3937                    hasher: Sha256::default(),
3938                    relay: relay.clone(),
3939                    me: validator.clone(),
3940                    propose_latency: (100.0, 50.0),
3941                    verify_latency: (50.0, 40.0),
3942                    certify_latency: (50.0, 40.0),
3943                    should_certify: mocks::application::Certifier::Sometimes,
3944                };
3945                let (actor, application) = mocks::application::Application::new(
3946                    context.with_label("application"),
3947                    application_cfg,
3948                );
3949                actor.start();
3950                let blocker = oracle.control(validator.clone());
3951                let cfg = config::Config {
3952                    scheme: schemes[idx].clone(),
3953                    elector: elector.clone(),
3954                    blocker,
3955                    automaton: application.clone(),
3956                    relay: application.clone(),
3957                    reporter: reporter.clone(),
3958                    strategy: Sequential,
3959                    partition: validator.to_string(),
3960                    mailbox_size: 1024,
3961                    epoch: Epoch::new(333),
3962                    leader_timeout: Duration::from_secs(1),
3963                    notarization_timeout: Duration::from_secs(2),
3964                    nullify_retry: Duration::from_secs(10),
3965                    fetch_timeout: Duration::from_secs(1),
3966                    activity_timeout,
3967                    skip_timeout,
3968                    fetch_concurrent: 4,
3969                    replay_buffer: NZUsize!(1024 * 1024),
3970                    write_buffer: NZUsize!(1024 * 1024),
3971                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3972                };
3973                let engine = Engine::new(context.with_label("engine"), cfg);
3974
3975                // Start engine
3976                let (pending, recovered, resolver) = registrations
3977                    .remove(validator)
3978                    .expect("validator should be registered");
3979                engine_handlers.push(engine.start(pending, recovered, resolver));
3980            }
3981
3982            // Wait for all engines to finish
3983            let mut finalizers = Vec::new();
3984            for reporter in reporters.iter_mut() {
3985                let (mut latest, mut monitor) = reporter.subscribe().await;
3986                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3987                    while latest < required_containers {
3988                        latest = monitor.next().await.expect("event missing");
3989                    }
3990                }));
3991            }
3992            join_all(finalizers).await;
3993
3994            // Check reporters for correct activity
3995            for reporter in reporters.iter() {
3996                // Ensure no faults
3997                {
3998                    let faults = reporter.faults.lock().unwrap();
3999                    assert!(faults.is_empty());
4000                }
4001
4002                // Ensure no invalid signatures
4003                {
4004                    let invalid = reporter.invalid.lock().unwrap();
4005                    assert_eq!(*invalid, 0);
4006                }
4007            }
4008
4009            // Ensure no blocked connections
4010            let blocked = oracle.blocked().await.unwrap();
4011            assert!(blocked.is_empty());
4012        })
4013    }
4014
4015    #[test_group("slow")]
4016    #[test_traced]
4017    fn test_1k_bls12381_threshold_min_pk() {
4018        run_1k::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
4019    }
4020
4021    #[test_group("slow")]
4022    #[test_traced]
4023    fn test_1k_bls12381_threshold_min_sig() {
4024        run_1k::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
4025    }
4026
4027    #[test_group("slow")]
4028    #[test_traced]
4029    fn test_1k_bls12381_multisig_min_pk() {
4030        run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4031    }
4032
4033    #[test_group("slow")]
4034    #[test_traced]
4035    fn test_1k_bls12381_multisig_min_sig() {
4036        run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
4037    }
4038
4039    #[test_group("slow")]
4040    #[test_traced]
4041    fn test_1k_ed25519() {
4042        run_1k::<_, _, RoundRobin>(ed25519::fixture);
4043    }
4044
4045    #[test_group("slow")]
4046    #[test_traced]
4047    fn test_1k_secp256r1() {
4048        run_1k::<_, _, RoundRobin>(secp256r1::fixture);
4049    }
4050
4051    fn engine_shutdown<S, F, L>(seed: u64, mut fixture: F, graceful: bool)
4052    where
4053        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4054        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4055        L: Elector<S>,
4056    {
4057        let n = 1;
4058        let namespace = b"consensus".to_vec();
4059        let cfg = deterministic::Config::default()
4060            .with_seed(seed)
4061            .with_timeout(Some(Duration::from_secs(10)));
4062        let executor = deterministic::Runner::new(cfg);
4063        executor.start(|mut context| async move {
4064            // Create simulated network
4065            let (network, mut oracle) = Network::new(
4066                context.with_label("network"),
4067                Config {
4068                    max_size: 1024 * 1024,
4069                    disconnect_on_block: true,
4070                    tracked_peer_sets: None,
4071                },
4072            );
4073
4074            // Start network
4075            network.start();
4076
4077            // Register a single participant
4078            let Fixture {
4079                participants,
4080                schemes,
4081                ..
4082            } = fixture(&mut context, &namespace, n);
4083            let mut registrations = register_validators(&mut oracle, &participants).await;
4084
4085            // Link the single validator to itself (no-ops for completeness)
4086            let link = Link {
4087                latency: Duration::from_millis(1),
4088                jitter: Duration::from_millis(0),
4089                success_rate: 1.0,
4090            };
4091            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4092
4093            // Create engine
4094            let elector = L::default();
4095            let reporter_config = mocks::reporter::Config {
4096                participants: participants.clone().try_into().unwrap(),
4097                scheme: schemes[0].clone(),
4098                elector: elector.clone(),
4099            };
4100            let reporter =
4101                mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
4102            let relay = Arc::new(mocks::relay::Relay::new());
4103            let application_cfg = mocks::application::Config {
4104                hasher: Sha256::default(),
4105                relay: relay.clone(),
4106                me: participants[0].clone(),
4107                propose_latency: (1.0, 0.0),
4108                verify_latency: (1.0, 0.0),
4109                certify_latency: (1.0, 0.0),
4110                should_certify: mocks::application::Certifier::Sometimes,
4111            };
4112            let (actor, application) = mocks::application::Application::new(
4113                context.with_label("application"),
4114                application_cfg,
4115            );
4116            actor.start();
4117            let blocker = oracle.control(participants[0].clone());
4118            let cfg = config::Config {
4119                scheme: schemes[0].clone(),
4120                elector: elector.clone(),
4121                blocker,
4122                automaton: application.clone(),
4123                relay: application.clone(),
4124                reporter: reporter.clone(),
4125                strategy: Sequential,
4126                partition: participants[0].clone().to_string(),
4127                mailbox_size: 64,
4128                epoch: Epoch::new(333),
4129                leader_timeout: Duration::from_millis(50),
4130                notarization_timeout: Duration::from_millis(100),
4131                nullify_retry: Duration::from_millis(250),
4132                fetch_timeout: Duration::from_millis(50),
4133                activity_timeout: ViewDelta::new(4),
4134                skip_timeout: ViewDelta::new(2),
4135                fetch_concurrent: 4,
4136                replay_buffer: NZUsize!(1024 * 16),
4137                write_buffer: NZUsize!(1024 * 16),
4138                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4139            };
4140            let engine = Engine::new(context.with_label("engine"), cfg);
4141
4142            // Start engine
4143            let (pending, recovered, resolver) = registrations
4144                .remove(&participants[0])
4145                .expect("validator should be registered");
4146            let handle = engine.start(pending, recovered, resolver);
4147
4148            // Allow tasks to start
4149            context.sleep(Duration::from_millis(1000)).await;
4150
4151            // Count running tasks under the engine prefix
4152            let running_before = count_running_tasks(&context, "engine");
4153            assert!(
4154                running_before > 0,
4155                "at least one engine task should be running"
4156            );
4157
4158            // Make sure the engine is still running after some time
4159            context.sleep(Duration::from_millis(1500)).await;
4160            assert!(
4161                count_running_tasks(&context, "engine") > 0,
4162                "engine tasks should still be running"
4163            );
4164
4165            // Shutdown engine and ensure children stop
4166            let running_after = if graceful {
4167                let metrics_context = context.clone();
4168                let result = context.stop(0, Some(Duration::from_secs(5))).await;
4169                assert!(
4170                    result.is_ok(),
4171                    "graceful shutdown should complete: {result:?}"
4172                );
4173                count_running_tasks(&metrics_context, "engine")
4174            } else {
4175                handle.abort();
4176                let _ = handle.await; // ensure parent tear-down runs
4177
4178                // Give the runtime a tick to process aborts
4179                context.sleep(Duration::from_millis(1000)).await;
4180                count_running_tasks(&context, "engine")
4181            };
4182            assert_eq!(
4183                running_after, 0,
4184                "all engine tasks should be stopped, but {running_after} still running"
4185            );
4186        });
4187    }
4188
4189    #[test_traced]
4190    fn test_children_shutdown_on_engine_abort() {
4191        for seed in 0..10 {
4192            engine_shutdown::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>, false);
4193            engine_shutdown::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>, false);
4194            engine_shutdown::<_, _, RoundRobin>(
4195                seed,
4196                bls12381_multisig::fixture::<MinPk, _>,
4197                false,
4198            );
4199            engine_shutdown::<_, _, RoundRobin>(
4200                seed,
4201                bls12381_multisig::fixture::<MinSig, _>,
4202                false,
4203            );
4204            engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, false);
4205            engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, false);
4206        }
4207    }
4208
4209    #[test_traced]
4210    fn test_graceful_shutdown() {
4211        for seed in 0..10 {
4212            engine_shutdown::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>, true);
4213            engine_shutdown::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>, true);
4214            engine_shutdown::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>, true);
4215            engine_shutdown::<_, _, RoundRobin>(
4216                seed,
4217                bls12381_multisig::fixture::<MinSig, _>,
4218                true,
4219            );
4220            engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, true);
4221            engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, true);
4222        }
4223    }
4224
4225    fn attributable_reporter_filtering<S, F, L>(mut fixture: F)
4226    where
4227        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4228        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4229        L: Elector<S>,
4230    {
4231        let n = 3;
4232        let required_containers = View::new(10);
4233        let activity_timeout = ViewDelta::new(10);
4234        let skip_timeout = ViewDelta::new(5);
4235        let namespace = b"consensus".to_vec();
4236        let executor = deterministic::Runner::timed(Duration::from_secs(30));
4237        executor.start(|mut context| async move {
4238            // Create simulated network
4239            let (network, mut oracle) = Network::new(
4240                context.with_label("network"),
4241                Config {
4242                    max_size: 1024 * 1024,
4243                    disconnect_on_block: false,
4244                    tracked_peer_sets: None,
4245                },
4246            );
4247            network.start();
4248
4249            // Register participants
4250            let Fixture {
4251                participants,
4252                schemes,
4253                ..
4254            } = fixture(&mut context, &namespace, n);
4255            let mut registrations = register_validators(&mut oracle, &participants).await;
4256
4257            // Link all validators
4258            let link = Link {
4259                latency: Duration::from_millis(10),
4260                jitter: Duration::from_millis(1),
4261                success_rate: 1.0,
4262            };
4263            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4264
4265            // Create engines with `AttributableReporter` wrapper
4266            let elector = L::default();
4267            let relay = Arc::new(mocks::relay::Relay::new());
4268            let mut reporters = Vec::new();
4269            for (idx, validator) in participants.iter().enumerate() {
4270                let context = context.with_label(&format!("validator_{}", *validator));
4271
4272                let reporter_config = mocks::reporter::Config {
4273                    participants: participants.clone().try_into().unwrap(),
4274                    scheme: schemes[idx].clone(),
4275                    elector: elector.clone(),
4276                };
4277                let mock_reporter = mocks::reporter::Reporter::new(
4278                    context.with_label("mock_reporter"),
4279                    reporter_config,
4280                );
4281
4282                // Wrap with `AttributableReporter`
4283                let attributable_reporter = scheme::reporter::AttributableReporter::new(
4284                    context.with_label("rng"),
4285                    schemes[idx].clone(),
4286                    mock_reporter.clone(),
4287                    Sequential,
4288                    true, // Enable verification
4289                );
4290                reporters.push(mock_reporter.clone());
4291
4292                let application_cfg = mocks::application::Config {
4293                    hasher: Sha256::default(),
4294                    relay: relay.clone(),
4295                    me: validator.clone(),
4296                    propose_latency: (10.0, 5.0),
4297                    verify_latency: (10.0, 5.0),
4298                    certify_latency: (10.0, 5.0),
4299                    should_certify: mocks::application::Certifier::Sometimes,
4300                };
4301                let (actor, application) = mocks::application::Application::new(
4302                    context.with_label("application"),
4303                    application_cfg,
4304                );
4305                actor.start();
4306                let blocker = oracle.control(validator.clone());
4307                let cfg = config::Config {
4308                    scheme: schemes[idx].clone(),
4309                    elector: elector.clone(),
4310                    blocker,
4311                    automaton: application.clone(),
4312                    relay: application.clone(),
4313                    reporter: attributable_reporter,
4314                    strategy: Sequential,
4315                    partition: validator.to_string(),
4316                    mailbox_size: 1024,
4317                    epoch: Epoch::new(333),
4318                    leader_timeout: Duration::from_secs(1),
4319                    notarization_timeout: Duration::from_secs(2),
4320                    nullify_retry: Duration::from_secs(10),
4321                    fetch_timeout: Duration::from_secs(1),
4322                    activity_timeout,
4323                    skip_timeout,
4324                    fetch_concurrent: 4,
4325                    replay_buffer: NZUsize!(1024 * 1024),
4326                    write_buffer: NZUsize!(1024 * 1024),
4327                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4328                };
4329                let engine = Engine::new(context.with_label("engine"), cfg);
4330
4331                // Start engine
4332                let (pending, recovered, resolver) = registrations
4333                    .remove(validator)
4334                    .expect("validator should be registered");
4335                engine.start(pending, recovered, resolver);
4336            }
4337
4338            // Wait for all engines to finish
4339            let mut finalizers = Vec::new();
4340            for reporter in reporters.iter_mut() {
4341                let (mut latest, mut monitor) = reporter.subscribe().await;
4342                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4343                    while latest < required_containers {
4344                        latest = monitor.next().await.expect("event missing");
4345                    }
4346                }));
4347            }
4348            join_all(finalizers).await;
4349
4350            // Verify filtering behavior based on scheme attributability
4351            for reporter in reporters.iter() {
4352                // Ensure no faults (normal operation)
4353                {
4354                    let faults = reporter.faults.lock().unwrap();
4355                    assert!(faults.is_empty(), "No faults should be reported");
4356                }
4357
4358                // Ensure no invalid signatures
4359                {
4360                    let invalid = reporter.invalid.lock().unwrap();
4361                    assert_eq!(*invalid, 0, "No invalid signatures");
4362                }
4363
4364                // Check that we have certificates reported
4365                {
4366                    let notarizations = reporter.notarizations.lock().unwrap();
4367                    let finalizations = reporter.finalizations.lock().unwrap();
4368                    assert!(
4369                        !notarizations.is_empty() || !finalizations.is_empty(),
4370                        "Certificates should be reported"
4371                    );
4372                }
4373
4374                // Check notarizes
4375                let notarizes = reporter.notarizes.lock().unwrap();
4376                let last_view = notarizes.keys().max().cloned().unwrap_or_default();
4377                for (view, payloads) in notarizes.iter() {
4378                    if *view == last_view {
4379                        continue; // Skip last view
4380                    }
4381
4382                    let signers: usize = payloads.values().map(|signers| signers.len()).sum();
4383
4384                    // For attributable schemes, we should see peer activities
4385                    if S::is_attributable() {
4386                        assert!(signers > 1, "view {view}: {signers}");
4387                    } else {
4388                        // For non-attributable, we shouldn't see any peer activities
4389                        assert_eq!(signers, 0);
4390                    }
4391                }
4392
4393                // Check finalizes
4394                let finalizes = reporter.finalizes.lock().unwrap();
4395                for (_, payloads) in finalizes.iter() {
4396                    let signers: usize = payloads.values().map(|signers| signers.len()).sum();
4397
4398                    // For attributable schemes, we should see peer activities
4399                    if S::is_attributable() {
4400                        assert!(signers > 1);
4401                    } else {
4402                        // For non-attributable, we shouldn't see any peer activities
4403                        assert_eq!(signers, 0);
4404                    }
4405                }
4406            }
4407
4408            // Ensure no blocked connections (normal operation)
4409            let blocked = oracle.blocked().await.unwrap();
4410            assert!(blocked.is_empty());
4411        });
4412    }
4413
4414    #[test_traced]
4415    fn test_attributable_reporter_filtering() {
4416        attributable_reporter_filtering::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
4417        attributable_reporter_filtering::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
4418        attributable_reporter_filtering::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4419        attributable_reporter_filtering::<_, _, RoundRobin>(
4420            bls12381_multisig::fixture::<MinSig, _>,
4421        );
4422        attributable_reporter_filtering::<_, _, RoundRobin>(ed25519::fixture);
4423        attributable_reporter_filtering::<_, _, RoundRobin>(secp256r1::fixture);
4424    }
4425
4426    fn split_views_no_lockup<S, F, L>(mut fixture: F)
4427    where
4428        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4429        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4430        L: Elector<S>,
4431    {
4432        // Scenario:
4433        // - View F: Finalization of B_1 seen by all participants.
4434        // - View F+1:
4435        //   - Nullification seen by honest (4..=6,7) and all 3 byzantines
4436        //   - Notarization of B_2A seen by honest (1..=3)
4437        // - View F+2:
4438        //   - Nullification seen by honest (1..=3,7) and all 3 byzantines
4439        //   - Notarization of B_2B seen by honest (4..=6)
4440        // - View F+3: Nullification. Seen by all participants.
4441        // - Then ensure progress resumes beyond F+3 after reconnecting
4442
4443        // Define participant types
4444        enum ParticipantType {
4445            Group1,    // receives notarization for f+1, nullification for f+2
4446            Group2,    // receives nullification for f+1, notarization for f+2
4447            Ignorant,  // receives nullification for f+1 and f+2
4448            Byzantine, // nullify-only
4449        }
4450        let get_type = |idx: usize| -> ParticipantType {
4451            match idx {
4452                0..3 => ParticipantType::Group1,
4453                3..6 => ParticipantType::Group2,
4454                6 => ParticipantType::Ignorant,
4455                7..10 => ParticipantType::Byzantine,
4456                _ => unreachable!(),
4457            }
4458        };
4459
4460        // Create context
4461        let n = 10;
4462        let quorum = quorum(n) as usize;
4463        assert_eq!(quorum, 7);
4464        let activity_timeout = ViewDelta::new(10);
4465        let skip_timeout = ViewDelta::new(5);
4466        let namespace = b"consensus".to_vec();
4467        let executor = deterministic::Runner::timed(Duration::from_secs(300));
4468        executor.start(|mut context| async move {
4469            // Create simulated network
4470            let (network, mut oracle) = Network::new(
4471                context.with_label("network"),
4472                Config {
4473                    max_size: 1024 * 1024,
4474                    disconnect_on_block: false,
4475                    tracked_peer_sets: None,
4476                },
4477            );
4478            network.start();
4479
4480            // Register participants
4481            let Fixture {
4482                participants,
4483                schemes,
4484                ..
4485            } = fixture(&mut context, &namespace, n);
4486            let mut registrations = register_validators(&mut oracle, &participants).await;
4487
4488            // ========== Create engines ==========
4489
4490            // Do not link validators yet; we will inject certificates first, then link everyone.
4491
4492            // Create engines: 7 honest engines, 3 byzantine
4493            let elector = L::default();
4494            let relay = Arc::new(mocks::relay::Relay::new());
4495            let mut honest_reporters = Vec::new();
4496            for (idx, validator) in participants.iter().enumerate() {
4497                let (pending, recovered, resolver) = registrations
4498                    .remove(validator)
4499                    .expect("validator should be registered");
4500                let participant_type = get_type(idx);
4501                if matches!(participant_type, ParticipantType::Byzantine) {
4502                    // Byzantine engines
4503                    let cfg = mocks::nullify_only::Config {
4504                        scheme: schemes[idx].clone(),
4505                    };
4506                    let engine: mocks::nullify_only::NullifyOnly<_, _, Sha256> =
4507                        mocks::nullify_only::NullifyOnly::new(
4508                            context.with_label(&format!("byzantine_{}", *validator)),
4509                            cfg,
4510                        );
4511                    engine.start(pending);
4512                    // Recovered/resolver channels are unused for byzantine actors.
4513                    drop(recovered);
4514                    drop(resolver);
4515                } else {
4516                    // Honest engines
4517                    let reporter_config = mocks::reporter::Config {
4518                        participants: participants.clone().try_into().unwrap(),
4519                        scheme: schemes[idx].clone(),
4520                        elector: elector.clone(),
4521                    };
4522                    let reporter = mocks::reporter::Reporter::new(
4523                        context.with_label(&format!("reporter_{}", *validator)),
4524                        reporter_config,
4525                    );
4526                    honest_reporters.push(reporter.clone());
4527
4528                    let application_cfg = mocks::application::Config {
4529                        hasher: Sha256::default(),
4530                        relay: relay.clone(),
4531                        me: validator.clone(),
4532                        propose_latency: (10.0, 5.0),
4533                        verify_latency: (10.0, 5.0),
4534                        certify_latency: (10.0, 5.0),
4535                        should_certify: mocks::application::Certifier::Sometimes,
4536                    };
4537                    let (actor, application) = mocks::application::Application::new(
4538                        context.with_label(&format!("application_{}", *validator)),
4539                        application_cfg,
4540                    );
4541                    actor.start();
4542                    let blocker = oracle.control(validator.clone());
4543                    let cfg = config::Config {
4544                        scheme: schemes[idx].clone(),
4545                        elector: elector.clone(),
4546                        blocker,
4547                        automaton: application.clone(),
4548                        relay: application.clone(),
4549                        reporter: reporter.clone(),
4550                        strategy: Sequential,
4551                        partition: validator.to_string(),
4552                        mailbox_size: 1024,
4553                        epoch: Epoch::new(333),
4554                        leader_timeout: Duration::from_secs(10),
4555                        notarization_timeout: Duration::from_secs(10),
4556                        nullify_retry: Duration::from_secs(10),
4557                        fetch_timeout: Duration::from_secs(1),
4558                        activity_timeout,
4559                        skip_timeout,
4560                        fetch_concurrent: 4,
4561                        replay_buffer: NZUsize!(1024 * 1024),
4562                        write_buffer: NZUsize!(1024 * 1024),
4563                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4564                    };
4565                    let engine =
4566                        Engine::new(context.with_label(&format!("engine_{}", *validator)), cfg);
4567                    engine.start(pending, recovered, resolver);
4568                }
4569            }
4570
4571            // ========== Build the certificates manually ==========
4572
4573            // Helper: assemble finalization from explicit signer indices
4574            let build_finalization = |proposal: &Proposal<D>| -> TFinalization<_, D> {
4575                let votes: Vec<_> = (0..=quorum)
4576                    .map(|i| TFinalize::sign(&schemes[i], proposal.clone()).unwrap())
4577                    .collect();
4578                TFinalization::from_finalizes(&schemes[0], &votes, &Sequential)
4579                    .expect("finalization quorum")
4580            };
4581            // Helper: assemble notarization from explicit signer indices
4582            let build_notarization = |proposal: &Proposal<D>| -> TNotarization<_, D> {
4583                let votes: Vec<_> = (0..=quorum)
4584                    .map(|i| TNotarize::sign(&schemes[i], proposal.clone()).unwrap())
4585                    .collect();
4586                TNotarization::from_notarizes(&schemes[0], &votes, &Sequential)
4587                    .expect("notarization quorum")
4588            };
4589            let build_nullification = |round: Round| -> TNullification<_> {
4590                let votes: Vec<_> = (0..=quorum)
4591                    .map(|i| TNullify::sign::<D>(&schemes[i], round).unwrap())
4592                    .collect();
4593                TNullification::from_nullifies(&schemes[0], &votes, &Sequential)
4594                    .expect("nullification quorum")
4595            };
4596            // Choose F=1 and construct B_1, B_2A, B_2B
4597            let f_view = 1;
4598            let round_f = Round::new(Epoch::new(333), View::new(f_view));
4599            let payload_b0 = Sha256::hash(b"B_F");
4600            let proposal_b0 = Proposal::new(round_f, View::new(f_view - 1), payload_b0);
4601            let payload_b1a = Sha256::hash(b"B_G1");
4602            let proposal_b1a = Proposal::new(
4603                Round::new(Epoch::new(333), View::new(f_view + 1)),
4604                View::new(f_view),
4605                payload_b1a,
4606            );
4607            let payload_b1b = Sha256::hash(b"B_G2");
4608            let proposal_b1b = Proposal::new(
4609                Round::new(Epoch::new(333), View::new(f_view + 2)),
4610                View::new(f_view),
4611                payload_b1b,
4612            );
4613
4614            // Build notarization and finalization for the first block
4615            let b0_notarization = build_notarization(&proposal_b0);
4616            let b0_finalization = build_finalization(&proposal_b0);
4617            // Build notarizations for F+1 and F+2
4618            let b1a_notarization = build_notarization(&proposal_b1a);
4619            let b1b_notarization = build_notarization(&proposal_b1b);
4620            // Build nullifications for F+1 and F+2
4621            let null_a = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 1)));
4622            let null_b = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 2)));
4623
4624            // Create an 11th non-participant injector and obtain senders
4625            let injector_pk = PrivateKey::from_seed(1_000_000).public_key();
4626            let (mut injector_sender, _inj_certificate_receiver) = oracle
4627                .control(injector_pk.clone())
4628                .register(1, TEST_QUOTA)
4629                .await
4630                .unwrap();
4631
4632            // Create minimal one-way links from injector to all participants (not full mesh)
4633            let link = Link {
4634                latency: Duration::from_millis(10),
4635                jitter: Duration::from_millis(0),
4636                success_rate: 1.0,
4637            };
4638            for p in participants.iter() {
4639                oracle
4640                    .add_link(injector_pk.clone(), p.clone(), link.clone())
4641                    .await
4642                    .unwrap();
4643            }
4644
4645            // ========== Broadcast certificates over recovered network. ==========
4646
4647            // Broadcasts are in reverse order of views to make the tests easier by preventing the
4648            // proposer from making a proposal in F+1 or F+2, as it may panic when it proposes
4649            // something but generates a certificate for a different proposal.
4650
4651            // View F+2:
4652            let notarization_msg = Certificate::<_, D>::Notarization(b1b_notarization);
4653            let nullification_msg = Certificate::<_, D>::Nullification(null_b.clone());
4654            for (i, participant) in participants.iter().enumerate() {
4655                let recipient = Recipients::One(participant.clone());
4656                let msg = match get_type(i) {
4657                    ParticipantType::Group2 => notarization_msg.encode(),
4658                    _ => nullification_msg.encode(),
4659                };
4660                injector_sender.send(recipient, msg, true).await.unwrap();
4661            }
4662            // View F+1:
4663            let notarization_msg = Certificate::<_, D>::Notarization(b1a_notarization);
4664            let nullification_msg = Certificate::<_, D>::Nullification(null_a.clone());
4665            for (i, participant) in participants.iter().enumerate() {
4666                let recipient = Recipients::One(participant.clone());
4667                let msg = match get_type(i) {
4668                    ParticipantType::Group1 => notarization_msg.encode(),
4669                    _ => nullification_msg.encode(),
4670                };
4671                injector_sender.send(recipient, msg, true).await.unwrap();
4672            }
4673            // View F:
4674            let msg = Certificate::<_, D>::Notarization(b0_notarization).encode();
4675            injector_sender
4676                .send(Recipients::All, msg, true)
4677                .await
4678                .unwrap();
4679            let msg = Certificate::<_, D>::Finalization(b0_finalization).encode();
4680            injector_sender
4681                .send(Recipients::All, msg, true)
4682                .await
4683                .unwrap();
4684
4685            // Wait for a while to let the certificates propagate, but not so long that we
4686            // nullify view F+2.
4687            debug!("waiting for certificates to propagate");
4688            context.sleep(Duration::from_secs(5)).await;
4689            debug!("certificates propagated");
4690
4691            // ========== Assert the exact certificates are seen in each view ==========
4692
4693            // Assert the exact certificates in view F
4694            // All participants should have finalized B_0
4695            let view = View::new(f_view);
4696            for reporter in honest_reporters.iter() {
4697                let finalizations = reporter.finalizations.lock().unwrap();
4698                assert!(finalizations.contains_key(&view));
4699            }
4700
4701            // Assert the exact certificates in view F+1
4702            // Group 1 should have notarized B_1A only
4703            // All other participants should have nullified F+1
4704            let view = View::new(f_view + 1);
4705            for (i, reporter) in honest_reporters.iter().enumerate() {
4706                let finalizations = reporter.finalizations.lock().unwrap();
4707                assert!(!finalizations.contains_key(&view));
4708                let nullifications = reporter.nullifications.lock().unwrap();
4709                let notarizations = reporter.notarizations.lock().unwrap();
4710                match get_type(i) {
4711                    ParticipantType::Group1 => {
4712                        assert!(notarizations.contains_key(&view));
4713                        assert!(!nullifications.contains_key(&view));
4714                    }
4715                    _ => {
4716                        assert!(nullifications.contains_key(&view));
4717                        assert!(!notarizations.contains_key(&view));
4718                    }
4719                }
4720            }
4721
4722            // Assert the exact certificates in view F+2
4723            // Group 2 should have notarized B_1B only
4724            // All other participants should have nullified F+2
4725            let view = View::new(f_view + 2);
4726            for (i, reporter) in honest_reporters.iter().enumerate() {
4727                let finalizations = reporter.finalizations.lock().unwrap();
4728                assert!(!finalizations.contains_key(&view));
4729                let nullifications = reporter.nullifications.lock().unwrap();
4730                let notarizations = reporter.notarizations.lock().unwrap();
4731                match get_type(i) {
4732                    ParticipantType::Group2 => {
4733                        assert!(notarizations.contains_key(&view));
4734                        assert!(!nullifications.contains_key(&view));
4735                    }
4736                    _ => {
4737                        assert!(nullifications.contains_key(&view));
4738                        assert!(!notarizations.contains_key(&view));
4739                    }
4740                }
4741            }
4742
4743            // Assert no members have yet nullified view F+3
4744            let next_view = View::new(f_view + 3);
4745            for (i, reporter) in honest_reporters.iter().enumerate() {
4746                let nullifies = reporter.nullifies.lock().unwrap();
4747                assert!(!nullifies.contains_key(&next_view), "reporter {i}");
4748            }
4749
4750            // ========== Reconnect all participants ==========
4751
4752            // Reconnect all participants fully using the helper
4753            link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
4754
4755            // Wait until all honest reporters finalize strictly past F+2 (e.g., at least F+3)
4756            {
4757                let target = View::new(f_view + 3);
4758                let mut finalizers = Vec::new();
4759                for reporter in honest_reporters.iter_mut() {
4760                    let (mut latest, mut monitor) = reporter.subscribe().await;
4761                    finalizers.push(context.with_label("resume_finalizer").spawn(
4762                        move |_| async move {
4763                            while latest < target {
4764                                latest = monitor.next().await.expect("event missing");
4765                            }
4766                        },
4767                    ));
4768                }
4769                join_all(finalizers).await;
4770            }
4771
4772            // Sanity checks: no faults/invalid signatures, and no peers blocked
4773            for reporter in honest_reporters.iter() {
4774                {
4775                    let faults = reporter.faults.lock().unwrap();
4776                    assert!(faults.is_empty());
4777                }
4778                {
4779                    let invalid = reporter.invalid.lock().unwrap();
4780                    assert_eq!(*invalid, 0);
4781                }
4782            }
4783            let blocked = oracle.blocked().await.unwrap();
4784            assert!(blocked.is_empty());
4785        });
4786    }
4787
4788    #[test_traced]
4789    fn test_split_views_no_lockup() {
4790        split_views_no_lockup::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
4791        split_views_no_lockup::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
4792        split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4793        split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
4794        split_views_no_lockup::<_, _, RoundRobin>(ed25519::fixture);
4795        split_views_no_lockup::<_, _, RoundRobin>(secp256r1::fixture);
4796    }
4797
4798    fn tle<V, L>()
4799    where
4800        V: Variant,
4801        L: Elector<bls12381_threshold::Scheme<PublicKey, V>>,
4802    {
4803        // Create context
4804        let n = 4;
4805        let namespace = b"consensus".to_vec();
4806        let activity_timeout = ViewDelta::new(100);
4807        let skip_timeout = ViewDelta::new(50);
4808        let executor = deterministic::Runner::timed(Duration::from_secs(30));
4809        executor.start(|mut context| async move {
4810            // Create simulated network
4811            let (network, mut oracle) = Network::new(
4812                context.with_label("network"),
4813                Config {
4814                    max_size: 1024 * 1024,
4815                    disconnect_on_block: false,
4816                    tracked_peer_sets: None,
4817                },
4818            );
4819
4820            // Start network
4821            network.start();
4822
4823            // Register participants
4824            let Fixture {
4825                participants,
4826                schemes,
4827                ..
4828            } = bls12381_threshold::fixture::<V, _>(&mut context, &namespace, n);
4829            let mut registrations = register_validators(&mut oracle, &participants).await;
4830
4831            // Link all validators
4832            let link = Link {
4833                latency: Duration::from_millis(10),
4834                jitter: Duration::from_millis(5),
4835                success_rate: 1.0,
4836            };
4837            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4838
4839            // Create engines and reporters
4840            let elector = L::default();
4841            let relay = Arc::new(mocks::relay::Relay::new());
4842            let mut reporters = Vec::new();
4843            let mut engine_handlers = Vec::new();
4844            let monitor_reporter = Arc::new(Mutex::new(None));
4845            for (idx, validator) in participants.iter().enumerate() {
4846                // Create scheme context
4847                let context = context.with_label(&format!("validator_{}", *validator));
4848
4849                // Store first reporter for monitoring
4850                let reporter_config = mocks::reporter::Config {
4851                    participants: participants.clone().try_into().unwrap(),
4852                    scheme: schemes[idx].clone(),
4853                    elector: elector.clone(),
4854                };
4855                let reporter =
4856                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
4857                reporters.push(reporter.clone());
4858                if idx == 0 {
4859                    *monitor_reporter.lock().unwrap() = Some(reporter.clone());
4860                }
4861
4862                // Configure application
4863                let application_cfg = mocks::application::Config {
4864                    hasher: Sha256::default(),
4865                    relay: relay.clone(),
4866                    me: validator.clone(),
4867                    propose_latency: (10.0, 5.0),
4868                    verify_latency: (10.0, 5.0),
4869                    certify_latency: (10.0, 5.0),
4870                    should_certify: mocks::application::Certifier::Sometimes,
4871                };
4872                let (actor, application) = mocks::application::Application::new(
4873                    context.with_label("application"),
4874                    application_cfg,
4875                );
4876                actor.start();
4877                let blocker = oracle.control(validator.clone());
4878                let cfg = config::Config {
4879                    scheme: schemes[idx].clone(),
4880                    elector: elector.clone(),
4881                    blocker,
4882                    automaton: application.clone(),
4883                    relay: application.clone(),
4884                    reporter: reporter.clone(),
4885                    strategy: Sequential,
4886                    partition: validator.to_string(),
4887                    mailbox_size: 1024,
4888                    epoch: Epoch::new(333),
4889                    leader_timeout: Duration::from_millis(100),
4890                    notarization_timeout: Duration::from_millis(200),
4891                    nullify_retry: Duration::from_millis(500),
4892                    fetch_timeout: Duration::from_millis(100),
4893                    activity_timeout,
4894                    skip_timeout,
4895                    fetch_concurrent: 4,
4896                    replay_buffer: NZUsize!(1024 * 1024),
4897                    write_buffer: NZUsize!(1024 * 1024),
4898                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4899                };
4900                let engine = Engine::new(context.with_label("engine"), cfg);
4901
4902                // Start engine
4903                let (pending, recovered, resolver) = registrations
4904                    .remove(validator)
4905                    .expect("validator should be registered");
4906                engine_handlers.push(engine.start(pending, recovered, resolver));
4907            }
4908
4909            // Prepare TLE test data
4910            let target = Round::new(Epoch::new(333), View::new(10)); // Encrypt for round (epoch 333, view 10)
4911            let message = b"Secret message for future view10"; // 32 bytes
4912
4913            // Encrypt message
4914            let ciphertext = schemes[0].encrypt(&mut context, target, *message);
4915
4916            // Wait for consensus to reach the target view and then decrypt
4917            let reporter = monitor_reporter.lock().unwrap().clone().unwrap();
4918            loop {
4919                // Wait for notarization
4920                context.sleep(Duration::from_millis(100)).await;
4921                let notarizations = reporter.notarizations.lock().unwrap();
4922                let Some(notarization) = notarizations.get(&target.view()) else {
4923                    continue;
4924                };
4925
4926                // Decrypt the message using the seed
4927                let seed = notarization.seed();
4928                let decrypted = seed
4929                    .decrypt(&ciphertext)
4930                    .expect("Decryption should succeed with valid seed signature");
4931                assert_eq!(
4932                    message,
4933                    decrypted.as_ref(),
4934                    "Decrypted message should match original message"
4935                );
4936                break;
4937            }
4938        });
4939    }
4940
4941    #[test_traced]
4942    fn test_tle() {
4943        tle::<MinPk, Random>();
4944        tle::<MinSig, Random>();
4945    }
4946
4947    fn hailstorm<S, F, L>(
4948        seed: u64,
4949        shutdowns: usize,
4950        interval: ViewDelta,
4951        mut fixture: F,
4952    ) -> String
4953    where
4954        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4955        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4956        L: Elector<S>,
4957    {
4958        // Create context
4959        let n = 5;
4960        let activity_timeout = ViewDelta::new(10);
4961        let skip_timeout = ViewDelta::new(5);
4962        let namespace = b"consensus".to_vec();
4963        let cfg = deterministic::Config::new().with_seed(seed);
4964        let executor = deterministic::Runner::new(cfg);
4965        executor.start(|mut context| async move {
4966            // Create simulated network
4967            let (network, mut oracle) = Network::new(
4968                context.with_label("network"),
4969                Config {
4970                    max_size: 1024 * 1024,
4971                    disconnect_on_block: true,
4972                    tracked_peer_sets: None,
4973                },
4974            );
4975
4976            // Start network
4977            network.start();
4978
4979            // Register participants
4980            let Fixture {
4981                participants,
4982                schemes,
4983                ..
4984            } = fixture(&mut context, &namespace, n);
4985            let mut registrations = register_validators(&mut oracle, &participants).await;
4986
4987            // Link all validators
4988            let link = Link {
4989                latency: Duration::from_millis(10),
4990                jitter: Duration::from_millis(1),
4991                success_rate: 1.0,
4992            };
4993            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4994
4995            // Create engines
4996            let elector = L::default();
4997            let relay = Arc::new(mocks::relay::Relay::new());
4998            let mut reporters = BTreeMap::new();
4999            let mut engine_handlers = BTreeMap::new();
5000            for (idx, validator) in participants.iter().enumerate() {
5001                // Create scheme context
5002                let context = context.with_label(&format!("validator_{}", *validator));
5003
5004                // Configure engine
5005                let reporter_config = mocks::reporter::Config {
5006                    participants: participants.clone().try_into().unwrap(),
5007                    scheme: schemes[idx].clone(),
5008                    elector: elector.clone(),
5009                };
5010                let reporter =
5011                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5012                reporters.insert(idx, reporter.clone());
5013                let application_cfg = mocks::application::Config {
5014                    hasher: Sha256::default(),
5015                    relay: relay.clone(),
5016                    me: validator.clone(),
5017                    propose_latency: (10.0, 5.0),
5018                    verify_latency: (10.0, 5.0),
5019                    certify_latency: (10.0, 5.0),
5020                    should_certify: mocks::application::Certifier::Sometimes,
5021                };
5022                let (actor, application) = mocks::application::Application::new(
5023                    context.with_label("application"),
5024                    application_cfg,
5025                );
5026                actor.start();
5027                let blocker = oracle.control(validator.clone());
5028                let cfg = config::Config {
5029                    scheme: schemes[idx].clone(),
5030                    elector: elector.clone(),
5031                    blocker,
5032                    automaton: application.clone(),
5033                    relay: application.clone(),
5034                    reporter: reporter.clone(),
5035                    strategy: Sequential,
5036                    partition: validator.to_string(),
5037                    mailbox_size: 1024,
5038                    epoch: Epoch::new(333),
5039                    leader_timeout: Duration::from_secs(1),
5040                    notarization_timeout: Duration::from_secs(2),
5041                    nullify_retry: Duration::from_secs(10),
5042                    fetch_timeout: Duration::from_secs(1),
5043                    activity_timeout,
5044                    skip_timeout,
5045                    fetch_concurrent: 4,
5046                    replay_buffer: NZUsize!(1024 * 1024),
5047                    write_buffer: NZUsize!(1024 * 1024),
5048                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5049                };
5050                let engine = Engine::new(context.with_label("engine"), cfg);
5051
5052                // Start engine
5053                let (pending, recovered, resolver) = registrations
5054                    .remove(validator)
5055                    .expect("validator should be registered");
5056                engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
5057            }
5058
5059            // Run shutdowns
5060            let mut target = View::zero();
5061            for i in 0..shutdowns {
5062                // Update target
5063                target = target.saturating_add(interval);
5064
5065                // Wait for all engines to finish
5066                let mut finalizers = Vec::new();
5067                for (_, reporter) in reporters.iter_mut() {
5068                    let (mut latest, mut monitor) = reporter.subscribe().await;
5069                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5070                        while latest < target {
5071                            latest = monitor.next().await.expect("event missing");
5072                        }
5073                    }));
5074                }
5075                join_all(finalizers).await;
5076                target = target.saturating_add(interval);
5077
5078                // Select a random engine to shutdown
5079                let idx = context.gen_range(0..engine_handlers.len());
5080                let validator = &participants[idx];
5081                let handle = engine_handlers.remove(&idx).unwrap();
5082                handle.abort();
5083                let _ = handle.await;
5084                let selected_reporter = reporters.remove(&idx).unwrap();
5085                info!(idx, ?validator, "shutdown validator");
5086
5087                // Wait for all engines to finish
5088                let mut finalizers = Vec::new();
5089                for (_, reporter) in reporters.iter_mut() {
5090                    let (mut latest, mut monitor) = reporter.subscribe().await;
5091                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5092                        while latest < target {
5093                            latest = monitor.next().await.expect("event missing");
5094                        }
5095                    }));
5096                }
5097                join_all(finalizers).await;
5098                target = target.saturating_add(interval);
5099
5100                // Recreate engine
5101                info!(idx, ?validator, "restarting validator");
5102                let context =
5103                    context.with_label(&format!("validator_{}_restarted_{}", *validator, i));
5104
5105                // Start engine
5106                let (pending, recovered, resolver) =
5107                    register_validator(&mut oracle, validator.clone()).await;
5108                let application_cfg = mocks::application::Config {
5109                    hasher: Sha256::default(),
5110                    relay: relay.clone(),
5111                    me: validator.clone(),
5112                    propose_latency: (10.0, 5.0),
5113                    verify_latency: (10.0, 5.0),
5114                    certify_latency: (10.0, 5.0),
5115                    should_certify: mocks::application::Certifier::Sometimes,
5116                };
5117                let (actor, application) = mocks::application::Application::new(
5118                    context.with_label("application"),
5119                    application_cfg,
5120                );
5121                actor.start();
5122                reporters.insert(idx, selected_reporter.clone());
5123                let blocker = oracle.control(validator.clone());
5124                let cfg = config::Config {
5125                    scheme: schemes[idx].clone(),
5126                    elector: elector.clone(),
5127                    blocker,
5128                    automaton: application.clone(),
5129                    relay: application.clone(),
5130                    reporter: selected_reporter,
5131                    strategy: Sequential,
5132                    partition: validator.to_string(),
5133                    mailbox_size: 1024,
5134                    epoch: Epoch::new(333),
5135                    leader_timeout: Duration::from_secs(1),
5136                    notarization_timeout: Duration::from_secs(2),
5137                    nullify_retry: Duration::from_secs(10),
5138                    fetch_timeout: Duration::from_secs(1),
5139                    activity_timeout,
5140                    skip_timeout,
5141                    fetch_concurrent: 4,
5142                    replay_buffer: NZUsize!(1024 * 1024),
5143                    write_buffer: NZUsize!(1024 * 1024),
5144                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5145                };
5146                let engine = Engine::new(context.with_label("engine"), cfg);
5147                engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
5148
5149                // Wait for all engines to hit required containers
5150                let mut finalizers = Vec::new();
5151                for (_, reporter) in reporters.iter_mut() {
5152                    let (mut latest, mut monitor) = reporter.subscribe().await;
5153                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5154                        while latest < target {
5155                            latest = monitor.next().await.expect("event missing");
5156                        }
5157                    }));
5158                }
5159                join_all(finalizers).await;
5160                info!(idx, ?validator, "validator recovered");
5161            }
5162
5163            // Check reporters for correct activity
5164            let latest_complete = target.saturating_sub(activity_timeout);
5165            for (_, reporter) in reporters.iter() {
5166                // Ensure no faults
5167                {
5168                    let faults = reporter.faults.lock().unwrap();
5169                    assert!(faults.is_empty());
5170                }
5171
5172                // Ensure no invalid signatures
5173                {
5174                    let invalid = reporter.invalid.lock().unwrap();
5175                    assert_eq!(*invalid, 0);
5176                }
5177
5178                // Ensure no forks
5179                let mut notarized = HashMap::new();
5180                let mut finalized = HashMap::new();
5181                {
5182                    let notarizes = reporter.notarizes.lock().unwrap();
5183                    for view in View::range(View::new(1), latest_complete) {
5184                        // Ensure only one payload proposed per view
5185                        let Some(payloads) = notarizes.get(&view) else {
5186                            continue;
5187                        };
5188                        if payloads.len() > 1 {
5189                            panic!("view: {view}");
5190                        }
5191                        let (digest, _) = payloads.iter().next().unwrap();
5192                        notarized.insert(view, *digest);
5193                    }
5194                }
5195                {
5196                    let notarizations = reporter.notarizations.lock().unwrap();
5197                    for view in View::range(View::new(1), latest_complete) {
5198                        // Ensure notarization matches digest from notarizes
5199                        let Some(notarization) = notarizations.get(&view) else {
5200                            continue;
5201                        };
5202                        let Some(digest) = notarized.get(&view) else {
5203                            continue;
5204                        };
5205                        assert_eq!(&notarization.proposal.payload, digest);
5206                    }
5207                }
5208                {
5209                    let finalizes = reporter.finalizes.lock().unwrap();
5210                    for view in View::range(View::new(1), latest_complete) {
5211                        // Ensure only one payload proposed per view
5212                        let Some(payloads) = finalizes.get(&view) else {
5213                            continue;
5214                        };
5215                        if payloads.len() > 1 {
5216                            panic!("view: {view}");
5217                        }
5218                        let (digest, _) = payloads.iter().next().unwrap();
5219                        finalized.insert(view, *digest);
5220
5221                        // Only check at views below timeout
5222                        if view > latest_complete {
5223                            continue;
5224                        }
5225
5226                        // Ensure no nullifies for any finalizers
5227                        let nullifies = reporter.nullifies.lock().unwrap();
5228                        let Some(nullifies) = nullifies.get(&view) else {
5229                            continue;
5230                        };
5231                        for (_, finalizers) in payloads.iter() {
5232                            for finalizer in finalizers.iter() {
5233                                if nullifies.contains(finalizer) {
5234                                    panic!("should not nullify and finalize at same view");
5235                                }
5236                            }
5237                        }
5238                    }
5239                }
5240                {
5241                    let finalizations = reporter.finalizations.lock().unwrap();
5242                    for view in View::range(View::new(1), latest_complete) {
5243                        // Ensure finalization matches digest from finalizes
5244                        let Some(finalization) = finalizations.get(&view) else {
5245                            continue;
5246                        };
5247                        let Some(digest) = finalized.get(&view) else {
5248                            continue;
5249                        };
5250                        assert_eq!(&finalization.proposal.payload, digest);
5251                    }
5252                }
5253            }
5254
5255            // Ensure no blocked connections
5256            let blocked = oracle.blocked().await.unwrap();
5257            assert!(blocked.is_empty());
5258
5259            // Return state for audit
5260            context.auditor().state()
5261        })
5262    }
5263
5264    #[test_group("slow")]
5265    #[test_traced]
5266    fn test_hailstorm_bls12381_threshold_min_pk() {
5267        assert_eq!(
5268            hailstorm::<_, _, Random>(
5269                0,
5270                10,
5271                ViewDelta::new(15),
5272                bls12381_threshold::fixture::<MinPk, _>
5273            ),
5274            hailstorm::<_, _, Random>(
5275                0,
5276                10,
5277                ViewDelta::new(15),
5278                bls12381_threshold::fixture::<MinPk, _>
5279            ),
5280        );
5281    }
5282
5283    #[test_group("slow")]
5284    #[test_traced]
5285    fn test_hailstorm_bls12381_threshold_min_sig() {
5286        assert_eq!(
5287            hailstorm::<_, _, Random>(
5288                0,
5289                10,
5290                ViewDelta::new(15),
5291                bls12381_threshold::fixture::<MinSig, _>
5292            ),
5293            hailstorm::<_, _, Random>(
5294                0,
5295                10,
5296                ViewDelta::new(15),
5297                bls12381_threshold::fixture::<MinSig, _>
5298            ),
5299        );
5300    }
5301
5302    #[test_group("slow")]
5303    #[test_traced]
5304    fn test_hailstorm_bls12381_multisig_min_pk() {
5305        assert_eq!(
5306            hailstorm::<_, _, RoundRobin>(
5307                0,
5308                10,
5309                ViewDelta::new(15),
5310                bls12381_multisig::fixture::<MinPk, _>
5311            ),
5312            hailstorm::<_, _, RoundRobin>(
5313                0,
5314                10,
5315                ViewDelta::new(15),
5316                bls12381_multisig::fixture::<MinPk, _>
5317            ),
5318        );
5319    }
5320
5321    #[test_group("slow")]
5322    #[test_traced]
5323    fn test_hailstorm_bls12381_multisig_min_sig() {
5324        assert_eq!(
5325            hailstorm::<_, _, RoundRobin>(
5326                0,
5327                10,
5328                ViewDelta::new(15),
5329                bls12381_multisig::fixture::<MinSig, _>
5330            ),
5331            hailstorm::<_, _, RoundRobin>(
5332                0,
5333                10,
5334                ViewDelta::new(15),
5335                bls12381_multisig::fixture::<MinSig, _>
5336            ),
5337        );
5338    }
5339
5340    #[test_group("slow")]
5341    #[test_traced]
5342    fn test_hailstorm_ed25519() {
5343        assert_eq!(
5344            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture),
5345            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture)
5346        );
5347    }
5348
5349    #[test_group("slow")]
5350    #[test_traced]
5351    fn test_hailstorm_secp256r1() {
5352        assert_eq!(
5353            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture),
5354            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture)
5355        );
5356    }
5357
5358    /// Implementation of [Twins: BFT Systems Made Robust](https://arxiv.org/abs/2004.10617).
5359    fn twins<S, F, L>(seed: u64, n: u32, strategy: Strategy, link: Link, mut fixture: F)
5360    where
5361        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5362        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5363        L: Elector<S>,
5364    {
5365        let faults = N3f1::max_faults(n);
5366        let required_containers = View::new(100);
5367        let activity_timeout = ViewDelta::new(10);
5368        let skip_timeout = ViewDelta::new(5);
5369        let namespace = b"consensus".to_vec();
5370        let cfg = deterministic::Config::new()
5371            .with_seed(seed)
5372            .with_timeout(Some(Duration::from_secs(900)));
5373        let executor = deterministic::Runner::new(cfg);
5374        executor.start(|mut context| async move {
5375            let (network, mut oracle) = Network::new(
5376                context.with_label("network"),
5377                Config {
5378                    max_size: 1024 * 1024,
5379                    disconnect_on_block: false,
5380                    tracked_peer_sets: None,
5381                },
5382            );
5383            network.start();
5384
5385            let Fixture {
5386                participants,
5387                schemes,
5388                ..
5389            } = fixture(&mut context, &namespace, n);
5390            let participants: Arc<[_]> = participants.into();
5391            let mut registrations = register_validators(&mut oracle, &participants).await;
5392            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5393
5394            // We don't apply partitioning to the relay explicitly, however, a participant will only query the relay by digest
5395            // after receiving a vote (implicitly respecting the partitioning)
5396            let elector = L::default();
5397            let relay = Arc::new(mocks::relay::Relay::new());
5398            let mut reporters = Vec::new();
5399            let mut engine_handlers = Vec::new();
5400
5401            // Create twin engines (f Byzantine twins)
5402            for (idx, validator) in participants.iter().enumerate().take(faults as usize) {
5403                let (
5404                    (vote_sender, vote_receiver),
5405                    (certificate_sender, certificate_receiver),
5406                    (resolver_sender, resolver_receiver),
5407                ) = registrations
5408                    .remove(validator)
5409                    .expect("validator should be registered");
5410
5411                // Create forwarder closures for votes
5412                let make_vote_forwarder = || {
5413                    let participants = participants.clone();
5414                    move |origin: SplitOrigin, _: &Recipients<_>, message: &Bytes| {
5415                        let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
5416                        let (primary, secondary) =
5417                            strategy.partitions(msg.view(), participants.as_ref());
5418                        match origin {
5419                            SplitOrigin::Primary => Some(Recipients::Some(primary)),
5420                            SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
5421                        }
5422                    }
5423                };
5424                // Create forwarder closures for certificates
5425                let make_certificate_forwarder = || {
5426                    let codec = schemes[idx].certificate_codec_config();
5427                    let participants = participants.clone();
5428                    move |origin: SplitOrigin, _: &Recipients<_>, message: &Bytes| {
5429                        let msg: Certificate<S, D> =
5430                            Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
5431                        let (primary, secondary) =
5432                            strategy.partitions(msg.view(), participants.as_ref());
5433                        match origin {
5434                            SplitOrigin::Primary => Some(Recipients::Some(primary)),
5435                            SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
5436                        }
5437                    }
5438                };
5439                let make_drop_forwarder =
5440                    || move |_: SplitOrigin, _: &Recipients<_>, _: &Bytes| None;
5441
5442                // Create router closures for votes
5443                let make_vote_router = || {
5444                    let participants = participants.clone();
5445                    move |(sender, message): &(_, Bytes)| {
5446                        let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
5447                        strategy.route(msg.view(), sender, participants.as_ref())
5448                    }
5449                };
5450                // Create router closures for certificates
5451                let make_certificate_router = || {
5452                    let codec = schemes[idx].certificate_codec_config();
5453                    let participants = participants.clone();
5454                    move |(sender, message): &(_, Bytes)| {
5455                        let msg: Certificate<S, D> =
5456                            Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
5457                        strategy.route(msg.view(), sender, participants.as_ref())
5458                    }
5459                };
5460                let make_drop_router = || move |(_, _): &(_, _)| SplitTarget::None;
5461
5462                // Apply view-based forwarder and router to pending and recovered channel
5463                let (vote_sender_primary, vote_sender_secondary) =
5464                    vote_sender.split_with(make_vote_forwarder());
5465                let (vote_receiver_primary, vote_receiver_secondary) = vote_receiver.split_with(
5466                    context.with_label(&format!("pending_split_{idx}")),
5467                    make_vote_router(),
5468                );
5469                let (certificate_sender_primary, certificate_sender_secondary) =
5470                    certificate_sender.split_with(make_certificate_forwarder());
5471                let (certificate_receiver_primary, certificate_receiver_secondary) =
5472                    certificate_receiver.split_with(
5473                        context.with_label(&format!("recovered_split_{idx}")),
5474                        make_certificate_router(),
5475                    );
5476
5477                // Prevent any resolver messages from being sent or received by twins (these messages aren't cleanly mapped to a view and allowing them to bypass partitions seems wrong)
5478                let (resolver_sender_primary, resolver_sender_secondary) =
5479                    resolver_sender.split_with(make_drop_forwarder());
5480                let (resolver_receiver_primary, resolver_receiver_secondary) = resolver_receiver
5481                    .split_with(
5482                        context.with_label(&format!("resolver_split_{idx}")),
5483                        make_drop_router(),
5484                    );
5485
5486                for (twin_label, pending, recovered, resolver) in [
5487                    (
5488                        "primary",
5489                        (vote_sender_primary, vote_receiver_primary),
5490                        (certificate_sender_primary, certificate_receiver_primary),
5491                        (resolver_sender_primary, resolver_receiver_primary),
5492                    ),
5493                    (
5494                        "secondary",
5495                        (vote_sender_secondary, vote_receiver_secondary),
5496                        (certificate_sender_secondary, certificate_receiver_secondary),
5497                        (resolver_sender_secondary, resolver_receiver_secondary),
5498                    ),
5499                ] {
5500                    let label = format!("twin_{idx}_{twin_label}");
5501                    let context = context.with_label(&label);
5502
5503                    let reporter_config = mocks::reporter::Config {
5504                        participants: participants.as_ref().try_into().unwrap(),
5505                        scheme: schemes[idx].clone(),
5506                        elector: elector.clone(),
5507                    };
5508                    let reporter = mocks::reporter::Reporter::new(
5509                        context.with_label("reporter"),
5510                        reporter_config,
5511                    );
5512                    reporters.push(reporter.clone());
5513
5514                    let application_cfg = mocks::application::Config {
5515                        hasher: Sha256::default(),
5516                        relay: relay.clone(),
5517                        me: validator.clone(),
5518                        propose_latency: (10.0, 5.0),
5519                        verify_latency: (10.0, 5.0),
5520                        certify_latency: (10.0, 5.0),
5521                        should_certify: mocks::application::Certifier::Sometimes,
5522                    };
5523                    let (actor, application) = mocks::application::Application::new(
5524                        context.with_label("application"),
5525                        application_cfg,
5526                    );
5527                    actor.start();
5528
5529                    let blocker = oracle.control(validator.clone());
5530                    let cfg = config::Config {
5531                        scheme: schemes[idx].clone(),
5532                        elector: elector.clone(),
5533                        blocker,
5534                        automaton: application.clone(),
5535                        relay: application.clone(),
5536                        reporter: reporter.clone(),
5537                        strategy: Sequential,
5538                        partition: label,
5539                        mailbox_size: 1024,
5540                        epoch: Epoch::new(333),
5541                        leader_timeout: Duration::from_secs(1),
5542                        notarization_timeout: Duration::from_secs(2),
5543                        nullify_retry: Duration::from_secs(10),
5544                        fetch_timeout: Duration::from_secs(1),
5545                        activity_timeout,
5546                        skip_timeout,
5547                        fetch_concurrent: 4,
5548                        replay_buffer: NZUsize!(1024 * 1024),
5549                        write_buffer: NZUsize!(1024 * 1024),
5550                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5551                    };
5552                    let engine = Engine::new(context.with_label("engine"), cfg);
5553                    engine_handlers.push(engine.start(pending, recovered, resolver));
5554                }
5555            }
5556
5557            // Create honest engines
5558            for (idx, validator) in participants.iter().enumerate().skip(faults as usize) {
5559                let label = format!("honest_{idx}");
5560                let context = context.with_label(&label);
5561
5562                let reporter_config = mocks::reporter::Config {
5563                    participants: participants.as_ref().try_into().unwrap(),
5564                    scheme: schemes[idx].clone(),
5565                    elector: elector.clone(),
5566                };
5567                let reporter =
5568                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5569                reporters.push(reporter.clone());
5570
5571                let application_cfg = mocks::application::Config {
5572                    hasher: Sha256::default(),
5573                    relay: relay.clone(),
5574                    me: validator.clone(),
5575                    propose_latency: (10.0, 5.0),
5576                    verify_latency: (10.0, 5.0),
5577                    certify_latency: (10.0, 5.0),
5578                    should_certify: mocks::application::Certifier::Sometimes,
5579                };
5580                let (actor, application) = mocks::application::Application::new(
5581                    context.with_label("application"),
5582                    application_cfg,
5583                );
5584                actor.start();
5585
5586                let blocker = oracle.control(validator.clone());
5587                let cfg = config::Config {
5588                    scheme: schemes[idx].clone(),
5589                    elector: elector.clone(),
5590                    blocker,
5591                    automaton: application.clone(),
5592                    relay: application.clone(),
5593                    reporter: reporter.clone(),
5594                    strategy: Sequential,
5595                    partition: label,
5596                    mailbox_size: 1024,
5597                    epoch: Epoch::new(333),
5598                    leader_timeout: Duration::from_secs(1),
5599                    notarization_timeout: Duration::from_secs(2),
5600                    nullify_retry: Duration::from_secs(10),
5601                    fetch_timeout: Duration::from_secs(1),
5602                    activity_timeout,
5603                    skip_timeout,
5604                    fetch_concurrent: 4,
5605                    replay_buffer: NZUsize!(1024 * 1024),
5606                    write_buffer: NZUsize!(1024 * 1024),
5607                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5608                };
5609                let engine = Engine::new(context.with_label("engine"), cfg);
5610
5611                let (pending, recovered, resolver) = registrations
5612                    .remove(validator)
5613                    .expect("validator should be registered");
5614                engine_handlers.push(engine.start(pending, recovered, resolver));
5615            }
5616
5617            // Wait for progress (liveness check)
5618            let mut finalizers = Vec::new();
5619            for reporter in reporters.iter_mut() {
5620                let (mut latest, mut monitor) = reporter.subscribe().await;
5621                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5622                    while latest < required_containers {
5623                        latest = monitor.next().await.expect("event missing");
5624                    }
5625                }));
5626            }
5627            join_all(finalizers).await;
5628
5629            // Verify safety: no conflicting finalizations across honest reporters
5630            let honest_start = faults as usize * 2; // Each twin produces 2 reporters
5631            let mut finalized_at_view: BTreeMap<View, D> = BTreeMap::new();
5632            for reporter in reporters.iter().skip(honest_start) {
5633                let finalizations = reporter.finalizations.lock().unwrap();
5634                for (view, finalization) in finalizations.iter() {
5635                    let digest = finalization.proposal.payload;
5636                    if let Some(existing) = finalized_at_view.get(view) {
5637                        assert_eq!(
5638                            existing, &digest,
5639                            "safety violation: conflicting finalizations at view {view}"
5640                        );
5641                    } else {
5642                        finalized_at_view.insert(*view, digest);
5643                    }
5644                }
5645            }
5646
5647            // Verify no invalid signatures were observed
5648            for reporter in reporters.iter().skip(honest_start) {
5649                let invalid = reporter.invalid.lock().unwrap();
5650                assert_eq!(*invalid, 0, "invalid signatures detected");
5651            }
5652
5653            // Ensure faults are attributable to twins
5654            let twin_identities: Vec<_> = participants.iter().take(faults as usize).collect();
5655            for reporter in reporters.iter().skip(honest_start) {
5656                let faults = reporter.faults.lock().unwrap();
5657                for (faulter, _) in faults.iter() {
5658                    assert!(
5659                        twin_identities.contains(&faulter),
5660                        "fault from non-twin participant"
5661                    );
5662                }
5663            }
5664
5665            // Ensure blocked connections are attributable to twins
5666            let blocked = oracle.blocked().await.unwrap();
5667            for (_, faulter) in blocked.iter() {
5668                assert!(
5669                    twin_identities.contains(&faulter),
5670                    "blocked connection from non-twin participant"
5671                );
5672            }
5673        });
5674    }
5675
5676    fn test_twins<S, F, L>(mut fixture: F)
5677    where
5678        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5679        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5680        L: Elector<S>,
5681    {
5682        for strategy in [
5683            Strategy::View,
5684            Strategy::Fixed(3),
5685            Strategy::Isolate(4),
5686            Strategy::Broadcast,
5687            Strategy::Shuffle,
5688        ] {
5689            for link in [
5690                Link {
5691                    latency: Duration::from_millis(10),
5692                    jitter: Duration::from_millis(1),
5693                    success_rate: 1.0,
5694                },
5695                Link {
5696                    latency: Duration::from_millis(200),
5697                    jitter: Duration::from_millis(150),
5698                    success_rate: 0.75,
5699                },
5700            ] {
5701                twins::<S, _, L>(0, 5, strategy, link, |namespace, context, n| {
5702                    fixture(namespace, context, n)
5703                });
5704            }
5705        }
5706    }
5707
5708    #[test_group("slow")]
5709    #[test_traced]
5710    fn test_twins_multisig_min_pk() {
5711        test_twins::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
5712    }
5713
5714    #[test_group("slow")]
5715    #[test_traced]
5716    fn test_twins_multisig_min_sig() {
5717        test_twins::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
5718    }
5719
5720    #[test_group("slow")]
5721    #[test_traced]
5722    fn test_twins_threshold_min_pk() {
5723        test_twins::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
5724    }
5725
5726    #[test_group("slow")]
5727    #[test_traced]
5728    fn test_twins_threshold_min_sig() {
5729        test_twins::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
5730    }
5731
5732    #[test_group("slow")]
5733    #[test_traced]
5734    fn test_twins_ed25519() {
5735        test_twins::<_, _, RoundRobin>(ed25519::fixture);
5736    }
5737
5738    #[test_group("slow")]
5739    #[test_traced]
5740    fn test_twins_secp256r1() {
5741        test_twins::<_, _, RoundRobin>(secp256r1::fixture);
5742    }
5743
5744    #[test_group("slow")]
5745    #[test_traced]
5746    fn test_twins_large_view() {
5747        twins::<_, _, Random>(
5748            0,
5749            10,
5750            Strategy::View,
5751            Link {
5752                latency: Duration::from_millis(200),
5753                jitter: Duration::from_millis(150),
5754                success_rate: 0.75,
5755            },
5756            bls12381_threshold::fixture::<MinPk, _>,
5757        );
5758    }
5759
5760    #[test_group("slow")]
5761    #[test_traced]
5762    fn test_twins_large_shuffle() {
5763        twins::<_, _, Random>(
5764            0,
5765            10,
5766            Strategy::Shuffle,
5767            Link {
5768                latency: Duration::from_millis(200),
5769                jitter: Duration::from_millis(150),
5770                success_rate: 0.75,
5771            },
5772            bls12381_threshold::fixture::<MinPk, _>,
5773        );
5774    }
5775}