Skip to main content

commonware_consensus/simplex/
mod.rs

1//! Simple and fast BFT agreement inspired by Simplex Consensus.
2//!
3//! Inspired by [Simplex Consensus](https://eprint.iacr.org/2023/463), `simplex` provides simple and fast BFT
4//! agreement with network-speed view (i.e. block time) latency and optimal finalization latency in a
5//! partially synchronous setting.
6//!
7//! # Features
8//!
9//! * Wicked Fast Block Times (2 Network Hops)
10//! * Optimal Finalization Latency (3 Network Hops)
11//! * Externalized Uptime and Fault Proofs
12//! * Require Certification Before Finalization
13//! * Decoupled Block Broadcast and Sync
14//! * Lazy Message Verification
15//! * Application-Defined Block Format
16//! * Pluggable Hashing and Cryptography
17//! * Embedded VRF (via [scheme::bls12381_threshold::vrf])
18//!
19//! # Design
20//!
21//! ## Protocol Description
22//!
23//! ### Genesis
24//!
25//! Genesis (view 0) is implicitly finalized. There is no finalization certificate for genesis;
26//! the digest returned by [`Automaton::genesis`](crate::Automaton::genesis) serves as the initial
27//! finalized state. Voting begins at view 1, with the first proposal referencing genesis as its parent.
28//!
29//! ### Specification for View `v`
30//!
31//! Upon entering view `v`:
32//! * Determine leader `l` for view `v`
33//! * Set timer for leader proposal `t_l = 2Δ` and advance `t_a = 3Δ`
34//!     * If leader `l` has not been active in last `r` views, set `t_l` to 0.
35//! * If leader `l`, broadcast `notarize(c,v)`
36//!   * If can't propose container in view `v` because missing notarization/nullification for a
37//!     previous view `v_m`, request `v_m`
38//!
39//! Upon receiving first `notarize(c,v)` from `l`:
40//! * Cancel `t_l`
41//! * If the container's parent `c_parent` is finalized (or both notarized and certified) at `v_parent`
42//!   and we have nullifications for all views between `v` and `v_parent`, verify `c` and broadcast `notarize(c,v)`
43//!     * If verification of `c` fails, immediately broadcast `nullify(v)`
44//!
45//! Upon receiving `2f+1` `notarize(c,v)`:
46//! * Cancel `t_a`
47//! * Mark `c` as notarized
48//! * Broadcast `notarization(c,v)` (even if we have not verified `c`)
49//! * Attempt to certify `c` (see [Certification](#certification))
50//!     * On success: broadcast `finalize(c,v)` (if have not broadcast `nullify(v)`) and enter `v+1`
51//!     * On failure: broadcast `nullify(v)`
52//!
53//! Upon receiving `2f+1` `nullify(v)`:
54//! * Broadcast `nullification(v)`
55//! * Enter `v+1`
56//!
57//! Upon receiving `2f+1` `finalize(c,v)`:
58//! * Mark `c` as finalized (and recursively finalize its parents)
59//! * Broadcast `finalization(c,v)` (even if we have not verified `c`)
60//!
61//! Upon `t_l` or `t_a` firing:
62//! * Broadcast `nullify(v)`
63//! * Every `t_r` after `nullify(v)` broadcast that we are still in view `v`:
64//!    * Rebroadcast `nullify(v)` and either `notarization(v-1)` or `nullification(v-1)`
65//!
66//! _When `2f+1` votes of a given type (`notarize(c,v)`, `nullify(v)`, or `finalize(c,v)`) have been have been collected
67//! from unique participants, a certificate (`notarization(c,v)`, `nullification(v)`, or `finalization(c,v)`) can be assembled.
68//! These certificates serve as a standalone proof of consensus progress that downstream systems can ingest without executing
69//! the protocol._
70//!
71//! ### Joining Consensus
72//!
73//! As soon as `2f+1` nullifies or finalizes are observed for some view `v`, the `Voter` will
74//! enter `v+1`. Notarizations advance the view if-and-only-if the application certifies them.
75//! This means that a new participant joining consensus will immediately jump ahead on the previous
76//! view's nullification or finalization and begin participating in consensus at the current view.
77//!
78//! ### Certification
79//!
80//! After a payload is notarized, the application can optionally delay or prevent finalization via the
81//! [`CertifiableAutomaton::certify`](crate::CertifiableAutomaton::certify) method. By default, `certify`
82//! returns `true` for all payloads, meaning finalization proceeds immediately after notarization.
83//!
84//! Customizing `certify` is useful for systems that employ erasure coding, where participants may want
85//! to wait until they have received enough shards to reconstruct and validate the full block before
86//! voting to finalize.
87//!
88//! If `certify` returns `true`, the participant broadcasts a `finalize` vote for the payload and enters the
89//! next view. If `certify` returns `false`, the participant broadcasts `nullify` for the view instead (treating
90//! it as an immediate timeout), and will refuse to build upon the proposal or notarize proposals that build upon it.
91//! Thus, a payload can only be finalized if a quorum of participants certify it.
92//!
93//! Certification of some notarization should only be abandoned once a finalization at the same or higher view is observed.
94//! Until then (say a nullification certificate for a view arrives before certification completes), the application should continue
95//! attempting to complete certification. This increases the likelihood that we can vote on the next honest proposer's block (which
96//! may build on our in-flight certification or the nullification). If we did not do this, it is possible that different parts of
97//! the network (neither with quorum) would refuse to vote on each other's blocks (halting consensus).
98//!
99//! _The decision returned by `certify` must be deterministic and consistent across all honest participants to ensure
100//! liveness._
101//!
102//! ### Deviations from Simplex Consensus
103//!
104//! * Fetch missing notarizations/nullifications as needed rather than assuming each proposal contains
105//!   a set of all notarizations/nullifications for all historical blocks.
106//! * Introduce distinct messages for `notarize` and `nullify` rather than referring to both as a `vote` for
107//!   either a "block" or a "dummy block", respectively.
108//! * Introduce a "leader timeout" to trigger early view transitions for unresponsive leaders.
109//! * Skip "leader timeout" and "certification timeout" if a designated leader hasn't participated in
110//!   some number of views (again to trigger early view transition for an unresponsive leader).
111//! * Introduce message rebroadcast to continue making progress if messages from a given view are dropped (only way
112//!   to ensure messages are reliably delivered is with a heavyweight reliable broadcast protocol).
113//! * Treat local proposal failure as immediate timeout expiry and broadcast `nullify(v)`.
114//! * Treat local verification failure as immediate timeout expiry and broadcast `nullify(v)`.
115//! * Consider the current leader's `nullify(v)` as immediate timeout expiry and broadcast `nullify(v)`.
116//! * Upon seeing `notarization(c,v)`, instead of moving to the view `v+1` immediately, request certification from
117//!   the application (see [Certification](#certification)). Only move to view `v+1` and broadcast `finalize(c,v)`
118//!   if certification succeeds, otherwise broadcast `nullify(v)` and refuse to build upon `c`.
119//!
120//! ## Protocol Properties
121//!
122//! ### Forced Inclusion (Tail-Forking Resistance)
123//!
124//! A notarized payload in view `v` must appear in the canonical chain if no nullification
125//! certificate exists for `v`. This follows directly from the protocol rules:
126//!
127//! 1. To propose in view `v+k`, the leader must reference a certified parent in some view `v_p`
128//!    and possess nullification certificates for every view between `v_p` and `v+k`.
129//! 2. A nullification certificate for view `v` requires `2f+1` `nullify(v)` votes.
130//! 3. An honest participant only broadcasts `nullify(v)` when a timeout fires (`t_l` or `t_a`)
131//!    or when certification fails.
132//!
133//! Therefore, if view `v` completes without timeout and certification succeeds, no honest
134//! participant has broadcast `nullify(v)`. With at most `f` Byzantine participants, at most `f`
135//! `nullify(v)` votes exist, which is insufficient to form a nullification certificate. Without
136//! that certificate, no future leader can skip view `v`, and the notarized payload must be
137//! included as an ancestor in all subsequent proposals.
138//!
139//! ### Optimistic Finality
140//!
141//! The forced inclusion property provides a weaker but faster form of finality: once a
142//! notarization certificate is observed for view `v` (without any timeout having fired),
143//! the notarized payload can be treated as speculatively final. No future sequence of
144//! proposals can exclude it from the canonical chain.
145//!
146//! This "speculative finality" is available after just 2 network hops (proposal + notarization),
147//! compared to the 3 hops required for full finalization (proposal + notarization + finalization).
148//! A notarized-but-not-yet-finalized payload can only be excluded in two scenarios:
149//! `f+1` or more honest participants timed out, or certification failed. Because
150//! certification is deterministic, it either fails for all honest participants or none,
151//! so a certification failure always produces a nullification. In the common case
152//! (no faults, no timeouts), exclusion cannot happen.
153//!
154//! ### Unchained Finalization
155//!
156//! Finalization does not require consecutive honest views. When a participant certifies
157//! `notarization(c,v)`, it broadcasts `finalize(c,v)` and immediately enters `v+1`,
158//! regardless of what happens in subsequent views. These `finalize(c,v)` votes accumulate
159//! independently of the current view: even if views `v+1` through `v+k` all time out
160//! (producing nullifications), the `finalize(c,v)` votes still count toward the `2f+1`
161//! threshold needed to form `finalization(c,v)`.
162//!
163//! This means a payload notarized in view `v` can be finalized while the network is
164//! in view `v+k` for any `k >= 1`. There is no requirement that a particular view
165//! after `v` succeeds or that any subsequent leader cooperates. As long as `2f+1`
166//! participants eventually certify and broadcast `finalize(c,v)`, the finalization
167//! certificate will form.
168//!
169//! ## Architecture
170//!
171//! All logic is split into four components: the `Batcher`, the `Voter`, the `Resolver`, and the `Application` (provided by the user).
172//! The `Batcher` is responsible for collecting messages from peers and lazily verifying them when a quorum is met. The `Voter`
173//! is responsible for directing participation in the current view. The `Resolver` is responsible for
174//! fetching artifacts from previous views required to verify proposed blocks in the latest view. Lastly, the `Application`
175//! is responsible for proposing new blocks and indicating whether some block is valid.
176//!
177//! To drive great performance, all interactions between `Batcher`, `Voter`, `Resolver`, and `Application` are
178//! non-blocking. This means that, for example, the `Voter` can continue processing messages while the
179//! `Application` verifies a proposed block or the `Resolver` fetches a notarization.
180//!
181//! ```txt
182//!                            +------------+          +++++++++++++++
183//!                            |            +--------->+             +
184//!                            |  Batcher   |          +    Peers    +
185//!                            |            |<---------+             +
186//!                            +-------+----+          +++++++++++++++
187//!                                |   ^
188//!                                |   |
189//!                                |   |
190//!                                |   |
191//!                                v   |
192//! +---------------+           +---------+            +++++++++++++++
193//! |               |<----------+         +----------->+             +
194//! |  Application  |           |  Voter  |            +    Peers    +
195//! |               +---------->|         |<-----------+             +
196//! +---------------+           +--+------+            +++++++++++++++
197//!                                |   ^
198//!                                |   |
199//!                                |   |
200//!                                |   |
201//!                                v   |
202//!                            +-------+----+          +++++++++++++++
203//!                            |            +--------->+             +
204//!                            |  Resolver  |          +    Peers    +
205//!                            |            |<---------+             +
206//!                            +------------+          +++++++++++++++
207//! ```
208//!
209//! ### Batched Verification
210//!
211//! Unlike other consensus constructions that verify all incoming messages received from peers, for schemes
212//! where [`Scheme::is_batchable()`](commonware_cryptography::certificate::Scheme::is_batchable) returns `true`
213//! (such as [scheme::ed25519], [scheme::bls12381_multisig] and [scheme::bls12381_threshold]), `simplex` lazily
214//! verifies messages (only when a quorum is met), enabling efficient batch verification. For schemes where
215//! `is_batchable()` returns `false` (such as [scheme::secp256r1]), signatures are verified eagerly as they
216//! arrive since there is no batching benefit.
217//!
218//! If an invalid signature is detected, the `Batcher` will perform repeated bisections over collected
219//! messages to find the offending message (and block the peer(s) that sent it via [commonware_p2p::Blocker]).
220//!
221//! _If using a p2p implementation that is not authenticated, it is not safe to employ this optimization
222//! as any attacking peer could simply reconnect from a different address. We recommend [commonware_p2p::authenticated]._
223//!
224//! ### Fetching Missing Certificates
225//!
226//! Instead of trying to fetch all possible certificates above the floor, we only attempt to fetch
227//! nullifications for all views from the floor (last certified notarization or finalization) to the current view.
228//! This technique, however, is not sufficient to guarantee progress.
229//!
230//! Consider the case where `f` honest participants have seen a finalization for a given view `v` (and nullifications only
231//! from `v` to the current view `c`) but the remaining `f+1` honest participants have not (they have exclusively seen
232//! nullifications from some view `o < v` to `c`). Neither partition of participants will vote for the other's proposals.
233//!
234//! To ensure progress is eventually made, leaders with nullified proposals directly broadcast the best finalization
235//! certificate they are aware of to ensure all honest participants eventually consider the same proposal ancestry valid.
236//!
237//! _While a more aggressive recovery mechanism could be employed, like requiring all participants to broadcast their highest
238//! finalization certificate after nullification, it would impose significant overhead under normal network
239//! conditions (whereas the approach described incurs no overhead under normal network conditions). Recall, honest participants
240//! already broadcast observed certificates to all other participants in each view (and misaligned participants should only ever
241//! be observed following severe network degradation)._
242//!
243//! ## Pluggable Hashing and Cryptography
244//!
245//! Hashing is abstracted via the [commonware_cryptography::Hasher] trait and cryptography is abstracted via
246//! the [commonware_cryptography::certificate::Scheme] trait, allowing deployments to employ approaches that best match their
247//! requirements (or to provide their own without modifying any consensus logic). The following schemes
248//! are supported out-of-the-box:
249//!
250//! ### [scheme::ed25519]
251//!
252//! [commonware_cryptography::ed25519] signatures are ["High-speed high-security signatures"](https://eprint.iacr.org/2011/368)
253//! with 32 byte public keys and 64 byte signatures. While they are well-supported by commercial HSMs and offer efficient batch
254//! verification, the signatures are not aggregatable (and certificates grow linearly with the quorum size).
255//!
256//! ### [scheme::bls12381_multisig]
257//!
258//! [commonware_cryptography::bls12381] is a ["digital signature scheme with aggregation properties"](https://www.ietf.org/archive/id/draft-irtf-cfrg-bls-signature-05.txt).
259//! Unlike [commonware_cryptography::ed25519], signatures from multiple participants (say the signers in a certificate) can be aggregated
260//! into a single signature (reducing bandwidth usage per broadcast). That being said, [commonware_cryptography::bls12381] is much slower
261//! to verify than [commonware_cryptography::ed25519] and isn't supported by most HSMs (a standardization effort expired in 2022).
262//!
263//! ### [scheme::secp256r1]
264//!
265//! [commonware_cryptography::secp256r1] signatures use the NIST P-256 elliptic curve (also known as prime256v1), which is widely
266//! supported by commercial HSMs and hardware security modules. Unlike [commonware_cryptography::ed25519], Secp256r1 does not
267//! benefit from batch verification, so signatures are verified individually. Certificates grow linearly with quorum size
268//! (similar to ed25519).
269//!
270//! ### [scheme::bls12381_threshold]
271//!
272//! [scheme::bls12381_threshold] employs threshold cryptography (BLS12-381 threshold signatures with a `2f+1` of `3f+1` quorum)
273//! to generate succinct consensus certificates (verifiable with just the static public key). This scheme requires instantiating
274//! the shared secret via [commonware_cryptography::bls12381::dkg] and resharing whenever participants change.
275//!
276//! Two (non-attributable) variants are provided:
277//!
278//! - [scheme::bls12381_threshold::standard]: Certificates contain only a vote signature.
279//!
280//! - [scheme::bls12381_threshold::vrf]: Certificates contain a vote signature and a view signature (i.e. a seed that can be used
281//!   as a VRF). This variant can be configured for random leader election (via [elector::Random]) and/or incorporate this randomness
282//!   into execution.
283//!
284//! #### Embedded VRF ([scheme::bls12381_threshold::vrf])
285//!
286//! Every `notarize(c,v)` or `nullify(v)` message includes an `attestation(v)` (a partial signature over the view `v`). After `2f+1`
287//! `notarize(c,v)` or `nullify(v)` messages are collected from unique participants, `seed(v)` can be recovered. Because `attestation(v)` is
288//! only over the view `v`, the seed derived for a given view `v` is the same regardless of whether or not a block was notarized in said
289//! view `v`.
290//!
291//! Because the value of `seed(v)` cannot be known prior to message broadcast by any participant (including the leader) in view `v`
292//! and cannot be manipulated by any participant (deterministic for any `2f+1` signers at a given view `v`), it can be used both as a beacon
293//! for leader election (where `seed(v)` determines the leader for `v+1`) and a source of randomness in execution (where `seed(v)`
294//! is used as a seed in `v`).
295//!
296//! #### Succinct Certificates
297//!
298//! All broadcast consensus messages (`notarize(c,v)`, `nullify(v)`, `finalize(c,v)`) contain attestations (partial signatures) for a static
299//! public key (derived from a group polynomial that can be recomputed during reconfiguration using [dkg](commonware_cryptography::bls12381::dkg)).
300//! As soon as `2f+1` messages are collected, a threshold signature over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)`
301//! can be recovered, respectively. Because the public key is static, any of these certificates can be verified by an external
302//! process without following the consensus instance and/or tracking the current set of participants (as is typically required
303//! to operate a lite client).
304//!
305//! These threshold signatures over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)` (i.e. the consensus certificates)
306//! can be used to secure interoperability between different consensus instances and user interactions with an infrastructure provider
307//! (where any data served can be proven to derive from some finalized block of some consensus instance with a known static public key).
308//!
309//! ## Persistence
310//!
311//! The `Voter` caches all data required to participate in consensus to avoid any disk reads on
312//! on the critical path. To enable recovery, the `Voter` writes valid messages it receives from
313//! consensus and messages it generates to a write-ahead log (WAL) implemented by [commonware_storage::journal::segmented::variable::Journal].
314//! Before sending a message, the `Journal` sync is invoked to prevent inadvertent Byzantine behavior
315//! on restart (especially in the case of unclean shutdown).
316
317use crate::types::Round;
318use commonware_cryptography::PublicKey;
319
320pub mod elector;
321pub mod scheme;
322pub mod types;
323
324cfg_if::cfg_if! {
325    if #[cfg(not(target_arch = "wasm32"))] {
326        mod actors;
327        pub mod config;
328        pub use config::{Config, ForwardingPolicy};
329        mod engine;
330        pub use engine::Engine;
331        mod metrics;
332    }
333}
334
335#[cfg(any(test, feature = "mocks"))]
336pub mod mocks;
337
338#[cfg(not(target_arch = "wasm32"))]
339use crate::types::{View, ViewDelta};
340
341/// The minimum view we are tracking both in-memory and on-disk.
342#[cfg(not(target_arch = "wasm32"))]
343pub(crate) const fn min_active(activity_timeout: ViewDelta, last_finalized: View) -> View {
344    last_finalized.saturating_sub(activity_timeout)
345}
346
347/// Whether or not a view is interesting to us. This is a function
348/// of both `min_active` and whether or not the view is too far
349/// in the future (based on the view we are currently in).
350#[cfg(not(target_arch = "wasm32"))]
351pub(crate) fn interesting(
352    activity_timeout: ViewDelta,
353    last_finalized: View,
354    current: View,
355    pending: View,
356    allow_future: bool,
357) -> bool {
358    // If the view is genesis, skip it, genesis doesn't have votes
359    if pending.is_zero() {
360        return false;
361    }
362    if pending < min_active(activity_timeout, last_finalized) {
363        return false;
364    }
365    if !allow_future && pending > current.next() {
366        return false;
367    }
368    true
369}
370
371/// Describes how a payload should be broadcast to the network.
372pub enum Plan<P: PublicKey> {
373    /// Initial broadcast of a newly proposed block to all participants.
374    Propose,
375    /// Forward a block to a specific set of peers.
376    Forward {
377        /// The round in which the forwarded block was proposed.
378        round: Round,
379        /// The peers to forward the block to.
380        peers: Vec<P>,
381    },
382}
383
384/// Convenience alias for [`N3f1::quorum`].
385#[cfg(test)]
386pub(crate) fn quorum(n: u32) -> u32 {
387    use commonware_utils::{Faults, N3f1};
388
389    N3f1::quorum(n)
390}
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395    use crate::{
396        simplex::{
397            elector::{Config as Elector, Random, RoundRobin},
398            mocks::{
399                scheme as scheme_mocks,
400                twins::{self, Elector as TwinsElector},
401                wrapped,
402            },
403            scheme::{
404                bls12381_multisig,
405                bls12381_threshold::{
406                    standard as bls12381_threshold_std,
407                    vrf::{self as bls12381_threshold_vrf, Seedable},
408                },
409                ed25519, secp256r1, Scheme,
410            },
411            types::{
412                Certificate, Finalization as TFinalization, Finalize as TFinalize,
413                Notarization as TNotarization, Notarize as TNotarize,
414                Nullification as TNullification, Nullify as TNullify, Proposal, Vote,
415            },
416        },
417        types::{Epoch, Round},
418        Monitor, Viewable,
419    };
420    use commonware_codec::{Decode, DecodeExt, Encode};
421    use commonware_cryptography::{
422        bls12381::primitives::variant::{MinPk, MinSig, Variant},
423        certificate::mocks::Fixture,
424        ed25519::{PrivateKey, PublicKey},
425        sha256::{Digest as Sha256Digest, Digest as D},
426        Hasher as _, Sha256, Signer as _,
427    };
428    use commonware_macros::{select, test_group, test_traced};
429    use commonware_p2p::{
430        simulated::{Config, Link, Network, Oracle, Receiver, Sender, SplitOrigin},
431        utils::mocks::inert_channel,
432        Manager as _, Recipients, Sender as _, TrackedPeers,
433    };
434    use commonware_parallel::Sequential;
435    use commonware_runtime::{
436        buffer::paged::CacheRef, count_running_tasks, deterministic, Clock, IoBuf, Metrics, Quota,
437        Runner, Spawner,
438    };
439    use commonware_utils::{ordered::Set, sync::Mutex, test_rng, Faults, N3f1, NZUsize, NZU16};
440    use engine::Engine;
441    use futures::future::join_all;
442    use rand::{rngs::StdRng, Rng as _, SeedableRng};
443    use std::{
444        collections::{BTreeMap, HashMap, HashSet},
445        num::{NonZeroU16, NonZeroU32, NonZeroUsize},
446        sync::Arc,
447        time::Duration,
448    };
449    use tracing::{debug, info, warn};
450    use types::Activity;
451
452    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
453    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
454    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
455
456    #[test]
457    fn test_interesting() {
458        let activity_timeout = ViewDelta::new(10);
459
460        // Genesis view is never interesting
461        assert!(!interesting(
462            activity_timeout,
463            View::zero(),
464            View::zero(),
465            View::zero(),
466            false
467        ));
468        assert!(!interesting(
469            activity_timeout,
470            View::zero(),
471            View::new(1),
472            View::zero(),
473            true
474        ));
475
476        // View below min_active is not interesting
477        assert!(!interesting(
478            activity_timeout,
479            View::new(20),
480            View::new(25),
481            View::new(5), // below min_active (10)
482            false
483        ));
484
485        // View at min_active boundary is interesting
486        assert!(interesting(
487            activity_timeout,
488            View::new(20),
489            View::new(25),
490            View::new(10), // exactly min_active
491            false
492        ));
493
494        // Future view beyond current.next() is not interesting when allow_future is false
495        assert!(!interesting(
496            activity_timeout,
497            View::new(20),
498            View::new(25),
499            View::new(27),
500            false
501        ));
502
503        // Future view beyond current.next() is interesting when allow_future is true
504        assert!(interesting(
505            activity_timeout,
506            View::new(20),
507            View::new(25),
508            View::new(27),
509            true
510        ));
511
512        // View at current.next() is interesting
513        assert!(interesting(
514            activity_timeout,
515            View::new(20),
516            View::new(25),
517            View::new(26),
518            false
519        ));
520
521        // View within valid range is interesting
522        assert!(interesting(
523            activity_timeout,
524            View::new(20),
525            View::new(25),
526            View::new(22),
527            false
528        ));
529
530        // When last_finalized is 0 and activity_timeout would underflow
531        // min_active saturates at 0, so view 1 should still be interesting
532        assert!(interesting(
533            activity_timeout,
534            View::zero(),
535            View::new(5),
536            View::new(1),
537            false
538        ));
539    }
540
541    /// Register a validator with the oracle.
542    async fn register_validator(
543        oracle: &mut Oracle<PublicKey, deterministic::Context>,
544        validator: PublicKey,
545    ) -> (
546        (
547            Sender<PublicKey, deterministic::Context>,
548            Receiver<PublicKey>,
549        ),
550        (
551            Sender<PublicKey, deterministic::Context>,
552            Receiver<PublicKey>,
553        ),
554        (
555            Sender<PublicKey, deterministic::Context>,
556            Receiver<PublicKey>,
557        ),
558    ) {
559        let control = oracle.control(validator.clone());
560        let (vote_sender, vote_receiver) = control.register(0, TEST_QUOTA).await.unwrap();
561        let (certificate_sender, certificate_receiver) =
562            control.register(1, TEST_QUOTA).await.unwrap();
563        let (resolver_sender, resolver_receiver) = control.register(2, TEST_QUOTA).await.unwrap();
564        (
565            (vote_sender, vote_receiver),
566            (certificate_sender, certificate_receiver),
567            (resolver_sender, resolver_receiver),
568        )
569    }
570
571    /// Registers all validators using the oracle.
572    async fn register_validators(
573        oracle: &mut Oracle<PublicKey, deterministic::Context>,
574        validators: &[PublicKey],
575    ) -> HashMap<
576        PublicKey,
577        (
578            (
579                Sender<PublicKey, deterministic::Context>,
580                Receiver<PublicKey>,
581            ),
582            (
583                Sender<PublicKey, deterministic::Context>,
584                Receiver<PublicKey>,
585            ),
586            (
587                Sender<PublicKey, deterministic::Context>,
588                Receiver<PublicKey>,
589            ),
590        ),
591    > {
592        let mut registrations = HashMap::new();
593        for validator in validators.iter() {
594            let registration = register_validator(oracle, validator.clone()).await;
595            registrations.insert(validator.clone(), registration);
596        }
597        registrations
598    }
599
600    async fn start_test_network_with_peers<I>(
601        context: deterministic::Context,
602        peers: I,
603        disconnect_on_block: bool,
604    ) -> Oracle<PublicKey, deterministic::Context>
605    where
606        I: IntoIterator<Item = PublicKey>,
607    {
608        let (network, oracle) = Network::new_with_peers(
609            context.with_label("network"),
610            Config {
611                max_size: 1024 * 1024,
612                disconnect_on_block,
613                tracked_peer_sets: NZUsize!(1),
614            },
615            peers,
616        )
617        .await;
618        network.start();
619        oracle
620    }
621
622    async fn start_test_network_with_split_peers<I, J>(
623        context: deterministic::Context,
624        primary: I,
625        secondary: J,
626        disconnect_on_block: bool,
627    ) -> Oracle<PublicKey, deterministic::Context>
628    where
629        I: IntoIterator<Item = PublicKey>,
630        J: IntoIterator<Item = PublicKey>,
631    {
632        let (network, oracle) = Network::new_with_split_peers(
633            context.with_label("network"),
634            Config {
635                max_size: 1024 * 1024,
636                disconnect_on_block,
637                tracked_peer_sets: NZUsize!(1),
638            },
639            primary,
640            secondary,
641        )
642        .await;
643        network.start();
644        oracle
645    }
646
647    /// Enum to describe the action to take when linking validators.
648    enum Action {
649        Link(Link),
650        Update(Link), // Unlink and then link
651        Unlink,
652    }
653
654    /// Links (or unlinks) validators using the oracle.
655    ///
656    /// The `action` parameter determines the action (e.g. link, unlink) to take.
657    /// The `restrict_to` function can be used to restrict the linking to certain connections,
658    /// otherwise all validators will be linked to all other validators.
659    async fn link_validators(
660        oracle: &mut Oracle<PublicKey, deterministic::Context>,
661        validators: &[PublicKey],
662        action: Action,
663        restrict_to: Option<fn(usize, usize, usize) -> bool>,
664    ) {
665        for (i1, v1) in validators.iter().enumerate() {
666            for (i2, v2) in validators.iter().enumerate() {
667                // Ignore self
668                if v2 == v1 {
669                    continue;
670                }
671
672                // Restrict to certain connections
673                if let Some(f) = restrict_to {
674                    if !f(validators.len(), i1, i2) {
675                        continue;
676                    }
677                }
678
679                // Do any unlinking first
680                match action {
681                    Action::Update(_) | Action::Unlink => {
682                        oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
683                    }
684                    _ => {}
685                }
686
687                // Do any linking after
688                match action {
689                    Action::Link(ref link) | Action::Update(ref link) => {
690                        oracle
691                            .add_link(v1.clone(), v2.clone(), link.clone())
692                            .await
693                            .unwrap();
694                    }
695                    _ => {}
696                }
697            }
698        }
699    }
700
701    /// Counts lines where all patterns match and the trailing value is non-zero.
702    fn count_nonzero_metric_lines(encoded: &str, patterns: &[&str]) -> u32 {
703        encoded
704            .lines()
705            .filter(|line| patterns.iter().all(|p| line.contains(p)))
706            .filter(|line| {
707                line.split_whitespace()
708                    .last()
709                    .and_then(|s| s.parse::<u64>().ok())
710                    .is_some_and(|n| n > 0)
711            })
712            .count() as u32
713    }
714
715    fn all_online<S, F, L>(mut fixture: F)
716    where
717        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
718        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
719        L: Elector<S>,
720    {
721        // Create context
722        let n = 5;
723        let quorum = quorum(n) as usize;
724        let required_containers = View::new(100);
725        let activity_timeout = ViewDelta::new(10);
726        let skip_timeout = ViewDelta::new(5);
727        let namespace = b"consensus".to_vec();
728        let executor = deterministic::Runner::timed(Duration::from_secs(300));
729        executor.start(|mut context| async move {
730            // Register participants
731            let Fixture {
732                participants,
733                schemes,
734                ..
735            } = fixture(&mut context, &namespace, n);
736            let mut oracle =
737                start_test_network_with_peers(context.clone(), participants.clone(), true).await;
738            let mut registrations = register_validators(&mut oracle, &participants).await;
739
740            // Link all validators
741            let link = Link {
742                latency: Duration::from_millis(10),
743                jitter: Duration::from_millis(1),
744                success_rate: 1.0,
745            };
746            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
747
748            // Create engines
749            let elector = L::default();
750            let relay = Arc::new(mocks::relay::Relay::new());
751            let mut reporters = Vec::new();
752            let mut engine_handlers = Vec::new();
753            for (idx, validator) in participants.iter().enumerate() {
754                // Create scheme context
755                let context = context.with_label(&format!("validator_{}", *validator));
756
757                // Configure engine
758                let reporter_config = mocks::reporter::Config {
759                    participants: participants.clone().try_into().unwrap(),
760                    scheme: schemes[idx].clone(),
761                    elector: elector.clone(),
762                };
763                let reporter =
764                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
765                reporters.push(reporter.clone());
766                let application_cfg = mocks::application::Config {
767                    hasher: Sha256::default(),
768                    relay: relay.clone(),
769                    me: validator.clone(),
770                    propose_latency: (10.0, 5.0),
771                    verify_latency: (10.0, 5.0),
772                    certify_latency: (10.0, 5.0),
773                    should_certify: mocks::application::Certifier::Sometimes,
774                };
775                let (actor, application) = mocks::application::Application::new(
776                    context.with_label("application"),
777                    application_cfg,
778                );
779                actor.start();
780                let blocker = oracle.control(validator.clone());
781                let cfg = config::Config {
782                    scheme: schemes[idx].clone(),
783                    elector: elector.clone(),
784                    blocker,
785                    automaton: application.clone(),
786                    relay: application.clone(),
787                    reporter: reporter.clone(),
788                    strategy: Sequential,
789                    partition: validator.to_string(),
790                    mailbox_size: 1024,
791                    epoch: Epoch::new(333),
792                    leader_timeout: Duration::from_secs(1),
793                    certification_timeout: Duration::from_secs(2),
794                    timeout_retry: Duration::from_secs(10),
795                    fetch_timeout: Duration::from_secs(1),
796                    activity_timeout,
797                    skip_timeout,
798                    fetch_concurrent: 4,
799                    replay_buffer: NZUsize!(1024 * 1024),
800                    write_buffer: NZUsize!(1024 * 1024),
801                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
802                    forwarding: ForwardingPolicy::Disabled,
803                };
804                let engine = Engine::new(context.with_label("engine"), cfg);
805
806                // Start engine
807                let (pending, recovered, resolver) = registrations
808                    .remove(validator)
809                    .expect("validator should be registered");
810                engine_handlers.push(engine.start(pending, recovered, resolver));
811            }
812
813            // Wait for all engines to finish
814            let mut finalizers = Vec::new();
815            for reporter in reporters.iter_mut() {
816                let (mut latest, mut monitor) = reporter.subscribe().await;
817                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
818                    while latest < required_containers {
819                        latest = monitor.recv().await.expect("event missing");
820                    }
821                }));
822            }
823            join_all(finalizers).await;
824
825            // Check reporters for correct activity
826            let latest_complete = required_containers.saturating_sub(activity_timeout);
827            for reporter in reporters.iter() {
828                // Ensure no faults
829                reporter.assert_no_faults();
830
831                // Ensure no invalid signatures
832                reporter.assert_no_invalid();
833
834                // Ensure certificates for all views
835                {
836                    let certified = reporter.certified.lock();
837                    for view in View::range(View::new(1), latest_complete) {
838                        // Ensure certificate for every view
839                        if !certified.contains(&view) {
840                            panic!("view: {view}");
841                        }
842                    }
843                }
844
845                // Ensure no forks
846                let mut notarized = HashMap::new();
847                let mut finalized = HashMap::new();
848                {
849                    let notarizes = reporter.notarizes.lock();
850                    for view in View::range(View::new(1), latest_complete) {
851                        // Ensure only one payload proposed per view
852                        let Some(payloads) = notarizes.get(&view) else {
853                            continue;
854                        };
855                        if payloads.len() > 1 {
856                            panic!("view: {view}");
857                        }
858                        let (digest, notarizers) = payloads.iter().next().unwrap();
859                        notarized.insert(view, *digest);
860
861                        if notarizers.len() < quorum {
862                            // We can't verify that everyone participated at every view because some nodes may
863                            // have started later.
864                            panic!("view: {view}");
865                        }
866                    }
867                }
868                {
869                    let notarizations = reporter.notarizations.lock();
870                    for view in View::range(View::new(1), latest_complete) {
871                        // Ensure notarization matches digest from notarizes
872                        let Some(notarization) = notarizations.get(&view) else {
873                            continue;
874                        };
875                        let Some(digest) = notarized.get(&view) else {
876                            continue;
877                        };
878                        assert_eq!(&notarization.proposal.payload, digest);
879                    }
880                }
881                {
882                    let finalizes = reporter.finalizes.lock();
883                    for view in View::range(View::new(1), latest_complete) {
884                        // Ensure only one payload proposed per view
885                        let Some(payloads) = finalizes.get(&view) else {
886                            continue;
887                        };
888                        if payloads.len() > 1 {
889                            panic!("view: {view}");
890                        }
891                        let (digest, finalizers) = payloads.iter().next().unwrap();
892                        finalized.insert(view, *digest);
893
894                        // Only check at views below timeout
895                        if view > latest_complete {
896                            continue;
897                        }
898
899                        // Ensure everyone participating
900                        if finalizers.len() < quorum {
901                            // We can't verify that everyone participated at every view because some nodes may
902                            // have started later.
903                            panic!("view: {view}");
904                        }
905
906                        // Ensure no nullifies for any finalizers
907                        let nullifies = reporter.nullifies.lock();
908                        let Some(nullifies) = nullifies.get(&view) else {
909                            continue;
910                        };
911                        for (_, finalizers) in payloads.iter() {
912                            for finalizer in finalizers.iter() {
913                                if nullifies.contains(finalizer) {
914                                    panic!("should not nullify and finalize at same view");
915                                }
916                            }
917                        }
918                    }
919                }
920                {
921                    let finalizations = reporter.finalizations.lock();
922                    for view in View::range(View::new(1), latest_complete) {
923                        // Ensure finalization matches digest from finalizes
924                        let Some(finalization) = finalizations.get(&view) else {
925                            continue;
926                        };
927                        let Some(digest) = finalized.get(&view) else {
928                            continue;
929                        };
930                        assert_eq!(&finalization.proposal.payload, digest);
931                    }
932                }
933            }
934
935            // Ensure no blocked connections
936            let blocked = oracle.blocked().await.unwrap();
937            assert!(blocked.is_empty());
938        });
939    }
940
941    #[test_group("slow")]
942    #[test_traced]
943    fn test_all_online() {
944        all_online::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
945        all_online::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
946        all_online::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
947        all_online::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
948        all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
949        all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
950        all_online::<_, _, RoundRobin>(ed25519::fixture);
951        all_online::<_, _, RoundRobin>(secp256r1::fixture);
952    }
953
954    fn observer<S, F, L>(mut fixture: F)
955    where
956        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
957        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
958        L: Elector<S>,
959    {
960        // Create context
961        let n_active = 5;
962        let required_containers = View::new(100);
963        let activity_timeout = ViewDelta::new(10);
964        let skip_timeout = ViewDelta::new(5);
965        let namespace = b"consensus".to_vec();
966        let executor = deterministic::Runner::timed(Duration::from_secs(300));
967        executor.start(|mut context| async move {
968            // Register participants (active)
969            let Fixture {
970                participants,
971                schemes,
972                verifier,
973                ..
974            } = fixture(&mut context, &namespace, n_active);
975
976            // Add observer (no share)
977            let private_key_observer = PrivateKey::from_seed(n_active as u64);
978            let public_key_observer = private_key_observer.public_key();
979
980            let mut oracle = start_test_network_with_split_peers(
981                context.clone(),
982                participants.clone(),
983                [public_key_observer.clone()],
984                true,
985            )
986            .await;
987
988            // Register all (including observer) with the network
989            let mut all_validators = participants.clone();
990            all_validators.push(public_key_observer.clone());
991            all_validators.sort();
992            let mut registrations = register_validators(&mut oracle, &all_validators).await;
993
994            // Link all peers (including observer)
995            let link = Link {
996                latency: Duration::from_millis(10),
997                jitter: Duration::from_millis(1),
998                success_rate: 1.0,
999            };
1000            link_validators(&mut oracle, &all_validators, Action::Link(link), None).await;
1001
1002            // Create engines
1003            let elector = L::default();
1004            let relay = Arc::new(mocks::relay::Relay::new());
1005            let mut reporters = Vec::new();
1006
1007            for (idx, validator) in participants.iter().enumerate() {
1008                let is_observer = *validator == public_key_observer;
1009
1010                // Create scheme context
1011                let context = context.with_label(&format!("validator_{}", *validator));
1012
1013                // Configure engine
1014                let signing = if is_observer {
1015                    verifier.clone()
1016                } else {
1017                    schemes[idx].clone()
1018                };
1019                let reporter_config = mocks::reporter::Config {
1020                    participants: participants.clone().try_into().unwrap(),
1021                    scheme: signing.clone(),
1022                    elector: elector.clone(),
1023                };
1024                let reporter =
1025                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1026                reporters.push(reporter.clone());
1027                let application_cfg = mocks::application::Config {
1028                    hasher: Sha256::default(),
1029                    relay: relay.clone(),
1030                    me: validator.clone(),
1031                    propose_latency: (10.0, 5.0),
1032                    verify_latency: (10.0, 5.0),
1033                    certify_latency: (10.0, 5.0),
1034                    should_certify: mocks::application::Certifier::Sometimes,
1035                };
1036                let (actor, application) = mocks::application::Application::new(
1037                    context.with_label("application"),
1038                    application_cfg,
1039                );
1040                actor.start();
1041                let blocker = oracle.control(validator.clone());
1042                let cfg = config::Config {
1043                    scheme: signing.clone(),
1044                    elector: elector.clone(),
1045                    blocker,
1046                    automaton: application.clone(),
1047                    relay: application.clone(),
1048                    reporter: reporter.clone(),
1049                    strategy: Sequential,
1050                    partition: validator.to_string(),
1051                    mailbox_size: 1024,
1052                    epoch: Epoch::new(333),
1053                    leader_timeout: Duration::from_secs(1),
1054                    certification_timeout: Duration::from_secs(2),
1055                    timeout_retry: Duration::from_secs(10),
1056                    fetch_timeout: Duration::from_secs(1),
1057                    activity_timeout,
1058                    skip_timeout,
1059                    fetch_concurrent: 4,
1060                    replay_buffer: NZUsize!(1024 * 1024),
1061                    write_buffer: NZUsize!(1024 * 1024),
1062                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1063                    forwarding: ForwardingPolicy::Disabled,
1064                };
1065                let engine = Engine::new(context.with_label("engine"), cfg);
1066
1067                // Start engine
1068                let (pending, recovered, resolver) = registrations
1069                    .remove(validator)
1070                    .expect("validator should be registered");
1071                engine.start(pending, recovered, resolver);
1072            }
1073
1074            // Wait for all  engines to finish
1075            let mut finalizers = Vec::new();
1076            for reporter in reporters.iter_mut() {
1077                let (mut latest, mut monitor) = reporter.subscribe().await;
1078                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1079                    while latest < required_containers {
1080                        latest = monitor.recv().await.expect("event missing");
1081                    }
1082                }));
1083            }
1084            join_all(finalizers).await;
1085
1086            // Sanity check. The standalone secondary observer should still
1087            // process the chain to the same progress threshold as validators.
1088            for reporter in reporters.iter() {
1089                // Ensure no faults or invalid signatures
1090                reporter.assert_no_faults();
1091                reporter.assert_no_invalid();
1092
1093                // Ensure no blocked connections
1094                let blocked = oracle.blocked().await.unwrap();
1095                assert!(blocked.is_empty());
1096            }
1097        });
1098    }
1099
1100    #[test_group("slow")]
1101    #[test_traced]
1102    fn test_observer() {
1103        observer::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1104        observer::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1105        observer::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1106        observer::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1107        observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1108        observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1109        observer::<_, _, RoundRobin>(ed25519::fixture);
1110        observer::<_, _, RoundRobin>(secp256r1::fixture);
1111    }
1112
1113    fn unclean_shutdown<S, F, L>(mut fixture: F)
1114    where
1115        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1116        F: FnMut(&mut StdRng, &[u8], u32) -> Fixture<S>,
1117        L: Elector<S>,
1118    {
1119        // Create context
1120        let n = 5;
1121        let required_containers = View::new(100);
1122        let activity_timeout = ViewDelta::new(10);
1123        let skip_timeout = ViewDelta::new(5);
1124        let namespace = b"consensus".to_vec();
1125
1126        // Random restarts every x seconds
1127        let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
1128        let supervised = Arc::new(Mutex::new(Vec::new()));
1129        let mut prev_checkpoint = None;
1130
1131        // Create validator keys
1132        let mut rng = test_rng();
1133        let Fixture {
1134            participants,
1135            schemes,
1136            ..
1137        } = fixture(&mut rng, &namespace, n);
1138
1139        // Create block relay, shared across restarts.
1140        let relay = Arc::new(mocks::relay::Relay::<Sha256Digest, S::PublicKey>::new());
1141
1142        loop {
1143            let rng = rng.clone();
1144            let participants = participants.clone();
1145            let schemes = schemes.clone();
1146            let shutdowns = shutdowns.clone();
1147            let supervised = supervised.clone();
1148            let relay = relay.clone();
1149            relay.deregister_all(); // Clear all recipients from previous restart.
1150
1151            let f = |mut context: deterministic::Context| async move {
1152                // Register participants
1153                let mut oracle =
1154                    start_test_network_with_peers(context.clone(), participants.clone(), true)
1155                        .await;
1156                let mut registrations = register_validators(&mut oracle, &participants).await;
1157
1158                // Link all validators
1159                let link = Link {
1160                    latency: Duration::from_millis(50),
1161                    jitter: Duration::from_millis(50),
1162                    success_rate: 1.0,
1163                };
1164                link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1165
1166                // Create engines
1167                let elector = L::default();
1168                let relay = Arc::new(mocks::relay::Relay::new());
1169                let mut reporters = HashMap::new();
1170                let mut engine_handlers = Vec::new();
1171                for (idx, validator) in participants.iter().enumerate() {
1172                    // Create scheme context
1173                    let context = context.with_label(&format!("validator_{}", *validator));
1174
1175                    // Configure engine
1176                    let reporter_config = mocks::reporter::Config {
1177                        participants: participants.clone().try_into().unwrap(),
1178                        scheme: schemes[idx].clone(),
1179                        elector: elector.clone(),
1180                    };
1181                    let reporter = mocks::reporter::Reporter::new(rng.clone(), reporter_config);
1182                    reporters.insert(validator.clone(), reporter.clone());
1183                    let application_cfg = mocks::application::Config {
1184                        hasher: Sha256::default(),
1185                        relay: relay.clone(),
1186                        me: validator.clone(),
1187                        propose_latency: (10.0, 5.0),
1188                        verify_latency: (10.0, 5.0),
1189                        certify_latency: (10.0, 5.0),
1190                        should_certify: mocks::application::Certifier::Sometimes,
1191                    };
1192                    let (actor, application) = mocks::application::Application::new(
1193                        context.with_label("application"),
1194                        application_cfg,
1195                    );
1196                    actor.start();
1197                    let blocker = oracle.control(validator.clone());
1198                    let cfg = config::Config {
1199                        scheme: schemes[idx].clone(),
1200                        elector: elector.clone(),
1201                        blocker,
1202                        automaton: application.clone(),
1203                        relay: application.clone(),
1204                        reporter: reporter.clone(),
1205                        strategy: Sequential,
1206                        partition: validator.to_string(),
1207                        mailbox_size: 1024,
1208                        epoch: Epoch::new(333),
1209                        leader_timeout: Duration::from_secs(1),
1210                        certification_timeout: Duration::from_secs(2),
1211                        timeout_retry: Duration::from_secs(10),
1212                        fetch_timeout: Duration::from_secs(1),
1213                        activity_timeout,
1214                        skip_timeout,
1215                        fetch_concurrent: 4,
1216                        replay_buffer: NZUsize!(1024 * 1024),
1217                        write_buffer: NZUsize!(1024 * 1024),
1218                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1219                        forwarding: ForwardingPolicy::Disabled,
1220                    };
1221                    let engine = Engine::new(context.with_label("engine"), cfg);
1222
1223                    // Start engine
1224                    let (pending, recovered, resolver) = registrations
1225                        .remove(validator)
1226                        .expect("validator should be registered");
1227                    engine_handlers.push(engine.start(pending, recovered, resolver));
1228                }
1229
1230                // Store all finalizer handles
1231                let mut finalizers = Vec::new();
1232                for (_, reporter) in reporters.iter_mut() {
1233                    let (mut latest, mut monitor) = reporter.subscribe().await;
1234                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1235                        while latest < required_containers {
1236                            latest = monitor.recv().await.expect("event missing");
1237                        }
1238                    }));
1239                }
1240
1241                // Exit at random points for unclean shutdown of entire set
1242                let wait =
1243                    context.gen_range(Duration::from_millis(100)..Duration::from_millis(2_000));
1244                let result = select! {
1245                    _ = context.sleep(wait) => {
1246                        // Collect reporters to check faults
1247                        {
1248                            let mut shutdowns = shutdowns.lock();
1249                            debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
1250                            *shutdowns += 1;
1251                        }
1252                        supervised.lock().push(reporters);
1253                        false
1254                    },
1255                    _ = join_all(finalizers) => {
1256                        // Check reporters for faults activity
1257                        let supervised = supervised.lock();
1258                        for reporters in supervised.iter() {
1259                            for (_, reporter) in reporters.iter() {
1260                                reporter.assert_no_faults();
1261                            }
1262                        }
1263                        true
1264                    },
1265                };
1266
1267                // Ensure no blocked connections
1268                let blocked = oracle.blocked().await.unwrap();
1269                assert!(blocked.is_empty());
1270
1271                result
1272            };
1273
1274            let (complete, checkpoint) = prev_checkpoint
1275                .map_or_else(
1276                    || deterministic::Runner::timed(Duration::from_secs(180)),
1277                    deterministic::Runner::from,
1278                )
1279                .start_and_recover(f);
1280
1281            // Check if we should exit
1282            if complete {
1283                break;
1284            }
1285
1286            prev_checkpoint = Some(checkpoint);
1287        }
1288    }
1289
1290    #[test_group("slow")]
1291    #[test_traced]
1292    fn test_unclean_shutdown() {
1293        unclean_shutdown::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1294        unclean_shutdown::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1295        unclean_shutdown::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1296        unclean_shutdown::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1297        unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1298        unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1299        unclean_shutdown::<_, _, RoundRobin>(ed25519::fixture);
1300        unclean_shutdown::<_, _, RoundRobin>(secp256r1::fixture);
1301    }
1302
1303    fn backfill<S, F, L>(mut fixture: F)
1304    where
1305        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1306        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1307        L: Elector<S>,
1308    {
1309        // Create context
1310        let n = 4;
1311        let required_containers = View::new(100);
1312        let activity_timeout = ViewDelta::new(10);
1313        let skip_timeout = ViewDelta::new(5);
1314        let namespace = b"consensus".to_vec();
1315        let executor = deterministic::Runner::timed(Duration::from_secs(240));
1316        executor.start(|mut context| async move {
1317            // Register participants
1318            let Fixture {
1319                participants,
1320                schemes,
1321                ..
1322            } = fixture(&mut context, &namespace, n);
1323            let mut oracle =
1324                start_test_network_with_peers(context.clone(), participants.clone(), true).await;
1325            let mut registrations = register_validators(&mut oracle, &participants).await;
1326
1327            // Link all validators except first
1328            let link = Link {
1329                latency: Duration::from_millis(10),
1330                jitter: Duration::from_millis(1),
1331                success_rate: 1.0,
1332            };
1333            link_validators(
1334                &mut oracle,
1335                &participants,
1336                Action::Link(link),
1337                Some(|_, i, j| ![i, j].contains(&0usize)),
1338            )
1339            .await;
1340
1341            // Create engines
1342            let elector = L::default();
1343            let relay = Arc::new(mocks::relay::Relay::new());
1344            let mut reporters = Vec::new();
1345            let mut engine_handlers = Vec::new();
1346            for (idx_scheme, validator) in participants.iter().enumerate() {
1347                // Skip first peer
1348                if idx_scheme == 0 {
1349                    continue;
1350                }
1351
1352                // Create scheme context
1353                let context = context.with_label(&format!("validator_{}", *validator));
1354
1355                // Configure engine
1356                let reporter_config = mocks::reporter::Config {
1357                    participants: participants.clone().try_into().unwrap(),
1358                    scheme: schemes[idx_scheme].clone(),
1359                    elector: elector.clone(),
1360                };
1361                let reporter =
1362                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1363                reporters.push(reporter.clone());
1364                let application_cfg = mocks::application::Config {
1365                    hasher: Sha256::default(),
1366                    relay: relay.clone(),
1367                    me: validator.clone(),
1368                    propose_latency: (10.0, 5.0),
1369                    verify_latency: (10.0, 5.0),
1370                    certify_latency: (10.0, 5.0),
1371                    should_certify: mocks::application::Certifier::Sometimes,
1372                };
1373                let (actor, application) = mocks::application::Application::new(
1374                    context.with_label("application"),
1375                    application_cfg,
1376                );
1377                actor.start();
1378                let blocker = oracle.control(validator.clone());
1379                let cfg = config::Config {
1380                    scheme: schemes[idx_scheme].clone(),
1381                    elector: elector.clone(),
1382                    blocker,
1383                    automaton: application.clone(),
1384                    relay: application.clone(),
1385                    reporter: reporter.clone(),
1386                    strategy: Sequential,
1387                    partition: validator.to_string(),
1388                    mailbox_size: 1024,
1389                    epoch: Epoch::new(333),
1390                    leader_timeout: Duration::from_secs(1),
1391                    certification_timeout: Duration::from_secs(2),
1392                    timeout_retry: Duration::from_secs(10),
1393                    fetch_timeout: Duration::from_secs(1),
1394                    activity_timeout,
1395                    skip_timeout,
1396                    fetch_concurrent: 4,
1397                    replay_buffer: NZUsize!(1024 * 1024),
1398                    write_buffer: NZUsize!(1024 * 1024),
1399                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1400                    forwarding: ForwardingPolicy::Disabled,
1401                };
1402                let engine = Engine::new(context.with_label("engine"), cfg);
1403
1404                // Start engine
1405                let (pending, recovered, resolver) = registrations
1406                    .remove(validator)
1407                    .expect("validator should be registered");
1408                engine_handlers.push(engine.start(pending, recovered, resolver));
1409            }
1410
1411            // Wait for all engines to finish
1412            let mut finalizers = Vec::new();
1413            for reporter in reporters.iter_mut() {
1414                let (mut latest, mut monitor) = reporter.subscribe().await;
1415                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1416                    while latest < required_containers {
1417                        latest = monitor.recv().await.expect("event missing");
1418                    }
1419                }));
1420            }
1421            join_all(finalizers).await;
1422
1423            // Degrade network connections for online peers
1424            let link = Link {
1425                latency: Duration::from_secs(3),
1426                jitter: Duration::from_millis(0),
1427                success_rate: 1.0,
1428            };
1429            link_validators(
1430                &mut oracle,
1431                &participants,
1432                Action::Update(link.clone()),
1433                Some(|_, i, j| ![i, j].contains(&0usize)),
1434            )
1435            .await;
1436
1437            // Wait for nullifications to accrue
1438            context.sleep(Duration::from_secs(60)).await;
1439
1440            // Unlink second peer from all (except first)
1441            link_validators(
1442                &mut oracle,
1443                &participants,
1444                Action::Unlink,
1445                Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
1446            )
1447            .await;
1448
1449            // Configure engine for first peer
1450            let me = participants[0].clone();
1451            let context = context.with_label(&format!("validator_{me}"));
1452
1453            // Link first peer to all (except second)
1454            link_validators(
1455                &mut oracle,
1456                &participants,
1457                Action::Link(link),
1458                Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
1459            )
1460            .await;
1461
1462            // Restore network connections for all online peers
1463            let link = Link {
1464                latency: Duration::from_millis(10),
1465                jitter: Duration::from_millis(3),
1466                success_rate: 1.0,
1467            };
1468            link_validators(
1469                &mut oracle,
1470                &participants,
1471                Action::Update(link),
1472                Some(|_, i, j| ![i, j].contains(&1usize)),
1473            )
1474            .await;
1475
1476            // Configure engine
1477            let reporter_config = mocks::reporter::Config {
1478                participants: participants.clone().try_into().unwrap(),
1479                scheme: schemes[0].clone(),
1480                elector: elector.clone(),
1481            };
1482            let mut reporter =
1483                mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1484            reporters.push(reporter.clone());
1485            let application_cfg = mocks::application::Config {
1486                hasher: Sha256::default(),
1487                relay: relay.clone(),
1488                me: me.clone(),
1489                propose_latency: (10.0, 5.0),
1490                verify_latency: (10.0, 5.0),
1491                certify_latency: (10.0, 5.0),
1492                should_certify: mocks::application::Certifier::Sometimes,
1493            };
1494            let (actor, application) = mocks::application::Application::new(
1495                context.with_label("application"),
1496                application_cfg,
1497            );
1498            actor.start();
1499            let blocker = oracle.control(me.clone());
1500            let cfg = config::Config {
1501                scheme: schemes[0].clone(),
1502                elector: elector.clone(),
1503                blocker,
1504                automaton: application.clone(),
1505                relay: application.clone(),
1506                reporter: reporter.clone(),
1507                strategy: Sequential,
1508                partition: me.to_string(),
1509                mailbox_size: 1024,
1510                epoch: Epoch::new(333),
1511                leader_timeout: Duration::from_secs(1),
1512                certification_timeout: Duration::from_secs(2),
1513                timeout_retry: Duration::from_secs(10),
1514                fetch_timeout: Duration::from_secs(1),
1515                activity_timeout,
1516                skip_timeout,
1517                fetch_concurrent: 4,
1518                replay_buffer: NZUsize!(1024 * 1024),
1519                write_buffer: NZUsize!(1024 * 1024),
1520                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1521                forwarding: ForwardingPolicy::Disabled,
1522            };
1523            let engine = Engine::new(context.with_label("engine"), cfg);
1524
1525            // Start engine
1526            let (pending, recovered, resolver) = registrations
1527                .remove(&me)
1528                .expect("validator should be registered");
1529            engine_handlers.push(engine.start(pending, recovered, resolver));
1530
1531            // Wait for new engine to finalize required
1532            let (mut latest, mut monitor) = reporter.subscribe().await;
1533            while latest < required_containers {
1534                latest = monitor.recv().await.expect("event missing");
1535            }
1536
1537            // Ensure no blocked connections
1538            let blocked = oracle.blocked().await.unwrap();
1539            assert!(blocked.is_empty());
1540        });
1541    }
1542
1543    #[test_group("slow")]
1544    #[test_traced]
1545    fn test_backfill() {
1546        backfill::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1547        backfill::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1548        backfill::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1549        backfill::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1550        backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1551        backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1552        backfill::<_, _, RoundRobin>(ed25519::fixture);
1553        backfill::<_, _, RoundRobin>(secp256r1::fixture);
1554    }
1555
1556    fn one_offline<S, F, L>(mut fixture: F)
1557    where
1558        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1559        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1560        L: Elector<S>,
1561    {
1562        // Create context
1563        let n = 5;
1564        let quorum = quorum(n) as usize;
1565        let required_containers = View::new(100);
1566        let activity_timeout = ViewDelta::new(10);
1567        let skip_timeout = ViewDelta::new(5);
1568        let max_exceptions = 10;
1569        let namespace = b"consensus".to_vec();
1570        let executor = deterministic::Runner::timed(Duration::from_secs(300));
1571        executor.start(|mut context| async move {
1572            // Register participants
1573            let Fixture {
1574                participants,
1575                schemes,
1576                ..
1577            } = fixture(&mut context, &namespace, n);
1578            let mut oracle =
1579                start_test_network_with_peers(context.clone(), participants.clone(), true).await;
1580            let mut registrations = register_validators(&mut oracle, &participants).await;
1581
1582            // Link all validators except first
1583            let link = Link {
1584                latency: Duration::from_millis(10),
1585                jitter: Duration::from_millis(1),
1586                success_rate: 1.0,
1587            };
1588            link_validators(
1589                &mut oracle,
1590                &participants,
1591                Action::Link(link),
1592                Some(|_, i, j| ![i, j].contains(&0usize)),
1593            )
1594            .await;
1595
1596            // Create engines
1597            let elector = L::default();
1598            let relay = Arc::new(mocks::relay::Relay::new());
1599            let mut reporters = Vec::new();
1600            let mut engine_handlers = Vec::new();
1601            for (idx_scheme, validator) in participants.iter().enumerate() {
1602                // Skip first peer
1603                if idx_scheme == 0 {
1604                    continue;
1605                }
1606
1607                // Create scheme context
1608                let context = context.with_label(&format!("validator_{}", *validator));
1609
1610                // Configure engine
1611                let reporter_config = mocks::reporter::Config {
1612                    participants: participants.clone().try_into().unwrap(),
1613                    scheme: schemes[idx_scheme].clone(),
1614                    elector: elector.clone(),
1615                };
1616                let reporter =
1617                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1618                reporters.push(reporter.clone());
1619                let application_cfg = mocks::application::Config {
1620                    hasher: Sha256::default(),
1621                    relay: relay.clone(),
1622                    me: validator.clone(),
1623                    propose_latency: (10.0, 5.0),
1624                    verify_latency: (10.0, 5.0),
1625                    certify_latency: (10.0, 5.0),
1626                    should_certify: mocks::application::Certifier::Sometimes,
1627                };
1628                let (actor, application) = mocks::application::Application::new(
1629                    context.with_label("application"),
1630                    application_cfg,
1631                );
1632                actor.start();
1633                let blocker = oracle.control(validator.clone());
1634                let cfg = config::Config {
1635                    scheme: schemes[idx_scheme].clone(),
1636                    elector: elector.clone(),
1637                    blocker,
1638                    automaton: application.clone(),
1639                    relay: application.clone(),
1640                    reporter: reporter.clone(),
1641                    strategy: Sequential,
1642                    partition: validator.to_string(),
1643                    mailbox_size: 1024,
1644                    epoch: Epoch::new(333),
1645                    leader_timeout: Duration::from_secs(1),
1646                    certification_timeout: Duration::from_secs(2),
1647                    timeout_retry: Duration::from_secs(10),
1648                    fetch_timeout: Duration::from_secs(1),
1649                    activity_timeout,
1650                    skip_timeout,
1651                    fetch_concurrent: 4,
1652                    replay_buffer: NZUsize!(1024 * 1024),
1653                    write_buffer: NZUsize!(1024 * 1024),
1654                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1655                    forwarding: ForwardingPolicy::Disabled,
1656                };
1657                let engine = Engine::new(context.with_label("engine"), cfg);
1658
1659                // Start engine
1660                let (pending, recovered, resolver) = registrations
1661                    .remove(validator)
1662                    .expect("validator should be registered");
1663                engine_handlers.push(engine.start(pending, recovered, resolver));
1664            }
1665
1666            // Wait for all engines to finish
1667            let mut finalizers = Vec::new();
1668            for reporter in reporters.iter_mut() {
1669                let (mut latest, mut monitor) = reporter.subscribe().await;
1670                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1671                    while latest < required_containers {
1672                        latest = monitor.recv().await.expect("event missing");
1673                    }
1674                }));
1675            }
1676            join_all(finalizers).await;
1677
1678            // Check reporters for correct activity
1679            let exceptions = 0;
1680            let offline = &participants[0];
1681            for reporter in reporters.iter() {
1682                // Ensure no faults
1683                reporter.assert_no_faults();
1684
1685                // Ensure no invalid signatures
1686                reporter.assert_no_invalid();
1687
1688                // Ensure offline node is never active
1689                let mut exceptions = 0;
1690                {
1691                    let notarizes = reporter.notarizes.lock();
1692                    for (view, payloads) in notarizes.iter() {
1693                        for (_, participants) in payloads.iter() {
1694                            if participants.contains(offline) {
1695                                panic!("view: {view}");
1696                            }
1697                        }
1698                    }
1699                }
1700                {
1701                    let nullifies = reporter.nullifies.lock();
1702                    for (view, participants) in nullifies.iter() {
1703                        if participants.contains(offline) {
1704                            panic!("view: {view}");
1705                        }
1706                    }
1707                }
1708                {
1709                    let finalizes = reporter.finalizes.lock();
1710                    for (view, payloads) in finalizes.iter() {
1711                        for (_, finalizers) in payloads.iter() {
1712                            if finalizers.contains(offline) {
1713                                panic!("view: {view}");
1714                            }
1715                        }
1716                    }
1717                }
1718
1719                // Identify offline views
1720                let mut offline_views = Vec::new();
1721                {
1722                    let leaders = reporter.leaders.lock();
1723                    for (view, leader) in leaders.iter() {
1724                        if leader == offline {
1725                            offline_views.push(*view);
1726                        }
1727                    }
1728                }
1729                assert!(!offline_views.is_empty());
1730
1731                // Ensure nullifies/nullification collected for offline node
1732                {
1733                    let nullifies = reporter.nullifies.lock();
1734                    for view in offline_views.iter() {
1735                        let nullifies = nullifies.get(view).map_or(0, |n| n.len());
1736                        if nullifies < quorum {
1737                            warn!("missing expected view nullifies: {}", view);
1738                            exceptions += 1;
1739                        }
1740                    }
1741                }
1742                {
1743                    let nullifications = reporter.nullifications.lock();
1744                    for view in offline_views.iter() {
1745                        if !nullifications.contains_key(view) {
1746                            warn!("missing expected view nullifies: {}", view);
1747                            exceptions += 1;
1748                        }
1749                    }
1750                }
1751
1752                // Ensure exceptions within allowed
1753                assert!(exceptions <= max_exceptions);
1754            }
1755            assert!(exceptions <= max_exceptions);
1756
1757            // Ensure no blocked connections
1758            let blocked = oracle.blocked().await.unwrap();
1759            assert!(blocked.is_empty());
1760
1761            // Ensure online nodes are recording timeouts/nullifications for the offline leader
1762            let encoded = context.encode();
1763            let leader_label = format!("leader=\"{}\"", offline);
1764            assert!(
1765                count_nonzero_metric_lines(&encoded, &["_timeouts", &leader_label]) >= n - 1,
1766                "expected timeout metrics for offline leader"
1767            );
1768            assert_eq!(
1769                count_nonzero_metric_lines(&encoded, &["_nullifications", &leader_label]),
1770                n - 1,
1771                "expected all online nodes to record _nullifications for offline leader"
1772            );
1773        });
1774    }
1775
1776    #[test_group("slow")]
1777    #[test_traced]
1778    fn test_one_offline() {
1779        one_offline::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1780        one_offline::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1781        one_offline::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1782        one_offline::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1783        one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1784        one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1785        one_offline::<_, _, RoundRobin>(ed25519::fixture);
1786        one_offline::<_, _, RoundRobin>(secp256r1::fixture);
1787    }
1788
1789    fn slow_validator<S, F, L>(mut fixture: F)
1790    where
1791        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1792        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1793        L: Elector<S>,
1794    {
1795        // Create context
1796        let n = 5;
1797        let required_containers = View::new(50);
1798        let activity_timeout = ViewDelta::new(10);
1799        let skip_timeout = ViewDelta::new(5);
1800        let namespace = b"consensus".to_vec();
1801        let executor = deterministic::Runner::timed(Duration::from_secs(300));
1802        executor.start(|mut context| async move {
1803            // Register participants
1804            let Fixture {
1805                participants,
1806                schemes,
1807                ..
1808            } = fixture(&mut context, &namespace, n);
1809            let mut oracle =
1810                start_test_network_with_peers(context.clone(), participants.clone(), true).await;
1811            let mut registrations = register_validators(&mut oracle, &participants).await;
1812
1813            // Link all validators
1814            let link = Link {
1815                latency: Duration::from_millis(10),
1816                jitter: Duration::from_millis(1),
1817                success_rate: 1.0,
1818            };
1819            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1820
1821            // Create engines
1822            let elector = L::default();
1823            let relay = Arc::new(mocks::relay::Relay::new());
1824            let mut reporters = Vec::new();
1825            let mut engine_handlers = Vec::new();
1826            for (idx_scheme, validator) in participants.iter().enumerate() {
1827                // Create scheme context
1828                let context = context.with_label(&format!("validator_{}", *validator));
1829
1830                // Configure engine
1831                let reporter_config = mocks::reporter::Config {
1832                    participants: participants.clone().try_into().unwrap(),
1833                    scheme: schemes[idx_scheme].clone(),
1834                    elector: elector.clone(),
1835                };
1836                let reporter =
1837                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1838                reporters.push(reporter.clone());
1839                let application_cfg = if idx_scheme == 0 {
1840                    mocks::application::Config {
1841                        hasher: Sha256::default(),
1842                        relay: relay.clone(),
1843                        me: validator.clone(),
1844                        propose_latency: (10_000.0, 0.0),
1845                        verify_latency: (10_000.0, 5.0),
1846                        certify_latency: (10_000.0, 5.0),
1847                        should_certify: mocks::application::Certifier::Sometimes,
1848                    }
1849                } else {
1850                    mocks::application::Config {
1851                        hasher: Sha256::default(),
1852                        relay: relay.clone(),
1853                        me: validator.clone(),
1854                        propose_latency: (10.0, 5.0),
1855                        verify_latency: (10.0, 5.0),
1856                        certify_latency: (10.0, 5.0),
1857                        should_certify: mocks::application::Certifier::Sometimes,
1858                    }
1859                };
1860                let (actor, application) = mocks::application::Application::new(
1861                    context.with_label("application"),
1862                    application_cfg,
1863                );
1864                actor.start();
1865                let blocker = oracle.control(validator.clone());
1866                let cfg = config::Config {
1867                    scheme: schemes[idx_scheme].clone(),
1868                    elector: elector.clone(),
1869                    blocker,
1870                    automaton: application.clone(),
1871                    relay: application.clone(),
1872                    reporter: reporter.clone(),
1873                    strategy: Sequential,
1874                    partition: validator.to_string(),
1875                    mailbox_size: 1024,
1876                    epoch: Epoch::new(333),
1877                    leader_timeout: Duration::from_secs(1),
1878                    certification_timeout: Duration::from_secs(2),
1879                    timeout_retry: Duration::from_secs(10),
1880                    fetch_timeout: Duration::from_secs(1),
1881                    activity_timeout,
1882                    skip_timeout,
1883                    fetch_concurrent: 4,
1884                    replay_buffer: NZUsize!(1024 * 1024),
1885                    write_buffer: NZUsize!(1024 * 1024),
1886                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1887                    forwarding: ForwardingPolicy::Disabled,
1888                };
1889                let engine = Engine::new(context.with_label("engine"), cfg);
1890
1891                // Start engine
1892                let (pending, recovered, resolver) = registrations
1893                    .remove(validator)
1894                    .expect("validator should be registered");
1895                engine_handlers.push(engine.start(pending, recovered, resolver));
1896            }
1897
1898            // Wait for all engines to finish
1899            let mut finalizers = Vec::new();
1900            for reporter in reporters.iter_mut() {
1901                let (mut latest, mut monitor) = reporter.subscribe().await;
1902                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1903                    while latest < required_containers {
1904                        latest = monitor.recv().await.expect("event missing");
1905                    }
1906                }));
1907            }
1908            join_all(finalizers).await;
1909
1910            // Check reporters for correct activity
1911            let slow = &participants[0];
1912            for reporter in reporters.iter() {
1913                // Ensure no faults
1914                reporter.assert_no_faults();
1915
1916                // Ensure no invalid signatures
1917                reporter.assert_no_invalid();
1918
1919                // Ensure the slow validator never emits notarize or finalize
1920                // votes. It may still emit nullifies after timing out.
1921                {
1922                    let notarizes = reporter.notarizes.lock();
1923                    assert!(notarizes.values().all(|payloads| {
1924                        payloads
1925                            .values()
1926                            .all(|participants| !participants.contains(slow))
1927                    }));
1928                }
1929                {
1930                    let finalizes = reporter.finalizes.lock();
1931                    assert!(finalizes.values().all(|payloads| {
1932                        payloads
1933                            .values()
1934                            .all(|participants| !participants.contains(slow))
1935                    }));
1936                }
1937
1938                // Ensure every reporter observes finalization progress to at least the target view.
1939                {
1940                    let finalizations = reporter.finalizations.lock();
1941                    assert!(finalizations
1942                        .keys()
1943                        .any(|view| *view >= required_containers));
1944                }
1945            }
1946
1947            // Ensure no blocked connections
1948            let blocked = oracle.blocked().await.unwrap();
1949            assert!(blocked.is_empty());
1950        });
1951    }
1952
1953    #[test_group("slow")]
1954    #[test_traced]
1955    fn test_slow_validator() {
1956        slow_validator::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1957        slow_validator::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1958        slow_validator::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1959        slow_validator::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1960        slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1961        slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1962        slow_validator::<_, _, RoundRobin>(ed25519::fixture);
1963        slow_validator::<_, _, RoundRobin>(secp256r1::fixture);
1964    }
1965
1966    fn all_recovery<S, F, L>(mut fixture: F)
1967    where
1968        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1969        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1970        L: Elector<S>,
1971    {
1972        // Create context
1973        let n = 5;
1974        let required_containers = View::new(100);
1975        let activity_timeout = ViewDelta::new(10);
1976        let skip_timeout = ViewDelta::new(2);
1977        let namespace = b"consensus".to_vec();
1978        let executor = deterministic::Runner::timed(Duration::from_secs(1800));
1979        executor.start(|mut context| async move {
1980            // Register participants
1981            let Fixture {
1982                participants,
1983                schemes,
1984                ..
1985            } = fixture(&mut context, &namespace, n);
1986            let mut oracle =
1987                start_test_network_with_peers(context.clone(), participants.clone(), true).await;
1988            let mut registrations = register_validators(&mut oracle, &participants).await;
1989
1990            // Link all validators
1991            let link = Link {
1992                latency: Duration::from_secs(3),
1993                jitter: Duration::from_millis(0),
1994                success_rate: 1.0,
1995            };
1996            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1997
1998            // Create engines
1999            let elector = L::default();
2000            let relay = Arc::new(mocks::relay::Relay::new());
2001            let mut reporters = Vec::new();
2002            let mut engine_handlers = Vec::new();
2003            for (idx, validator) in participants.iter().enumerate() {
2004                // Create scheme context
2005                let context = context.with_label(&format!("validator_{}", *validator));
2006
2007                // Configure engine
2008                let reporter_config = mocks::reporter::Config {
2009                    participants: participants.clone().try_into().unwrap(),
2010                    scheme: schemes[idx].clone(),
2011                    elector: elector.clone(),
2012                };
2013                let reporter =
2014                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2015                reporters.push(reporter.clone());
2016                let application_cfg = mocks::application::Config {
2017                    hasher: Sha256::default(),
2018                    relay: relay.clone(),
2019                    me: validator.clone(),
2020                    propose_latency: (10.0, 5.0),
2021                    verify_latency: (10.0, 5.0),
2022                    certify_latency: (10.0, 5.0),
2023                    should_certify: mocks::application::Certifier::Sometimes,
2024                };
2025                let (actor, application) = mocks::application::Application::new(
2026                    context.with_label("application"),
2027                    application_cfg,
2028                );
2029                actor.start();
2030                let blocker = oracle.control(validator.clone());
2031                let cfg = config::Config {
2032                    scheme: schemes[idx].clone(),
2033                    elector: elector.clone(),
2034                    blocker,
2035                    automaton: application.clone(),
2036                    relay: application.clone(),
2037                    reporter: reporter.clone(),
2038                    strategy: Sequential,
2039                    partition: validator.to_string(),
2040                    mailbox_size: 1024,
2041                    epoch: Epoch::new(333),
2042                    leader_timeout: Duration::from_secs(1),
2043                    certification_timeout: Duration::from_secs(2),
2044                    timeout_retry: Duration::from_secs(10),
2045                    fetch_timeout: Duration::from_secs(1),
2046                    activity_timeout,
2047                    skip_timeout,
2048                    fetch_concurrent: 4,
2049                    replay_buffer: NZUsize!(1024 * 1024),
2050                    write_buffer: NZUsize!(1024 * 1024),
2051                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2052                    forwarding: ForwardingPolicy::Disabled,
2053                };
2054                let engine = Engine::new(context.with_label("engine"), cfg);
2055
2056                // Start engine
2057                let (pending, recovered, resolver) = registrations
2058                    .remove(validator)
2059                    .expect("validator should be registered");
2060                engine_handlers.push(engine.start(pending, recovered, resolver));
2061            }
2062
2063            // Wait for a few virtual minutes (shouldn't finalize anything)
2064            let mut finalizers = Vec::new();
2065            for reporter in reporters.iter_mut() {
2066                let (_, mut monitor) = reporter.subscribe().await;
2067                finalizers.push(
2068                    context
2069                        .with_label("finalizer")
2070                        .spawn(move |context| async move {
2071                            select! {
2072                                _timeout = context.sleep(Duration::from_secs(60)) => {},
2073                                _done = monitor.recv() => {
2074                                    panic!("engine should not notarize or finalize anything");
2075                                },
2076                            }
2077                        }),
2078                );
2079            }
2080            join_all(finalizers).await;
2081
2082            // Unlink all validators to get latest view
2083            link_validators(&mut oracle, &participants, Action::Unlink, None).await;
2084
2085            // Wait for a virtual minute (nothing should happen)
2086            context.sleep(Duration::from_secs(60)).await;
2087
2088            // Get latest view
2089            let mut latest = View::zero();
2090            for reporter in reporters.iter() {
2091                let nullifies = reporter.nullifies.lock();
2092                let max = nullifies.keys().max().unwrap();
2093                if *max > latest {
2094                    latest = *max;
2095                }
2096            }
2097
2098            // Update links
2099            let link = Link {
2100                latency: Duration::from_millis(10),
2101                jitter: Duration::from_millis(1),
2102                success_rate: 1.0,
2103            };
2104            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2105
2106            // Wait for all engines to finish
2107            let mut finalizers = Vec::new();
2108            for reporter in reporters.iter_mut() {
2109                let (mut latest, mut monitor) = reporter.subscribe().await;
2110                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2111                    while latest < required_containers {
2112                        latest = monitor.recv().await.expect("event missing");
2113                    }
2114                }));
2115            }
2116            join_all(finalizers).await;
2117
2118            // Check reporters for correct activity
2119            for reporter in reporters.iter() {
2120                // Ensure no faults
2121                reporter.assert_no_faults();
2122
2123                // Ensure no invalid signatures
2124                reporter.assert_no_invalid();
2125
2126                // Ensure quick recovery.
2127                //
2128                // If the skip timeout isn't implemented correctly, we may go many views before participants
2129                // start to notarize a validator's proposal.
2130                {
2131                    // Ensure nearly all views around latest are notarized.
2132                    // We don't check for finalization since some of the blocks may fail to be
2133                    // certified for the purposes of testing.
2134                    let mut found = 0;
2135                    let notarizations = reporter.notarizations.lock();
2136                    for view in View::range(latest, latest.saturating_add(activity_timeout)) {
2137                        if notarizations.contains_key(&view) {
2138                            found += 1;
2139                        }
2140                    }
2141                    assert!(
2142                        found >= activity_timeout.get().saturating_sub(2),
2143                        "found: {found}"
2144                    );
2145                }
2146            }
2147
2148            // Ensure no blocked connections
2149            let blocked = oracle.blocked().await.unwrap();
2150            assert!(blocked.is_empty());
2151        });
2152    }
2153
2154    #[test_group("slow")]
2155    #[test_traced]
2156    fn test_all_recovery() {
2157        all_recovery::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
2158        all_recovery::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
2159        all_recovery::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
2160        all_recovery::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
2161        all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2162        all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2163        all_recovery::<_, _, RoundRobin>(ed25519::fixture);
2164        all_recovery::<_, _, RoundRobin>(secp256r1::fixture);
2165    }
2166
2167    fn partition<S, F, L>(mut fixture: F)
2168    where
2169        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2170        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2171        L: Elector<S>,
2172    {
2173        // Create context
2174        let n = 10;
2175        let required_containers = View::new(50);
2176        let activity_timeout = ViewDelta::new(10);
2177        let skip_timeout = ViewDelta::new(5);
2178        let namespace = b"consensus".to_vec();
2179        let executor = deterministic::Runner::timed(Duration::from_secs(900));
2180        executor.start(|mut context| async move {
2181            // Register participants
2182            let Fixture {
2183                participants,
2184                schemes,
2185                ..
2186            } = fixture(&mut context, &namespace, n);
2187            let mut oracle =
2188                start_test_network_with_peers(context.clone(), participants.clone(), true).await;
2189            let mut registrations = register_validators(&mut oracle, &participants).await;
2190
2191            // Link all validators
2192            let link = Link {
2193                latency: Duration::from_millis(10),
2194                jitter: Duration::from_millis(1),
2195                success_rate: 1.0,
2196            };
2197            link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
2198
2199            // Create engines
2200            let elector = L::default();
2201            let relay = Arc::new(mocks::relay::Relay::new());
2202            let mut reporters = Vec::new();
2203            let mut engine_handlers = Vec::new();
2204            for (idx, validator) in participants.iter().enumerate() {
2205                // Create scheme context
2206                let context = context.with_label(&format!("validator_{}", *validator));
2207
2208                // Configure engine
2209                let reporter_config = mocks::reporter::Config {
2210                    participants: participants.clone().try_into().unwrap(),
2211                    scheme: schemes[idx].clone(),
2212                    elector: elector.clone(),
2213                };
2214                let reporter =
2215                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2216                reporters.push(reporter.clone());
2217                let application_cfg = mocks::application::Config {
2218                    hasher: Sha256::default(),
2219                    relay: relay.clone(),
2220                    me: validator.clone(),
2221                    propose_latency: (10.0, 5.0),
2222                    verify_latency: (10.0, 5.0),
2223                    certify_latency: (10.0, 5.0),
2224                    should_certify: mocks::application::Certifier::Sometimes,
2225                };
2226                let (actor, application) = mocks::application::Application::new(
2227                    context.with_label("application"),
2228                    application_cfg,
2229                );
2230                actor.start();
2231                let blocker = oracle.control(validator.clone());
2232                let cfg = config::Config {
2233                    scheme: schemes[idx].clone(),
2234                    elector: elector.clone(),
2235                    blocker,
2236                    automaton: application.clone(),
2237                    relay: application.clone(),
2238                    reporter: reporter.clone(),
2239                    strategy: Sequential,
2240                    partition: validator.to_string(),
2241                    mailbox_size: 1024,
2242                    epoch: Epoch::new(333),
2243                    leader_timeout: Duration::from_secs(1),
2244                    certification_timeout: Duration::from_secs(2),
2245                    timeout_retry: Duration::from_secs(10),
2246                    fetch_timeout: Duration::from_secs(1),
2247                    activity_timeout,
2248                    skip_timeout,
2249                    fetch_concurrent: 4,
2250                    replay_buffer: NZUsize!(1024 * 1024),
2251                    write_buffer: NZUsize!(1024 * 1024),
2252                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2253                    forwarding: ForwardingPolicy::Disabled,
2254                };
2255                let engine = Engine::new(context.with_label("engine"), cfg);
2256
2257                // Start engine
2258                let (pending, recovered, resolver) = registrations
2259                    .remove(validator)
2260                    .expect("validator should be registered");
2261                engine_handlers.push(engine.start(pending, recovered, resolver));
2262            }
2263
2264            // Wait for all engines to finish
2265            let mut finalizers = Vec::new();
2266            for reporter in reporters.iter_mut() {
2267                let (mut latest, mut monitor) = reporter.subscribe().await;
2268                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2269                    while latest < required_containers {
2270                        latest = monitor.recv().await.expect("event missing");
2271                    }
2272                }));
2273            }
2274            join_all(finalizers).await;
2275
2276            // Cut all links between validator halves
2277            fn separated(n: usize, a: usize, b: usize) -> bool {
2278                let m = n / 2;
2279                (a < m && b >= m) || (a >= m && b < m)
2280            }
2281            link_validators(&mut oracle, &participants, Action::Unlink, Some(separated)).await;
2282
2283            // Wait for any in-progress notarizations/finalizations to finish
2284            context.sleep(Duration::from_secs(10)).await;
2285
2286            // Wait for a few virtual minutes (shouldn't finalize anything)
2287            let mut finalizers = Vec::new();
2288            for reporter in reporters.iter_mut() {
2289                let (_, mut monitor) = reporter.subscribe().await;
2290                finalizers.push(
2291                    context
2292                        .with_label("finalizer")
2293                        .spawn(move |context| async move {
2294                            select! {
2295                                _timeout = context.sleep(Duration::from_secs(60)) => {},
2296                                _done = monitor.recv() => {
2297                                    panic!("engine should not notarize or finalize anything");
2298                                },
2299                            }
2300                        }),
2301                );
2302            }
2303            join_all(finalizers).await;
2304
2305            // Restore links
2306            link_validators(
2307                &mut oracle,
2308                &participants,
2309                Action::Link(link),
2310                Some(separated),
2311            )
2312            .await;
2313
2314            // Wait for all engines to finish
2315            let mut finalizers = Vec::new();
2316            for reporter in reporters.iter_mut() {
2317                let (mut latest, mut monitor) = reporter.subscribe().await;
2318                let required = latest.saturating_add(ViewDelta::new(required_containers.get()));
2319                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2320                    while latest < required {
2321                        latest = monitor.recv().await.expect("event missing");
2322                    }
2323                }));
2324            }
2325            join_all(finalizers).await;
2326
2327            // Check reporters for correct activity
2328            for reporter in reporters.iter() {
2329                // Ensure no faults
2330                reporter.assert_no_faults();
2331
2332                // Ensure no invalid signatures
2333                reporter.assert_no_invalid();
2334            }
2335
2336            // Ensure no blocked connections
2337            let blocked = oracle.blocked().await.unwrap();
2338            assert!(blocked.is_empty());
2339        });
2340    }
2341
2342    #[test_group("slow")]
2343    #[test_traced]
2344    fn test_partition() {
2345        partition::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
2346        partition::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
2347        partition::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
2348        partition::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
2349        partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2350        partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2351        partition::<_, _, RoundRobin>(ed25519::fixture);
2352        partition::<_, _, RoundRobin>(secp256r1::fixture);
2353    }
2354
2355    fn slow_and_lossy_links<S, F, L>(seed: u64, mut fixture: F) -> String
2356    where
2357        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2358        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2359        L: Elector<S>,
2360    {
2361        // Create context
2362        let n = 5;
2363        let required_containers = View::new(50);
2364        let activity_timeout = ViewDelta::new(10);
2365        let skip_timeout = ViewDelta::new(5);
2366        let namespace = b"consensus".to_vec();
2367        let cfg = deterministic::Config::new()
2368            .with_seed(seed)
2369            .with_timeout(Some(Duration::from_secs(5_000)));
2370        let executor = deterministic::Runner::new(cfg);
2371        executor.start(|mut context| async move {
2372            // Register participants
2373            let Fixture {
2374                participants,
2375                schemes,
2376                ..
2377            } = fixture(&mut context, &namespace, n);
2378            let mut oracle =
2379                start_test_network_with_peers(context.clone(), participants.clone(), true).await;
2380            let mut registrations = register_validators(&mut oracle, &participants).await;
2381
2382            // Link all validators
2383            let degraded_link = Link {
2384                latency: Duration::from_millis(200),
2385                jitter: Duration::from_millis(150),
2386                success_rate: 0.5,
2387            };
2388            link_validators(
2389                &mut oracle,
2390                &participants,
2391                Action::Link(degraded_link),
2392                None,
2393            )
2394            .await;
2395
2396            // Create engines
2397            let elector = L::default();
2398            let relay = Arc::new(mocks::relay::Relay::new());
2399            let mut reporters = Vec::new();
2400            let mut engine_handlers = Vec::new();
2401            for (idx, validator) in participants.iter().enumerate() {
2402                // Create scheme context
2403                let context = context.with_label(&format!("validator_{}", *validator));
2404
2405                // Configure engine
2406                let reporter_config = mocks::reporter::Config {
2407                    participants: participants.clone().try_into().unwrap(),
2408                    scheme: schemes[idx].clone(),
2409                    elector: elector.clone(),
2410                };
2411                let reporter =
2412                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2413                reporters.push(reporter.clone());
2414                let application_cfg = mocks::application::Config {
2415                    hasher: Sha256::default(),
2416                    relay: relay.clone(),
2417                    me: validator.clone(),
2418                    propose_latency: (10.0, 5.0),
2419                    verify_latency: (10.0, 5.0),
2420                    certify_latency: (10.0, 5.0),
2421                    should_certify: mocks::application::Certifier::Sometimes,
2422                };
2423                let (actor, application) = mocks::application::Application::new(
2424                    context.with_label("application"),
2425                    application_cfg,
2426                );
2427                actor.start();
2428                let blocker = oracle.control(validator.clone());
2429                let cfg = config::Config {
2430                    scheme: schemes[idx].clone(),
2431                    elector: elector.clone(),
2432                    blocker,
2433                    automaton: application.clone(),
2434                    relay: application.clone(),
2435                    reporter: reporter.clone(),
2436                    strategy: Sequential,
2437                    partition: validator.to_string(),
2438                    mailbox_size: 1024,
2439                    epoch: Epoch::new(333),
2440                    leader_timeout: Duration::from_secs(1),
2441                    certification_timeout: Duration::from_secs(2),
2442                    timeout_retry: Duration::from_secs(10),
2443                    fetch_timeout: Duration::from_secs(1),
2444                    activity_timeout,
2445                    skip_timeout,
2446                    fetch_concurrent: 4,
2447                    replay_buffer: NZUsize!(1024 * 1024),
2448                    write_buffer: NZUsize!(1024 * 1024),
2449                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2450                    forwarding: ForwardingPolicy::Disabled,
2451                };
2452                let engine = Engine::new(context.with_label("engine"), cfg);
2453
2454                // Start engine
2455                let (pending, recovered, resolver) = registrations
2456                    .remove(validator)
2457                    .expect("validator should be registered");
2458                engine_handlers.push(engine.start(pending, recovered, resolver));
2459            }
2460
2461            // Wait for all engines to finish
2462            let mut finalizers = Vec::new();
2463            for reporter in reporters.iter_mut() {
2464                let (mut latest, mut monitor) = reporter.subscribe().await;
2465                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2466                    while latest < required_containers {
2467                        latest = monitor.recv().await.expect("event missing");
2468                    }
2469                }));
2470            }
2471            join_all(finalizers).await;
2472
2473            // Check reporters for correct activity
2474            for reporter in reporters.iter() {
2475                // Ensure no faults
2476                reporter.assert_no_faults();
2477
2478                // Ensure no invalid signatures
2479                reporter.assert_no_invalid();
2480            }
2481
2482            // Ensure no blocked connections
2483            let blocked = oracle.blocked().await.unwrap();
2484            assert!(blocked.is_empty());
2485
2486            context.auditor().state()
2487        })
2488    }
2489
2490    #[test_group("slow")]
2491    #[test_traced]
2492    fn test_slow_and_lossy_links() {
2493        slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold_vrf::fixture::<MinPk, _>);
2494        slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold_vrf::fixture::<MinSig, _>);
2495        slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_threshold_std::fixture::<MinPk, _>);
2496        slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_threshold_std::fixture::<MinSig, _>);
2497        slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinPk, _>);
2498        slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinSig, _>);
2499        slow_and_lossy_links::<_, _, RoundRobin>(0, ed25519::fixture);
2500        slow_and_lossy_links::<_, _, RoundRobin>(0, secp256r1::fixture);
2501    }
2502
2503    #[test_group("slow")]
2504    #[test_traced]
2505    fn test_determinism() {
2506        // We use slow and lossy links as the deterministic test
2507        // because it is the most complex test.
2508        for seed in 1..6 {
2509            let ts_vrf_pk_state_1 = slow_and_lossy_links::<_, _, Random>(
2510                seed,
2511                bls12381_threshold_vrf::fixture::<MinPk, _>,
2512            );
2513            let ts_vrf_pk_state_2 = slow_and_lossy_links::<_, _, Random>(
2514                seed,
2515                bls12381_threshold_vrf::fixture::<MinPk, _>,
2516            );
2517            assert_eq!(ts_vrf_pk_state_1, ts_vrf_pk_state_2);
2518
2519            let ts_vrf_sig_state_1 = slow_and_lossy_links::<_, _, Random>(
2520                seed,
2521                bls12381_threshold_vrf::fixture::<MinSig, _>,
2522            );
2523            let ts_vrf_sig_state_2 = slow_and_lossy_links::<_, _, Random>(
2524                seed,
2525                bls12381_threshold_vrf::fixture::<MinSig, _>,
2526            );
2527            assert_eq!(ts_vrf_sig_state_1, ts_vrf_sig_state_2);
2528
2529            let ts_std_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2530                seed,
2531                bls12381_threshold_std::fixture::<MinPk, _>,
2532            );
2533            let ts_std_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2534                seed,
2535                bls12381_threshold_std::fixture::<MinPk, _>,
2536            );
2537            assert_eq!(ts_std_pk_state_1, ts_std_pk_state_2);
2538
2539            let ts_std_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2540                seed,
2541                bls12381_threshold_std::fixture::<MinSig, _>,
2542            );
2543            let ts_std_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2544                seed,
2545                bls12381_threshold_std::fixture::<MinSig, _>,
2546            );
2547            assert_eq!(ts_std_sig_state_1, ts_std_sig_state_2);
2548
2549            let ms_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2550                seed,
2551                bls12381_multisig::fixture::<MinPk, _>,
2552            );
2553            let ms_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2554                seed,
2555                bls12381_multisig::fixture::<MinPk, _>,
2556            );
2557            assert_eq!(ms_pk_state_1, ms_pk_state_2);
2558
2559            let ms_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2560                seed,
2561                bls12381_multisig::fixture::<MinSig, _>,
2562            );
2563            let ms_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2564                seed,
2565                bls12381_multisig::fixture::<MinSig, _>,
2566            );
2567            assert_eq!(ms_sig_state_1, ms_sig_state_2);
2568
2569            let ed_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
2570            let ed_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
2571            assert_eq!(ed_state_1, ed_state_2);
2572
2573            let secp_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
2574            let secp_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
2575            assert_eq!(secp_state_1, secp_state_2);
2576
2577            let states = [
2578                ("threshold-vrf-minpk", ts_vrf_pk_state_1),
2579                ("threshold-vrf-minsig", ts_vrf_sig_state_1),
2580                ("threshold-std-minpk", ts_std_pk_state_1),
2581                ("threshold-std-minsig", ts_std_sig_state_1),
2582                ("multisig-minpk", ms_pk_state_1),
2583                ("multisig-minsig", ms_sig_state_1),
2584                ("ed25519", ed_state_1),
2585                ("secp256r1", secp_state_1),
2586            ];
2587
2588            // Sanity check that different types can't be identical
2589            for pair in states.windows(2) {
2590                assert_ne!(
2591                    pair[0].1, pair[1].1,
2592                    "state {} equals state {}",
2593                    pair[0].0, pair[1].0
2594                );
2595            }
2596        }
2597    }
2598
2599    fn conflicter<S, F, L>(seed: u64, mut fixture: F)
2600    where
2601        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2602        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2603        L: Elector<S>,
2604    {
2605        // Create context
2606        let n = 4;
2607        let required_containers = View::new(50);
2608        let activity_timeout = ViewDelta::new(10);
2609        let skip_timeout = ViewDelta::new(5);
2610        let namespace = b"consensus".to_vec();
2611        let cfg = deterministic::Config::new()
2612            .with_seed(seed)
2613            .with_timeout(Some(Duration::from_secs(30)));
2614        let executor = deterministic::Runner::new(cfg);
2615        executor.start(|mut context| async move {
2616            // Register participants
2617            let Fixture {
2618                participants,
2619                schemes,
2620                ..
2621            } = fixture(&mut context, &namespace, n);
2622            let mut oracle =
2623                start_test_network_with_peers(context.clone(), participants.clone(), true).await;
2624            let mut registrations = register_validators(&mut oracle, &participants).await;
2625
2626            // Link all validators
2627            let link = Link {
2628                latency: Duration::from_millis(10),
2629                jitter: Duration::from_millis(1),
2630                success_rate: 1.0,
2631            };
2632            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2633
2634            // Create engines
2635            let elector = L::default();
2636            let relay = Arc::new(mocks::relay::Relay::new());
2637            let mut reporters = Vec::new();
2638            for (idx_scheme, validator) in participants.iter().enumerate() {
2639                // Create scheme context
2640                let context = context.with_label(&format!("validator_{}", *validator));
2641
2642                // Start engine
2643                let reporter_config = mocks::reporter::Config {
2644                    participants: participants.clone().try_into().unwrap(),
2645                    scheme: schemes[idx_scheme].clone(),
2646                    elector: elector.clone(),
2647                };
2648                let reporter =
2649                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2650                let (pending, recovered, resolver) = registrations
2651                    .remove(validator)
2652                    .expect("validator should be registered");
2653                if idx_scheme == 0 {
2654                    let cfg = mocks::conflicter::Config {
2655                        scheme: schemes[idx_scheme].clone(),
2656                    };
2657
2658                    let engine: mocks::conflicter::Conflicter<_, _, Sha256> =
2659                        mocks::conflicter::Conflicter::new(
2660                            context.with_label("byzantine_engine"),
2661                            cfg,
2662                        );
2663                    engine.start(pending);
2664                } else {
2665                    reporters.push(reporter.clone());
2666                    let application_cfg = mocks::application::Config {
2667                        hasher: Sha256::default(),
2668                        relay: relay.clone(),
2669                        me: validator.clone(),
2670                        propose_latency: (10.0, 5.0),
2671                        verify_latency: (10.0, 5.0),
2672                        certify_latency: (10.0, 5.0),
2673                        should_certify: mocks::application::Certifier::Sometimes,
2674                    };
2675                    let (actor, application) = mocks::application::Application::new(
2676                        context.with_label("application"),
2677                        application_cfg,
2678                    );
2679                    actor.start();
2680                    let blocker = oracle.control(validator.clone());
2681                    let cfg = config::Config {
2682                        scheme: schemes[idx_scheme].clone(),
2683                        elector: elector.clone(),
2684                        blocker,
2685                        automaton: application.clone(),
2686                        relay: application.clone(),
2687                        reporter: reporter.clone(),
2688                        strategy: Sequential,
2689                        partition: validator.to_string(),
2690                        mailbox_size: 1024,
2691                        epoch: Epoch::new(333),
2692                        leader_timeout: Duration::from_secs(1),
2693                        certification_timeout: Duration::from_secs(2),
2694                        timeout_retry: Duration::from_secs(10),
2695                        fetch_timeout: Duration::from_secs(1),
2696                        activity_timeout,
2697                        skip_timeout,
2698                        fetch_concurrent: 4,
2699                        replay_buffer: NZUsize!(1024 * 1024),
2700                        write_buffer: NZUsize!(1024 * 1024),
2701                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2702                        forwarding: ForwardingPolicy::Disabled,
2703                    };
2704                    let engine = Engine::new(context.with_label("engine"), cfg);
2705                    engine.start(pending, recovered, resolver);
2706                }
2707            }
2708
2709            // Wait for all engines to finish
2710            let mut finalizers = Vec::new();
2711            for reporter in reporters.iter_mut() {
2712                let (mut latest, mut monitor) = reporter.subscribe().await;
2713                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2714                    while latest < required_containers {
2715                        latest = monitor.recv().await.expect("event missing");
2716                    }
2717                }));
2718            }
2719            join_all(finalizers).await;
2720
2721            // Check reporters for correct activity
2722            let byz = &participants[0];
2723            let mut count_conflicting = 0;
2724            for reporter in reporters.iter() {
2725                // Ensure only faults for byz
2726                {
2727                    let faults = reporter.faults.lock();
2728                    assert_eq!(faults.len(), 1);
2729                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
2730                    for (_, faults) in faulter.iter() {
2731                        for fault in faults.iter() {
2732                            match fault {
2733                                Activity::ConflictingNotarize(_) => {
2734                                    count_conflicting += 1;
2735                                }
2736                                Activity::ConflictingFinalize(_) => {
2737                                    count_conflicting += 1;
2738                                }
2739                                _ => panic!("unexpected fault: {fault:?}"),
2740                            }
2741                        }
2742                    }
2743                }
2744
2745                // Ensure no invalid signatures
2746                reporter.assert_no_invalid();
2747            }
2748            assert!(count_conflicting > 0);
2749
2750            // Ensure conflicter is blocked
2751            let blocked = oracle.blocked().await.unwrap();
2752            assert!(!blocked.is_empty());
2753            for (a, b) in blocked {
2754                assert_ne!(&a, byz);
2755                assert_eq!(&b, byz);
2756            }
2757        });
2758    }
2759
2760    #[test_group("slow")]
2761    #[test_traced]
2762    fn test_conflicter() {
2763        for seed in 0..5 {
2764            conflicter::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
2765            conflicter::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
2766            conflicter::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
2767            conflicter::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
2768            conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
2769            conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
2770            conflicter::<_, _, RoundRobin>(seed, ed25519::fixture);
2771            conflicter::<_, _, RoundRobin>(seed, secp256r1::fixture);
2772        }
2773    }
2774
2775    fn invalid<S, F, L>(seed: u64, mut fixture: F)
2776    where
2777        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2778        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2779        L: Elector<S>,
2780    {
2781        // Create context
2782        let n = 4;
2783        let required_containers = View::new(50);
2784        let activity_timeout = ViewDelta::new(10);
2785        let skip_timeout = ViewDelta::new(5);
2786        let namespace = b"consensus".to_vec();
2787        let cfg = deterministic::Config::new()
2788            .with_seed(seed)
2789            .with_timeout(Some(Duration::from_secs(30)));
2790        let executor = deterministic::Runner::new(cfg);
2791        executor.start(|mut context| async move {
2792            // Register participants
2793            let Fixture {
2794                participants,
2795                schemes,
2796                ..
2797            } = fixture(&mut context, &namespace, n);
2798
2799            let schemes: Vec<_> = schemes
2800                .into_iter()
2801                .enumerate()
2802                .map(|(idx, scheme)| {
2803                    let is_byzantine = idx == 0;
2804                    let behavior = if is_byzantine {
2805                        wrapped::Behavior::CorruptSignature
2806                    } else {
2807                        wrapped::Behavior::Honest
2808                    };
2809                    wrapped::Scheme::new(scheme, behavior)
2810                })
2811                .collect();
2812
2813            let mut oracle =
2814                start_test_network_with_peers(context.clone(), participants.clone(), true).await;
2815            let mut registrations = register_validators(&mut oracle, &participants).await;
2816
2817            // Link all validators
2818            let link = Link {
2819                latency: Duration::from_millis(10),
2820                jitter: Duration::from_millis(1),
2821                success_rate: 1.0,
2822            };
2823            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2824
2825            // Create engines
2826            let elector = wrapped::Config(L::default());
2827            let relay = Arc::new(mocks::relay::Relay::new());
2828            let mut reporters = Vec::new();
2829            for (idx_scheme, validator) in participants.iter().enumerate() {
2830                // Create scheme context
2831                let context = context.with_label(&format!("validator_{}", *validator));
2832
2833                let reporter_config = mocks::reporter::Config {
2834                    participants: participants.clone().try_into().unwrap(),
2835                    scheme: schemes[idx_scheme].clone(),
2836                    elector: elector.clone(),
2837                };
2838                let reporter =
2839                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2840                reporters.push(reporter.clone());
2841
2842                let application_cfg = mocks::application::Config {
2843                    hasher: Sha256::default(),
2844                    relay: relay.clone(),
2845                    me: validator.clone(),
2846                    propose_latency: (10.0, 5.0),
2847                    verify_latency: (10.0, 5.0),
2848                    certify_latency: (10.0, 5.0),
2849                    should_certify: mocks::application::Certifier::Sometimes,
2850                };
2851                let (actor, application) = mocks::application::Application::new(
2852                    context.with_label("application"),
2853                    application_cfg,
2854                );
2855                actor.start();
2856                let blocker = oracle.control(validator.clone());
2857                let cfg = config::Config {
2858                    scheme: schemes[idx_scheme].clone(),
2859                    elector: elector.clone(),
2860                    blocker,
2861                    automaton: application.clone(),
2862                    relay: application.clone(),
2863                    reporter: reporter.clone(),
2864                    strategy: Sequential,
2865                    partition: validator.clone().to_string(),
2866                    mailbox_size: 1024,
2867                    epoch: Epoch::new(333),
2868                    leader_timeout: Duration::from_secs(1),
2869                    certification_timeout: Duration::from_secs(2),
2870                    timeout_retry: Duration::from_secs(10),
2871                    fetch_timeout: Duration::from_secs(1),
2872                    activity_timeout,
2873                    skip_timeout,
2874                    fetch_concurrent: 4,
2875                    replay_buffer: NZUsize!(1024 * 1024),
2876                    write_buffer: NZUsize!(1024 * 1024),
2877                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2878                    forwarding: ForwardingPolicy::Disabled,
2879                };
2880                let engine = Engine::new(context.with_label("engine"), cfg);
2881                let (pending, recovered, resolver) = registrations
2882                    .remove(validator)
2883                    .expect("validator should be registered");
2884                engine.start(pending, recovered, resolver);
2885            }
2886
2887            // Wait for all engines to finish.
2888            // The byzantine node will not finish since it will mark any finalization
2889            // certificates it creates (using its own invalid signature) as invalid.
2890            let mut finalizers = Vec::new();
2891            for reporter in reporters.iter_mut().skip(1) {
2892                let (mut latest, mut monitor) = reporter.subscribe().await;
2893                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2894                    while latest < required_containers {
2895                        latest = monitor.recv().await.expect("event missing");
2896                    }
2897                }));
2898            }
2899            join_all(finalizers).await;
2900
2901            // Check all reporters for activity
2902            for (i, reporter) in reporters.iter().enumerate() {
2903                // Ensure no faults
2904                reporter.assert_no_faults();
2905
2906                // All nodes see invalid signatures since the honest reporters get unfiltered votes
2907                // once they pass the view.
2908                assert!(*reporter.invalid_votes.lock() > 0);
2909
2910                // Only the byzantine node sees invalid certificates since it constructs them from
2911                // its own invalid vote. The honest nodes reject them before reaching the reporter.
2912                let is_byzantine = i == 0;
2913                if is_byzantine {
2914                    assert!(*reporter.invalid_certificates.lock() > 0);
2915                } else {
2916                    assert_eq!(*reporter.invalid_certificates.lock(), 0);
2917                }
2918            }
2919
2920            // Ensure byzantine node is blocked by honest nodes.
2921            // The ">=" is because the Byzantine node may block itself.
2922            let blocked = oracle.blocked().await.unwrap();
2923            assert!(blocked.len() >= participants.len() - 1);
2924            let byz = &participants[0];
2925            for (_, b) in blocked {
2926                // Assert only the byzantine node is blocked.
2927                assert_eq!(&b, byz);
2928            }
2929        });
2930    }
2931
2932    #[test_group("slow")]
2933    #[test_traced]
2934    fn test_invalid() {
2935        for seed in 0..5 {
2936            invalid::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
2937            invalid::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
2938            invalid::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
2939            invalid::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
2940            invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
2941            invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
2942            invalid::<_, _, RoundRobin>(seed, ed25519::fixture);
2943            invalid::<_, _, RoundRobin>(seed, secp256r1::fixture);
2944        }
2945    }
2946
2947    fn received_certificates_are_reported<S, F, L>(seed: u64, mut fixture: F)
2948    where
2949        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2950        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2951        L: Elector<S>,
2952    {
2953        let n = 4;
2954        let required_containers = View::new(10);
2955        let activity_timeout = ViewDelta::new(10);
2956        let skip_timeout = ViewDelta::new(5);
2957        let namespace = b"consensus".to_vec();
2958        let cfg = deterministic::Config::new()
2959            .with_seed(seed)
2960            .with_timeout(Some(Duration::from_secs(30)));
2961        let executor = deterministic::Runner::new(cfg);
2962        executor.start(|mut context| async move {
2963            let Fixture {
2964                participants,
2965                schemes,
2966                ..
2967            } = fixture(&mut context, &namespace, n);
2968
2969            let mut oracle =
2970                start_test_network_with_peers(context.clone(), participants.clone(), false).await;
2971            let mut registrations = register_validators(&mut oracle, &participants).await;
2972
2973            // Link all honest nodes. Only link node 0 to node 1.
2974            //
2975            // Node 0 cannot locally form a certificate because it only sees itself plus one honest
2976            // peer, but it should still receive the certificates relayed by that peer.
2977            let link = Link {
2978                latency: Duration::from_millis(100),
2979                jitter: Duration::from_millis(1),
2980                success_rate: 1.0,
2981            };
2982            fn link_graph(_: usize, i: usize, j: usize) -> bool {
2983                if i == 0 || j == 0 {
2984                    return i == 1 || j == 1;
2985                }
2986                true
2987            }
2988            link_validators(
2989                &mut oracle,
2990                &participants,
2991                Action::Link(link),
2992                Some(link_graph),
2993            )
2994            .await;
2995
2996            let elector = L::default();
2997            let relay = Arc::new(mocks::relay::Relay::new());
2998            let mut reporters = Vec::new();
2999            for (idx_scheme, validator) in participants.iter().enumerate() {
3000                let context = context.with_label(&format!("validator_{}", *validator));
3001                let reporter_config = mocks::reporter::Config {
3002                    participants: participants.clone().try_into().unwrap(),
3003                    scheme: schemes[idx_scheme].clone(),
3004                    elector: elector.clone(),
3005                };
3006                let reporter =
3007                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3008                reporters.push(reporter.clone());
3009
3010                let application_cfg = mocks::application::Config {
3011                    hasher: Sha256::default(),
3012                    relay: relay.clone(),
3013                    me: validator.clone(),
3014                    propose_latency: (10.0, 5.0),
3015                    verify_latency: (10.0, 5.0),
3016                    certify_latency: (10.0, 5.0),
3017                    should_certify: mocks::application::Certifier::Sometimes,
3018                };
3019                let (actor, application) = mocks::application::Application::new(
3020                    context.with_label("application"),
3021                    application_cfg,
3022                );
3023                actor.start();
3024                let blocker = oracle.control(validator.clone());
3025                let cfg = config::Config {
3026                    scheme: schemes[idx_scheme].clone(),
3027                    elector: elector.clone(),
3028                    blocker,
3029                    automaton: application.clone(),
3030                    relay: application.clone(),
3031                    reporter: reporter.clone(),
3032                    strategy: Sequential,
3033                    partition: validator.clone().to_string(),
3034                    mailbox_size: 1024,
3035                    epoch: Epoch::new(333),
3036                    leader_timeout: Duration::from_secs(1),
3037                    certification_timeout: Duration::from_secs(2),
3038                    timeout_retry: Duration::from_secs(10),
3039                    fetch_timeout: Duration::from_secs(1),
3040                    activity_timeout,
3041                    skip_timeout,
3042                    fetch_concurrent: 4,
3043                    replay_buffer: NZUsize!(1024 * 1024),
3044                    write_buffer: NZUsize!(1024 * 1024),
3045                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3046                    forwarding: ForwardingPolicy::Disabled,
3047                };
3048                let engine = Engine::new(context.with_label("engine"), cfg);
3049                let (pending, recovered, resolver) = registrations
3050                    .remove(validator)
3051                    .expect("validator should be registered");
3052                engine.start(pending, recovered, resolver);
3053            }
3054            // Wait for an honest node to observe the finalizations
3055            let excluded_reporter = reporters[0].clone();
3056            let mut honest_reporter = reporters[1].clone();
3057            let (mut honest_latest, mut honest_monitor) = honest_reporter.subscribe().await;
3058            while honest_latest < required_containers {
3059                honest_latest = honest_monitor.recv().await.expect("event missing");
3060            }
3061
3062            // Wait for all in-flight certificates to arrive at excluded node and be reported.
3063            context.sleep(Duration::from_secs(1)).await;
3064
3065            // It should have received similar certificates to the honest node (with some
3066            // tolerance for initial views in which may not have yet been connected).
3067            let honest_notarized = {
3068                let notarizations = honest_reporter.notarizations.lock();
3069                View::range(View::new(1), required_containers.next())
3070                    .filter(|view| notarizations.contains_key(view))
3071                    .count()
3072            };
3073            let excluded_notarized = {
3074                let notarizations = excluded_reporter.notarizations.lock();
3075                View::range(View::new(1), required_containers.next())
3076                    .filter(|view| notarizations.contains_key(view))
3077                    .count()
3078            };
3079            assert!(
3080                excluded_notarized >= honest_notarized.saturating_sub(2),
3081                "honest_notarized: {honest_notarized}, excluded_notarized: {excluded_notarized}"
3082            );
3083
3084            let honest_finalized = {
3085                let finalizations = honest_reporter.finalizations.lock();
3086                View::range(View::new(1), required_containers.next())
3087                    .filter(|view| finalizations.contains_key(view))
3088                    .count()
3089            };
3090            let excluded_finalized = {
3091                let finalizations = excluded_reporter.finalizations.lock();
3092                View::range(View::new(1), required_containers.next())
3093                    .filter(|view| finalizations.contains_key(view))
3094                    .count()
3095            };
3096            assert!(
3097                excluded_finalized >= honest_finalized.saturating_sub(2),
3098                "honest_finalized: {honest_finalized}, excluded_finalized: {excluded_finalized}"
3099            );
3100        });
3101    }
3102
3103    // Test that when a node receives finalizations, it reports them.
3104    #[test_group("slow")]
3105    #[test_traced]
3106    fn test_received_certificates_are_reported() {
3107        received_certificates_are_reported::<_, _, Random>(
3108            0,
3109            bls12381_threshold_vrf::fixture::<MinPk, _>,
3110        );
3111        received_certificates_are_reported::<_, _, Random>(
3112            0,
3113            bls12381_threshold_vrf::fixture::<MinSig, _>,
3114        );
3115        received_certificates_are_reported::<_, _, RoundRobin>(
3116            0,
3117            bls12381_threshold_std::fixture::<MinPk, _>,
3118        );
3119        received_certificates_are_reported::<_, _, RoundRobin>(
3120            0,
3121            bls12381_threshold_std::fixture::<MinSig, _>,
3122        );
3123        received_certificates_are_reported::<_, _, RoundRobin>(
3124            0,
3125            bls12381_multisig::fixture::<MinPk, _>,
3126        );
3127        received_certificates_are_reported::<_, _, RoundRobin>(
3128            0,
3129            bls12381_multisig::fixture::<MinSig, _>,
3130        );
3131        received_certificates_are_reported::<_, _, RoundRobin>(0, ed25519::fixture);
3132        received_certificates_are_reported::<_, _, RoundRobin>(0, secp256r1::fixture);
3133    }
3134
3135    fn impersonator<S, F, L>(seed: u64, mut fixture: F)
3136    where
3137        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3138        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3139        L: Elector<S>,
3140    {
3141        // Create context
3142        let n = 4;
3143        let required_containers = View::new(50);
3144        let activity_timeout = ViewDelta::new(10);
3145        let skip_timeout = ViewDelta::new(5);
3146        let namespace = b"consensus".to_vec();
3147        let cfg = deterministic::Config::new()
3148            .with_seed(seed)
3149            .with_timeout(Some(Duration::from_secs(30)));
3150        let executor = deterministic::Runner::new(cfg);
3151        executor.start(|mut context| async move {
3152            // Register participants
3153            let Fixture {
3154                participants,
3155                schemes,
3156                ..
3157            } = fixture(&mut context, &namespace, n);
3158            let mut oracle =
3159                start_test_network_with_peers(context.clone(), participants.clone(), true).await;
3160            let mut registrations = register_validators(&mut oracle, &participants).await;
3161
3162            // Link all validators
3163            let link = Link {
3164                latency: Duration::from_millis(10),
3165                jitter: Duration::from_millis(1),
3166                success_rate: 1.0,
3167            };
3168            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3169
3170            // Create engines
3171            let elector = L::default();
3172            let relay = Arc::new(mocks::relay::Relay::new());
3173            let mut reporters = Vec::new();
3174            for (idx_scheme, validator) in participants.iter().enumerate() {
3175                // Create scheme context
3176                let context = context.with_label(&format!("validator_{}", *validator));
3177
3178                // Start engine
3179                let reporter_config = mocks::reporter::Config {
3180                    participants: participants.clone().try_into().unwrap(),
3181                    scheme: schemes[idx_scheme].clone(),
3182                    elector: elector.clone(),
3183                };
3184                let reporter =
3185                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3186                let (pending, recovered, resolver) = registrations
3187                    .remove(validator)
3188                    .expect("validator should be registered");
3189                if idx_scheme == 0 {
3190                    let cfg = mocks::impersonator::Config {
3191                        scheme: schemes[idx_scheme].clone(),
3192                    };
3193
3194                    let engine: mocks::impersonator::Impersonator<_, _, Sha256> =
3195                        mocks::impersonator::Impersonator::new(
3196                            context.with_label("byzantine_engine"),
3197                            cfg,
3198                        );
3199                    engine.start(pending);
3200                } else {
3201                    reporters.push(reporter.clone());
3202                    let application_cfg = mocks::application::Config {
3203                        hasher: Sha256::default(),
3204                        relay: relay.clone(),
3205                        me: validator.clone(),
3206                        propose_latency: (10.0, 5.0),
3207                        verify_latency: (10.0, 5.0),
3208                        certify_latency: (10.0, 5.0),
3209                        should_certify: mocks::application::Certifier::Sometimes,
3210                    };
3211                    let (actor, application) = mocks::application::Application::new(
3212                        context.with_label("application"),
3213                        application_cfg,
3214                    );
3215                    actor.start();
3216                    let blocker = oracle.control(validator.clone());
3217                    let cfg = config::Config {
3218                        scheme: schemes[idx_scheme].clone(),
3219                        elector: elector.clone(),
3220                        blocker,
3221                        automaton: application.clone(),
3222                        relay: application.clone(),
3223                        reporter: reporter.clone(),
3224                        strategy: Sequential,
3225                        partition: validator.clone().to_string(),
3226                        mailbox_size: 1024,
3227                        epoch: Epoch::new(333),
3228                        leader_timeout: Duration::from_secs(1),
3229                        certification_timeout: Duration::from_secs(2),
3230                        timeout_retry: Duration::from_secs(10),
3231                        fetch_timeout: Duration::from_secs(1),
3232                        activity_timeout,
3233                        skip_timeout,
3234                        fetch_concurrent: 4,
3235                        replay_buffer: NZUsize!(1024 * 1024),
3236                        write_buffer: NZUsize!(1024 * 1024),
3237                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3238                        forwarding: ForwardingPolicy::Disabled,
3239                    };
3240                    let engine = Engine::new(context.with_label("engine"), cfg);
3241                    engine.start(pending, recovered, resolver);
3242                }
3243            }
3244
3245            // Wait for all engines to finish
3246            let mut finalizers = Vec::new();
3247            for reporter in reporters.iter_mut() {
3248                let (mut latest, mut monitor) = reporter.subscribe().await;
3249                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3250                    while latest < required_containers {
3251                        latest = monitor.recv().await.expect("event missing");
3252                    }
3253                }));
3254            }
3255            join_all(finalizers).await;
3256
3257            // Check reporters for correct activity
3258            let byz = &participants[0];
3259            for reporter in reporters.iter() {
3260                // Ensure no faults
3261                reporter.assert_no_faults();
3262
3263                // Ensure no invalid signatures
3264                reporter.assert_no_invalid();
3265            }
3266
3267            // Ensure invalid is blocked
3268            let blocked = oracle.blocked().await.unwrap();
3269            assert!(!blocked.is_empty());
3270            for (a, b) in blocked {
3271                assert_ne!(&a, byz);
3272                assert_eq!(&b, byz);
3273            }
3274        });
3275    }
3276
3277    #[test_group("slow")]
3278    #[test_traced]
3279    fn test_impersonator() {
3280        for seed in 0..5 {
3281            impersonator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3282            impersonator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3283            impersonator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3284            impersonator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3285            impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3286            impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3287            impersonator::<_, _, RoundRobin>(seed, ed25519::fixture);
3288            impersonator::<_, _, RoundRobin>(seed, secp256r1::fixture);
3289        }
3290    }
3291
3292    fn equivocator<S, F, L>(seed: u64, mut fixture: F) -> bool
3293    where
3294        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3295        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3296        L: Elector<S>,
3297    {
3298        // Create context
3299        let n = 7;
3300        let required_containers = View::new(50);
3301        let activity_timeout = ViewDelta::new(10);
3302        let skip_timeout = ViewDelta::new(5);
3303        let namespace = b"consensus".to_vec();
3304        let cfg = deterministic::Config::new()
3305            .with_seed(seed)
3306            .with_timeout(Some(Duration::from_secs(60)));
3307        let executor = deterministic::Runner::new(cfg);
3308        executor.start(|mut context| async move {
3309            // Register participants
3310            let Fixture {
3311                participants,
3312                schemes,
3313                ..
3314            } = fixture(&mut context, &namespace, n);
3315            let mut oracle =
3316                start_test_network_with_peers(context.clone(), participants.clone(), true).await;
3317            let mut registrations = register_validators(&mut oracle, &participants).await;
3318
3319            // Link all validators
3320            let link = Link {
3321                latency: Duration::from_millis(10),
3322                jitter: Duration::from_millis(1),
3323                success_rate: 1.0,
3324            };
3325            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3326
3327            // Create engines
3328            let elector = L::default();
3329            let mut engines = Vec::new();
3330            let relay = Arc::new(mocks::relay::Relay::new());
3331            let mut reporters = Vec::new();
3332            for (idx_scheme, validator) in participants.iter().enumerate() {
3333                // Create scheme context
3334                let context = context.with_label(&format!("validator_{}", *validator));
3335
3336                // Start engine
3337                let reporter_config = mocks::reporter::Config {
3338                    participants: participants.clone().try_into().unwrap(),
3339                    scheme: schemes[idx_scheme].clone(),
3340                    elector: elector.clone(),
3341                };
3342                let reporter =
3343                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3344                reporters.push(reporter.clone());
3345                let (pending, recovered, resolver) = registrations
3346                    .remove(validator)
3347                    .expect("validator should be registered");
3348                if idx_scheme == 0 {
3349                    let cfg = mocks::equivocator::Config {
3350                        scheme: schemes[idx_scheme].clone(),
3351                        epoch: Epoch::new(333),
3352                        relay: relay.clone(),
3353                        hasher: Sha256::default(),
3354                        elector: elector.clone(),
3355                    };
3356
3357                    let engine = mocks::equivocator::Equivocator::new(
3358                        context.with_label("byzantine_engine"),
3359                        cfg,
3360                    );
3361                    engines.push(engine.start(pending, recovered));
3362                } else {
3363                    let application_cfg = mocks::application::Config {
3364                        hasher: Sha256::default(),
3365                        relay: relay.clone(),
3366                        me: validator.clone(),
3367                        propose_latency: (10.0, 5.0),
3368                        verify_latency: (10.0, 5.0),
3369                        certify_latency: (10.0, 5.0),
3370                        should_certify: mocks::application::Certifier::Sometimes,
3371                    };
3372                    let (actor, application) = mocks::application::Application::new(
3373                        context.with_label("application"),
3374                        application_cfg,
3375                    );
3376                    actor.start();
3377                    let blocker = oracle.control(validator.clone());
3378                    let cfg = config::Config {
3379                        scheme: schemes[idx_scheme].clone(),
3380                        elector: elector.clone(),
3381                        blocker,
3382                        automaton: application.clone(),
3383                        relay: application.clone(),
3384                        reporter: reporter.clone(),
3385                        strategy: Sequential,
3386                        partition: validator.to_string(),
3387                        mailbox_size: 1024,
3388                        epoch: Epoch::new(333),
3389                        leader_timeout: Duration::from_secs(1),
3390                        certification_timeout: Duration::from_secs(2),
3391                        timeout_retry: Duration::from_secs(10),
3392                        fetch_timeout: Duration::from_secs(1),
3393                        activity_timeout,
3394                        skip_timeout,
3395                        fetch_concurrent: 4,
3396                        replay_buffer: NZUsize!(1024 * 1024),
3397                        write_buffer: NZUsize!(1024 * 1024),
3398                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3399                        forwarding: ForwardingPolicy::Disabled,
3400                    };
3401                    let engine = Engine::new(context.with_label("engine"), cfg);
3402                    engines.push(engine.start(pending, recovered, resolver));
3403                }
3404            }
3405
3406            // Wait for all engines to hit required containers
3407            let mut finalizers = Vec::new();
3408            for reporter in reporters.iter_mut().skip(1) {
3409                let (mut latest, mut monitor) = reporter.subscribe().await;
3410                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3411                    while latest < required_containers {
3412                        latest = monitor.recv().await.expect("event missing");
3413                    }
3414                }));
3415            }
3416            join_all(finalizers).await;
3417
3418            // Abort a validator
3419            let idx = context.gen_range(1..engines.len()); // skip byzantine validator
3420            let validator = &participants[idx];
3421            let handle = engines.remove(idx);
3422            handle.abort();
3423            let _ = handle.await;
3424            reporters.remove(idx);
3425            info!(idx, ?validator, "aborted validator");
3426
3427            // Wait for all engines to hit required containers
3428            let mut finalizers = Vec::new();
3429            for reporter in reporters.iter_mut().skip(1) {
3430                let (mut latest, mut monitor) = reporter.subscribe().await;
3431                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3432                    while latest < View::new(required_containers.get() * 2) {
3433                        latest = monitor.recv().await.expect("event missing");
3434                    }
3435                }));
3436            }
3437            join_all(finalizers).await;
3438
3439            // Recreate engine
3440            info!(idx, ?validator, "restarting validator");
3441            let context = context.with_label(&format!("validator_{}_restarted", *validator));
3442
3443            // Start engine
3444            let reporter_config = mocks::reporter::Config {
3445                participants: participants.clone().try_into().unwrap(),
3446                scheme: schemes[idx].clone(),
3447                elector: elector.clone(),
3448            };
3449            let reporter =
3450                mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3451            let (pending, recovered, resolver) =
3452                register_validator(&mut oracle, validator.clone()).await;
3453            reporters.push(reporter.clone());
3454            let application_cfg = mocks::application::Config {
3455                hasher: Sha256::default(),
3456                relay: relay.clone(),
3457                me: validator.clone(),
3458                propose_latency: (10.0, 5.0),
3459                verify_latency: (10.0, 5.0),
3460                certify_latency: (10.0, 5.0),
3461                should_certify: mocks::application::Certifier::Sometimes,
3462            };
3463            let (actor, application) = mocks::application::Application::new(
3464                context.with_label("application"),
3465                application_cfg,
3466            );
3467            actor.start();
3468            let blocker = oracle.control(validator.clone());
3469            let cfg = config::Config {
3470                scheme: schemes[idx].clone(),
3471                elector: elector.clone(),
3472                blocker,
3473                automaton: application.clone(),
3474                relay: application.clone(),
3475                reporter: reporter.clone(),
3476                strategy: Sequential,
3477                partition: validator.to_string(),
3478                mailbox_size: 1024,
3479                epoch: Epoch::new(333),
3480                leader_timeout: Duration::from_secs(1),
3481                certification_timeout: Duration::from_secs(2),
3482                timeout_retry: Duration::from_secs(10),
3483                fetch_timeout: Duration::from_secs(1),
3484                activity_timeout,
3485                skip_timeout,
3486                fetch_concurrent: 4,
3487                replay_buffer: NZUsize!(1024 * 1024),
3488                write_buffer: NZUsize!(1024 * 1024),
3489                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3490                forwarding: ForwardingPolicy::Disabled,
3491            };
3492            let engine = Engine::new(context.with_label("engine"), cfg);
3493            engine.start(pending, recovered, resolver);
3494
3495            // Wait for all engines to hit required containers
3496            let mut finalizers = Vec::new();
3497            for reporter in reporters.iter_mut().skip(1) {
3498                let (mut latest, mut monitor) = reporter.subscribe().await;
3499                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3500                    while latest < View::new(required_containers.get() * 3) {
3501                        latest = monitor.recv().await.expect("event missing");
3502                    }
3503                }));
3504            }
3505            join_all(finalizers).await;
3506
3507            // Check equivocator blocking (we aren't guaranteed a fault will be produced
3508            // because it may not be possible to extract a conflicting vote from the certificate
3509            // we receive)
3510            let byz = &participants[0];
3511            let blocked = oracle.blocked().await.unwrap();
3512            for (a, b) in &blocked {
3513                assert_ne!(a, byz);
3514                assert_eq!(b, byz);
3515            }
3516            !blocked.is_empty()
3517        })
3518    }
3519
3520    #[test_group("slow")]
3521    #[test_traced]
3522    fn test_equivocator_bls12381_threshold_vrf_min_pk() {
3523        let detected = (0..5).any(|seed| {
3524            equivocator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>)
3525        });
3526        assert!(
3527            detected,
3528            "expected at least one seed to detect equivocation"
3529        );
3530    }
3531
3532    #[test_group("slow")]
3533    #[test_traced]
3534    fn test_equivocator_bls12381_threshold_vrf_min_sig() {
3535        let detected = (0..5).any(|seed| {
3536            equivocator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>)
3537        });
3538        assert!(
3539            detected,
3540            "expected at least one seed to detect equivocation"
3541        );
3542    }
3543
3544    #[test_group("slow")]
3545    #[test_traced]
3546    fn test_equivocator_bls12381_threshold_std_min_pk() {
3547        let detected = (0..5).any(|seed| {
3548            equivocator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>)
3549        });
3550        assert!(
3551            detected,
3552            "expected at least one seed to detect equivocation"
3553        );
3554    }
3555
3556    #[test_group("slow")]
3557    #[test_traced]
3558    fn test_equivocator_bls12381_threshold_std_min_sig() {
3559        let detected = (0..5).any(|seed| {
3560            equivocator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>)
3561        });
3562        assert!(
3563            detected,
3564            "expected at least one seed to detect equivocation"
3565        );
3566    }
3567
3568    #[test_group("slow")]
3569    #[test_traced]
3570    fn test_equivocator_bls12381_multisig_min_pk() {
3571        let detected = (0..5).any(|seed| {
3572            equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>)
3573        });
3574        assert!(
3575            detected,
3576            "expected at least one seed to detect equivocation"
3577        );
3578    }
3579
3580    #[test_group("slow")]
3581    #[test_traced]
3582    fn test_equivocator_bls12381_multisig_min_sig() {
3583        let detected = (0..5).any(|seed| {
3584            equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>)
3585        });
3586        assert!(
3587            detected,
3588            "expected at least one seed to detect equivocation"
3589        );
3590    }
3591
3592    #[test_group("slow")]
3593    #[test_traced]
3594    fn test_equivocator_ed25519() {
3595        let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, ed25519::fixture));
3596        assert!(
3597            detected,
3598            "expected at least one seed to detect equivocation"
3599        );
3600    }
3601
3602    #[test_group("slow")]
3603    #[test_traced]
3604    fn test_equivocator_secp256r1() {
3605        let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, secp256r1::fixture));
3606        assert!(
3607            detected,
3608            "expected at least one seed to detect equivocation"
3609        );
3610    }
3611
3612    fn reconfigurer<S, F, L>(seed: u64, mut fixture: F)
3613    where
3614        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3615        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3616        L: Elector<S>,
3617    {
3618        // Create context
3619        let n = 4;
3620        let required_containers = View::new(50);
3621        let activity_timeout = ViewDelta::new(10);
3622        let skip_timeout = ViewDelta::new(5);
3623        let namespace = b"consensus".to_vec();
3624        let cfg = deterministic::Config::new()
3625            .with_seed(seed)
3626            .with_timeout(Some(Duration::from_secs(30)));
3627        let executor = deterministic::Runner::new(cfg);
3628        executor.start(|mut context| async move {
3629            // Register participants
3630            let Fixture {
3631                participants,
3632                schemes,
3633                ..
3634            } = fixture(&mut context, &namespace, n);
3635            let mut oracle =
3636                start_test_network_with_peers(context.clone(), participants.clone(), true).await;
3637            let mut registrations = register_validators(&mut oracle, &participants).await;
3638
3639            // Link all validators
3640            let link = Link {
3641                latency: Duration::from_millis(10),
3642                jitter: Duration::from_millis(1),
3643                success_rate: 1.0,
3644            };
3645            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3646
3647            // Create engines
3648            let elector = L::default();
3649            let relay = Arc::new(mocks::relay::Relay::new());
3650            let mut reporters = Vec::new();
3651            for (idx_scheme, validator) in participants.iter().enumerate() {
3652                // Create scheme context
3653                let context = context.with_label(&format!("validator_{}", *validator));
3654
3655                // Start engine
3656                let reporter_config = mocks::reporter::Config {
3657                    participants: participants.clone().try_into().unwrap(),
3658                    scheme: schemes[idx_scheme].clone(),
3659                    elector: elector.clone(),
3660                };
3661                let reporter =
3662                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3663                let (pending, recovered, resolver) = registrations
3664                    .remove(validator)
3665                    .expect("validator should be registered");
3666                if idx_scheme == 0 {
3667                    let cfg = mocks::reconfigurer::Config {
3668                        scheme: schemes[idx_scheme].clone(),
3669                    };
3670                    let engine: mocks::reconfigurer::Reconfigurer<_, _, Sha256> =
3671                        mocks::reconfigurer::Reconfigurer::new(
3672                            context.with_label("byzantine_engine"),
3673                            cfg,
3674                        );
3675                    engine.start(pending);
3676                } else {
3677                    reporters.push(reporter.clone());
3678                    let application_cfg = mocks::application::Config {
3679                        hasher: Sha256::default(),
3680                        relay: relay.clone(),
3681                        me: validator.clone(),
3682                        propose_latency: (10.0, 5.0),
3683                        verify_latency: (10.0, 5.0),
3684                        certify_latency: (10.0, 5.0),
3685                        should_certify: mocks::application::Certifier::Sometimes,
3686                    };
3687                    let (actor, application) = mocks::application::Application::new(
3688                        context.with_label("application"),
3689                        application_cfg,
3690                    );
3691                    actor.start();
3692                    let blocker = oracle.control(validator.clone());
3693                    let cfg = config::Config {
3694                        scheme: schemes[idx_scheme].clone(),
3695                        elector: elector.clone(),
3696                        blocker,
3697                        automaton: application.clone(),
3698                        relay: application.clone(),
3699                        reporter: reporter.clone(),
3700                        strategy: Sequential,
3701                        partition: validator.to_string(),
3702                        mailbox_size: 1024,
3703                        epoch: Epoch::new(333),
3704                        leader_timeout: Duration::from_secs(1),
3705                        certification_timeout: Duration::from_secs(2),
3706                        timeout_retry: Duration::from_secs(10),
3707                        fetch_timeout: Duration::from_secs(1),
3708                        activity_timeout,
3709                        skip_timeout,
3710                        fetch_concurrent: 4,
3711                        replay_buffer: NZUsize!(1024 * 1024),
3712                        write_buffer: NZUsize!(1024 * 1024),
3713                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3714                        forwarding: ForwardingPolicy::Disabled,
3715                    };
3716                    let engine = Engine::new(context.with_label("engine"), cfg);
3717                    engine.start(pending, recovered, resolver);
3718                }
3719            }
3720
3721            // Wait for all engines to finish
3722            let mut finalizers = Vec::new();
3723            for reporter in reporters.iter_mut() {
3724                let (mut latest, mut monitor) = reporter.subscribe().await;
3725                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3726                    while latest < required_containers {
3727                        latest = monitor.recv().await.expect("event missing");
3728                    }
3729                }));
3730            }
3731            join_all(finalizers).await;
3732
3733            // Check reporters for correct activity
3734            let byz = &participants[0];
3735            for reporter in reporters.iter() {
3736                // Ensure no faults
3737                reporter.assert_no_faults();
3738
3739                // Ensure no invalid signatures
3740                reporter.assert_no_invalid();
3741            }
3742
3743            // Ensure reconfigurer is blocked (epoch mismatch)
3744            let blocked = oracle.blocked().await.unwrap();
3745            assert!(!blocked.is_empty());
3746            for (a, b) in blocked {
3747                assert_ne!(&a, byz);
3748                assert_eq!(&b, byz);
3749            }
3750        });
3751    }
3752
3753    #[test_group("slow")]
3754    #[test_traced]
3755    fn test_reconfigurer() {
3756        for seed in 0..5 {
3757            reconfigurer::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3758            reconfigurer::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3759            reconfigurer::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3760            reconfigurer::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3761            reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3762            reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3763            reconfigurer::<_, _, RoundRobin>(seed, ed25519::fixture);
3764            reconfigurer::<_, _, RoundRobin>(seed, secp256r1::fixture);
3765        }
3766    }
3767
3768    fn nuller<S, F, L>(seed: u64, mut fixture: F)
3769    where
3770        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3771        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3772        L: Elector<S>,
3773    {
3774        // Create context
3775        let n = 4;
3776        let required_containers = View::new(50);
3777        let activity_timeout = ViewDelta::new(10);
3778        let skip_timeout = ViewDelta::new(5);
3779        let namespace = b"consensus".to_vec();
3780        let cfg = deterministic::Config::new()
3781            .with_seed(seed)
3782            .with_timeout(Some(Duration::from_secs(30)));
3783        let executor = deterministic::Runner::new(cfg);
3784        executor.start(|mut context| async move {
3785            // Register participants
3786            let Fixture {
3787                participants,
3788                schemes,
3789                ..
3790            } = fixture(&mut context, &namespace, n);
3791            let mut oracle =
3792                start_test_network_with_peers(context.clone(), participants.clone(), true).await;
3793            let mut registrations = register_validators(&mut oracle, &participants).await;
3794
3795            // Link all validators
3796            let link = Link {
3797                latency: Duration::from_millis(10),
3798                jitter: Duration::from_millis(1),
3799                success_rate: 1.0,
3800            };
3801            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3802
3803            // Create engines
3804            let elector = L::default();
3805            let relay = Arc::new(mocks::relay::Relay::new());
3806            let mut reporters = Vec::new();
3807            for (idx_scheme, validator) in participants.iter().enumerate() {
3808                // Create scheme context
3809                let context = context.with_label(&format!("validator_{}", *validator));
3810
3811                // Start engine
3812                let reporter_config = mocks::reporter::Config {
3813                    participants: participants.clone().try_into().unwrap(),
3814                    scheme: schemes[idx_scheme].clone(),
3815                    elector: elector.clone(),
3816                };
3817                let reporter =
3818                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3819                let (pending, recovered, resolver) = registrations
3820                    .remove(validator)
3821                    .expect("validator should be registered");
3822                if idx_scheme == 0 {
3823                    let cfg = mocks::nuller::Config {
3824                        scheme: schemes[idx_scheme].clone(),
3825                    };
3826                    let engine: mocks::nuller::Nuller<_, _, Sha256> =
3827                        mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
3828                    engine.start(pending);
3829                } else {
3830                    reporters.push(reporter.clone());
3831                    let application_cfg = mocks::application::Config {
3832                        hasher: Sha256::default(),
3833                        relay: relay.clone(),
3834                        me: validator.clone(),
3835                        propose_latency: (10.0, 5.0),
3836                        verify_latency: (10.0, 5.0),
3837                        certify_latency: (10.0, 5.0),
3838                        should_certify: mocks::application::Certifier::Sometimes,
3839                    };
3840                    let (actor, application) = mocks::application::Application::new(
3841                        context.with_label("application"),
3842                        application_cfg,
3843                    );
3844                    actor.start();
3845                    let blocker = oracle.control(validator.clone());
3846                    let cfg = config::Config {
3847                        scheme: schemes[idx_scheme].clone(),
3848                        elector: elector.clone(),
3849                        blocker,
3850                        automaton: application.clone(),
3851                        relay: application.clone(),
3852                        reporter: reporter.clone(),
3853                        strategy: Sequential,
3854                        partition: validator.clone().to_string(),
3855                        mailbox_size: 1024,
3856                        epoch: Epoch::new(333),
3857                        leader_timeout: Duration::from_secs(1),
3858                        certification_timeout: Duration::from_secs(2),
3859                        timeout_retry: Duration::from_secs(10),
3860                        fetch_timeout: Duration::from_secs(1),
3861                        activity_timeout,
3862                        skip_timeout,
3863                        fetch_concurrent: 4,
3864                        replay_buffer: NZUsize!(1024 * 1024),
3865                        write_buffer: NZUsize!(1024 * 1024),
3866                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3867                        forwarding: ForwardingPolicy::Disabled,
3868                    };
3869                    let engine = Engine::new(context.with_label("engine"), cfg);
3870                    engine.start(pending, recovered, resolver);
3871                }
3872            }
3873
3874            // Wait for all engines to finish
3875            let mut finalizers = Vec::new();
3876            for reporter in reporters.iter_mut() {
3877                let (mut latest, mut monitor) = reporter.subscribe().await;
3878                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3879                    while latest < required_containers {
3880                        latest = monitor.recv().await.expect("event missing");
3881                    }
3882                }));
3883            }
3884            join_all(finalizers).await;
3885
3886            // Check reporters for correct activity
3887            let byz = &participants[0];
3888            let mut count_nullify_and_finalize = 0;
3889            for reporter in reporters.iter() {
3890                // Ensure only faults for byz
3891                {
3892                    let faults = reporter.faults.lock();
3893                    assert_eq!(faults.len(), 1);
3894                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
3895                    for (_, faults) in faulter.iter() {
3896                        for fault in faults.iter() {
3897                            match fault {
3898                                Activity::NullifyFinalize(_) => {
3899                                    count_nullify_and_finalize += 1;
3900                                }
3901                                _ => panic!("unexpected fault: {fault:?}"),
3902                            }
3903                        }
3904                    }
3905                }
3906
3907                // Ensure no invalid signatures
3908                reporter.assert_no_invalid();
3909            }
3910            assert!(count_nullify_and_finalize > 0);
3911
3912            // Ensure nullifier is blocked
3913            let blocked = oracle.blocked().await.unwrap();
3914            assert!(!blocked.is_empty());
3915            for (a, b) in blocked {
3916                assert_ne!(&a, byz);
3917                assert_eq!(&b, byz);
3918            }
3919        });
3920    }
3921
3922    #[test_group("slow")]
3923    #[test_traced]
3924    fn test_nuller() {
3925        for seed in 0..5 {
3926            nuller::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3927            nuller::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3928            nuller::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3929            nuller::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3930            nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3931            nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3932            nuller::<_, _, RoundRobin>(seed, ed25519::fixture);
3933            nuller::<_, _, RoundRobin>(seed, secp256r1::fixture);
3934        }
3935    }
3936
3937    fn outdated<S, F, L>(seed: u64, mut fixture: F)
3938    where
3939        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3940        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3941        L: Elector<S>,
3942    {
3943        // Create context
3944        let n = 4;
3945        let required_containers = View::new(100);
3946        let activity_timeout = ViewDelta::new(10);
3947        let skip_timeout = ViewDelta::new(5);
3948        let namespace = b"consensus".to_vec();
3949        let cfg = deterministic::Config::new()
3950            .with_seed(seed)
3951            .with_timeout(Some(Duration::from_secs(30)));
3952        let executor = deterministic::Runner::new(cfg);
3953        executor.start(|mut context| async move {
3954            // Register participants
3955            let Fixture {
3956                participants,
3957                schemes,
3958                ..
3959            } = fixture(&mut context, &namespace, n);
3960            let mut oracle =
3961                start_test_network_with_peers(context.clone(), participants.clone(), true).await;
3962            let mut registrations = register_validators(&mut oracle, &participants).await;
3963
3964            // Link all validators
3965            let link = Link {
3966                latency: Duration::from_millis(10),
3967                jitter: Duration::from_millis(1),
3968                success_rate: 1.0,
3969            };
3970            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3971
3972            // Create engines
3973            let elector = L::default();
3974            let relay = Arc::new(mocks::relay::Relay::new());
3975            let mut reporters = Vec::new();
3976            for (idx_scheme, validator) in participants.iter().enumerate() {
3977                // Create scheme context
3978                let context = context.with_label(&format!("validator_{}", *validator));
3979
3980                // Start engine
3981                let reporter_config = mocks::reporter::Config {
3982                    participants: participants.clone().try_into().unwrap(),
3983                    scheme: schemes[idx_scheme].clone(),
3984                    elector: elector.clone(),
3985                };
3986                let reporter =
3987                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3988                let (pending, recovered, resolver) = registrations
3989                    .remove(validator)
3990                    .expect("validator should be registered");
3991                if idx_scheme == 0 {
3992                    let cfg = mocks::outdated::Config {
3993                        scheme: schemes[idx_scheme].clone(),
3994                        view_delta: ViewDelta::new(activity_timeout.get().saturating_mul(4)),
3995                    };
3996                    let engine: mocks::outdated::Outdated<_, _, Sha256> =
3997                        mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
3998                    engine.start(pending);
3999                } else {
4000                    reporters.push(reporter.clone());
4001                    let application_cfg = mocks::application::Config {
4002                        hasher: Sha256::default(),
4003                        relay: relay.clone(),
4004                        me: validator.clone(),
4005                        propose_latency: (10.0, 5.0),
4006                        verify_latency: (10.0, 5.0),
4007                        certify_latency: (10.0, 5.0),
4008                        should_certify: mocks::application::Certifier::Sometimes,
4009                    };
4010                    let (actor, application) = mocks::application::Application::new(
4011                        context.with_label("application"),
4012                        application_cfg,
4013                    );
4014                    actor.start();
4015                    let blocker = oracle.control(validator.clone());
4016                    let cfg = config::Config {
4017                        scheme: schemes[idx_scheme].clone(),
4018                        elector: elector.clone(),
4019                        blocker,
4020                        automaton: application.clone(),
4021                        relay: application.clone(),
4022                        reporter: reporter.clone(),
4023                        strategy: Sequential,
4024                        partition: validator.clone().to_string(),
4025                        mailbox_size: 1024,
4026                        epoch: Epoch::new(333),
4027                        leader_timeout: Duration::from_secs(1),
4028                        certification_timeout: Duration::from_secs(2),
4029                        timeout_retry: Duration::from_secs(10),
4030                        fetch_timeout: Duration::from_secs(1),
4031                        activity_timeout,
4032                        skip_timeout,
4033                        fetch_concurrent: 4,
4034                        replay_buffer: NZUsize!(1024 * 1024),
4035                        write_buffer: NZUsize!(1024 * 1024),
4036                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4037                        forwarding: ForwardingPolicy::Disabled,
4038                    };
4039                    let engine = Engine::new(context.with_label("engine"), cfg);
4040                    engine.start(pending, recovered, resolver);
4041                }
4042            }
4043
4044            // Wait for all engines to finish
4045            let mut finalizers = Vec::new();
4046            for reporter in reporters.iter_mut() {
4047                let (mut latest, mut monitor) = reporter.subscribe().await;
4048                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4049                    while latest < required_containers {
4050                        latest = monitor.recv().await.expect("event missing");
4051                    }
4052                }));
4053            }
4054            join_all(finalizers).await;
4055
4056            // Check reporters for correct activity
4057            for reporter in reporters.iter() {
4058                // Ensure no faults
4059                reporter.assert_no_faults();
4060
4061                // Ensure no invalid signatures
4062                reporter.assert_no_invalid();
4063            }
4064
4065            // Ensure no blocked connections
4066            let blocked = oracle.blocked().await.unwrap();
4067            assert!(blocked.is_empty());
4068        });
4069    }
4070
4071    #[test_group("slow")]
4072    #[test_traced]
4073    fn test_outdated() {
4074        for seed in 0..5 {
4075            outdated::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
4076            outdated::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
4077            outdated::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
4078            outdated::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
4079            outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
4080            outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
4081            outdated::<_, _, RoundRobin>(seed, ed25519::fixture);
4082            outdated::<_, _, RoundRobin>(seed, secp256r1::fixture);
4083        }
4084    }
4085
4086    fn run_1k<S, F, L>(mut fixture: F)
4087    where
4088        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4089        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4090        L: Elector<S>,
4091    {
4092        // Create context
4093        let n = 10;
4094        let required_containers = View::new(1_000);
4095        let activity_timeout = ViewDelta::new(10);
4096        let skip_timeout = ViewDelta::new(5);
4097        let namespace = b"consensus".to_vec();
4098        let cfg = deterministic::Config::new();
4099        let executor = deterministic::Runner::new(cfg);
4100        executor.start(|mut context| async move {
4101            // Register participants
4102            let Fixture {
4103                participants,
4104                schemes,
4105                ..
4106            } = fixture(&mut context, &namespace, n);
4107            let mut oracle =
4108                start_test_network_with_peers(context.clone(), participants.clone(), true).await;
4109            let mut registrations = register_validators(&mut oracle, &participants).await;
4110
4111            // Link all validators
4112            let link = Link {
4113                latency: Duration::from_millis(80),
4114                jitter: Duration::from_millis(10),
4115                success_rate: 0.98,
4116            };
4117            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4118
4119            // Create engines
4120            let elector = L::default();
4121            let relay = Arc::new(mocks::relay::Relay::new());
4122            let mut reporters = Vec::new();
4123            let mut engine_handlers = Vec::new();
4124            for (idx, validator) in participants.iter().enumerate() {
4125                // Create scheme context
4126                let context = context.with_label(&format!("validator_{}", *validator));
4127
4128                // Configure engine
4129                let reporter_config = mocks::reporter::Config {
4130                    participants: participants.clone().try_into().unwrap(),
4131                    scheme: schemes[idx].clone(),
4132                    elector: elector.clone(),
4133                };
4134                let reporter =
4135                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
4136                reporters.push(reporter.clone());
4137                let application_cfg = mocks::application::Config {
4138                    hasher: Sha256::default(),
4139                    relay: relay.clone(),
4140                    me: validator.clone(),
4141                    propose_latency: (100.0, 50.0),
4142                    verify_latency: (50.0, 40.0),
4143                    certify_latency: (50.0, 40.0),
4144                    should_certify: mocks::application::Certifier::Sometimes,
4145                };
4146                let (actor, application) = mocks::application::Application::new(
4147                    context.with_label("application"),
4148                    application_cfg,
4149                );
4150                actor.start();
4151                let blocker = oracle.control(validator.clone());
4152                let cfg = config::Config {
4153                    scheme: schemes[idx].clone(),
4154                    elector: elector.clone(),
4155                    blocker,
4156                    automaton: application.clone(),
4157                    relay: application.clone(),
4158                    reporter: reporter.clone(),
4159                    strategy: Sequential,
4160                    partition: validator.to_string(),
4161                    mailbox_size: 1024,
4162                    epoch: Epoch::new(333),
4163                    leader_timeout: Duration::from_secs(1),
4164                    certification_timeout: Duration::from_secs(2),
4165                    timeout_retry: Duration::from_secs(10),
4166                    fetch_timeout: Duration::from_secs(1),
4167                    activity_timeout,
4168                    skip_timeout,
4169                    fetch_concurrent: 4,
4170                    replay_buffer: NZUsize!(1024 * 1024),
4171                    write_buffer: NZUsize!(1024 * 1024),
4172                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4173                    forwarding: ForwardingPolicy::Disabled,
4174                };
4175                let engine = Engine::new(context.with_label("engine"), cfg);
4176
4177                // Start engine
4178                let (pending, recovered, resolver) = registrations
4179                    .remove(validator)
4180                    .expect("validator should be registered");
4181                engine_handlers.push(engine.start(pending, recovered, resolver));
4182            }
4183
4184            // Wait for all engines to finish
4185            let mut finalizers = Vec::new();
4186            for reporter in reporters.iter_mut() {
4187                let (mut latest, mut monitor) = reporter.subscribe().await;
4188                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4189                    while latest < required_containers {
4190                        latest = monitor.recv().await.expect("event missing");
4191                    }
4192                }));
4193            }
4194            join_all(finalizers).await;
4195
4196            // Check reporters for correct activity
4197            for reporter in reporters.iter() {
4198                // Ensure no faults
4199                reporter.assert_no_faults();
4200
4201                // Ensure no invalid signatures
4202                reporter.assert_no_invalid();
4203            }
4204
4205            // Ensure no blocked connections
4206            let blocked = oracle.blocked().await.unwrap();
4207            assert!(blocked.is_empty());
4208        })
4209    }
4210
4211    #[test_group("slow")]
4212    #[test_traced]
4213    fn test_1k_bls12381_threshold_vrf_min_pk() {
4214        run_1k::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
4215    }
4216
4217    #[test_group("slow")]
4218    #[test_traced]
4219    fn test_1k_bls12381_threshold_vrf_min_sig() {
4220        run_1k::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
4221    }
4222
4223    #[test_group("slow")]
4224    #[test_traced]
4225    fn test_1k_bls12381_threshold_std_min_pk() {
4226        run_1k::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
4227    }
4228
4229    #[test_group("slow")]
4230    #[test_traced]
4231    fn test_1k_bls12381_threshold_std_min_sig() {
4232        run_1k::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
4233    }
4234
4235    #[test_group("slow")]
4236    #[test_traced]
4237    fn test_1k_bls12381_multisig_min_pk() {
4238        run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4239    }
4240
4241    #[test_group("slow")]
4242    #[test_traced]
4243    fn test_1k_bls12381_multisig_min_sig() {
4244        run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
4245    }
4246
4247    #[test_group("slow")]
4248    #[test_traced]
4249    fn test_1k_ed25519() {
4250        run_1k::<_, _, RoundRobin>(ed25519::fixture);
4251    }
4252
4253    #[test_group("slow")]
4254    #[test_traced]
4255    fn test_1k_secp256r1() {
4256        run_1k::<_, _, RoundRobin>(secp256r1::fixture);
4257    }
4258
4259    fn engine_shutdown<S, F, L>(seed: u64, mut fixture: F, graceful: bool)
4260    where
4261        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4262        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4263        L: Elector<S>,
4264    {
4265        let n = 1;
4266        let namespace = b"consensus".to_vec();
4267        let cfg = deterministic::Config::default()
4268            .with_seed(seed)
4269            .with_timeout(Some(Duration::from_secs(10)));
4270        let executor = deterministic::Runner::new(cfg);
4271        executor.start(|mut context| async move {
4272            // Register a single participant
4273            let Fixture {
4274                participants,
4275                schemes,
4276                ..
4277            } = fixture(&mut context, &namespace, n);
4278            let mut oracle =
4279                start_test_network_with_peers(context.clone(), participants.clone(), true).await;
4280            let mut registrations = register_validators(&mut oracle, &participants).await;
4281
4282            // Link the single validator to itself (no-ops for completeness)
4283            let link = Link {
4284                latency: Duration::from_millis(1),
4285                jitter: Duration::from_millis(0),
4286                success_rate: 1.0,
4287            };
4288            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4289
4290            // Create engine
4291            let elector = L::default();
4292            let reporter_config = mocks::reporter::Config {
4293                participants: participants.clone().try_into().unwrap(),
4294                scheme: schemes[0].clone(),
4295                elector: elector.clone(),
4296            };
4297            let reporter =
4298                mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
4299            let relay = Arc::new(mocks::relay::Relay::new());
4300            let application_cfg = mocks::application::Config {
4301                hasher: Sha256::default(),
4302                relay: relay.clone(),
4303                me: participants[0].clone(),
4304                propose_latency: (1.0, 0.0),
4305                verify_latency: (1.0, 0.0),
4306                certify_latency: (1.0, 0.0),
4307                should_certify: mocks::application::Certifier::Sometimes,
4308            };
4309            let (actor, application) = mocks::application::Application::new(
4310                context.with_label("application"),
4311                application_cfg,
4312            );
4313            actor.start();
4314            let blocker = oracle.control(participants[0].clone());
4315            let cfg = config::Config {
4316                scheme: schemes[0].clone(),
4317                elector: elector.clone(),
4318                blocker,
4319                automaton: application.clone(),
4320                relay: application.clone(),
4321                reporter: reporter.clone(),
4322                strategy: Sequential,
4323                partition: participants[0].clone().to_string(),
4324                mailbox_size: 64,
4325                epoch: Epoch::new(333),
4326                leader_timeout: Duration::from_millis(50),
4327                certification_timeout: Duration::from_millis(100),
4328                timeout_retry: Duration::from_millis(250),
4329                fetch_timeout: Duration::from_millis(50),
4330                activity_timeout: ViewDelta::new(4),
4331                skip_timeout: ViewDelta::new(2),
4332                fetch_concurrent: 4,
4333                replay_buffer: NZUsize!(1024 * 16),
4334                write_buffer: NZUsize!(1024 * 16),
4335                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4336                forwarding: ForwardingPolicy::Disabled,
4337            };
4338            let engine = Engine::new(context.with_label("engine"), cfg);
4339
4340            // Start engine
4341            let (pending, recovered, resolver) = registrations
4342                .remove(&participants[0])
4343                .expect("validator should be registered");
4344            let handle = engine.start(pending, recovered, resolver);
4345
4346            // Allow tasks to start
4347            context.sleep(Duration::from_millis(1000)).await;
4348
4349            // Count running tasks under the engine prefix
4350            let running_before = count_running_tasks(&context, "engine");
4351            assert!(
4352                running_before > 0,
4353                "at least one engine task should be running"
4354            );
4355
4356            // Make sure the engine is still running after some time
4357            context.sleep(Duration::from_millis(1500)).await;
4358            assert!(
4359                count_running_tasks(&context, "engine") > 0,
4360                "engine tasks should still be running"
4361            );
4362
4363            // Shutdown engine and ensure children stop
4364            let running_after = if graceful {
4365                let metrics_context = context.clone();
4366                let result = context.stop(0, Some(Duration::from_secs(5))).await;
4367                assert!(
4368                    result.is_ok(),
4369                    "graceful shutdown should complete: {result:?}"
4370                );
4371                count_running_tasks(&metrics_context, "engine")
4372            } else {
4373                handle.abort();
4374                let _ = handle.await; // ensure parent tear-down runs
4375
4376                // Give the runtime a tick to process aborts
4377                context.sleep(Duration::from_millis(1000)).await;
4378                count_running_tasks(&context, "engine")
4379            };
4380            assert_eq!(
4381                running_after, 0,
4382                "all engine tasks should be stopped, but {running_after} still running"
4383            );
4384        });
4385    }
4386
4387    #[test_group("slow")]
4388    #[test_traced]
4389    fn test_children_shutdown_on_engine_abort() {
4390        for seed in 0..10 {
4391            engine_shutdown::<_, _, Random>(
4392                seed,
4393                bls12381_threshold_vrf::fixture::<MinPk, _>,
4394                false,
4395            );
4396            engine_shutdown::<_, _, Random>(
4397                seed,
4398                bls12381_threshold_vrf::fixture::<MinSig, _>,
4399                false,
4400            );
4401            engine_shutdown::<_, _, RoundRobin>(
4402                seed,
4403                bls12381_threshold_std::fixture::<MinPk, _>,
4404                false,
4405            );
4406            engine_shutdown::<_, _, RoundRobin>(
4407                seed,
4408                bls12381_threshold_std::fixture::<MinSig, _>,
4409                false,
4410            );
4411            engine_shutdown::<_, _, RoundRobin>(
4412                seed,
4413                bls12381_multisig::fixture::<MinPk, _>,
4414                false,
4415            );
4416            engine_shutdown::<_, _, RoundRobin>(
4417                seed,
4418                bls12381_multisig::fixture::<MinSig, _>,
4419                false,
4420            );
4421            engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, false);
4422            engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, false);
4423        }
4424    }
4425
4426    #[test_group("slow")]
4427    #[test_traced]
4428    fn test_graceful_shutdown() {
4429        for seed in 0..10 {
4430            engine_shutdown::<_, _, Random>(
4431                seed,
4432                bls12381_threshold_vrf::fixture::<MinPk, _>,
4433                true,
4434            );
4435            engine_shutdown::<_, _, Random>(
4436                seed,
4437                bls12381_threshold_vrf::fixture::<MinSig, _>,
4438                true,
4439            );
4440            engine_shutdown::<_, _, RoundRobin>(
4441                seed,
4442                bls12381_threshold_std::fixture::<MinPk, _>,
4443                true,
4444            );
4445            engine_shutdown::<_, _, RoundRobin>(
4446                seed,
4447                bls12381_threshold_std::fixture::<MinSig, _>,
4448                true,
4449            );
4450            engine_shutdown::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>, true);
4451            engine_shutdown::<_, _, RoundRobin>(
4452                seed,
4453                bls12381_multisig::fixture::<MinSig, _>,
4454                true,
4455            );
4456            engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, true);
4457            engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, true);
4458        }
4459    }
4460
4461    fn attributable_reporter_filtering<S, F, L>(mut fixture: F)
4462    where
4463        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4464        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4465        L: Elector<S>,
4466    {
4467        let n = 3;
4468        let required_containers = View::new(10);
4469        let activity_timeout = ViewDelta::new(10);
4470        let skip_timeout = ViewDelta::new(5);
4471        let namespace = b"consensus".to_vec();
4472        let executor = deterministic::Runner::timed(Duration::from_secs(30));
4473        executor.start(|mut context| async move {
4474            // Register participants
4475            let Fixture {
4476                participants,
4477                schemes,
4478                ..
4479            } = fixture(&mut context, &namespace, n);
4480            let mut oracle =
4481                start_test_network_with_peers(context.clone(), participants.clone(), false).await;
4482            let mut registrations = register_validators(&mut oracle, &participants).await;
4483
4484            // Link all validators
4485            let link = Link {
4486                latency: Duration::from_millis(10),
4487                jitter: Duration::from_millis(1),
4488                success_rate: 1.0,
4489            };
4490            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4491
4492            // Create engines with `AttributableReporter` wrapper
4493            let elector = L::default();
4494            let relay = Arc::new(mocks::relay::Relay::new());
4495            let mut reporters = Vec::new();
4496            for (idx, validator) in participants.iter().enumerate() {
4497                let context = context.with_label(&format!("validator_{}", *validator));
4498
4499                let reporter_config = mocks::reporter::Config {
4500                    participants: participants.clone().try_into().unwrap(),
4501                    scheme: schemes[idx].clone(),
4502                    elector: elector.clone(),
4503                };
4504                let mock_reporter = mocks::reporter::Reporter::new(
4505                    context.with_label("mock_reporter"),
4506                    reporter_config,
4507                );
4508
4509                // Wrap with `AttributableReporter`
4510                let attributable_reporter = scheme::reporter::AttributableReporter::new(
4511                    context.with_label("rng"),
4512                    schemes[idx].clone(),
4513                    mock_reporter.clone(),
4514                    Sequential,
4515                    true, // Enable verification
4516                );
4517                reporters.push(mock_reporter.clone());
4518
4519                let application_cfg = mocks::application::Config {
4520                    hasher: Sha256::default(),
4521                    relay: relay.clone(),
4522                    me: validator.clone(),
4523                    propose_latency: (10.0, 5.0),
4524                    verify_latency: (10.0, 5.0),
4525                    certify_latency: (10.0, 5.0),
4526                    should_certify: mocks::application::Certifier::Sometimes,
4527                };
4528                let (actor, application) = mocks::application::Application::new(
4529                    context.with_label("application"),
4530                    application_cfg,
4531                );
4532                actor.start();
4533                let blocker = oracle.control(validator.clone());
4534                let cfg = config::Config {
4535                    scheme: schemes[idx].clone(),
4536                    elector: elector.clone(),
4537                    blocker,
4538                    automaton: application.clone(),
4539                    relay: application.clone(),
4540                    reporter: attributable_reporter,
4541                    strategy: Sequential,
4542                    partition: validator.to_string(),
4543                    mailbox_size: 1024,
4544                    epoch: Epoch::new(333),
4545                    leader_timeout: Duration::from_secs(1),
4546                    certification_timeout: Duration::from_secs(2),
4547                    timeout_retry: Duration::from_secs(10),
4548                    fetch_timeout: Duration::from_secs(1),
4549                    activity_timeout,
4550                    skip_timeout,
4551                    fetch_concurrent: 4,
4552                    replay_buffer: NZUsize!(1024 * 1024),
4553                    write_buffer: NZUsize!(1024 * 1024),
4554                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4555                    forwarding: ForwardingPolicy::Disabled,
4556                };
4557                let engine = Engine::new(context.with_label("engine"), cfg);
4558
4559                // Start engine
4560                let (pending, recovered, resolver) = registrations
4561                    .remove(validator)
4562                    .expect("validator should be registered");
4563                engine.start(pending, recovered, resolver);
4564            }
4565
4566            // Wait for all engines to finish
4567            let mut finalizers = Vec::new();
4568            for reporter in reporters.iter_mut() {
4569                let (mut latest, mut monitor) = reporter.subscribe().await;
4570                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4571                    while latest < required_containers {
4572                        latest = monitor.recv().await.expect("event missing");
4573                    }
4574                }));
4575            }
4576            join_all(finalizers).await;
4577
4578            // Verify filtering behavior based on scheme attributability
4579            for reporter in reporters.iter() {
4580                // Ensure no faults (normal operation)
4581                reporter.assert_no_faults();
4582
4583                // Ensure no invalid signatures
4584                reporter.assert_no_invalid();
4585
4586                // Check that we have certificates reported
4587                {
4588                    let notarizations = reporter.notarizations.lock();
4589                    let finalizations = reporter.finalizations.lock();
4590                    assert!(
4591                        !notarizations.is_empty() || !finalizations.is_empty(),
4592                        "Certificates should be reported"
4593                    );
4594                }
4595
4596                // Check notarizes
4597                let notarizes = reporter.notarizes.lock();
4598                let last_view = notarizes.keys().max().cloned().unwrap_or_default();
4599                for (view, payloads) in notarizes.iter() {
4600                    if *view == last_view {
4601                        continue; // Skip last view
4602                    }
4603
4604                    let signers: usize = payloads.values().map(|signers| signers.len()).sum();
4605
4606                    // For attributable schemes, we should see peer activities
4607                    if S::is_attributable() {
4608                        assert!(signers > 1, "view {view}: {signers}");
4609                    } else {
4610                        // For non-attributable, we shouldn't see any peer activities
4611                        assert_eq!(signers, 0);
4612                    }
4613                }
4614
4615                // Check finalizes
4616                let finalizes = reporter.finalizes.lock();
4617                for (_, payloads) in finalizes.iter() {
4618                    let signers: usize = payloads.values().map(|signers| signers.len()).sum();
4619
4620                    // For attributable schemes, we should see peer activities
4621                    if S::is_attributable() {
4622                        assert!(signers > 1);
4623                    } else {
4624                        // For non-attributable, we shouldn't see any peer activities
4625                        assert_eq!(signers, 0);
4626                    }
4627                }
4628            }
4629
4630            // Ensure no blocked connections (normal operation)
4631            let blocked = oracle.blocked().await.unwrap();
4632            assert!(blocked.is_empty());
4633        });
4634    }
4635
4636    #[test_traced]
4637    fn test_attributable_reporter_filtering() {
4638        attributable_reporter_filtering::<_, _, Random>(
4639            bls12381_threshold_vrf::fixture::<MinPk, _>,
4640        );
4641        attributable_reporter_filtering::<_, _, Random>(
4642            bls12381_threshold_vrf::fixture::<MinSig, _>,
4643        );
4644        attributable_reporter_filtering::<_, _, RoundRobin>(
4645            bls12381_threshold_std::fixture::<MinPk, _>,
4646        );
4647        attributable_reporter_filtering::<_, _, RoundRobin>(
4648            bls12381_threshold_std::fixture::<MinSig, _>,
4649        );
4650        attributable_reporter_filtering::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4651        attributable_reporter_filtering::<_, _, RoundRobin>(
4652            bls12381_multisig::fixture::<MinSig, _>,
4653        );
4654        attributable_reporter_filtering::<_, _, RoundRobin>(ed25519::fixture);
4655        attributable_reporter_filtering::<_, _, RoundRobin>(secp256r1::fixture);
4656    }
4657
4658    fn split_views_no_lockup<S, F, L>(mut fixture: F)
4659    where
4660        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4661        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4662        L: Elector<S>,
4663    {
4664        // Scenario:
4665        // - View F: Finalization of B_1 seen by all participants.
4666        // - View F+1:
4667        //   - Nullification seen by honest (4..=6,7) and all 3 byzantines
4668        //   - Notarization of B_2A seen by honest (1..=3)
4669        // - View F+2:
4670        //   - Nullification seen by honest (1..=3,7) and all 3 byzantines
4671        //   - Notarization of B_2B seen by honest (4..=6)
4672        // - View F+3: Nullification. Seen by all participants.
4673        // - Then ensure progress resumes beyond F+3 after reconnecting
4674
4675        // Define participant types
4676        enum ParticipantType {
4677            Group1,    // receives notarization for f+1, nullification for f+2
4678            Group2,    // receives nullification for f+1, notarization for f+2
4679            Ignorant,  // receives nullification for f+1 and f+2
4680            Byzantine, // nullify-only
4681        }
4682        let get_type = |idx: usize| -> ParticipantType {
4683            match idx {
4684                0..3 => ParticipantType::Group1,
4685                3..6 => ParticipantType::Group2,
4686                6 => ParticipantType::Ignorant,
4687                7..10 => ParticipantType::Byzantine,
4688                _ => unreachable!(),
4689            }
4690        };
4691
4692        // Create context
4693        let n = 10;
4694        let quorum = quorum(n) as usize;
4695        assert_eq!(quorum, 7);
4696        let activity_timeout = ViewDelta::new(10);
4697        let skip_timeout = ViewDelta::new(5);
4698        let namespace = b"consensus".to_vec();
4699        let executor = deterministic::Runner::timed(Duration::from_secs(300));
4700        executor.start(|mut context| async move {
4701            // Register participants
4702            let Fixture {
4703                participants,
4704                schemes,
4705                ..
4706            } = fixture(&mut context, &namespace, n);
4707            let mut oracle =
4708                start_test_network_with_peers(context.clone(), participants.clone(), false).await;
4709            let mut registrations = register_validators(&mut oracle, &participants).await;
4710
4711            // ========== Build the certificates manually ==========
4712
4713            // Helper: assemble finalization from explicit signer indices
4714            let build_finalization = |proposal: &Proposal<D>| -> TFinalization<_, D> {
4715                let votes: Vec<_> = (0..=quorum)
4716                    .map(|i| TFinalize::sign(&schemes[i], proposal.clone()).unwrap())
4717                    .collect();
4718                TFinalization::from_finalizes(&schemes[0], &votes, &Sequential)
4719                    .expect("finalization quorum")
4720            };
4721            // Helper: assemble notarization from explicit signer indices
4722            let build_notarization = |proposal: &Proposal<D>| -> TNotarization<_, D> {
4723                let votes: Vec<_> = (0..=quorum)
4724                    .map(|i| TNotarize::sign(&schemes[i], proposal.clone()).unwrap())
4725                    .collect();
4726                TNotarization::from_notarizes(&schemes[0], &votes, &Sequential)
4727                    .expect("notarization quorum")
4728            };
4729            let build_nullification = |round: Round| -> TNullification<_> {
4730                let votes: Vec<_> = (0..=quorum)
4731                    .map(|i| TNullify::sign::<D>(&schemes[i], round).unwrap())
4732                    .collect();
4733                TNullification::from_nullifies(&schemes[0], &votes, &Sequential)
4734                    .expect("nullification quorum")
4735            };
4736            // Choose F=1 and construct B_1, B_2A, B_2B
4737            let f_view = 1;
4738            let round_f = Round::new(Epoch::new(333), View::new(f_view));
4739            let payload_b0 = Sha256::hash(b"B_F");
4740            let proposal_b0 = Proposal::new(round_f, View::new(f_view - 1), payload_b0);
4741            let payload_b1a = Sha256::hash(b"B_G1");
4742            let proposal_b1a = Proposal::new(
4743                Round::new(Epoch::new(333), View::new(f_view + 1)),
4744                View::new(f_view),
4745                payload_b1a,
4746            );
4747            let payload_b1b = Sha256::hash(b"B_G2");
4748            let proposal_b1b = Proposal::new(
4749                Round::new(Epoch::new(333), View::new(f_view + 2)),
4750                View::new(f_view),
4751                payload_b1b,
4752            );
4753
4754            // Build notarization and finalization for the first block
4755            let b0_notarization = build_notarization(&proposal_b0);
4756            let b0_finalization = build_finalization(&proposal_b0);
4757            // Build notarizations for F+1 and F+2
4758            let b1a_notarization = build_notarization(&proposal_b1a);
4759            let b1b_notarization = build_notarization(&proposal_b1b);
4760            // Build nullifications for F+1 and F+2
4761            let null_a = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 1)));
4762            let null_b = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 2)));
4763
4764            // Create an 11th non-participant injector and obtain senders
4765            let injector_pk = PrivateKey::from_seed(1_000_000).public_key();
4766            let (mut injector_sender, _inj_certificate_receiver) = oracle
4767                .control(injector_pk.clone())
4768                .register(1, TEST_QUOTA)
4769                .await
4770                .unwrap();
4771
4772            // Create minimal one-way links from injector to all participants (not full mesh)
4773            let link = Link {
4774                latency: Duration::from_millis(10),
4775                jitter: Duration::from_millis(0),
4776                success_rate: 1.0,
4777            };
4778            for p in participants.iter() {
4779                oracle
4780                    .add_link(injector_pk.clone(), p.clone(), link.clone())
4781                    .await
4782                    .unwrap();
4783            }
4784            oracle
4785                .manager()
4786                .track(
4787                    1,
4788                    TrackedPeers::new(
4789                        Set::from_iter_dedup(participants.iter().cloned()),
4790                        Set::from_iter_dedup(std::slice::from_ref(&injector_pk).iter().cloned()),
4791                    ),
4792                )
4793                .await;
4794            context.sleep(Duration::from_millis(10)).await;
4795
4796            // ========== Broadcast certificates over recovered network. ==========
4797
4798            // View F:
4799            let msg = Certificate::<_, D>::Notarization(b0_notarization).encode();
4800            injector_sender
4801                .send(Recipients::All, msg, true)
4802                .await
4803                .unwrap();
4804            let msg = Certificate::<_, D>::Finalization(b0_finalization).encode();
4805            injector_sender
4806                .send(Recipients::All, msg, true)
4807                .await
4808                .unwrap();
4809            // View F+1:
4810            let notarization_msg = Certificate::<_, D>::Notarization(b1a_notarization);
4811            let nullification_msg = Certificate::<_, D>::Nullification(null_a.clone());
4812            for (i, participant) in participants.iter().enumerate() {
4813                let recipient = Recipients::One(participant.clone());
4814                let msg = match get_type(i) {
4815                    ParticipantType::Group1 => notarization_msg.encode(),
4816                    _ => nullification_msg.encode(),
4817                };
4818                injector_sender.send(recipient, msg, true).await.unwrap();
4819            }
4820            // View F+2:
4821            let notarization_msg = Certificate::<_, D>::Notarization(b1b_notarization);
4822            let nullification_msg = Certificate::<_, D>::Nullification(null_b.clone());
4823            for (i, participant) in participants.iter().enumerate() {
4824                let recipient = Recipients::One(participant.clone());
4825                let msg = match get_type(i) {
4826                    ParticipantType::Group2 => notarization_msg.encode(),
4827                    _ => nullification_msg.encode(),
4828                };
4829                injector_sender.send(recipient, msg, true).await.unwrap();
4830            }
4831
4832            // ========== Create engines ==========
4833
4834            // Start engines after preloading certificates into each participant's
4835            // recovered channel (ensuring processing before any leader attempts to issue a
4836            // conflicting vote).
4837            let elector = L::default();
4838            let relay = Arc::new(mocks::relay::Relay::new());
4839            let mut honest_reporters = Vec::new();
4840            for (idx, validator) in participants.iter().enumerate() {
4841                let (pending, recovered, resolver) = registrations
4842                    .remove(validator)
4843                    .expect("validator should be registered");
4844                let participant_type = get_type(idx);
4845                if matches!(participant_type, ParticipantType::Byzantine) {
4846                    // Byzantine engines
4847                    let cfg = mocks::nullify_only::Config {
4848                        scheme: schemes[idx].clone(),
4849                    };
4850                    let engine: mocks::nullify_only::NullifyOnly<_, _, Sha256> =
4851                        mocks::nullify_only::NullifyOnly::new(
4852                            context.with_label(&format!("byzantine_{}", *validator)),
4853                            cfg,
4854                        );
4855                    engine.start(pending);
4856                    // Recovered/resolver channels are unused for byzantine actors.
4857                    drop(recovered);
4858                    drop(resolver);
4859                } else {
4860                    // Honest engines
4861                    let reporter_config = mocks::reporter::Config {
4862                        participants: participants.clone().try_into().unwrap(),
4863                        scheme: schemes[idx].clone(),
4864                        elector: elector.clone(),
4865                    };
4866                    let reporter = mocks::reporter::Reporter::new(
4867                        context.with_label(&format!("reporter_{}", *validator)),
4868                        reporter_config,
4869                    );
4870                    honest_reporters.push(reporter.clone());
4871
4872                    let application_cfg = mocks::application::Config {
4873                        hasher: Sha256::default(),
4874                        relay: relay.clone(),
4875                        me: validator.clone(),
4876                        propose_latency: (250.0, 50.0), // ensure we process certificates first
4877                        verify_latency: (10.0, 5.0),
4878                        certify_latency: (10.0, 5.0),
4879                        should_certify: mocks::application::Certifier::Sometimes,
4880                    };
4881                    let (actor, application) = mocks::application::Application::new(
4882                        context.with_label(&format!("application_{}", *validator)),
4883                        application_cfg,
4884                    );
4885                    actor.start();
4886                    let blocker = oracle.control(validator.clone());
4887                    let cfg = config::Config {
4888                        scheme: schemes[idx].clone(),
4889                        elector: elector.clone(),
4890                        blocker,
4891                        automaton: application.clone(),
4892                        relay: application.clone(),
4893                        reporter: reporter.clone(),
4894                        strategy: Sequential,
4895                        partition: validator.to_string(),
4896                        mailbox_size: 1024,
4897                        epoch: Epoch::new(333),
4898                        leader_timeout: Duration::from_secs(10),
4899                        certification_timeout: Duration::from_secs(10),
4900                        timeout_retry: Duration::from_secs(10),
4901                        fetch_timeout: Duration::from_secs(1),
4902                        activity_timeout,
4903                        skip_timeout,
4904                        fetch_concurrent: 4,
4905                        replay_buffer: NZUsize!(1024 * 1024),
4906                        write_buffer: NZUsize!(1024 * 1024),
4907                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4908                        forwarding: ForwardingPolicy::Disabled,
4909                    };
4910                    let engine =
4911                        Engine::new(context.with_label(&format!("engine_{}", *validator)), cfg);
4912                    engine.start(pending, recovered, resolver);
4913                }
4914            }
4915
4916            // Allow started engines to consume preloaded certificates.
4917            context.sleep(Duration::from_secs(2)).await;
4918
4919            // ========== Assert the exact certificates are seen in each view ==========
4920
4921            // Assert the exact certificates in view F
4922            // All participants should have finalized B_0
4923            let view = View::new(f_view);
4924            for reporter in honest_reporters.iter() {
4925                let finalizations = reporter.finalizations.lock();
4926                assert!(finalizations.contains_key(&view));
4927            }
4928
4929            // Assert the exact certificates in view F+1
4930            // Group 1 should have notarized B_1A only
4931            // All other participants should have nullified F+1
4932            let view = View::new(f_view + 1);
4933            for (i, reporter) in honest_reporters.iter().enumerate() {
4934                let finalizations = reporter.finalizations.lock();
4935                assert!(!finalizations.contains_key(&view));
4936                let nullifications = reporter.nullifications.lock();
4937                let notarizations = reporter.notarizations.lock();
4938                match get_type(i) {
4939                    ParticipantType::Group1 => {
4940                        assert!(notarizations.contains_key(&view));
4941                        assert!(!nullifications.contains_key(&view));
4942                    }
4943                    _ => {
4944                        assert!(nullifications.contains_key(&view));
4945                        assert!(!notarizations.contains_key(&view));
4946                    }
4947                }
4948            }
4949
4950            // Assert the exact certificates in view F+2
4951            // Group 2 should have notarized B_1B only
4952            // All other participants should have nullified F+2
4953            let view = View::new(f_view + 2);
4954            for (i, reporter) in honest_reporters.iter().enumerate() {
4955                let finalizations = reporter.finalizations.lock();
4956                assert!(!finalizations.contains_key(&view));
4957                let nullifications = reporter.nullifications.lock();
4958                let notarizations = reporter.notarizations.lock();
4959                match get_type(i) {
4960                    ParticipantType::Group2 => {
4961                        assert!(notarizations.contains_key(&view));
4962                        assert!(!nullifications.contains_key(&view));
4963                    }
4964                    _ => {
4965                        assert!(nullifications.contains_key(&view));
4966                        assert!(!notarizations.contains_key(&view));
4967                    }
4968                }
4969            }
4970
4971            // Assert no members have yet nullified view F+3
4972            let next_view = View::new(f_view + 3);
4973            for (i, reporter) in honest_reporters.iter().enumerate() {
4974                let nullifies = reporter.nullifies.lock();
4975                assert!(!nullifies.contains_key(&next_view), "reporter {i}");
4976            }
4977
4978            // ========== Reconnect all participants ==========
4979
4980            // Reconnect all participants fully using the helper
4981            link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
4982
4983            // Wait until all honest reporters finalize strictly past F+2 (e.g., at least F+3)
4984            {
4985                let target = View::new(f_view + 3);
4986                let mut finalizers = Vec::new();
4987                for reporter in honest_reporters.iter_mut() {
4988                    let (mut latest, mut monitor) = reporter.subscribe().await;
4989                    finalizers.push(context.with_label("resume_finalizer").spawn(
4990                        move |_| async move {
4991                            while latest < target {
4992                                latest = monitor.recv().await.expect("event missing");
4993                            }
4994                        },
4995                    ));
4996                }
4997                join_all(finalizers).await;
4998            }
4999
5000            // Sanity checks: no faults/invalid signatures, and no peers blocked
5001            for reporter in honest_reporters.iter() {
5002                reporter.assert_no_faults();
5003                reporter.assert_no_invalid();
5004            }
5005            let blocked = oracle.blocked().await.unwrap();
5006            assert!(blocked.is_empty(), "blocked peers: {blocked:?}");
5007        });
5008    }
5009
5010    #[test_group("slow")]
5011    #[test_traced]
5012    fn test_split_views_no_lockup() {
5013        split_views_no_lockup::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
5014        split_views_no_lockup::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
5015        split_views_no_lockup::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
5016        split_views_no_lockup::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
5017        split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
5018        split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
5019        split_views_no_lockup::<_, _, RoundRobin>(ed25519::fixture);
5020        split_views_no_lockup::<_, _, RoundRobin>(secp256r1::fixture);
5021    }
5022
5023    fn tle<V, L>()
5024    where
5025        V: Variant,
5026        L: Elector<bls12381_threshold_vrf::Scheme<PublicKey, V>>,
5027    {
5028        // Create context
5029        let n = 4;
5030        let namespace = b"consensus".to_vec();
5031        let activity_timeout = ViewDelta::new(100);
5032        let skip_timeout = ViewDelta::new(50);
5033        let executor = deterministic::Runner::timed(Duration::from_secs(30));
5034        executor.start(|mut context| async move {
5035            // Register participants
5036            let Fixture {
5037                participants,
5038                schemes,
5039                ..
5040            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, &namespace, n);
5041            let mut oracle =
5042                start_test_network_with_peers(context.clone(), participants.clone(), true).await;
5043            let mut registrations = register_validators(&mut oracle, &participants).await;
5044
5045            // Link all validators
5046            let link = Link {
5047                latency: Duration::from_millis(10),
5048                jitter: Duration::from_millis(5),
5049                success_rate: 1.0,
5050            };
5051            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5052
5053            // Create engines and reporters
5054            let elector = L::default();
5055            let relay = Arc::new(mocks::relay::Relay::new());
5056            let mut reporters = Vec::new();
5057            let mut engine_handlers = Vec::new();
5058            let monitor_reporter = Arc::new(Mutex::new(None));
5059            for (idx, validator) in participants.iter().enumerate() {
5060                // Create scheme context
5061                let context = context.with_label(&format!("validator_{}", *validator));
5062
5063                // Store first reporter for monitoring
5064                let reporter_config = mocks::reporter::Config {
5065                    participants: participants.clone().try_into().unwrap(),
5066                    scheme: schemes[idx].clone(),
5067                    elector: elector.clone(),
5068                };
5069                let reporter =
5070                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5071                reporters.push(reporter.clone());
5072                if idx == 0 {
5073                    *monitor_reporter.lock() = Some(reporter.clone());
5074                }
5075
5076                // Configure application
5077                let application_cfg = mocks::application::Config {
5078                    hasher: Sha256::default(),
5079                    relay: relay.clone(),
5080                    me: validator.clone(),
5081                    propose_latency: (10.0, 5.0),
5082                    verify_latency: (10.0, 5.0),
5083                    certify_latency: (10.0, 5.0),
5084                    should_certify: mocks::application::Certifier::Sometimes,
5085                };
5086                let (actor, application) = mocks::application::Application::new(
5087                    context.with_label("application"),
5088                    application_cfg,
5089                );
5090                actor.start();
5091                let blocker = oracle.control(validator.clone());
5092                let cfg = config::Config {
5093                    scheme: schemes[idx].clone(),
5094                    elector: elector.clone(),
5095                    blocker,
5096                    automaton: application.clone(),
5097                    relay: application.clone(),
5098                    reporter: reporter.clone(),
5099                    strategy: Sequential,
5100                    partition: validator.to_string(),
5101                    mailbox_size: 1024,
5102                    epoch: Epoch::new(333),
5103                    leader_timeout: Duration::from_millis(100),
5104                    certification_timeout: Duration::from_millis(200),
5105                    timeout_retry: Duration::from_millis(500),
5106                    fetch_timeout: Duration::from_millis(100),
5107                    activity_timeout,
5108                    skip_timeout,
5109                    fetch_concurrent: 4,
5110                    replay_buffer: NZUsize!(1024 * 1024),
5111                    write_buffer: NZUsize!(1024 * 1024),
5112                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5113                    forwarding: ForwardingPolicy::Disabled,
5114                };
5115                let engine = Engine::new(context.with_label("engine"), cfg);
5116
5117                // Start engine
5118                let (pending, recovered, resolver) = registrations
5119                    .remove(validator)
5120                    .expect("validator should be registered");
5121                engine_handlers.push(engine.start(pending, recovered, resolver));
5122            }
5123
5124            // Prepare TLE test data
5125            let target = Round::new(Epoch::new(333), View::new(10)); // Encrypt for round (epoch 333, view 10)
5126            let message = b"Secret message for future view10"; // 32 bytes
5127
5128            // Encrypt message
5129            let ciphertext = schemes[0].encrypt(&mut context, target, *message);
5130
5131            // Wait for consensus to reach the target view and then decrypt
5132            let reporter = monitor_reporter.lock().clone().unwrap();
5133            loop {
5134                // Wait for notarization
5135                context.sleep(Duration::from_millis(100)).await;
5136                let notarizations = reporter.notarizations.lock();
5137                let Some(notarization) = notarizations.get(&target.view()) else {
5138                    continue;
5139                };
5140
5141                // Decrypt the message using the seed
5142                let seed = notarization.seed();
5143                let decrypted = seed
5144                    .decrypt(&ciphertext)
5145                    .expect("Decryption should succeed with valid seed signature");
5146                assert_eq!(
5147                    message,
5148                    decrypted.as_ref(),
5149                    "Decrypted message should match original message"
5150                );
5151                break;
5152            }
5153        });
5154    }
5155
5156    #[test_traced]
5157    fn test_tle() {
5158        tle::<MinPk, Random>();
5159        tle::<MinSig, Random>();
5160    }
5161
5162    fn hailstorm<S, F, L>(
5163        seed: u64,
5164        shutdowns: usize,
5165        interval: ViewDelta,
5166        mut fixture: F,
5167    ) -> String
5168    where
5169        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5170        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5171        L: Elector<S>,
5172    {
5173        // Create context
5174        let n = 5;
5175        let activity_timeout = ViewDelta::new(10);
5176        let skip_timeout = ViewDelta::new(5);
5177        let namespace = b"consensus".to_vec();
5178        let cfg = deterministic::Config::new().with_seed(seed);
5179        let executor = deterministic::Runner::new(cfg);
5180        executor.start(|mut context| async move {
5181            // Register participants
5182            let Fixture {
5183                participants,
5184                schemes,
5185                ..
5186            } = fixture(&mut context, &namespace, n);
5187            let mut oracle =
5188                start_test_network_with_peers(context.clone(), participants.clone(), true).await;
5189            let mut registrations = register_validators(&mut oracle, &participants).await;
5190
5191            // Link all validators
5192            let link = Link {
5193                latency: Duration::from_millis(10),
5194                jitter: Duration::from_millis(1),
5195                success_rate: 1.0,
5196            };
5197            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5198
5199            // Create engines
5200            let elector = L::default();
5201            let relay = Arc::new(mocks::relay::Relay::new());
5202            let mut reporters = BTreeMap::new();
5203            let mut engine_handlers = BTreeMap::new();
5204            for (idx, validator) in participants.iter().enumerate() {
5205                // Create scheme context
5206                let context = context.with_label(&format!("validator_{}", *validator));
5207
5208                // Configure engine
5209                let reporter_config = mocks::reporter::Config {
5210                    participants: participants.clone().try_into().unwrap(),
5211                    scheme: schemes[idx].clone(),
5212                    elector: elector.clone(),
5213                };
5214                let reporter =
5215                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5216                reporters.insert(idx, reporter.clone());
5217                let application_cfg = mocks::application::Config {
5218                    hasher: Sha256::default(),
5219                    relay: relay.clone(),
5220                    me: validator.clone(),
5221                    propose_latency: (10.0, 5.0),
5222                    verify_latency: (10.0, 5.0),
5223                    certify_latency: (10.0, 5.0),
5224                    should_certify: mocks::application::Certifier::Sometimes,
5225                };
5226                let (actor, application) = mocks::application::Application::new(
5227                    context.with_label("application"),
5228                    application_cfg,
5229                );
5230                actor.start();
5231                let blocker = oracle.control(validator.clone());
5232                let cfg = config::Config {
5233                    scheme: schemes[idx].clone(),
5234                    elector: elector.clone(),
5235                    blocker,
5236                    automaton: application.clone(),
5237                    relay: application.clone(),
5238                    reporter: reporter.clone(),
5239                    strategy: Sequential,
5240                    partition: validator.to_string(),
5241                    mailbox_size: 1024,
5242                    epoch: Epoch::new(333),
5243                    leader_timeout: Duration::from_secs(1),
5244                    certification_timeout: Duration::from_secs(2),
5245                    timeout_retry: Duration::from_secs(10),
5246                    fetch_timeout: Duration::from_secs(1),
5247                    activity_timeout,
5248                    skip_timeout,
5249                    fetch_concurrent: 4,
5250                    replay_buffer: NZUsize!(1024 * 1024),
5251                    write_buffer: NZUsize!(1024 * 1024),
5252                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5253                    forwarding: ForwardingPolicy::Disabled,
5254                };
5255                let engine = Engine::new(context.with_label("engine"), cfg);
5256
5257                // Start engine
5258                let (pending, recovered, resolver) = registrations
5259                    .remove(validator)
5260                    .expect("validator should be registered");
5261                engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
5262            }
5263
5264            // Run shutdowns
5265            let mut target = View::zero();
5266            for i in 0..shutdowns {
5267                // Update target
5268                target = target.saturating_add(interval);
5269
5270                // Wait for all engines to finish
5271                let mut finalizers = Vec::new();
5272                for (_, reporter) in reporters.iter_mut() {
5273                    let (mut latest, mut monitor) = reporter.subscribe().await;
5274                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5275                        while latest < target {
5276                            latest = monitor.recv().await.expect("event missing");
5277                        }
5278                    }));
5279                }
5280                join_all(finalizers).await;
5281                target = target.saturating_add(interval);
5282
5283                // Select a random engine to shutdown
5284                let idx = context.gen_range(0..engine_handlers.len());
5285                let validator = &participants[idx];
5286                let handle = engine_handlers.remove(&idx).unwrap();
5287                handle.abort();
5288                let _ = handle.await;
5289                let selected_reporter = reporters.remove(&idx).unwrap();
5290                info!(idx, ?validator, "shutdown validator");
5291
5292                // Wait for all engines to finish
5293                let mut finalizers = Vec::new();
5294                for (_, reporter) in reporters.iter_mut() {
5295                    let (mut latest, mut monitor) = reporter.subscribe().await;
5296                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5297                        while latest < target {
5298                            latest = monitor.recv().await.expect("event missing");
5299                        }
5300                    }));
5301                }
5302                join_all(finalizers).await;
5303                target = target.saturating_add(interval);
5304
5305                // Recreate engine
5306                info!(idx, ?validator, "restarting validator");
5307                let context =
5308                    context.with_label(&format!("validator_{}_restarted_{}", *validator, i));
5309
5310                // Start engine
5311                let (pending, recovered, resolver) =
5312                    register_validator(&mut oracle, validator.clone()).await;
5313                let application_cfg = mocks::application::Config {
5314                    hasher: Sha256::default(),
5315                    relay: relay.clone(),
5316                    me: validator.clone(),
5317                    propose_latency: (10.0, 5.0),
5318                    verify_latency: (10.0, 5.0),
5319                    certify_latency: (10.0, 5.0),
5320                    should_certify: mocks::application::Certifier::Sometimes,
5321                };
5322                let (actor, application) = mocks::application::Application::new(
5323                    context.with_label("application"),
5324                    application_cfg,
5325                );
5326                actor.start();
5327                reporters.insert(idx, selected_reporter.clone());
5328                let blocker = oracle.control(validator.clone());
5329                let cfg = config::Config {
5330                    scheme: schemes[idx].clone(),
5331                    elector: elector.clone(),
5332                    blocker,
5333                    automaton: application.clone(),
5334                    relay: application.clone(),
5335                    reporter: selected_reporter,
5336                    strategy: Sequential,
5337                    partition: validator.to_string(),
5338                    mailbox_size: 1024,
5339                    epoch: Epoch::new(333),
5340                    leader_timeout: Duration::from_secs(1),
5341                    certification_timeout: Duration::from_secs(2),
5342                    timeout_retry: Duration::from_secs(10),
5343                    fetch_timeout: Duration::from_secs(1),
5344                    activity_timeout,
5345                    skip_timeout,
5346                    fetch_concurrent: 4,
5347                    replay_buffer: NZUsize!(1024 * 1024),
5348                    write_buffer: NZUsize!(1024 * 1024),
5349                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5350                    forwarding: ForwardingPolicy::Disabled,
5351                };
5352                let engine = Engine::new(context.with_label("engine"), cfg);
5353                engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
5354
5355                // Wait for all engines to hit required containers
5356                let mut finalizers = Vec::new();
5357                for (_, reporter) in reporters.iter_mut() {
5358                    let (mut latest, mut monitor) = reporter.subscribe().await;
5359                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5360                        while latest < target {
5361                            latest = monitor.recv().await.expect("event missing");
5362                        }
5363                    }));
5364                }
5365                join_all(finalizers).await;
5366                info!(idx, ?validator, "validator recovered");
5367            }
5368
5369            // Check reporters for correct activity
5370            let latest_complete = target.saturating_sub(activity_timeout);
5371            for (_, reporter) in reporters.iter() {
5372                // Ensure no faults
5373                reporter.assert_no_faults();
5374
5375                // Ensure no invalid signatures
5376                reporter.assert_no_invalid();
5377
5378                // Ensure no forks
5379                let mut notarized = HashMap::new();
5380                let mut finalized = HashMap::new();
5381                {
5382                    let notarizes = reporter.notarizes.lock();
5383                    for view in View::range(View::new(1), latest_complete) {
5384                        // Ensure only one payload proposed per view
5385                        let Some(payloads) = notarizes.get(&view) else {
5386                            continue;
5387                        };
5388                        if payloads.len() > 1 {
5389                            panic!("view: {view}");
5390                        }
5391                        let (digest, _) = payloads.iter().next().unwrap();
5392                        notarized.insert(view, *digest);
5393                    }
5394                }
5395                {
5396                    let notarizations = reporter.notarizations.lock();
5397                    for view in View::range(View::new(1), latest_complete) {
5398                        // Ensure notarization matches digest from notarizes
5399                        let Some(notarization) = notarizations.get(&view) else {
5400                            continue;
5401                        };
5402                        let Some(digest) = notarized.get(&view) else {
5403                            continue;
5404                        };
5405                        assert_eq!(&notarization.proposal.payload, digest);
5406                    }
5407                }
5408                {
5409                    let finalizes = reporter.finalizes.lock();
5410                    for view in View::range(View::new(1), latest_complete) {
5411                        // Ensure only one payload proposed per view
5412                        let Some(payloads) = finalizes.get(&view) else {
5413                            continue;
5414                        };
5415                        if payloads.len() > 1 {
5416                            panic!("view: {view}");
5417                        }
5418                        let (digest, _) = payloads.iter().next().unwrap();
5419                        finalized.insert(view, *digest);
5420
5421                        // Only check at views below timeout
5422                        if view > latest_complete {
5423                            continue;
5424                        }
5425
5426                        // Ensure no nullifies for any finalizers
5427                        let nullifies = reporter.nullifies.lock();
5428                        let Some(nullifies) = nullifies.get(&view) else {
5429                            continue;
5430                        };
5431                        for (_, finalizers) in payloads.iter() {
5432                            for finalizer in finalizers.iter() {
5433                                if nullifies.contains(finalizer) {
5434                                    panic!("should not nullify and finalize at same view");
5435                                }
5436                            }
5437                        }
5438                    }
5439                }
5440                {
5441                    let finalizations = reporter.finalizations.lock();
5442                    for view in View::range(View::new(1), latest_complete) {
5443                        // Ensure finalization matches digest from finalizes
5444                        let Some(finalization) = finalizations.get(&view) else {
5445                            continue;
5446                        };
5447                        let Some(digest) = finalized.get(&view) else {
5448                            continue;
5449                        };
5450                        assert_eq!(&finalization.proposal.payload, digest);
5451                    }
5452                }
5453            }
5454
5455            // Ensure no blocked connections
5456            let blocked = oracle.blocked().await.unwrap();
5457            assert!(blocked.is_empty());
5458
5459            // Return state for audit
5460            context.auditor().state()
5461        })
5462    }
5463
5464    #[test_group("slow")]
5465    #[test_traced]
5466    fn test_hailstorm_bls12381_threshold_vrf_min_pk() {
5467        assert_eq!(
5468            hailstorm::<_, _, Random>(
5469                0,
5470                10,
5471                ViewDelta::new(15),
5472                bls12381_threshold_vrf::fixture::<MinPk, _>
5473            ),
5474            hailstorm::<_, _, Random>(
5475                0,
5476                10,
5477                ViewDelta::new(15),
5478                bls12381_threshold_vrf::fixture::<MinPk, _>
5479            ),
5480        );
5481    }
5482
5483    #[test_group("slow")]
5484    #[test_traced]
5485    fn test_hailstorm_bls12381_threshold_vrf_min_sig() {
5486        assert_eq!(
5487            hailstorm::<_, _, Random>(
5488                0,
5489                10,
5490                ViewDelta::new(15),
5491                bls12381_threshold_vrf::fixture::<MinSig, _>
5492            ),
5493            hailstorm::<_, _, Random>(
5494                0,
5495                10,
5496                ViewDelta::new(15),
5497                bls12381_threshold_vrf::fixture::<MinSig, _>
5498            ),
5499        );
5500    }
5501
5502    #[test_group("slow")]
5503    #[test_traced]
5504    fn test_hailstorm_bls12381_threshold_std_min_pk() {
5505        assert_eq!(
5506            hailstorm::<_, _, RoundRobin>(
5507                0,
5508                10,
5509                ViewDelta::new(15),
5510                bls12381_threshold_std::fixture::<MinPk, _>
5511            ),
5512            hailstorm::<_, _, RoundRobin>(
5513                0,
5514                10,
5515                ViewDelta::new(15),
5516                bls12381_threshold_std::fixture::<MinPk, _>
5517            ),
5518        );
5519    }
5520
5521    #[test_group("slow")]
5522    #[test_traced]
5523    fn test_hailstorm_bls12381_threshold_std_min_sig() {
5524        assert_eq!(
5525            hailstorm::<_, _, RoundRobin>(
5526                0,
5527                10,
5528                ViewDelta::new(15),
5529                bls12381_threshold_std::fixture::<MinSig, _>
5530            ),
5531            hailstorm::<_, _, RoundRobin>(
5532                0,
5533                10,
5534                ViewDelta::new(15),
5535                bls12381_threshold_std::fixture::<MinSig, _>
5536            ),
5537        );
5538    }
5539
5540    #[test_group("slow")]
5541    #[test_traced]
5542    fn test_hailstorm_bls12381_multisig_min_pk() {
5543        assert_eq!(
5544            hailstorm::<_, _, RoundRobin>(
5545                0,
5546                10,
5547                ViewDelta::new(15),
5548                bls12381_multisig::fixture::<MinPk, _>
5549            ),
5550            hailstorm::<_, _, RoundRobin>(
5551                0,
5552                10,
5553                ViewDelta::new(15),
5554                bls12381_multisig::fixture::<MinPk, _>
5555            ),
5556        );
5557    }
5558
5559    #[test_group("slow")]
5560    #[test_traced]
5561    fn test_hailstorm_bls12381_multisig_min_sig() {
5562        assert_eq!(
5563            hailstorm::<_, _, RoundRobin>(
5564                0,
5565                10,
5566                ViewDelta::new(15),
5567                bls12381_multisig::fixture::<MinSig, _>
5568            ),
5569            hailstorm::<_, _, RoundRobin>(
5570                0,
5571                10,
5572                ViewDelta::new(15),
5573                bls12381_multisig::fixture::<MinSig, _>
5574            ),
5575        );
5576    }
5577
5578    #[test_group("slow")]
5579    #[test_traced]
5580    fn test_hailstorm_ed25519() {
5581        assert_eq!(
5582            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture),
5583            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture)
5584        );
5585    }
5586
5587    #[test_group("slow")]
5588    #[test_traced]
5589    fn test_hailstorm_secp256r1() {
5590        assert_eq!(
5591            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture),
5592            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture)
5593        );
5594    }
5595
5596    /// Configuration for a Twins testing campaign.
5597    ///
5598    /// A campaign generates adversarial primary/secondary recipient-set
5599    /// scenarios, splits Byzantine participants into twin halves, and verifies
5600    /// that honest nodes still finalize blocks after the adversarial prefix
5601    /// ends.
5602    ///
5603    /// # Fields
5604    ///
5605    /// - `n`: Total participants. The number of faults is derived as
5606    ///   `N3f1::max_faults(n)`. Larger `n` increases the per-scenario
5607    ///   compromised-set space but also makes each case slower to execute.
5608    ///
5609    /// - `rounds`: Number of adversarial rounds that form the attack prefix.
5610    ///   Each round independently places participants relative to the primary
5611    ///   and secondary recipient sets: outside both, both-halves,
5612    ///   primary-only, or secondary-only. The two recipient sets may overlap;
5613    ///   a participant in `both-halves` is visible to both twins in that view.
5614    ///   After these rounds, the network becomes fully synchronous. More
5615    ///   rounds exponentially increase the canonical scenario space.
5616    ///
5617    /// - `mode`: How multi-round scenarios are constructed. `Sampled` picks
5618    ///   independent recipient sets per round; `Sustained` repeats a single
5619    ///   recipient-set pattern across all rounds (modeling a persistent
5620    ///   adversarial split).
5621    ///
5622    /// - `max_cases`: Upper bound on the total emitted cases. Each case is a
5623    ///   (scenario, compromised-assignment) pair. Also caps scenario
5624    ///   enumeration (sampling uniformly when the space is larger). Cases
5625    ///   are shuffled and truncated to this budget.
5626    ///
5627    /// - `trailing_finalizations`: Number of finalizations each honest node
5628    ///   must produce *after* the adversarial prefix before the case is
5629    ///   considered successful. This is the liveness assertion -- it ensures
5630    ///   the protocol actually commits blocks under synchrony, not just
5631    ///   reaches a high view via nullifications.
5632    #[derive(Clone, Copy, Debug)]
5633    struct TwinsCampaign {
5634        n: u32,
5635        rounds: usize,
5636        mode: twins::Mode,
5637        max_cases: usize,
5638        trailing_finalizations: usize,
5639    }
5640
5641    fn twins_campaign<S, F, L>(
5642        rng: &mut StdRng,
5643        campaign: TwinsCampaign,
5644        link: Link,
5645        mut fixture: F,
5646    ) where
5647        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5648        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5649        L: Elector<S>,
5650    {
5651        let n = campaign.n;
5652        let faults = N3f1::max_faults(n) as usize;
5653        let cases = twins::cases(
5654            rng,
5655            twins::Framework {
5656                participants: n as usize,
5657                faults,
5658                rounds: campaign.rounds,
5659                mode: campaign.mode,
5660                max_cases: campaign.max_cases,
5661            },
5662        );
5663        assert!(
5664            !cases.is_empty(),
5665            "twins campaign should generate at least one case"
5666        );
5667        for case in cases {
5668            let scenario = case.scenario.clone();
5669            let twin_indices = case.compromised.clone();
5670            assert_eq!(
5671                twin_indices.len(),
5672                faults,
5673                "unexpected twins count for n={n} (expected f={faults})",
5674            );
5675
5676            let activity_timeout = ViewDelta::new(10);
5677            let skip_timeout = ViewDelta::new(5);
5678            let namespace = b"consensus".to_vec();
5679            let link = link.clone();
5680            let trailing_finalizations = campaign.trailing_finalizations;
5681            let mut case_fixture =
5682                |ctx: &mut deterministic::Context, ns: &[u8], n: u32| fixture(ctx, ns, n);
5683            let cfg = deterministic::Config::new()
5684                .with_rng(Box::new(StdRng::from_rng(&mut *rng).unwrap()));
5685            let executor = deterministic::Runner::new(cfg);
5686            executor.start(|mut context| async move {
5687                let Fixture {
5688                    participants,
5689                    schemes,
5690                    ..
5691                } = case_fixture(&mut context, &namespace, n);
5692                let participants: Arc<[_]> = participants.into();
5693                let mut oracle = start_test_network_with_peers(
5694                    context.clone(),
5695                    participants.iter().cloned(),
5696                    false,
5697                )
5698                .await;
5699                let mut registrations = register_validators(&mut oracle, &participants).await;
5700                link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5701
5702                let elector = TwinsElector::new(L::default(), &scenario, n as usize);
5703                let relay = Arc::new(mocks::relay::Relay::new());
5704                let mut reporters = Vec::new();
5705                let mut engine_handlers = Vec::new();
5706                let twin_index_set: HashSet<usize> = twin_indices.iter().copied().collect();
5707
5708                // Create twin engines (f Byzantine twins).
5709                for idx in twin_indices.iter().copied() {
5710                    let validator = &participants[idx];
5711                    let (
5712                        (vote_sender, vote_receiver),
5713                        (certificate_sender, certificate_receiver),
5714                        (_resolver_sender, _resolver_receiver),
5715                    ) = registrations
5716                        .remove(validator)
5717                        .expect("validator should be registered");
5718
5719                    let make_vote_forwarder = || {
5720                        let participants = participants.clone();
5721                        let scenario = scenario.clone();
5722                        move |origin: SplitOrigin, _: &Recipients<_>, message: &IoBuf| {
5723                            let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
5724                            let (primary, secondary) =
5725                                scenario.partitions(msg.view(), participants.as_ref());
5726                            match origin {
5727                                SplitOrigin::Primary => Some(Recipients::Some(primary)),
5728                                SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
5729                            }
5730                        }
5731                    };
5732                    let make_certificate_forwarder = || {
5733                        let codec = schemes[idx].certificate_codec_config();
5734                        let participants = participants.clone();
5735                        let scenario = scenario.clone();
5736                        move |origin: SplitOrigin, _: &Recipients<_>, message: &IoBuf| {
5737                            let msg: Certificate<S, D> =
5738                                Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
5739                            let (primary, secondary) =
5740                                scenario.partitions(msg.view(), participants.as_ref());
5741                            match origin {
5742                                SplitOrigin::Primary => Some(Recipients::Some(primary)),
5743                                SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
5744                            }
5745                        }
5746                    };
5747                    let make_vote_router = || {
5748                        let participants = participants.clone();
5749                        let scenario = scenario.clone();
5750                        move |(sender, message): &(_, IoBuf)| {
5751                            let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
5752                            scenario.route(msg.view(), sender, participants.as_ref())
5753                        }
5754                    };
5755                    let make_certificate_router = || {
5756                        let codec = schemes[idx].certificate_codec_config();
5757                        let participants = participants.clone();
5758                        let scenario = scenario.clone();
5759                        move |(sender, message): &(_, IoBuf)| {
5760                            let msg: Certificate<S, D> =
5761                                Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
5762                            scenario.route(msg.view(), sender, participants.as_ref())
5763                        }
5764                    };
5765                    let (vote_sender_primary, vote_sender_secondary) =
5766                        vote_sender.split_with(make_vote_forwarder());
5767                    let (vote_receiver_primary, vote_receiver_secondary) = vote_receiver
5768                        .split_with(
5769                            context.with_label(&format!("pending_split_{idx}")),
5770                            make_vote_router(),
5771                        );
5772                    let (certificate_sender_primary, certificate_sender_secondary) =
5773                        certificate_sender.split_with(make_certificate_forwarder());
5774                    let (certificate_receiver_primary, certificate_receiver_secondary) =
5775                        certificate_receiver.split_with(
5776                            context.with_label(&format!("recovered_split_{idx}")),
5777                            make_certificate_router(),
5778                        );
5779
5780                    for (twin_label, pending, recovered) in [
5781                        (
5782                            "primary",
5783                            (vote_sender_primary, vote_receiver_primary),
5784                            (certificate_sender_primary, certificate_receiver_primary),
5785                        ),
5786                        (
5787                            "secondary",
5788                            (vote_sender_secondary, vote_receiver_secondary),
5789                            (certificate_sender_secondary, certificate_receiver_secondary),
5790                        ),
5791                    ] {
5792                        let label = format!("twin_{idx}_{twin_label}");
5793                        let context = context.with_label(&label);
5794
5795                        let reporter_config = mocks::reporter::Config {
5796                            participants: participants.as_ref().try_into().unwrap(),
5797                            scheme: schemes[idx].clone(),
5798                            elector: elector.clone(),
5799                        };
5800                        let reporter = mocks::reporter::Reporter::new(
5801                            context.with_label("reporter"),
5802                            reporter_config,
5803                        );
5804                        reporters.push(reporter.clone());
5805
5806                        let application_cfg = mocks::application::Config {
5807                            hasher: Sha256::default(),
5808                            relay: relay.clone(),
5809                            me: validator.clone(),
5810                            propose_latency: (10.0, 5.0),
5811                            verify_latency: (10.0, 5.0),
5812                            certify_latency: (10.0, 5.0),
5813                            should_certify: mocks::application::Certifier::Sometimes,
5814                        };
5815                        let (actor, application) = mocks::application::Application::new(
5816                            context.with_label("application"),
5817                            application_cfg,
5818                        );
5819                        actor.start();
5820
5821                        let blocker = oracle.control(validator.clone());
5822                        let cfg = config::Config {
5823                            scheme: schemes[idx].clone(),
5824                            elector: elector.clone(),
5825                            blocker,
5826                            automaton: application.clone(),
5827                            relay: application.clone(),
5828                            reporter: reporter.clone(),
5829                            strategy: Sequential,
5830                            partition: label,
5831                            mailbox_size: 1024,
5832                            epoch: Epoch::new(333),
5833                            leader_timeout: Duration::from_secs(1),
5834                            certification_timeout: Duration::from_millis(1_500),
5835                            timeout_retry: Duration::from_secs(10),
5836                            fetch_timeout: Duration::from_secs(1),
5837                            activity_timeout,
5838                            skip_timeout,
5839                            fetch_concurrent: 4,
5840                            replay_buffer: NZUsize!(1024 * 1024),
5841                            write_buffer: NZUsize!(1024 * 1024),
5842                            page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5843                            forwarding: ForwardingPolicy::Disabled,
5844                        };
5845                        let engine = Engine::new(context.with_label("engine"), cfg);
5846                        engine_handlers.push(engine.start(
5847                            pending,
5848                            recovered,
5849                            inert_channel(participants.as_ref()),
5850                        ));
5851                    }
5852                }
5853
5854                // Create honest engines.
5855                let honest_start = reporters.len();
5856                for (idx, validator) in participants.iter().enumerate() {
5857                    if twin_index_set.contains(&idx) {
5858                        continue;
5859                    }
5860
5861                    let label = format!("honest_{idx}");
5862                    let context = context.with_label(&label);
5863
5864                    let reporter_config = mocks::reporter::Config {
5865                        participants: participants.as_ref().try_into().unwrap(),
5866                        scheme: schemes[idx].clone(),
5867                        elector: elector.clone(),
5868                    };
5869                    let reporter = mocks::reporter::Reporter::new(
5870                        context.with_label("reporter"),
5871                        reporter_config,
5872                    );
5873                    reporters.push(reporter.clone());
5874
5875                    let application_cfg = mocks::application::Config {
5876                        hasher: Sha256::default(),
5877                        relay: relay.clone(),
5878                        me: validator.clone(),
5879                        propose_latency: (10.0, 5.0),
5880                        verify_latency: (10.0, 5.0),
5881                        certify_latency: (10.0, 5.0),
5882                        should_certify: mocks::application::Certifier::Sometimes,
5883                    };
5884                    let (actor, application) = mocks::application::Application::new(
5885                        context.with_label("application"),
5886                        application_cfg,
5887                    );
5888                    actor.start();
5889
5890                    let blocker = oracle.control(validator.clone());
5891                    let cfg = config::Config {
5892                        scheme: schemes[idx].clone(),
5893                        elector: elector.clone(),
5894                        blocker,
5895                        automaton: application.clone(),
5896                        relay: application.clone(),
5897                        reporter: reporter.clone(),
5898                        strategy: Sequential,
5899                        partition: label,
5900                        mailbox_size: 1024,
5901                        epoch: Epoch::new(333),
5902                        leader_timeout: Duration::from_secs(1),
5903                        certification_timeout: Duration::from_millis(1_500),
5904                        timeout_retry: Duration::from_secs(10),
5905                        fetch_timeout: Duration::from_secs(1),
5906                        activity_timeout,
5907                        skip_timeout,
5908                        fetch_concurrent: 4,
5909                        replay_buffer: NZUsize!(1024 * 1024),
5910                        write_buffer: NZUsize!(1024 * 1024),
5911                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5912                        forwarding: ForwardingPolicy::Disabled,
5913                    };
5914                    let engine = Engine::new(context.with_label("engine"), cfg);
5915
5916                    let (
5917                        (pending_sender, pending_receiver),
5918                        (recovered_sender, recovered_receiver),
5919                        _,
5920                    ) = registrations
5921                        .remove(validator)
5922                        .expect("validator should be registered");
5923                    engine_handlers.push(engine.start(
5924                        (pending_sender, pending_receiver),
5925                        (recovered_sender, recovered_receiver),
5926                        inert_channel(participants.as_ref()),
5927                    ));
5928                }
5929
5930                // Wait for progress (liveness check) across honest replicas only.
5931                //
5932                // Only count finalizations after the adversarial prefix so we
5933                // verify the protocol actually recovers and makes progress under
5934                // synchrony. Finalizations during the prefix may be artifacts of
5935                // the attack setup and do not demonstrate liveness.
5936                //
5937                // Twin halves are Byzantine test machinery and are not required to
5938                // make progress for the campaign to establish honest-node liveness.
5939                let prefix_end = View::new(scenario.rounds().len() as u64);
5940                let mut finalizers = Vec::new();
5941                for (i, reporter) in reporters.iter_mut().skip(honest_start).enumerate() {
5942                    let (_latest, mut monitor) = reporter.subscribe().await;
5943                    let required = trailing_finalizations;
5944                    let label = format!("finalizer_{i}");
5945                    finalizers.push(context.with_label(&label).spawn(move |_| async move {
5946                        let mut count = 0usize;
5947                        while count < required {
5948                            let view = monitor.recv().await.expect("event missing");
5949                            if view > prefix_end {
5950                                count += 1;
5951                            }
5952                        }
5953                    }));
5954                }
5955                join_all(finalizers).await;
5956
5957                // Verify safety: no conflicting finalizations across honest reporters.
5958                let mut finalized_at_view: BTreeMap<View, D> = BTreeMap::new();
5959                for reporter in reporters.iter().skip(honest_start) {
5960                    let finalizations = reporter.finalizations.lock();
5961                    for (view, finalization) in finalizations.iter() {
5962                        let digest = finalization.proposal.payload;
5963                        if let Some(existing) = finalized_at_view.get(view) {
5964                            assert_eq!(
5965                                existing, &digest,
5966                                "safety violation: conflicting finalizations at view {view}"
5967                            );
5968                        } else {
5969                            finalized_at_view.insert(*view, digest);
5970                        }
5971                    }
5972                }
5973
5974                // Verify no invalid signatures were observed by honest replicas.
5975                for reporter in reporters.iter().skip(honest_start) {
5976                    reporter.assert_no_invalid();
5977                }
5978
5979                // Ensure no honest signer appears under multiple payloads for the same view.
5980                let twin_identities: HashSet<_> = twin_indices
5981                    .iter()
5982                    .map(|idx| participants[*idx].clone())
5983                    .collect();
5984                let mut notarized_by_honest_signer: BTreeMap<View, HashMap<PublicKey, D>> =
5985                    BTreeMap::new();
5986                let mut finalized_by_honest_signer: BTreeMap<View, HashMap<PublicKey, D>> =
5987                    BTreeMap::new();
5988                for reporter in reporters.iter().skip(honest_start) {
5989                    let notarizes = reporter.notarizes.lock();
5990                    for (view, payloads) in notarizes.iter() {
5991                        let signers = notarized_by_honest_signer.entry(*view).or_default();
5992                        for (digest, payload_signers) in payloads.iter() {
5993                            for signer in payload_signers.iter() {
5994                                if twin_identities.contains(signer) {
5995                                    continue;
5996                                }
5997                                if let Some(existing) = signers.insert(signer.clone(), *digest) {
5998                                    assert_eq!(
5999                                    existing, *digest,
6000                                    "honest signer produced conflicting notarizes at view {view}"
6001                                );
6002                                }
6003                            }
6004                        }
6005                    }
6006
6007                    let finalizes = reporter.finalizes.lock();
6008                    for (view, payloads) in finalizes.iter() {
6009                        let signers = finalized_by_honest_signer.entry(*view).or_default();
6010                        for (digest, payload_signers) in payloads.iter() {
6011                            for signer in payload_signers.iter() {
6012                                if twin_identities.contains(signer) {
6013                                    continue;
6014                                }
6015                                if let Some(existing) = signers.insert(signer.clone(), *digest) {
6016                                    assert_eq!(
6017                                    existing, *digest,
6018                                    "honest signer produced conflicting finalizes at view {view}"
6019                                );
6020                                }
6021                            }
6022                        }
6023                    }
6024                }
6025
6026                // Ensure faults are attributable to twins.
6027                for reporter in reporters.iter().skip(honest_start) {
6028                    let faults = reporter.faults.lock();
6029                    for (faulter, _) in faults.iter() {
6030                        assert!(
6031                            twin_identities.contains(faulter),
6032                            "fault from non-twin participant"
6033                        );
6034                    }
6035                }
6036
6037                let blocked = oracle.blocked().await.unwrap();
6038                for (_, faulter) in blocked {
6039                    assert!(
6040                        twin_identities.contains(&faulter),
6041                        "blocked peer attributed to non-twin participant"
6042                    );
6043                }
6044            });
6045        }
6046    }
6047
6048    const TWINS_CAMPAIGN: TwinsCampaign = TwinsCampaign {
6049        n: 5,
6050        rounds: 3,
6051        mode: twins::Mode::Sampled,
6052        max_cases: 20,
6053        trailing_finalizations: 10,
6054    };
6055
6056    const TWINS_LINK: Link = Link {
6057        latency: Duration::from_millis(500),
6058        jitter: Duration::from_millis(500),
6059        success_rate: 1.0,
6060    };
6061
6062    #[test_group("slow")]
6063    #[test_traced("INFO")]
6064    fn test_twins_sampled() {
6065        for link in [
6066            Link {
6067                latency: Duration::from_millis(10),
6068                jitter: Duration::from_millis(10),
6069                success_rate: 1.0,
6070            },
6071            TWINS_LINK,
6072        ] {
6073            twins_campaign::<_, _, RoundRobin>(
6074                &mut test_rng(),
6075                TWINS_CAMPAIGN,
6076                link,
6077                scheme_mocks::fixture,
6078            );
6079        }
6080    }
6081
6082    #[test_group("slow")]
6083    #[test_traced("INFO")]
6084    fn test_twins_sustained() {
6085        let campaign = TwinsCampaign {
6086            mode: twins::Mode::Sustained,
6087            ..TWINS_CAMPAIGN
6088        };
6089        for link in [
6090            Link {
6091                latency: Duration::from_millis(10),
6092                jitter: Duration::from_millis(10),
6093                success_rate: 1.0,
6094            },
6095            TWINS_LINK,
6096        ] {
6097            twins_campaign::<_, _, RoundRobin>(
6098                &mut test_rng(),
6099                campaign,
6100                link,
6101                scheme_mocks::fixture,
6102            );
6103        }
6104    }
6105
6106    #[test_group("slow")]
6107    #[test_traced("INFO")]
6108    fn test_twins_large_sampled() {
6109        let campaign = TwinsCampaign {
6110            n: 10,
6111            rounds: 5,
6112            ..TWINS_CAMPAIGN
6113        };
6114        twins_campaign::<_, _, RoundRobin>(
6115            &mut test_rng(),
6116            campaign,
6117            TWINS_LINK,
6118            scheme_mocks::fixture,
6119        );
6120    }
6121
6122    #[test_group("slow")]
6123    #[test_traced("INFO")]
6124    fn test_twins_large_sustained() {
6125        let campaign = TwinsCampaign {
6126            n: 10,
6127            rounds: 5,
6128            mode: twins::Mode::Sustained,
6129            ..TWINS_CAMPAIGN
6130        };
6131        twins_campaign::<_, _, RoundRobin>(
6132            &mut test_rng(),
6133            campaign,
6134            TWINS_LINK,
6135            scheme_mocks::fixture,
6136        );
6137    }
6138
6139    #[test_group("slow")]
6140    #[test_traced("INFO")]
6141    fn test_twins_multisig_min_pk() {
6142        twins_campaign::<_, _, RoundRobin>(
6143            &mut test_rng(),
6144            TWINS_CAMPAIGN,
6145            TWINS_LINK,
6146            bls12381_multisig::fixture::<MinPk, _>,
6147        );
6148    }
6149
6150    #[test_group("slow")]
6151    #[test_traced("INFO")]
6152    fn test_twins_multisig_min_sig() {
6153        twins_campaign::<_, _, RoundRobin>(
6154            &mut test_rng(),
6155            TWINS_CAMPAIGN,
6156            TWINS_LINK,
6157            bls12381_multisig::fixture::<MinSig, _>,
6158        );
6159    }
6160
6161    #[test_group("slow")]
6162    #[test_traced("INFO")]
6163    fn test_twins_threshold_vrf_min_pk() {
6164        twins_campaign::<_, _, Random>(
6165            &mut test_rng(),
6166            TWINS_CAMPAIGN,
6167            TWINS_LINK,
6168            bls12381_threshold_vrf::fixture::<MinPk, _>,
6169        );
6170    }
6171
6172    #[test_group("slow")]
6173    #[test_traced("INFO")]
6174    fn test_twins_threshold_vrf_min_sig() {
6175        twins_campaign::<_, _, Random>(
6176            &mut test_rng(),
6177            TWINS_CAMPAIGN,
6178            TWINS_LINK,
6179            bls12381_threshold_vrf::fixture::<MinSig, _>,
6180        );
6181    }
6182
6183    #[test_group("slow")]
6184    #[test_traced("INFO")]
6185    fn test_twins_threshold_std_min_pk() {
6186        twins_campaign::<_, _, RoundRobin>(
6187            &mut test_rng(),
6188            TWINS_CAMPAIGN,
6189            TWINS_LINK,
6190            bls12381_threshold_std::fixture::<MinPk, _>,
6191        );
6192    }
6193
6194    #[test_group("slow")]
6195    #[test_traced("INFO")]
6196    fn test_twins_threshold_std_min_sig() {
6197        twins_campaign::<_, _, RoundRobin>(
6198            &mut test_rng(),
6199            TWINS_CAMPAIGN,
6200            TWINS_LINK,
6201            bls12381_threshold_std::fixture::<MinSig, _>,
6202        );
6203    }
6204
6205    #[test_group("slow")]
6206    #[test_traced("INFO")]
6207    fn test_twins_ed25519() {
6208        twins_campaign::<_, _, RoundRobin>(
6209            &mut test_rng(),
6210            TWINS_CAMPAIGN,
6211            TWINS_LINK,
6212            ed25519::fixture,
6213        );
6214    }
6215
6216    #[test_group("slow")]
6217    #[test_traced("INFO")]
6218    fn test_twins_secp256r1() {
6219        twins_campaign::<_, _, RoundRobin>(
6220            &mut test_rng(),
6221            TWINS_CAMPAIGN,
6222            TWINS_LINK,
6223            secp256r1::fixture,
6224        );
6225    }
6226}