Skip to main content

commonware_consensus/simplex/
mod.rs

1//! Simple and fast BFT agreement inspired by Simplex Consensus.
2//!
3//! Inspired by [Simplex Consensus](https://eprint.iacr.org/2023/463), `simplex` provides simple and fast BFT
4//! agreement with network-speed view (i.e. block time) latency and optimal finalization latency in a
5//! partially synchronous setting.
6//!
7//! # Features
8//!
9//! * Wicked Fast Block Times (2 Network Hops)
10//! * Optimal Finalization Latency (3 Network Hops)
11//! * Externalized Uptime and Fault Proofs
12//! * Require Certification Before Finalization
13//! * Decoupled Block Broadcast and Sync
14//! * Lazy Message Verification
15//! * Application-Defined Block Format
16//! * Pluggable Hashing and Cryptography
17//! * Embedded VRF (via [scheme::bls12381_threshold::vrf])
18//!
19//! # Design
20//!
21//! ## Protocol Description
22//!
23//! ### Genesis
24//!
25//! Genesis (view 0) is implicitly finalized. There is no finalization certificate for genesis;
26//! [`Config::floor`](config::Config::floor) supplies the initial finalized state. Voting begins
27//! at view 1, with the first proposal referencing genesis as its parent.
28//!
29//! ### Specification for View `v`
30//!
31//! Upon entering view `v`:
32//! * Determine leader `l` for view `v`
33//! * Set timer for leader proposal `t_l = 2Δ` and advance `t_a = 3Δ`
34//!     * If leader `l` has not been active in last `r` views, set `t_l` to 0.
35//! * If leader `l`, broadcast `notarize(c,v)`
36//!   * If can't propose container in view `v` because missing notarization/nullification for a
37//!     previous view `v_m`, request `v_m`
38//!
39//! Upon receiving first `notarize(c,v)` from `l`:
40//! * Cancel `t_l`
41//! * If the container's parent `c_parent` is finalized (or both notarized and certified) at `v_parent`
42//!   and we have nullifications for all views between `v` and `v_parent`, verify `c` and broadcast `notarize(c,v)`
43//!     * If verification of `c` fails, immediately broadcast `nullify(v)`
44//!
45//! Upon receiving `2f+1` `notarize(c,v)`:
46//! * Cancel `t_a`
47//! * Mark `c` as notarized
48//! * Broadcast `notarization(c,v)` (even if we have not verified `c`)
49//! * Attempt to certify `c` (see [Certification](#certification))
50//!     * On success: broadcast `finalize(c,v)` (if have not broadcast `nullify(v)`) and enter `v+1`
51//!     * On failure: broadcast `nullify(v)`
52//!
53//! Upon receiving `2f+1` `nullify(v)`:
54//! * Broadcast `nullification(v)`
55//! * Enter `v+1`
56//!
57//! Upon receiving `2f+1` `finalize(c,v)`:
58//! * Mark `c` as finalized (and recursively finalize its parents)
59//! * Broadcast `finalization(c,v)` (even if we have not verified `c`)
60//!
61//! Upon `t_l` or `t_a` firing:
62//! * Broadcast `nullify(v)`
63//! * Every `t_r` after `nullify(v)` broadcast that we are still in view `v`:
64//!    * Rebroadcast `nullify(v)` and either `notarization(v-1)` or `nullification(v-1)`
65//!
66//! _When `2f+1` votes of a given type (`notarize(c,v)`, `nullify(v)`, or `finalize(c,v)`) have been have been collected
67//! from unique participants, a certificate (`notarization(c,v)`, `nullification(v)`, or `finalization(c,v)`) can be assembled.
68//! These certificates serve as a standalone proof of consensus progress that downstream systems can ingest without executing
69//! the protocol._
70//!
71//! ### Joining Consensus
72//!
73//! As soon as `2f+1` nullifies or finalizes are observed for some view `v`, the `Voter` will
74//! enter `v+1`. Notarizations advance the view if-and-only-if the application certifies them.
75//! This means that a new participant joining consensus will immediately jump ahead on the previous
76//! view's nullification or finalization and begin participating in consensus at the current view.
77//!
78//! ### Certification
79//!
80//! After a payload is notarized, the application can optionally delay or prevent finalization via the
81//! [`CertifiableAutomaton::certify`](crate::CertifiableAutomaton::certify) method. By default, `certify`
82//! returns `true` for all payloads, meaning finalization proceeds immediately after notarization.
83//!
84//! Customizing `certify` is useful for systems that employ erasure coding, where participants may want
85//! to wait until they have received enough shards to reconstruct and validate the full block before
86//! voting to finalize.
87//!
88//! If `certify` returns `true`, the participant broadcasts a `finalize` vote for the payload and enters the
89//! next view. If `certify` returns `false`, the participant broadcasts `nullify` for the view instead (treating
90//! it as an immediate timeout), and will refuse to build upon the proposal or notarize proposals that build upon it.
91//! Thus, a payload can only be finalized if a quorum of participants certify it.
92//!
93//! Certification of some notarization should only be abandoned once a finalization at the same or higher view is observed.
94//! Until then (say a nullification certificate for a view arrives before certification completes), the application should continue
95//! attempting to complete certification. This increases the likelihood that we can vote on the next honest proposer's block (which
96//! may build on our in-flight certification or the nullification). If we did not do this, it is possible that different parts of
97//! the network (neither with quorum) would refuse to vote on each other's blocks (halting consensus).
98//!
99//! _The decision returned by `certify` must be deterministic and consistent across all honest participants to ensure
100//! liveness._
101//!
102//! ### Deviations from Simplex Consensus
103//!
104//! * Fetch missing notarizations/nullifications as needed rather than assuming each proposal contains
105//!   a set of all notarizations/nullifications for all historical blocks.
106//! * Introduce distinct messages for `notarize` and `nullify` rather than referring to both as a `vote` for
107//!   either a "block" or a "dummy block", respectively.
108//! * Introduce a "leader timeout" to trigger early view transitions for unresponsive leaders.
109//! * Skip "leader timeout" and "certification timeout" if a designated leader hasn't participated in
110//!   some number of views (again to trigger early view transition for an unresponsive leader).
111//! * Introduce message rebroadcast to continue making progress if messages from a given view are dropped (only way
112//!   to ensure messages are reliably delivered is with a heavyweight reliable broadcast protocol).
113//! * Treat local proposal failure as immediate timeout expiry and broadcast `nullify(v)`.
114//! * Treat local verification failure as immediate timeout expiry and broadcast `nullify(v)`.
115//! * Consider the current leader's `nullify(v)` as immediate timeout expiry and broadcast `nullify(v)`.
116//! * Upon seeing `notarization(c,v)`, instead of moving to the view `v+1` immediately, request certification from
117//!   the application (see [Certification](#certification)). Only move to view `v+1` and broadcast `finalize(c,v)`
118//!   if certification succeeds, otherwise broadcast `nullify(v)` and refuse to build upon `c`.
119//!
120//! ## Protocol Properties
121//!
122//! ### Forced Inclusion (Tail-Forking Resistance)
123//!
124//! A notarized payload in view `v` must appear in the canonical chain if no nullification
125//! certificate exists for `v`. This follows directly from the protocol rules:
126//!
127//! 1. To propose in view `v+k`, the leader must reference a certified parent in some view `v_p`
128//!    and possess nullification certificates for every view between `v_p` and `v+k`.
129//! 2. A nullification certificate for view `v` requires `2f+1` `nullify(v)` votes.
130//! 3. An honest participant only broadcasts `nullify(v)` when a timeout fires (`t_l` or `t_a`)
131//!    or when certification fails.
132//!
133//! Therefore, if view `v` completes without timeout and certification succeeds, no honest
134//! participant has broadcast `nullify(v)`. With at most `f` Byzantine participants, at most `f`
135//! `nullify(v)` votes exist, which is insufficient to form a nullification certificate. Without
136//! that certificate, no future leader can skip view `v`, and the notarized payload must be
137//! included as an ancestor in all subsequent proposals.
138//!
139//! ### Optimistic Finality
140//!
141//! The forced inclusion property provides a weaker but faster form of finality: once a
142//! notarization certificate is observed for view `v` (without any timeout having fired),
143//! the notarized payload can be treated as speculatively final. No future sequence of
144//! proposals can exclude it from the canonical chain.
145//!
146//! This "speculative finality" is available after just 2 network hops (proposal + notarization),
147//! compared to the 3 hops required for full finalization (proposal + notarization + finalization).
148//! A notarized-but-not-yet-finalized payload can only be excluded in two scenarios:
149//! `f+1` or more honest participants timed out, or certification failed. Because
150//! certification is deterministic, it either fails for all honest participants or none,
151//! so a certification failure always produces a nullification. In the common case
152//! (no faults, no timeouts), exclusion cannot happen.
153//!
154//! ### Unchained Finalization
155//!
156//! Finalization does not require consecutive honest views. When a participant certifies
157//! `notarization(c,v)`, it broadcasts `finalize(c,v)` and immediately enters `v+1`,
158//! regardless of what happens in subsequent views. These `finalize(c,v)` votes accumulate
159//! independently of the current view: even if views `v+1` through `v+k` all time out
160//! (producing nullifications), the `finalize(c,v)` votes still count toward the `2f+1`
161//! threshold needed to form `finalization(c,v)`.
162//!
163//! This means a payload notarized in view `v` can be finalized while the network is
164//! in view `v+k` for any `k >= 1`. There is no requirement that a particular view
165//! after `v` succeeds or that any subsequent leader cooperates. As long as `2f+1`
166//! participants eventually certify and broadcast `finalize(c,v)`, the finalization
167//! certificate will form.
168//!
169//! ## Architecture
170//!
171//! All logic is split into four components: the `Batcher`, the `Voter`, the `Resolver`, and the `Application` (provided by the user).
172//! The `Batcher` is responsible for collecting messages from peers and lazily verifying them when a quorum is met. The `Voter`
173//! is responsible for directing participation in the current view. The `Resolver` is responsible for
174//! fetching artifacts from previous views required to verify proposed blocks in the latest view. Lastly, the `Application`
175//! is responsible for proposing new blocks and indicating whether some block is valid.
176//!
177//! To drive great performance, all interactions between `Batcher`, `Voter`, `Resolver`, and `Application` are
178//! non-blocking. This means that, for example, the `Voter` can continue processing messages while the
179//! `Application` verifies a proposed block or the `Resolver` fetches a notarization.
180//!
181//! ```txt
182//!                            +------------+          +++++++++++++++
183//!                            |            +--------->+             +
184//!                            |  Batcher   |          +    Peers    +
185//!                            |            |<---------+             +
186//!                            +-------+----+          +++++++++++++++
187//!                                |   ^
188//!                                |   |
189//!                                |   |
190//!                                |   |
191//!                                v   |
192//! +---------------+           +---------+            +++++++++++++++
193//! |               |<----------+         +----------->+             +
194//! |  Application  |           |  Voter  |            +    Peers    +
195//! |               +---------->|         |<-----------+             +
196//! +---------------+           +--+------+            +++++++++++++++
197//!                                |   ^
198//!                                |   |
199//!                                |   |
200//!                                |   |
201//!                                v   |
202//!                            +-------+----+          +++++++++++++++
203//!                            |            +--------->+             +
204//!                            |  Resolver  |          +    Peers    +
205//!                            |            |<---------+             +
206//!                            +------------+          +++++++++++++++
207//! ```
208//!
209//! ### Batched Verification
210//!
211//! Unlike other consensus constructions that verify all incoming messages received from peers, for schemes
212//! where [`Scheme::is_batchable()`](commonware_cryptography::certificate::Scheme::is_batchable) returns `true`
213//! (such as [scheme::ed25519], [scheme::bls12381_multisig] and [scheme::bls12381_threshold]), `simplex` lazily
214//! verifies messages (only when a quorum is met), enabling efficient batch verification. For schemes where
215//! `is_batchable()` returns `false` (such as [scheme::secp256r1]), signatures are verified eagerly as they
216//! arrive since there is no batching benefit.
217//!
218//! If an invalid signature is detected, the `Batcher` will perform repeated bisections over collected
219//! messages to find the offending message (and block the peer(s) that sent it via [commonware_p2p::Blocker]).
220//!
221//! _If using a p2p implementation that is not authenticated, it is not safe to employ this optimization
222//! as any attacking peer could simply reconnect from a different address. We recommend [commonware_p2p::authenticated]._
223//!
224//! ### Fetching Missing Certificates
225//!
226//! Instead of trying to fetch all possible certificates above the floor, we only attempt to fetch
227//! nullifications for all views from the floor (last certified notarization or finalization) to the current view.
228//! This technique, however, is not sufficient to guarantee progress.
229//!
230//! Consider the case where `f` honest participants have seen a finalization for a given view `v` (and nullifications only
231//! from `v` to the current view `c`) but the remaining `f+1` honest participants have not (they have exclusively seen
232//! nullifications from some view `o < v` to `c`). Neither partition of participants will vote for the other's proposals.
233//!
234//! To ensure progress is eventually made, leaders with nullified proposals directly broadcast the best finalization
235//! certificate they are aware of to ensure all honest participants eventually consider the same proposal ancestry valid.
236//!
237//! _While a more aggressive recovery mechanism could be employed, like requiring all participants to broadcast their highest
238//! finalization certificate after nullification, it would impose significant overhead under normal network
239//! conditions (whereas the approach described incurs no overhead under normal network conditions). Recall, honest participants
240//! already broadcast observed certificates to all other participants in each view (and misaligned participants should only ever
241//! be observed following severe network degradation)._
242//!
243//! ## Pluggable Hashing and Cryptography
244//!
245//! Hashing is abstracted via the [commonware_cryptography::Hasher] trait and cryptography is abstracted via
246//! the [commonware_cryptography::certificate::Scheme] trait, allowing deployments to employ approaches that best match their
247//! requirements (or to provide their own without modifying any consensus logic). The following schemes
248//! are supported out-of-the-box:
249//!
250//! ### [scheme::ed25519]
251//!
252//! [commonware_cryptography::ed25519] signatures are ["High-speed high-security signatures"](https://eprint.iacr.org/2011/368)
253//! with 32 byte public keys and 64 byte signatures. While they are well-supported by commercial HSMs and offer efficient batch
254//! verification, the signatures are not aggregatable (and certificates grow linearly with the quorum size).
255//!
256//! ### [scheme::bls12381_multisig]
257//!
258//! [commonware_cryptography::bls12381] is a ["digital signature scheme with aggregation properties"](https://www.ietf.org/archive/id/draft-irtf-cfrg-bls-signature-05.txt).
259//! Unlike [commonware_cryptography::ed25519], signatures from multiple participants (say the signers in a certificate) can be aggregated
260//! into a single signature (reducing bandwidth usage per broadcast). That being said, [commonware_cryptography::bls12381] is much slower
261//! to verify than [commonware_cryptography::ed25519] and isn't supported by most HSMs (a standardization effort expired in 2022).
262//!
263//! ### [scheme::secp256r1]
264//!
265//! [commonware_cryptography::secp256r1] signatures use the NIST P-256 elliptic curve (also known as prime256v1), which is widely
266//! supported by commercial HSMs and hardware security modules. Unlike [commonware_cryptography::ed25519], Secp256r1 does not
267//! benefit from batch verification, so signatures are verified individually. Certificates grow linearly with quorum size
268//! (similar to ed25519).
269//!
270//! ### [scheme::bls12381_threshold]
271//!
272//! [scheme::bls12381_threshold] employs threshold cryptography (BLS12-381 threshold signatures with a `2f+1` of `3f+1` quorum)
273//! to generate succinct consensus certificates (verifiable with just the static public key). This scheme requires instantiating
274//! the shared secret via [commonware_cryptography::bls12381::dkg] and resharing whenever participants change.
275//!
276//! Two (non-attributable) variants are provided:
277//!
278//! - [scheme::bls12381_threshold::standard]: Certificates contain only a vote signature.
279//!
280//! - [scheme::bls12381_threshold::vrf]: Certificates contain a vote signature and a view signature (i.e. a seed that can be used
281//!   as a VRF). This variant can be configured for random leader election (via [elector::Random]) and/or incorporate this randomness
282//!   into execution.
283//!
284//! #### Embedded VRF ([scheme::bls12381_threshold::vrf])
285//!
286//! Every `notarize(c,v)` or `nullify(v)` message includes an `attestation(v)` (a partial signature over the view `v`). After `2f+1`
287//! `notarize(c,v)` or `nullify(v)` messages are collected from unique participants, `seed(v)` can be recovered. Because `attestation(v)` is
288//! only over the view `v`, the seed derived for a given view `v` is the same regardless of whether or not a block was notarized in said
289//! view `v`.
290//!
291//! Because the value of `seed(v)` cannot be known prior to message broadcast by any participant (including the leader) in view `v`
292//! and cannot be manipulated by any participant (deterministic for any `2f+1` signers at a given view `v`), it can be used both as a beacon
293//! for leader election (where `seed(v)` determines the leader for `v+1`) and a source of randomness in execution (where `seed(v)`
294//! is used as a seed in `v`).
295//!
296//! #### Succinct Certificates
297//!
298//! All broadcast consensus messages (`notarize(c,v)`, `nullify(v)`, `finalize(c,v)`) contain attestations (partial signatures) for a static
299//! public key (derived from a group polynomial that can be recomputed during reconfiguration using [dkg](commonware_cryptography::bls12381::dkg)).
300//! As soon as `2f+1` messages are collected, a threshold signature over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)`
301//! can be recovered, respectively. Because the public key is static, any of these certificates can be verified by an external
302//! process without following the consensus instance and/or tracking the current set of participants (as is typically required
303//! to operate a lite client).
304//!
305//! These threshold signatures over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)` (i.e. the consensus certificates)
306//! can be used to secure interoperability between different consensus instances and user interactions with an infrastructure provider
307//! (where any data served can be proven to derive from some finalized block of some consensus instance with a known static public key).
308//!
309//! ## Persistence
310//!
311//! The `Voter` caches all data required to participate in consensus to avoid any disk reads on
312//! on the critical path. To enable recovery, the `Voter` writes valid messages it receives from
313//! consensus and messages it generates to a write-ahead log (WAL) implemented by [commonware_storage::journal::segmented::variable::Journal].
314//! Before sending a message, the `Journal` sync is invoked to prevent inadvertent Byzantine behavior
315//! on restart (especially in the case of unclean shutdown).
316//!
317//! ## Automaton Failure Semantics
318//!
319//! If a validator is the leader for a view but cannot build a valid payload yet (for example because
320//! it is still syncing), it should decline the [`Automaton::propose`](crate::Automaton::propose)
321//! request by dropping the response channel. Simplex treats this as a missing proposal, broadcasts
322//! `nullify(v)`, and other validators can use the leader-nullify fast path to skip the view.
323//!
324//! Once `propose` returns a payload, the local proposer is committed to that payload for verification
325//! and certification. [`Automaton::verify`](crate::Automaton::verify) and
326//! [`CertifiableAutomaton::certify`](crate::CertifiableAutomaton::certify) are stable verdict APIs,
327//! not backpressure or syncing signals. While missing data may still arrive (and/or a validator cannot
328//! immediately determine if a payload is valid), implementations should keep these requests pending rather
329//! than returning `false` or closing the channel.
330//!
331//! Returning `false` from `verify` means the proposal is permanently invalid and causes a local
332//! nullify. Returning `false` from `certify` means the notarized payload is permanently
333//! uncertifiable for that round and also causes a local nullify. Closing `certify` does not provide
334//! a fast-skip signal and can halt progress because certification requests are not retried. The safe
335//! way to stop working on certification is to keep the request pending until Simplex drops it after
336//! finalizing the block or a descendant.
337
338pub mod elector;
339pub mod scheme;
340pub mod types;
341
342cfg_if::cfg_if! {
343    if #[cfg(not(target_arch = "wasm32"))] {
344        use crate::types::{Round, View, ViewDelta};
345        use commonware_cryptography::PublicKey;
346        use commonware_p2p::Recipients;
347
348        mod actors;
349        pub mod config;
350        pub use config::{Config, Floor, ForwardingPolicy};
351        mod engine;
352        pub use engine::Engine;
353        mod metrics;
354
355        /// The minimum view we are tracking both in-memory and on-disk.
356        pub(crate) const fn min_active(activity_timeout: ViewDelta, last_finalized: View) -> View {
357            last_finalized.saturating_sub(activity_timeout)
358        }
359
360        /// Whether or not a view is interesting to us. This is a function
361        /// of both `min_active` and whether or not the view is too far
362        /// in the future (based on the view we are currently in).
363        pub(crate) fn interesting(
364            activity_timeout: ViewDelta,
365            last_finalized: View,
366            current: View,
367            pending: View,
368            allow_future: bool,
369        ) -> bool {
370            // If the view is genesis, skip it, genesis doesn't have votes
371            if pending.is_zero() {
372                return false;
373            }
374            if pending < min_active(activity_timeout, last_finalized) {
375                return false;
376            }
377            if !allow_future && pending > current.next() {
378                return false;
379            }
380            true
381        }
382
383        /// Describes how a payload should be broadcast to the network.
384        pub enum Plan<P: PublicKey> {
385            /// Initial broadcast of a newly proposed block to all participants.
386            Propose {
387                /// The round in which the block was proposed.
388                round: Round,
389            },
390            /// Forward a block to a specific set of peers.
391            Forward {
392                /// The round in which the forwarded block was proposed.
393                round: Round,
394                /// The recipients to forward the block to.
395                recipients: Recipients<P>,
396            },
397        }
398    }
399}
400
401#[cfg(any(test, feature = "mocks"))]
402pub mod mocks;
403
404/// Convenience alias for [`N3f1::quorum`].
405#[cfg(test)]
406pub(crate) fn quorum(n: u32) -> u32 {
407    use commonware_utils::{Faults, N3f1};
408
409    N3f1::quorum(n)
410}
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415    use crate::{
416        simplex::{
417            elector::{Config as Elector, Elector as ElectorTrait, Random, RoundRobin},
418            mocks::{
419                scheme as scheme_mocks,
420                twins::{self, Elector as TwinsElector},
421                wrapped,
422            },
423            scheme::{
424                bls12381_multisig,
425                bls12381_threshold::{
426                    standard as bls12381_threshold_std,
427                    vrf::{self as bls12381_threshold_vrf, Seedable},
428                },
429                ed25519, secp256r1, Scheme,
430            },
431            types::{
432                Certificate, Finalization as TFinalization, Finalize as TFinalize,
433                Notarization as TNotarization, Notarize as TNotarize,
434                Nullification as TNullification, Nullify as TNullify, Proposal, Vote,
435            },
436        },
437        types::{Epoch, Participant, Round},
438        Monitor, Viewable,
439    };
440    use commonware_codec::{Decode, DecodeExt, Encode};
441    use commonware_cryptography::{
442        bls12381::primitives::variant::{MinPk, MinSig, Variant},
443        certificate::mocks::Fixture,
444        ed25519::{PrivateKey, PublicKey},
445        sha256::{Digest as Sha256Digest, Digest as D},
446        Hasher as _, Sha256, Signer as _,
447    };
448    use commonware_macros::{select, test_group, test_traced};
449    use commonware_p2p::{
450        simulated::{Config, Link, Network, Oracle, Receiver, Sender, SplitOrigin},
451        utils::mocks::inert_channel,
452        Manager as _, Recipients, Sender as _, TrackedPeers,
453    };
454    use commonware_parallel::Sequential;
455    use commonware_runtime::{
456        buffer::paged::CacheRef, deterministic, telemetry::metrics::count_running_tasks, Clock,
457        IoBuf, Metrics as _, Quota, Runner, Spawner, Supervisor as _,
458    };
459    use commonware_utils::{ordered::Set, sync::Mutex, test_rng, Faults, N3f1, NZUsize, NZU16};
460    use engine::Engine;
461    use futures::future::join_all;
462    use rand::{rngs::StdRng, Rng as _, SeedableRng};
463    use std::{
464        collections::{BTreeMap, HashMap, HashSet},
465        num::{NonZeroU16, NonZeroU32, NonZeroUsize},
466        sync::Arc,
467        time::Duration,
468    };
469    use tracing::{debug, info, warn};
470    use types::Activity;
471
472    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
473    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
474    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
475
476    #[test]
477    fn test_interesting() {
478        let activity_timeout = ViewDelta::new(10);
479
480        // Genesis view is never interesting
481        assert!(!interesting(
482            activity_timeout,
483            View::zero(),
484            View::zero(),
485            View::zero(),
486            false
487        ));
488        assert!(!interesting(
489            activity_timeout,
490            View::zero(),
491            View::new(1),
492            View::zero(),
493            true
494        ));
495
496        // View below min_active is not interesting
497        assert!(!interesting(
498            activity_timeout,
499            View::new(20),
500            View::new(25),
501            View::new(5), // below min_active (10)
502            false
503        ));
504
505        // View at min_active boundary is interesting
506        assert!(interesting(
507            activity_timeout,
508            View::new(20),
509            View::new(25),
510            View::new(10), // exactly min_active
511            false
512        ));
513
514        // Future view beyond current.next() is not interesting when allow_future is false
515        assert!(!interesting(
516            activity_timeout,
517            View::new(20),
518            View::new(25),
519            View::new(27),
520            false
521        ));
522
523        // Future view beyond current.next() is interesting when allow_future is true
524        assert!(interesting(
525            activity_timeout,
526            View::new(20),
527            View::new(25),
528            View::new(27),
529            true
530        ));
531
532        // View at current.next() is interesting
533        assert!(interesting(
534            activity_timeout,
535            View::new(20),
536            View::new(25),
537            View::new(26),
538            false
539        ));
540
541        // View within valid range is interesting
542        assert!(interesting(
543            activity_timeout,
544            View::new(20),
545            View::new(25),
546            View::new(22),
547            false
548        ));
549
550        // When last_finalized is 0 and activity_timeout would underflow
551        // min_active saturates at 0, so view 1 should still be interesting
552        assert!(interesting(
553            activity_timeout,
554            View::zero(),
555            View::new(5),
556            View::new(1),
557            false
558        ));
559    }
560
561    /// Register a validator with the oracle.
562    async fn register_validator(
563        oracle: &mut Oracle<PublicKey, deterministic::Context>,
564        validator: PublicKey,
565    ) -> (
566        (
567            Sender<PublicKey, deterministic::Context>,
568            Receiver<PublicKey>,
569        ),
570        (
571            Sender<PublicKey, deterministic::Context>,
572            Receiver<PublicKey>,
573        ),
574        (
575            Sender<PublicKey, deterministic::Context>,
576            Receiver<PublicKey>,
577        ),
578    ) {
579        let control = oracle.control(validator.clone());
580        let (vote_sender, vote_receiver) = control.register(0, TEST_QUOTA).await.unwrap();
581        let (certificate_sender, certificate_receiver) =
582            control.register(1, TEST_QUOTA).await.unwrap();
583        let (resolver_sender, resolver_receiver) = control.register(2, TEST_QUOTA).await.unwrap();
584        (
585            (vote_sender, vote_receiver),
586            (certificate_sender, certificate_receiver),
587            (resolver_sender, resolver_receiver),
588        )
589    }
590
591    /// Registers all validators using the oracle.
592    async fn register_validators(
593        oracle: &mut Oracle<PublicKey, deterministic::Context>,
594        validators: &[PublicKey],
595    ) -> HashMap<
596        PublicKey,
597        (
598            (
599                Sender<PublicKey, deterministic::Context>,
600                Receiver<PublicKey>,
601            ),
602            (
603                Sender<PublicKey, deterministic::Context>,
604                Receiver<PublicKey>,
605            ),
606            (
607                Sender<PublicKey, deterministic::Context>,
608                Receiver<PublicKey>,
609            ),
610        ),
611    > {
612        let mut registrations = HashMap::new();
613        for validator in validators.iter() {
614            let registration = register_validator(oracle, validator.clone()).await;
615            registrations.insert(validator.clone(), registration);
616        }
617        registrations
618    }
619
620    async fn start_test_network_with_peers<I>(
621        context: deterministic::Context,
622        peers: I,
623        disconnect_on_block: bool,
624    ) -> Oracle<PublicKey, deterministic::Context>
625    where
626        I: IntoIterator<Item = PublicKey>,
627    {
628        let (network, oracle) = Network::new_with_peers(
629            context.child("network"),
630            Config {
631                max_size: 1024 * 1024,
632                disconnect_on_block,
633                tracked_peer_sets: NZUsize!(1),
634            },
635            peers,
636        )
637        .await;
638        network.start();
639        oracle
640    }
641
642    async fn start_test_network_with_split_peers<I, J>(
643        context: deterministic::Context,
644        primary: I,
645        secondary: J,
646        disconnect_on_block: bool,
647    ) -> Oracle<PublicKey, deterministic::Context>
648    where
649        I: IntoIterator<Item = PublicKey>,
650        J: IntoIterator<Item = PublicKey>,
651    {
652        let (network, oracle) = Network::new_with_split_peers(
653            context.child("network"),
654            Config {
655                max_size: 1024 * 1024,
656                disconnect_on_block,
657                tracked_peer_sets: NZUsize!(1),
658            },
659            primary,
660            secondary,
661        )
662        .await;
663        network.start();
664        oracle
665    }
666
667    /// Enum to describe the action to take when linking validators.
668    enum Action {
669        Link(Link),
670        Update(Link), // Unlink and then link
671        Unlink,
672    }
673
674    /// Links (or unlinks) validators using the oracle.
675    ///
676    /// The `action` parameter determines the action (e.g. link, unlink) to take.
677    /// The `restrict_to` function can be used to restrict the linking to certain connections,
678    /// otherwise all validators will be linked to all other validators.
679    async fn link_validators(
680        oracle: &mut Oracle<PublicKey, deterministic::Context>,
681        validators: &[PublicKey],
682        action: Action,
683        restrict_to: Option<fn(usize, usize, usize) -> bool>,
684    ) {
685        for (i1, v1) in validators.iter().enumerate() {
686            for (i2, v2) in validators.iter().enumerate() {
687                // Ignore self
688                if v2 == v1 {
689                    continue;
690                }
691
692                // Restrict to certain connections
693                if let Some(f) = restrict_to {
694                    if !f(validators.len(), i1, i2) {
695                        continue;
696                    }
697                }
698
699                // Do any unlinking first
700                match action {
701                    Action::Update(_) | Action::Unlink => {
702                        oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
703                    }
704                    _ => {}
705                }
706
707                // Do any linking after
708                match action {
709                    Action::Link(ref link) | Action::Update(ref link) => {
710                        oracle
711                            .add_link(v1.clone(), v2.clone(), link.clone())
712                            .await
713                            .unwrap();
714                    }
715                    _ => {}
716                }
717            }
718        }
719    }
720
721    /// Counts lines where all patterns match and the trailing value is non-zero.
722    fn count_nonzero_metric_lines(encoded: &str, patterns: &[&str]) -> u32 {
723        encoded
724            .lines()
725            .filter(|line| patterns.iter().all(|p| line.contains(p)))
726            .filter(|line| {
727                line.split_whitespace()
728                    .last()
729                    .and_then(|s| s.parse::<u64>().ok())
730                    .is_some_and(|n| n > 0)
731            })
732            .count() as u32
733    }
734
735    fn all_online<S, F, L>(mut fixture: F)
736    where
737        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
738        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
739        L: Elector<S>,
740    {
741        // Create context
742        let n = 5;
743        let quorum = quorum(n) as usize;
744        let required_containers = View::new(100);
745        let activity_timeout = ViewDelta::new(10);
746        let skip_timeout = ViewDelta::new(5);
747        let namespace = b"consensus".to_vec();
748        let executor = deterministic::Runner::timed(Duration::from_secs(300));
749        executor.start(|mut context| async move {
750            // Register participants
751            let Fixture {
752                participants,
753                schemes,
754                ..
755            } = fixture(&mut context, &namespace, n);
756            let mut oracle =
757                start_test_network_with_peers(context.child("network"), participants.clone(), true)
758                    .await;
759            let mut registrations = register_validators(&mut oracle, &participants).await;
760
761            // Link all validators
762            let link = Link {
763                latency: Duration::from_millis(10),
764                jitter: Duration::from_millis(1),
765                success_rate: 1.0,
766            };
767            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
768
769            // Create engines
770            let elector = L::default();
771            let relay = Arc::new(mocks::relay::Relay::new());
772            let mut reporters = Vec::new();
773            let mut engine_handlers = Vec::new();
774            for (idx, validator) in participants.iter().enumerate() {
775                // Create scheme context
776                let context = context
777                    .child("validator")
778                    .with_attribute("public_key", validator);
779
780                // Configure engine
781                let reporter_config = mocks::reporter::Config {
782                    participants: participants.clone().try_into().unwrap(),
783                    scheme: schemes[idx].clone(),
784                    elector: elector.clone(),
785                };
786                let reporter =
787                    mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
788                reporters.push(reporter.clone());
789                let application_cfg = mocks::application::Config {
790                    hasher: Sha256::default(),
791                    relay: relay.clone(),
792                    me: validator.clone(),
793                    propose_latency: (10.0, 5.0),
794                    verify_latency: (10.0, 5.0),
795                    certify_latency: (10.0, 5.0),
796                    should_certify: mocks::application::Certifier::Always,
797                };
798                let (actor, application) = mocks::application::Application::new(
799                    context.child("application"),
800                    application_cfg,
801                );
802                actor.start();
803                let blocker = oracle.control(validator.clone());
804                let cfg = config::Config {
805                    scheme: schemes[idx].clone(),
806                    elector: elector.clone(),
807                    blocker,
808                    automaton: application.clone(),
809                    relay: application.clone(),
810                    reporter: reporter.clone(),
811                    strategy: Sequential,
812                    partition: validator.to_string(),
813                    mailbox_size: NZUsize!(1024),
814                    epoch: Epoch::new(333),
815                    floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
816                        Epoch::new(333),
817                    )),
818                    leader_timeout: Duration::from_secs(1),
819                    certification_timeout: Duration::from_secs(2),
820                    timeout_retry: Duration::from_secs(10),
821                    fetch_timeout: Duration::from_secs(1),
822                    activity_timeout,
823                    skip_timeout,
824                    fetch_concurrent: NZUsize!(4),
825                    replay_buffer: NZUsize!(1024 * 1024),
826                    write_buffer: NZUsize!(1024 * 1024),
827                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
828                    forwarding: ForwardingPolicy::Disabled,
829                };
830                let engine = Engine::new(context.child("engine"), cfg);
831
832                // Start engine
833                let (pending, recovered, resolver) = registrations
834                    .remove(validator)
835                    .expect("validator should be registered");
836                engine_handlers.push(engine.start(pending, recovered, resolver));
837            }
838
839            // Wait for all engines to finish
840            let mut finalizers = Vec::new();
841            for reporter in reporters.iter_mut() {
842                let (mut latest, mut monitor) = reporter.subscribe().await;
843                finalizers.push(context.child("finalizer").spawn(move |_| async move {
844                    while latest < required_containers {
845                        latest = monitor.recv().await.expect("event missing");
846                    }
847                }));
848            }
849            join_all(finalizers).await;
850
851            // Check reporters for correct activity
852            let latest_complete = required_containers.saturating_sub(activity_timeout);
853            for reporter in reporters.iter() {
854                // Ensure no faults
855                reporter.assert_no_faults();
856
857                // Ensure no invalid signatures
858                reporter.assert_no_invalid();
859
860                // Ensure certificates for all views
861                {
862                    let certified = reporter.certified.lock();
863                    for view in View::range(View::new(1), latest_complete) {
864                        // Ensure certificate for every view
865                        if !certified.contains(&view) {
866                            panic!("view: {view}");
867                        }
868                    }
869                }
870
871                // Ensure no forks
872                let mut notarized = HashMap::new();
873                let mut finalized = HashMap::new();
874                {
875                    let notarizes = reporter.notarizes.lock();
876                    for view in View::range(View::new(1), latest_complete) {
877                        // Ensure only one payload proposed per view
878                        let Some(payloads) = notarizes.get(&view) else {
879                            continue;
880                        };
881                        if payloads.len() > 1 {
882                            panic!("view: {view}");
883                        }
884                        let (digest, notarizers) = payloads.iter().next().unwrap();
885                        notarized.insert(view, *digest);
886
887                        if notarizers.len() < quorum {
888                            // We can't verify that everyone participated at every view because some nodes may
889                            // have started later.
890                            panic!("view: {view}");
891                        }
892                    }
893                }
894                {
895                    let notarizations = reporter.notarizations.lock();
896                    for view in View::range(View::new(1), latest_complete) {
897                        // Ensure notarization matches digest from notarizes
898                        let Some(notarization) = notarizations.get(&view) else {
899                            continue;
900                        };
901                        let Some(digest) = notarized.get(&view) else {
902                            continue;
903                        };
904                        assert_eq!(&notarization.proposal.payload, digest);
905                    }
906                }
907                {
908                    let finalizes = reporter.finalizes.lock();
909                    for view in View::range(View::new(1), latest_complete) {
910                        // Ensure only one payload proposed per view
911                        let Some(payloads) = finalizes.get(&view) else {
912                            continue;
913                        };
914                        if payloads.len() > 1 {
915                            panic!("view: {view}");
916                        }
917                        let (digest, finalizers) = payloads.iter().next().unwrap();
918                        finalized.insert(view, *digest);
919
920                        // Only check at views below timeout
921                        if view > latest_complete {
922                            continue;
923                        }
924
925                        // Ensure everyone participating
926                        if finalizers.len() < quorum {
927                            // We can't verify that everyone participated at every view because some nodes may
928                            // have started later.
929                            panic!("view: {view}");
930                        }
931
932                        // Ensure no nullifies for any finalizers
933                        let nullifies = reporter.nullifies.lock();
934                        let Some(nullifies) = nullifies.get(&view) else {
935                            continue;
936                        };
937                        for (_, finalizers) in payloads.iter() {
938                            for finalizer in finalizers.iter() {
939                                if nullifies.contains(finalizer) {
940                                    panic!("should not nullify and finalize at same view");
941                                }
942                            }
943                        }
944                    }
945                }
946                {
947                    let finalizations = reporter.finalizations.lock();
948                    for view in View::range(View::new(1), latest_complete) {
949                        // Ensure finalization matches digest from finalizes
950                        let Some(finalization) = finalizations.get(&view) else {
951                            continue;
952                        };
953                        let Some(digest) = finalized.get(&view) else {
954                            continue;
955                        };
956                        assert_eq!(&finalization.proposal.payload, digest);
957                    }
958                }
959            }
960
961            // Ensure no blocked connections
962            let blocked = oracle.blocked().await.unwrap();
963            assert!(blocked.is_empty());
964        });
965    }
966
967    #[test_group("slow")]
968    #[test_traced]
969    fn test_all_online() {
970        all_online::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
971        all_online::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
972        all_online::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
973        all_online::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
974        all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
975        all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
976        all_online::<_, _, RoundRobin>(ed25519::fixture);
977        all_online::<_, _, RoundRobin>(secp256r1::fixture);
978    }
979
980    #[test_group("slow")]
981    #[test_traced]
982    fn test_non_genesis_floor_joiner_catches_tip() {
983        // First let a quorum finalize beyond genesis so the joiner has a real
984        // floor certificate and existing tip to catch.
985        let n = 5;
986        let active_count = quorum(n) as usize;
987        let initial_tip_target = View::new(15);
988        let activity_timeout = ViewDelta::new(10);
989        let skip_timeout = ViewDelta::new(5);
990        let namespace = b"consensus".to_vec();
991        let executor = deterministic::Runner::timed(Duration::from_secs(300));
992        executor.start(|mut context| async move {
993            let Fixture {
994                participants,
995                schemes,
996                ..
997            } = ed25519::fixture(&mut context, &namespace, n);
998            let mut oracle =
999                start_test_network_with_peers(context.child("network"), participants.clone(), true)
1000                    .await;
1001
1002            let active = &participants[..active_count];
1003            let joiner_idx = active_count;
1004            let joiner = participants[joiner_idx].clone();
1005
1006            let link = Link {
1007                latency: Duration::from_millis(10),
1008                jitter: Duration::from_millis(1),
1009                success_rate: 1.0,
1010            };
1011            link_validators(&mut oracle, active, Action::Link(link.clone()), None).await;
1012
1013            let elector = RoundRobin::<Sha256>::default();
1014            let relay = Arc::new(mocks::relay::Relay::new());
1015            let mut reporters = Vec::new();
1016            let mut engine_handlers = Vec::new();
1017
1018            for (idx, validator) in active.iter().enumerate() {
1019                let validator_context = context
1020                    .child("validator")
1021                    .with_attribute("public_key", validator);
1022
1023                let reporter_config = mocks::reporter::Config {
1024                    participants: participants.clone().try_into().unwrap(),
1025                    scheme: schemes[idx].clone(),
1026                    elector: elector.clone(),
1027                };
1028                let reporter = mocks::reporter::Reporter::new(
1029                    validator_context.child("reporter"),
1030                    reporter_config,
1031                );
1032                reporters.push(reporter.clone());
1033
1034                let application_cfg = mocks::application::Config {
1035                    hasher: Sha256::default(),
1036                    relay: relay.clone(),
1037                    me: validator.clone(),
1038                    propose_latency: (10.0, 5.0),
1039                    verify_latency: (10.0, 5.0),
1040                    certify_latency: (10.0, 5.0),
1041                    should_certify: mocks::application::Certifier::Always,
1042                };
1043                let (actor, application) = mocks::application::Application::new(
1044                    validator_context.child("application"),
1045                    application_cfg,
1046                );
1047                actor.start();
1048
1049                let cfg = config::Config {
1050                    scheme: schemes[idx].clone(),
1051                    elector: elector.clone(),
1052                    blocker: oracle.control(validator.clone()),
1053                    automaton: application.clone(),
1054                    relay: application.clone(),
1055                    reporter: reporter.clone(),
1056                    strategy: Sequential,
1057                    partition: format!("joiner_catches_tip_{validator}"),
1058                    mailbox_size: NZUsize!(1024),
1059                    epoch: Epoch::new(333),
1060                    floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
1061                        Epoch::new(333),
1062                    )),
1063                    leader_timeout: Duration::from_secs(1),
1064                    certification_timeout: Duration::from_secs(2),
1065                    timeout_retry: Duration::from_secs(10),
1066                    fetch_timeout: Duration::from_secs(1),
1067                    activity_timeout,
1068                    skip_timeout,
1069                    fetch_concurrent: NZUsize!(4),
1070                    replay_buffer: NZUsize!(1024 * 1024),
1071                    write_buffer: NZUsize!(1024 * 1024),
1072                    page_cache: CacheRef::from_pooler(
1073                        &validator_context,
1074                        PAGE_SIZE,
1075                        PAGE_CACHE_SIZE,
1076                    ),
1077                    forwarding: ForwardingPolicy::Disabled,
1078                };
1079                let engine = Engine::new(validator_context.child("engine"), cfg);
1080                let (pending, recovered, resolver) =
1081                    register_validator(&mut oracle, validator.clone()).await;
1082                engine_handlers.push(engine.start(pending, recovered, resolver));
1083            }
1084
1085            let mut finalizers = Vec::new();
1086            for reporter in reporters.iter_mut() {
1087                let (mut latest, mut monitor) = reporter.subscribe().await;
1088                finalizers.push(
1089                    context
1090                        .child("initial_finalizer")
1091                        .spawn(move |_| async move {
1092                            while latest < initial_tip_target {
1093                                latest = monitor.recv().await.expect("event missing");
1094                            }
1095                            latest
1096                        }),
1097                );
1098            }
1099            let tip_at_join = join_all(finalizers)
1100                .await
1101                .into_iter()
1102                .map(|result| result.expect("initial finalizer failed"))
1103                .min()
1104                .expect("initial validators missing");
1105
1106            let (floor_view, floor_finalization) = {
1107                let finalizations = reporters[0].finalizations.lock();
1108                finalizations
1109                    .iter()
1110                    .filter(|(view, _)| **view > View::zero() && **view < tip_at_join)
1111                    .min_by_key(|(view, _)| view.get())
1112                    .map(|(view, finalization)| (*view, finalization.clone()))
1113                    .expect("non-genesis floor finalization missing")
1114            };
1115            assert!(floor_view > View::zero());
1116            assert!(floor_view < tip_at_join);
1117
1118            // Start the extra validator from the non-genesis floor and require
1119            // it to catch both the existing tip and later cluster progress.
1120            for validator in active.iter() {
1121                oracle
1122                    .add_link(joiner.clone(), validator.clone(), link.clone())
1123                    .await
1124                    .unwrap();
1125                oracle
1126                    .add_link(validator.clone(), joiner.clone(), link.clone())
1127                    .await
1128                    .unwrap();
1129            }
1130
1131            let joiner_context = context
1132                .child("validator")
1133                .with_attribute("public_key", &joiner);
1134            let reporter_config = mocks::reporter::Config {
1135                participants: participants.clone().try_into().unwrap(),
1136                scheme: schemes[joiner_idx].clone(),
1137                elector: elector.clone(),
1138            };
1139            let mut joiner_reporter =
1140                mocks::reporter::Reporter::new(joiner_context.child("reporter"), reporter_config);
1141            reporters.push(joiner_reporter.clone());
1142
1143            let application_cfg = mocks::application::Config {
1144                hasher: Sha256::default(),
1145                relay: relay.clone(),
1146                me: joiner.clone(),
1147                propose_latency: (10.0, 5.0),
1148                verify_latency: (10.0, 5.0),
1149                certify_latency: (10.0, 5.0),
1150                should_certify: mocks::application::Certifier::Always,
1151            };
1152            let (actor, application) = mocks::application::Application::new(
1153                joiner_context.child("application"),
1154                application_cfg,
1155            );
1156            actor.start();
1157
1158            let cfg = config::Config {
1159                scheme: schemes[joiner_idx].clone(),
1160                elector,
1161                blocker: oracle.control(joiner.clone()),
1162                automaton: application.clone(),
1163                relay: application.clone(),
1164                reporter: joiner_reporter.clone(),
1165                strategy: Sequential,
1166                partition: format!("joiner_catches_tip_{joiner}"),
1167                mailbox_size: NZUsize!(1024),
1168                epoch: Epoch::new(333),
1169                floor: config::Floor::Finalized(floor_finalization),
1170                leader_timeout: Duration::from_secs(1),
1171                certification_timeout: Duration::from_secs(2),
1172                timeout_retry: Duration::from_secs(10),
1173                fetch_timeout: Duration::from_secs(1),
1174                activity_timeout,
1175                skip_timeout,
1176                fetch_concurrent: NZUsize!(4),
1177                replay_buffer: NZUsize!(1024 * 1024),
1178                write_buffer: NZUsize!(1024 * 1024),
1179                page_cache: CacheRef::from_pooler(&joiner_context, PAGE_SIZE, PAGE_CACHE_SIZE),
1180                forwarding: ForwardingPolicy::Disabled,
1181            };
1182            let engine = Engine::new(joiner_context.child("engine"), cfg);
1183            let (pending, recovered, resolver) = register_validator(&mut oracle, joiner).await;
1184            engine_handlers.push(engine.start(pending, recovered, resolver));
1185
1186            let (mut joiner_latest, mut joiner_monitor) = joiner_reporter.subscribe().await;
1187            while joiner_latest < tip_at_join {
1188                joiner_latest = joiner_monitor.recv().await.expect("event missing");
1189            }
1190
1191            let post_join_target = tip_at_join.saturating_add(ViewDelta::new(5));
1192            while joiner_latest < post_join_target {
1193                joiner_latest = joiner_monitor.recv().await.expect("event missing");
1194            }
1195
1196            for reporter in reporters.iter() {
1197                reporter.assert_no_faults();
1198                reporter.assert_no_invalid();
1199            }
1200
1201            let blocked = oracle.blocked().await.unwrap();
1202            assert!(blocked.is_empty());
1203        });
1204    }
1205
1206    /// A dishonest leader (validator 0) proposes payloads that all honest peers
1207    /// refuse to certify.
1208    ///
1209    /// All n validators use the honest Application, but every peer's certifier
1210    /// rejects proposals from views where validator 0 is the elected leader.
1211    /// When validator 0 IS the leader, it short-circuits certification locally
1212    /// (it built the proposal) and votes finalize, but every other peer
1213    /// rejects via the Custom predicate and nullifies. The lone finalize vote
1214    /// cannot form a certificate (quorum=4). The nullification cert (4 honest
1215    /// peers) advances everyone.
1216    ///
1217    /// When an honest validator leads, all peers (including validator 0)
1218    /// certify normally and finalize. The cluster makes progress on honest
1219    /// leader views and nullifies dishonest leader views.
1220    fn dishonest_leader_certification_rejected<S, F>(mut fixture: F)
1221    where
1222        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1223        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1224        RoundRobin: Elector<S>,
1225    {
1226        let n = 5;
1227        let required_containers = View::new(50);
1228        let activity_timeout = ViewDelta::new(10);
1229        let skip_timeout = ViewDelta::new(5);
1230        let namespace = b"consensus".to_vec();
1231        let executor = deterministic::Runner::timed(Duration::from_secs(300));
1232        executor.start(|mut context| async move {
1233            let Fixture {
1234                participants,
1235                schemes,
1236                ..
1237            } = fixture(&mut context, &namespace, n);
1238            let mut oracle =
1239                start_test_network_with_peers(context.child("network"), participants.clone(), true)
1240                    .await;
1241            let mut registrations = register_validators(&mut oracle, &participants).await;
1242
1243            let link = Link {
1244                latency: Duration::from_millis(10),
1245                jitter: Duration::from_millis(1),
1246                success_rate: 1.0,
1247            };
1248            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1249
1250            let elector = RoundRobin::default();
1251            let participants_set: Set<S::PublicKey> = participants.clone().try_into().unwrap();
1252            let built_elector = elector.clone().build(&participants_set);
1253            let relay = Arc::new(mocks::relay::Relay::new());
1254            let mut reporters = Vec::new();
1255            let mut engine_handlers = Vec::new();
1256            let dishonest = Participant::new(0);
1257            for (idx, validator) in participants.iter().enumerate() {
1258                let context = context
1259                    .child("validator")
1260                    .with_attribute("public_key", validator);
1261                let reporter_config = mocks::reporter::Config {
1262                    participants: participants.clone().try_into().unwrap(),
1263                    scheme: schemes[idx].clone(),
1264                    elector: elector.clone(),
1265                };
1266                let reporter =
1267                    mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
1268                reporters.push(reporter.clone());
1269
1270                let application_cfg = mocks::application::Config {
1271                    hasher: Sha256::default(),
1272                    relay: relay.clone(),
1273                    me: validator.clone(),
1274                    propose_latency: (10.0, 5.0),
1275                    verify_latency: (10.0, 5.0),
1276                    certify_latency: (10.0, 5.0),
1277                    should_certify: mocks::application::Certifier::Custom(Box::new({
1278                        let built_elector_clone = built_elector.clone();
1279                        move |round, _| built_elector_clone.elect(round, None) != dishonest
1280                    })),
1281                };
1282                let (actor, application) = mocks::application::Application::new(
1283                    context.child("application"),
1284                    application_cfg,
1285                );
1286                actor.start();
1287
1288                let blocker = oracle.control(validator.clone());
1289                let cfg = config::Config {
1290                    scheme: schemes[idx].clone(),
1291                    elector: elector.clone(),
1292                    blocker,
1293                    automaton: application.clone(),
1294                    relay: application.clone(),
1295                    reporter: reporter.clone(),
1296                    strategy: Sequential,
1297                    partition: validator.to_string(),
1298                    mailbox_size: NZUsize!(1024),
1299                    epoch: Epoch::new(333),
1300                    floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
1301                        Epoch::new(333),
1302                    )),
1303                    leader_timeout: Duration::from_secs(1),
1304                    certification_timeout: Duration::from_secs(2),
1305                    timeout_retry: Duration::from_secs(10),
1306                    fetch_timeout: Duration::from_secs(1),
1307                    activity_timeout,
1308                    skip_timeout,
1309                    fetch_concurrent: NZUsize!(4),
1310                    replay_buffer: NZUsize!(1024 * 1024),
1311                    write_buffer: NZUsize!(1024 * 1024),
1312                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1313                    forwarding: ForwardingPolicy::Disabled,
1314                };
1315                let engine = Engine::new(context.child("engine"), cfg);
1316                let (pending, recovered, resolver) = registrations
1317                    .remove(validator)
1318                    .expect("validator should be registered");
1319                engine_handlers.push(engine.start(pending, recovered, resolver));
1320            }
1321
1322            let mut finalizers = Vec::new();
1323            for reporter in reporters.iter_mut() {
1324                let (mut latest, mut monitor) = reporter.subscribe().await;
1325                finalizers.push(context.child("finalizer").spawn(move |_| async move {
1326                    while latest < required_containers {
1327                        latest = monitor.recv().await.expect("event missing");
1328                    }
1329                }));
1330            }
1331            join_all(finalizers).await;
1332
1333            for reporter in reporters.iter() {
1334                reporter.assert_no_faults();
1335                reporter.assert_no_invalid();
1336            }
1337        });
1338    }
1339
1340    #[test_group("slow")]
1341    #[test_traced]
1342    fn test_dishonest_leader_certification_rejected() {
1343        dishonest_leader_certification_rejected::<_, _>(
1344            bls12381_threshold_vrf::fixture::<MinPk, _>,
1345        );
1346        dishonest_leader_certification_rejected::<_, _>(
1347            bls12381_threshold_vrf::fixture::<MinSig, _>,
1348        );
1349        dishonest_leader_certification_rejected::<_, _>(
1350            bls12381_threshold_std::fixture::<MinPk, _>,
1351        );
1352        dishonest_leader_certification_rejected::<_, _>(
1353            bls12381_threshold_std::fixture::<MinSig, _>,
1354        );
1355        dishonest_leader_certification_rejected::<_, _>(bls12381_multisig::fixture::<MinPk, _>);
1356        dishonest_leader_certification_rejected::<_, _>(bls12381_multisig::fixture::<MinSig, _>);
1357        dishonest_leader_certification_rejected::<_, _>(ed25519::fixture);
1358        dishonest_leader_certification_rejected::<_, _>(secp256r1::fixture);
1359    }
1360
1361    fn observer<S, F, L>(mut fixture: F)
1362    where
1363        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1364        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1365        L: Elector<S>,
1366    {
1367        // Create context
1368        let n_active = 5;
1369        let required_containers = View::new(100);
1370        let activity_timeout = ViewDelta::new(10);
1371        let skip_timeout = ViewDelta::new(5);
1372        let namespace = b"consensus".to_vec();
1373        let executor = deterministic::Runner::timed(Duration::from_secs(300));
1374        executor.start(|mut context| async move {
1375            // Register participants (active)
1376            let Fixture {
1377                participants,
1378                schemes,
1379                verifier,
1380                ..
1381            } = fixture(&mut context, &namespace, n_active);
1382
1383            // Add observer (no share)
1384            let private_key_observer = PrivateKey::from_seed(n_active as u64);
1385            let public_key_observer = private_key_observer.public_key();
1386
1387            let mut oracle = start_test_network_with_split_peers(
1388                context.child("network"),
1389                participants.clone(),
1390                [public_key_observer.clone()],
1391                true,
1392            )
1393            .await;
1394
1395            // Register all (including observer) with the network
1396            let mut all_validators = participants.clone();
1397            all_validators.push(public_key_observer.clone());
1398            all_validators.sort();
1399            let mut registrations = register_validators(&mut oracle, &all_validators).await;
1400
1401            // Link all peers (including observer)
1402            let link = Link {
1403                latency: Duration::from_millis(10),
1404                jitter: Duration::from_millis(1),
1405                success_rate: 1.0,
1406            };
1407            link_validators(&mut oracle, &all_validators, Action::Link(link), None).await;
1408
1409            // Create engines
1410            let elector = L::default();
1411            let relay = Arc::new(mocks::relay::Relay::new());
1412            let mut reporters = Vec::new();
1413
1414            for (idx, validator) in participants.iter().enumerate() {
1415                let is_observer = *validator == public_key_observer;
1416
1417                // Create scheme context
1418                let context = context
1419                    .child("validator")
1420                    .with_attribute("public_key", validator);
1421
1422                // Configure engine
1423                let signing = if is_observer {
1424                    verifier.clone()
1425                } else {
1426                    schemes[idx].clone()
1427                };
1428                let reporter_config = mocks::reporter::Config {
1429                    participants: participants.clone().try_into().unwrap(),
1430                    scheme: signing.clone(),
1431                    elector: elector.clone(),
1432                };
1433                let reporter =
1434                    mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
1435                reporters.push(reporter.clone());
1436                let application_cfg = mocks::application::Config {
1437                    hasher: Sha256::default(),
1438                    relay: relay.clone(),
1439                    me: validator.clone(),
1440                    propose_latency: (10.0, 5.0),
1441                    verify_latency: (10.0, 5.0),
1442                    certify_latency: (10.0, 5.0),
1443                    should_certify: mocks::application::Certifier::Always,
1444                };
1445                let (actor, application) = mocks::application::Application::new(
1446                    context.child("application"),
1447                    application_cfg,
1448                );
1449                actor.start();
1450                let blocker = oracle.control(validator.clone());
1451                let cfg = config::Config {
1452                    scheme: signing.clone(),
1453                    elector: elector.clone(),
1454                    blocker,
1455                    automaton: application.clone(),
1456                    relay: application.clone(),
1457                    reporter: reporter.clone(),
1458                    strategy: Sequential,
1459                    partition: validator.to_string(),
1460                    mailbox_size: NZUsize!(1024),
1461                    epoch: Epoch::new(333),
1462                    floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
1463                        Epoch::new(333),
1464                    )),
1465                    leader_timeout: Duration::from_secs(1),
1466                    certification_timeout: Duration::from_secs(2),
1467                    timeout_retry: Duration::from_secs(10),
1468                    fetch_timeout: Duration::from_secs(1),
1469                    activity_timeout,
1470                    skip_timeout,
1471                    fetch_concurrent: NZUsize!(4),
1472                    replay_buffer: NZUsize!(1024 * 1024),
1473                    write_buffer: NZUsize!(1024 * 1024),
1474                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1475                    forwarding: ForwardingPolicy::Disabled,
1476                };
1477                let engine = Engine::new(context.child("engine"), cfg);
1478
1479                // Start engine
1480                let (pending, recovered, resolver) = registrations
1481                    .remove(validator)
1482                    .expect("validator should be registered");
1483                engine.start(pending, recovered, resolver);
1484            }
1485
1486            // Wait for all  engines to finish
1487            let mut finalizers = Vec::new();
1488            for reporter in reporters.iter_mut() {
1489                let (mut latest, mut monitor) = reporter.subscribe().await;
1490                finalizers.push(context.child("finalizer").spawn(move |_| async move {
1491                    while latest < required_containers {
1492                        latest = monitor.recv().await.expect("event missing");
1493                    }
1494                }));
1495            }
1496            join_all(finalizers).await;
1497
1498            // Sanity check. The standalone secondary observer should still
1499            // process the chain to the same progress threshold as validators.
1500            for reporter in reporters.iter() {
1501                // Ensure no faults or invalid signatures
1502                reporter.assert_no_faults();
1503                reporter.assert_no_invalid();
1504
1505                // Ensure no blocked connections
1506                let blocked = oracle.blocked().await.unwrap();
1507                assert!(blocked.is_empty());
1508            }
1509        });
1510    }
1511
1512    #[test_group("slow")]
1513    #[test_traced]
1514    fn test_observer() {
1515        observer::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1516        observer::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1517        observer::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1518        observer::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1519        observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1520        observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1521        observer::<_, _, RoundRobin>(ed25519::fixture);
1522        observer::<_, _, RoundRobin>(secp256r1::fixture);
1523    }
1524
1525    fn unclean_shutdown<S, F, L>(mut fixture: F)
1526    where
1527        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1528        F: FnMut(&mut StdRng, &[u8], u32) -> Fixture<S>,
1529        L: Elector<S>,
1530    {
1531        // Create context
1532        let n = 5;
1533        let required_containers = View::new(100);
1534        let activity_timeout = ViewDelta::new(10);
1535        let skip_timeout = ViewDelta::new(5);
1536        let namespace = b"consensus".to_vec();
1537
1538        // Random restarts every x seconds
1539        let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
1540        let supervised = Arc::new(Mutex::new(Vec::new()));
1541        let mut prev_checkpoint = None;
1542
1543        // Create validator keys
1544        let mut rng = test_rng();
1545        let Fixture {
1546            participants,
1547            schemes,
1548            ..
1549        } = fixture(&mut rng, &namespace, n);
1550
1551        // Create block relay, shared across restarts.
1552        let relay = Arc::new(mocks::relay::Relay::<Sha256Digest, S::PublicKey>::new());
1553
1554        loop {
1555            let rng = rng.clone();
1556            let participants = participants.clone();
1557            let schemes = schemes.clone();
1558            let shutdowns = shutdowns.clone();
1559            let supervised = supervised.clone();
1560            let relay = relay.clone();
1561            relay.deregister_all(); // Clear all recipients from previous restart.
1562
1563            let f = |mut context: deterministic::Context| async move {
1564                // Register participants
1565                let mut oracle = start_test_network_with_peers(
1566                    context.child("network"),
1567                    participants.clone(),
1568                    true,
1569                )
1570                .await;
1571                let mut registrations = register_validators(&mut oracle, &participants).await;
1572
1573                // Link all validators
1574                let link = Link {
1575                    latency: Duration::from_millis(50),
1576                    jitter: Duration::from_millis(50),
1577                    success_rate: 1.0,
1578                };
1579                link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1580
1581                // Create engines
1582                let elector = L::default();
1583                let relay = Arc::new(mocks::relay::Relay::new());
1584                let mut reporters = HashMap::new();
1585                let mut engine_handlers = Vec::new();
1586                for (idx, validator) in participants.iter().enumerate() {
1587                    // Create scheme context
1588                    let context = context
1589                        .child("validator")
1590                        .with_attribute("public_key", validator);
1591
1592                    // Configure engine
1593                    let reporter_config = mocks::reporter::Config {
1594                        participants: participants.clone().try_into().unwrap(),
1595                        scheme: schemes[idx].clone(),
1596                        elector: elector.clone(),
1597                    };
1598                    let reporter = mocks::reporter::Reporter::new(rng.clone(), reporter_config);
1599                    reporters.insert(validator.clone(), reporter.clone());
1600                    let application_cfg = mocks::application::Config {
1601                        hasher: Sha256::default(),
1602                        relay: relay.clone(),
1603                        me: validator.clone(),
1604                        propose_latency: (10.0, 5.0),
1605                        verify_latency: (10.0, 5.0),
1606                        certify_latency: (10.0, 5.0),
1607                        should_certify: mocks::application::Certifier::Always,
1608                    };
1609                    let (actor, application) = mocks::application::Application::new(
1610                        context.child("application"),
1611                        application_cfg,
1612                    );
1613                    actor.start();
1614                    let blocker = oracle.control(validator.clone());
1615                    let cfg = config::Config {
1616                        scheme: schemes[idx].clone(),
1617                        elector: elector.clone(),
1618                        blocker,
1619                        automaton: application.clone(),
1620                        relay: application.clone(),
1621                        reporter: reporter.clone(),
1622                        strategy: Sequential,
1623                        partition: validator.to_string(),
1624                        mailbox_size: NZUsize!(1024),
1625                        epoch: Epoch::new(333),
1626                        floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
1627                            Epoch::new(333),
1628                        )),
1629                        leader_timeout: Duration::from_secs(1),
1630                        certification_timeout: Duration::from_secs(2),
1631                        timeout_retry: Duration::from_secs(10),
1632                        fetch_timeout: Duration::from_secs(1),
1633                        activity_timeout,
1634                        skip_timeout,
1635                        fetch_concurrent: NZUsize!(4),
1636                        replay_buffer: NZUsize!(1024 * 1024),
1637                        write_buffer: NZUsize!(1024 * 1024),
1638                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1639                        forwarding: ForwardingPolicy::Disabled,
1640                    };
1641                    let engine = Engine::new(context.child("engine"), cfg);
1642
1643                    // Start engine
1644                    let (pending, recovered, resolver) = registrations
1645                        .remove(validator)
1646                        .expect("validator should be registered");
1647                    engine_handlers.push(engine.start(pending, recovered, resolver));
1648                }
1649
1650                // Store all finalizer handles
1651                let mut finalizers = Vec::new();
1652                for (_, reporter) in reporters.iter_mut() {
1653                    let (mut latest, mut monitor) = reporter.subscribe().await;
1654                    finalizers.push(context.child("finalizer").spawn(move |_| async move {
1655                        while latest < required_containers {
1656                            latest = monitor.recv().await.expect("event missing");
1657                        }
1658                    }));
1659                }
1660
1661                // Exit at random points for unclean shutdown of entire set
1662                let wait =
1663                    context.gen_range(Duration::from_millis(100)..Duration::from_millis(2_000));
1664                let result = select! {
1665                    _ = context.sleep(wait) => {
1666                        // Collect reporters to check faults
1667                        {
1668                            let mut shutdowns = shutdowns.lock();
1669                            debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
1670                            *shutdowns += 1;
1671                        }
1672                        supervised.lock().push(reporters);
1673                        false
1674                    },
1675                    _ = join_all(finalizers) => {
1676                        // Check reporters for faults activity
1677                        let supervised = supervised.lock();
1678                        for reporters in supervised.iter() {
1679                            for (_, reporter) in reporters.iter() {
1680                                reporter.assert_no_faults();
1681                            }
1682                        }
1683                        true
1684                    },
1685                };
1686
1687                // Ensure no blocked connections
1688                let blocked = oracle.blocked().await.unwrap();
1689                assert!(blocked.is_empty());
1690
1691                result
1692            };
1693
1694            let (complete, checkpoint) = prev_checkpoint
1695                .map_or_else(
1696                    || deterministic::Runner::timed(Duration::from_secs(180)),
1697                    deterministic::Runner::from,
1698                )
1699                .start_and_recover(f);
1700
1701            // Check if we should exit
1702            if complete {
1703                break;
1704            }
1705
1706            prev_checkpoint = Some(checkpoint);
1707        }
1708    }
1709
1710    #[test_group("slow")]
1711    #[test_traced]
1712    fn test_unclean_shutdown() {
1713        unclean_shutdown::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1714        unclean_shutdown::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1715        unclean_shutdown::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1716        unclean_shutdown::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1717        unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1718        unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1719        unclean_shutdown::<_, _, RoundRobin>(ed25519::fixture);
1720        unclean_shutdown::<_, _, RoundRobin>(secp256r1::fixture);
1721    }
1722
1723    fn backfill<S, F, L>(mut fixture: F)
1724    where
1725        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1726        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1727        L: Elector<S>,
1728    {
1729        // Create context
1730        let n = 4;
1731        let required_containers = View::new(100);
1732        let activity_timeout = ViewDelta::new(10);
1733        let skip_timeout = ViewDelta::new(5);
1734        let namespace = b"consensus".to_vec();
1735        let executor = deterministic::Runner::timed(Duration::from_secs(240));
1736        executor.start(|mut context| async move {
1737            // Register participants
1738            let Fixture {
1739                participants,
1740                schemes,
1741                ..
1742            } = fixture(&mut context, &namespace, n);
1743            let mut oracle =
1744                start_test_network_with_peers(context.child("network"), participants.clone(), true)
1745                    .await;
1746            let mut registrations = register_validators(&mut oracle, &participants).await;
1747
1748            // Link all validators except first
1749            let link = Link {
1750                latency: Duration::from_millis(10),
1751                jitter: Duration::from_millis(1),
1752                success_rate: 1.0,
1753            };
1754            link_validators(
1755                &mut oracle,
1756                &participants,
1757                Action::Link(link),
1758                Some(|_, i, j| ![i, j].contains(&0usize)),
1759            )
1760            .await;
1761
1762            // Create engines
1763            let elector = L::default();
1764            let relay = Arc::new(mocks::relay::Relay::new());
1765            let mut reporters = Vec::new();
1766            let mut engine_handlers = Vec::new();
1767            for (idx_scheme, validator) in participants.iter().enumerate() {
1768                // Skip first peer
1769                if idx_scheme == 0 {
1770                    continue;
1771                }
1772
1773                // Create scheme context
1774                let context = context
1775                    .child("validator")
1776                    .with_attribute("public_key", validator);
1777
1778                // Configure engine
1779                let reporter_config = mocks::reporter::Config {
1780                    participants: participants.clone().try_into().unwrap(),
1781                    scheme: schemes[idx_scheme].clone(),
1782                    elector: elector.clone(),
1783                };
1784                let reporter =
1785                    mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
1786                reporters.push(reporter.clone());
1787                let application_cfg = mocks::application::Config {
1788                    hasher: Sha256::default(),
1789                    relay: relay.clone(),
1790                    me: validator.clone(),
1791                    propose_latency: (10.0, 5.0),
1792                    verify_latency: (10.0, 5.0),
1793                    certify_latency: (10.0, 5.0),
1794                    should_certify: mocks::application::Certifier::Always,
1795                };
1796                let (actor, application) = mocks::application::Application::new(
1797                    context.child("application"),
1798                    application_cfg,
1799                );
1800                actor.start();
1801                let blocker = oracle.control(validator.clone());
1802                let cfg = config::Config {
1803                    scheme: schemes[idx_scheme].clone(),
1804                    elector: elector.clone(),
1805                    blocker,
1806                    automaton: application.clone(),
1807                    relay: application.clone(),
1808                    reporter: reporter.clone(),
1809                    strategy: Sequential,
1810                    partition: validator.to_string(),
1811                    mailbox_size: NZUsize!(1024),
1812                    epoch: Epoch::new(333),
1813                    floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
1814                        Epoch::new(333),
1815                    )),
1816                    leader_timeout: Duration::from_secs(1),
1817                    certification_timeout: Duration::from_secs(2),
1818                    timeout_retry: Duration::from_secs(10),
1819                    fetch_timeout: Duration::from_secs(1),
1820                    activity_timeout,
1821                    skip_timeout,
1822                    fetch_concurrent: NZUsize!(4),
1823                    replay_buffer: NZUsize!(1024 * 1024),
1824                    write_buffer: NZUsize!(1024 * 1024),
1825                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1826                    forwarding: ForwardingPolicy::Disabled,
1827                };
1828                let engine = Engine::new(context.child("engine"), cfg);
1829
1830                // Start engine
1831                let (pending, recovered, resolver) = registrations
1832                    .remove(validator)
1833                    .expect("validator should be registered");
1834                engine_handlers.push(engine.start(pending, recovered, resolver));
1835            }
1836
1837            // Wait for all engines to finish
1838            let mut finalizers = Vec::new();
1839            for reporter in reporters.iter_mut() {
1840                let (mut latest, mut monitor) = reporter.subscribe().await;
1841                finalizers.push(context.child("finalizer").spawn(move |_| async move {
1842                    while latest < required_containers {
1843                        latest = monitor.recv().await.expect("event missing");
1844                    }
1845                }));
1846            }
1847            join_all(finalizers).await;
1848
1849            // Degrade network connections for online peers
1850            let link = Link {
1851                latency: Duration::from_secs(3),
1852                jitter: Duration::from_millis(0),
1853                success_rate: 1.0,
1854            };
1855            link_validators(
1856                &mut oracle,
1857                &participants,
1858                Action::Update(link.clone()),
1859                Some(|_, i, j| ![i, j].contains(&0usize)),
1860            )
1861            .await;
1862
1863            // Wait for nullifications to accrue
1864            context.sleep(Duration::from_secs(60)).await;
1865
1866            // Unlink second peer from all (except first)
1867            link_validators(
1868                &mut oracle,
1869                &participants,
1870                Action::Unlink,
1871                Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
1872            )
1873            .await;
1874
1875            // Configure engine for first peer
1876            let me = participants[0].clone();
1877            let context = context.child("validator").with_attribute("public_key", &me);
1878
1879            // Link first peer to all (except second)
1880            link_validators(
1881                &mut oracle,
1882                &participants,
1883                Action::Link(link),
1884                Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
1885            )
1886            .await;
1887
1888            // Restore network connections for all online peers
1889            let link = Link {
1890                latency: Duration::from_millis(10),
1891                jitter: Duration::from_millis(3),
1892                success_rate: 1.0,
1893            };
1894            link_validators(
1895                &mut oracle,
1896                &participants,
1897                Action::Update(link),
1898                Some(|_, i, j| ![i, j].contains(&1usize)),
1899            )
1900            .await;
1901
1902            // Configure engine
1903            let reporter_config = mocks::reporter::Config {
1904                participants: participants.clone().try_into().unwrap(),
1905                scheme: schemes[0].clone(),
1906                elector: elector.clone(),
1907            };
1908            let mut reporter =
1909                mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
1910            reporters.push(reporter.clone());
1911            let application_cfg = mocks::application::Config {
1912                hasher: Sha256::default(),
1913                relay: relay.clone(),
1914                me: me.clone(),
1915                propose_latency: (10.0, 5.0),
1916                verify_latency: (10.0, 5.0),
1917                certify_latency: (10.0, 5.0),
1918                should_certify: mocks::application::Certifier::Always,
1919            };
1920            let (actor, application) =
1921                mocks::application::Application::new(context.child("application"), application_cfg);
1922            actor.start();
1923            let blocker = oracle.control(me.clone());
1924            let cfg = config::Config {
1925                scheme: schemes[0].clone(),
1926                elector: elector.clone(),
1927                blocker,
1928                automaton: application.clone(),
1929                relay: application.clone(),
1930                reporter: reporter.clone(),
1931                strategy: Sequential,
1932                partition: me.to_string(),
1933                mailbox_size: NZUsize!(1024),
1934                epoch: Epoch::new(333),
1935                floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(Epoch::new(
1936                    333,
1937                ))),
1938                leader_timeout: Duration::from_secs(1),
1939                certification_timeout: Duration::from_secs(2),
1940                timeout_retry: Duration::from_secs(10),
1941                fetch_timeout: Duration::from_secs(1),
1942                activity_timeout,
1943                skip_timeout,
1944                fetch_concurrent: NZUsize!(4),
1945                replay_buffer: NZUsize!(1024 * 1024),
1946                write_buffer: NZUsize!(1024 * 1024),
1947                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1948                forwarding: ForwardingPolicy::Disabled,
1949            };
1950            let engine = Engine::new(context.child("engine"), cfg);
1951
1952            // Start engine
1953            let (pending, recovered, resolver) = registrations
1954                .remove(&me)
1955                .expect("validator should be registered");
1956            engine_handlers.push(engine.start(pending, recovered, resolver));
1957
1958            // Wait for new engine to finalize required
1959            let (mut latest, mut monitor) = reporter.subscribe().await;
1960            while latest < required_containers {
1961                latest = monitor.recv().await.expect("event missing");
1962            }
1963
1964            // Ensure no blocked connections
1965            let blocked = oracle.blocked().await.unwrap();
1966            assert!(blocked.is_empty());
1967        });
1968    }
1969
1970    #[test_group("slow")]
1971    #[test_traced]
1972    fn test_backfill() {
1973        backfill::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1974        backfill::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1975        backfill::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1976        backfill::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1977        backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1978        backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1979        backfill::<_, _, RoundRobin>(ed25519::fixture);
1980        backfill::<_, _, RoundRobin>(secp256r1::fixture);
1981    }
1982
1983    fn one_offline<S, F, L>(mut fixture: F)
1984    where
1985        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1986        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1987        L: Elector<S>,
1988    {
1989        // Create context
1990        let n = 5;
1991        let quorum = quorum(n) as usize;
1992        let required_containers = View::new(100);
1993        let activity_timeout = ViewDelta::new(10);
1994        let skip_timeout = ViewDelta::new(5);
1995        let max_exceptions = 10;
1996        let namespace = b"consensus".to_vec();
1997        let executor = deterministic::Runner::timed(Duration::from_secs(300));
1998        executor.start(|mut context| async move {
1999            // Register participants
2000            let Fixture {
2001                participants,
2002                schemes,
2003                ..
2004            } = fixture(&mut context, &namespace, n);
2005            let mut oracle =
2006                start_test_network_with_peers(context.child("network"), participants.clone(), true)
2007                    .await;
2008            let mut registrations = register_validators(&mut oracle, &participants).await;
2009
2010            // Link all validators except first
2011            let link = Link {
2012                latency: Duration::from_millis(10),
2013                jitter: Duration::from_millis(1),
2014                success_rate: 1.0,
2015            };
2016            link_validators(
2017                &mut oracle,
2018                &participants,
2019                Action::Link(link),
2020                Some(|_, i, j| ![i, j].contains(&0usize)),
2021            )
2022            .await;
2023
2024            // Create engines
2025            let elector = L::default();
2026            let relay = Arc::new(mocks::relay::Relay::new());
2027            let mut reporters = Vec::new();
2028            let mut engine_handlers = Vec::new();
2029            for (idx_scheme, validator) in participants.iter().enumerate() {
2030                // Skip first peer
2031                if idx_scheme == 0 {
2032                    continue;
2033                }
2034
2035                // Create scheme context
2036                let context = context
2037                    .child("validator")
2038                    .with_attribute("public_key", validator);
2039
2040                // Configure engine
2041                let reporter_config = mocks::reporter::Config {
2042                    participants: participants.clone().try_into().unwrap(),
2043                    scheme: schemes[idx_scheme].clone(),
2044                    elector: elector.clone(),
2045                };
2046                let reporter =
2047                    mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
2048                reporters.push(reporter.clone());
2049                let application_cfg = mocks::application::Config {
2050                    hasher: Sha256::default(),
2051                    relay: relay.clone(),
2052                    me: validator.clone(),
2053                    propose_latency: (10.0, 5.0),
2054                    verify_latency: (10.0, 5.0),
2055                    certify_latency: (10.0, 5.0),
2056                    should_certify: mocks::application::Certifier::Always,
2057                };
2058                let (actor, application) = mocks::application::Application::new(
2059                    context.child("application"),
2060                    application_cfg,
2061                );
2062                actor.start();
2063                let blocker = oracle.control(validator.clone());
2064                let cfg = config::Config {
2065                    scheme: schemes[idx_scheme].clone(),
2066                    elector: elector.clone(),
2067                    blocker,
2068                    automaton: application.clone(),
2069                    relay: application.clone(),
2070                    reporter: reporter.clone(),
2071                    strategy: Sequential,
2072                    partition: validator.to_string(),
2073                    mailbox_size: NZUsize!(1024),
2074                    epoch: Epoch::new(333),
2075                    floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
2076                        Epoch::new(333),
2077                    )),
2078                    leader_timeout: Duration::from_secs(1),
2079                    certification_timeout: Duration::from_secs(2),
2080                    timeout_retry: Duration::from_secs(10),
2081                    fetch_timeout: Duration::from_secs(1),
2082                    activity_timeout,
2083                    skip_timeout,
2084                    fetch_concurrent: NZUsize!(4),
2085                    replay_buffer: NZUsize!(1024 * 1024),
2086                    write_buffer: NZUsize!(1024 * 1024),
2087                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2088                    forwarding: ForwardingPolicy::Disabled,
2089                };
2090                let engine = Engine::new(context.child("engine"), cfg);
2091
2092                // Start engine
2093                let (pending, recovered, resolver) = registrations
2094                    .remove(validator)
2095                    .expect("validator should be registered");
2096                engine_handlers.push(engine.start(pending, recovered, resolver));
2097            }
2098
2099            // Wait for all engines to finish
2100            let mut finalizers = Vec::new();
2101            for reporter in reporters.iter_mut() {
2102                let (mut latest, mut monitor) = reporter.subscribe().await;
2103                finalizers.push(context.child("finalizer").spawn(move |_| async move {
2104                    while latest < required_containers {
2105                        latest = monitor.recv().await.expect("event missing");
2106                    }
2107                }));
2108            }
2109            join_all(finalizers).await;
2110
2111            // Check reporters for correct activity
2112            let exceptions = 0;
2113            let offline = &participants[0];
2114            for reporter in reporters.iter() {
2115                // Ensure no faults
2116                reporter.assert_no_faults();
2117
2118                // Ensure no invalid signatures
2119                reporter.assert_no_invalid();
2120
2121                // Ensure offline node is never active
2122                let mut exceptions = 0;
2123                {
2124                    let notarizes = reporter.notarizes.lock();
2125                    for (view, payloads) in notarizes.iter() {
2126                        for (_, participants) in payloads.iter() {
2127                            if participants.contains(offline) {
2128                                panic!("view: {view}");
2129                            }
2130                        }
2131                    }
2132                }
2133                {
2134                    let nullifies = reporter.nullifies.lock();
2135                    for (view, participants) in nullifies.iter() {
2136                        if participants.contains(offline) {
2137                            panic!("view: {view}");
2138                        }
2139                    }
2140                }
2141                {
2142                    let finalizes = reporter.finalizes.lock();
2143                    for (view, payloads) in finalizes.iter() {
2144                        for (_, finalizers) in payloads.iter() {
2145                            if finalizers.contains(offline) {
2146                                panic!("view: {view}");
2147                            }
2148                        }
2149                    }
2150                }
2151
2152                // Identify offline views
2153                let mut offline_views = Vec::new();
2154                {
2155                    let leaders = reporter.leaders.lock();
2156                    for (view, leader) in leaders.iter() {
2157                        if leader == offline {
2158                            offline_views.push(*view);
2159                        }
2160                    }
2161                }
2162                assert!(!offline_views.is_empty());
2163
2164                // Ensure nullifies/nullification collected for offline node
2165                {
2166                    let nullifies = reporter.nullifies.lock();
2167                    for view in offline_views.iter() {
2168                        let nullifies = nullifies.get(view).map_or(0, |n| n.len());
2169                        if nullifies < quorum {
2170                            warn!("missing expected view nullifies: {}", view);
2171                            exceptions += 1;
2172                        }
2173                    }
2174                }
2175                {
2176                    let nullifications = reporter.nullifications.lock();
2177                    for view in offline_views.iter() {
2178                        if !nullifications.contains_key(view) {
2179                            warn!("missing expected view nullifies: {}", view);
2180                            exceptions += 1;
2181                        }
2182                    }
2183                }
2184
2185                // Ensure exceptions within allowed
2186                assert!(exceptions <= max_exceptions);
2187            }
2188            assert!(exceptions <= max_exceptions);
2189
2190            // Ensure no blocked connections
2191            let blocked = oracle.blocked().await.unwrap();
2192            assert!(blocked.is_empty());
2193
2194            // Ensure online nodes are recording timeouts/nullifications for the offline leader
2195            let encoded = context.encode();
2196            let leader_label = format!("leader=\"{}\"", offline);
2197            assert!(
2198                count_nonzero_metric_lines(&encoded, &["_timeouts", &leader_label]) >= n - 1,
2199                "expected timeout metrics for offline leader"
2200            );
2201            assert_eq!(
2202                count_nonzero_metric_lines(&encoded, &["_nullifications", &leader_label]),
2203                n - 1,
2204                "expected all online nodes to record _nullifications for offline leader"
2205            );
2206        });
2207    }
2208
2209    #[test_group("slow")]
2210    #[test_traced]
2211    fn test_one_offline() {
2212        one_offline::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
2213        one_offline::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
2214        one_offline::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
2215        one_offline::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
2216        one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2217        one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2218        one_offline::<_, _, RoundRobin>(ed25519::fixture);
2219        one_offline::<_, _, RoundRobin>(secp256r1::fixture);
2220    }
2221
2222    fn slow_validator<S, F, L>(mut fixture: F)
2223    where
2224        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2225        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2226        L: Elector<S>,
2227    {
2228        // Create context
2229        let n = 5;
2230        let required_containers = View::new(50);
2231        let activity_timeout = ViewDelta::new(10);
2232        let skip_timeout = ViewDelta::new(5);
2233        let namespace = b"consensus".to_vec();
2234        let executor = deterministic::Runner::timed(Duration::from_secs(300));
2235        executor.start(|mut context| async move {
2236            // Register participants
2237            let Fixture {
2238                participants,
2239                schemes,
2240                ..
2241            } = fixture(&mut context, &namespace, n);
2242            let mut oracle =
2243                start_test_network_with_peers(context.child("network"), participants.clone(), true)
2244                    .await;
2245            let mut registrations = register_validators(&mut oracle, &participants).await;
2246
2247            // Link all validators
2248            let link = Link {
2249                latency: Duration::from_millis(10),
2250                jitter: Duration::from_millis(1),
2251                success_rate: 1.0,
2252            };
2253            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2254
2255            // Create engines
2256            let elector = L::default();
2257            let relay = Arc::new(mocks::relay::Relay::new());
2258            let mut reporters = Vec::new();
2259            let mut engine_handlers = Vec::new();
2260            for (idx_scheme, validator) in participants.iter().enumerate() {
2261                // Create scheme context
2262                let context = context
2263                    .child("validator")
2264                    .with_attribute("public_key", validator);
2265
2266                // Configure engine
2267                let reporter_config = mocks::reporter::Config {
2268                    participants: participants.clone().try_into().unwrap(),
2269                    scheme: schemes[idx_scheme].clone(),
2270                    elector: elector.clone(),
2271                };
2272                let reporter =
2273                    mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
2274                reporters.push(reporter.clone());
2275                let application_cfg = if idx_scheme == 0 {
2276                    mocks::application::Config {
2277                        hasher: Sha256::default(),
2278                        relay: relay.clone(),
2279                        me: validator.clone(),
2280                        propose_latency: (10_000.0, 0.0),
2281                        verify_latency: (10_000.0, 5.0),
2282                        certify_latency: (10_000.0, 5.0),
2283                        should_certify: mocks::application::Certifier::Always,
2284                    }
2285                } else {
2286                    mocks::application::Config {
2287                        hasher: Sha256::default(),
2288                        relay: relay.clone(),
2289                        me: validator.clone(),
2290                        propose_latency: (10.0, 5.0),
2291                        verify_latency: (10.0, 5.0),
2292                        certify_latency: (10.0, 5.0),
2293                        should_certify: mocks::application::Certifier::Always,
2294                    }
2295                };
2296                let (actor, application) = mocks::application::Application::new(
2297                    context.child("application"),
2298                    application_cfg,
2299                );
2300                actor.start();
2301                let blocker = oracle.control(validator.clone());
2302                let cfg = config::Config {
2303                    scheme: schemes[idx_scheme].clone(),
2304                    elector: elector.clone(),
2305                    blocker,
2306                    automaton: application.clone(),
2307                    relay: application.clone(),
2308                    reporter: reporter.clone(),
2309                    strategy: Sequential,
2310                    partition: validator.to_string(),
2311                    mailbox_size: NZUsize!(1024),
2312                    epoch: Epoch::new(333),
2313                    floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
2314                        Epoch::new(333),
2315                    )),
2316                    leader_timeout: Duration::from_secs(1),
2317                    certification_timeout: Duration::from_secs(2),
2318                    timeout_retry: Duration::from_secs(10),
2319                    fetch_timeout: Duration::from_secs(1),
2320                    activity_timeout,
2321                    skip_timeout,
2322                    fetch_concurrent: NZUsize!(4),
2323                    replay_buffer: NZUsize!(1024 * 1024),
2324                    write_buffer: NZUsize!(1024 * 1024),
2325                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2326                    forwarding: ForwardingPolicy::Disabled,
2327                };
2328                let engine = Engine::new(context.child("engine"), cfg);
2329
2330                // Start engine
2331                let (pending, recovered, resolver) = registrations
2332                    .remove(validator)
2333                    .expect("validator should be registered");
2334                engine_handlers.push(engine.start(pending, recovered, resolver));
2335            }
2336
2337            // Wait for all engines to finish
2338            let mut finalizers = Vec::new();
2339            for reporter in reporters.iter_mut() {
2340                let (mut latest, mut monitor) = reporter.subscribe().await;
2341                finalizers.push(context.child("finalizer").spawn(move |_| async move {
2342                    while latest < required_containers {
2343                        latest = monitor.recv().await.expect("event missing");
2344                    }
2345                }));
2346            }
2347            join_all(finalizers).await;
2348
2349            // Check reporters for correct activity
2350            let slow = &participants[0];
2351            for reporter in reporters.iter() {
2352                // Ensure no faults
2353                reporter.assert_no_faults();
2354
2355                // Ensure no invalid signatures
2356                reporter.assert_no_invalid();
2357
2358                // Ensure the slow validator never emits notarize or finalize
2359                // votes. It may still emit nullifies after timing out.
2360                {
2361                    let notarizes = reporter.notarizes.lock();
2362                    assert!(notarizes.values().all(|payloads| {
2363                        payloads
2364                            .values()
2365                            .all(|participants| !participants.contains(slow))
2366                    }));
2367                }
2368                {
2369                    let finalizes = reporter.finalizes.lock();
2370                    assert!(finalizes.values().all(|payloads| {
2371                        payloads
2372                            .values()
2373                            .all(|participants| !participants.contains(slow))
2374                    }));
2375                }
2376
2377                // Ensure every reporter observes finalization progress to at least the target view.
2378                {
2379                    let finalizations = reporter.finalizations.lock();
2380                    assert!(finalizations
2381                        .keys()
2382                        .any(|view| *view >= required_containers));
2383                }
2384            }
2385
2386            // Ensure no blocked connections
2387            let blocked = oracle.blocked().await.unwrap();
2388            assert!(blocked.is_empty());
2389        });
2390    }
2391
2392    #[test_group("slow")]
2393    #[test_traced]
2394    fn test_slow_validator() {
2395        slow_validator::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
2396        slow_validator::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
2397        slow_validator::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
2398        slow_validator::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
2399        slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2400        slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2401        slow_validator::<_, _, RoundRobin>(ed25519::fixture);
2402        slow_validator::<_, _, RoundRobin>(secp256r1::fixture);
2403    }
2404
2405    fn all_recovery<S, F, L>(mut fixture: F)
2406    where
2407        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2408        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2409        L: Elector<S>,
2410    {
2411        // Create context
2412        let n = 5;
2413        let required_containers = View::new(100);
2414        let activity_timeout = ViewDelta::new(10);
2415        let skip_timeout = ViewDelta::new(2);
2416        let namespace = b"consensus".to_vec();
2417        let executor = deterministic::Runner::timed(Duration::from_secs(1800));
2418        executor.start(|mut context| async move {
2419            // Register participants
2420            let Fixture {
2421                participants,
2422                schemes,
2423                ..
2424            } = fixture(&mut context, &namespace, n);
2425            let mut oracle =
2426                start_test_network_with_peers(context.child("network"), participants.clone(), true)
2427                    .await;
2428            let mut registrations = register_validators(&mut oracle, &participants).await;
2429
2430            // Link all validators
2431            let link = Link {
2432                latency: Duration::from_secs(3),
2433                jitter: Duration::from_millis(0),
2434                success_rate: 1.0,
2435            };
2436            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2437
2438            // Create engines
2439            let elector = L::default();
2440            let relay = Arc::new(mocks::relay::Relay::new());
2441            let mut reporters = Vec::new();
2442            let mut engine_handlers = Vec::new();
2443            for (idx, validator) in participants.iter().enumerate() {
2444                // Create scheme context
2445                let context = context
2446                    .child("validator")
2447                    .with_attribute("public_key", validator);
2448
2449                // Configure engine
2450                let reporter_config = mocks::reporter::Config {
2451                    participants: participants.clone().try_into().unwrap(),
2452                    scheme: schemes[idx].clone(),
2453                    elector: elector.clone(),
2454                };
2455                let reporter =
2456                    mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
2457                reporters.push(reporter.clone());
2458                let application_cfg = mocks::application::Config {
2459                    hasher: Sha256::default(),
2460                    relay: relay.clone(),
2461                    me: validator.clone(),
2462                    propose_latency: (10.0, 5.0),
2463                    verify_latency: (10.0, 5.0),
2464                    certify_latency: (10.0, 5.0),
2465                    should_certify: mocks::application::Certifier::Always,
2466                };
2467                let (actor, application) = mocks::application::Application::new(
2468                    context.child("application"),
2469                    application_cfg,
2470                );
2471                actor.start();
2472                let blocker = oracle.control(validator.clone());
2473                let cfg = config::Config {
2474                    scheme: schemes[idx].clone(),
2475                    elector: elector.clone(),
2476                    blocker,
2477                    automaton: application.clone(),
2478                    relay: application.clone(),
2479                    reporter: reporter.clone(),
2480                    strategy: Sequential,
2481                    partition: validator.to_string(),
2482                    mailbox_size: NZUsize!(1024),
2483                    epoch: Epoch::new(333),
2484                    floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
2485                        Epoch::new(333),
2486                    )),
2487                    leader_timeout: Duration::from_secs(1),
2488                    certification_timeout: Duration::from_secs(2),
2489                    timeout_retry: Duration::from_secs(10),
2490                    fetch_timeout: Duration::from_secs(1),
2491                    activity_timeout,
2492                    skip_timeout,
2493                    fetch_concurrent: NZUsize!(4),
2494                    replay_buffer: NZUsize!(1024 * 1024),
2495                    write_buffer: NZUsize!(1024 * 1024),
2496                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2497                    forwarding: ForwardingPolicy::Disabled,
2498                };
2499                let engine = Engine::new(context.child("engine"), cfg);
2500
2501                // Start engine
2502                let (pending, recovered, resolver) = registrations
2503                    .remove(validator)
2504                    .expect("validator should be registered");
2505                engine_handlers.push(engine.start(pending, recovered, resolver));
2506            }
2507
2508            // Wait for a few virtual minutes (shouldn't finalize anything)
2509            let mut finalizers = Vec::new();
2510            for reporter in reporters.iter_mut() {
2511                let (_, mut monitor) = reporter.subscribe().await;
2512                finalizers.push(context.child("finalizer").spawn(move |context| async move {
2513                    select! {
2514                        _timeout = context.sleep(Duration::from_secs(60)) => {},
2515                        _done = monitor.recv() => {
2516                            panic!("engine should not notarize or finalize anything");
2517                        },
2518                    }
2519                }));
2520            }
2521            join_all(finalizers).await;
2522
2523            // Unlink all validators to get latest view
2524            link_validators(&mut oracle, &participants, Action::Unlink, None).await;
2525
2526            // Wait for a virtual minute (nothing should happen)
2527            context.sleep(Duration::from_secs(60)).await;
2528
2529            // Get latest view
2530            let mut latest = View::zero();
2531            for reporter in reporters.iter() {
2532                let nullifies = reporter.nullifies.lock();
2533                let max = nullifies.keys().max().unwrap();
2534                if *max > latest {
2535                    latest = *max;
2536                }
2537            }
2538
2539            // Update links
2540            let link = Link {
2541                latency: Duration::from_millis(10),
2542                jitter: Duration::from_millis(1),
2543                success_rate: 1.0,
2544            };
2545            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2546
2547            // Wait for all engines to finish
2548            let mut finalizers = Vec::new();
2549            for reporter in reporters.iter_mut() {
2550                let (mut latest, mut monitor) = reporter.subscribe().await;
2551                finalizers.push(context.child("finalizer").spawn(move |_| async move {
2552                    while latest < required_containers {
2553                        latest = monitor.recv().await.expect("event missing");
2554                    }
2555                }));
2556            }
2557            join_all(finalizers).await;
2558
2559            // Check reporters for correct activity
2560            for reporter in reporters.iter() {
2561                // Ensure no faults
2562                reporter.assert_no_faults();
2563
2564                // Ensure no invalid signatures
2565                reporter.assert_no_invalid();
2566
2567                // Ensure quick recovery.
2568                //
2569                // If the skip timeout isn't implemented correctly, we may go many views before participants
2570                // start to notarize a validator's proposal.
2571                {
2572                    // Ensure nearly all views around latest are notarized.
2573                    // We don't check for finalization since some of the blocks may fail to be
2574                    // certified for the purposes of testing.
2575                    let mut found = 0;
2576                    let notarizations = reporter.notarizations.lock();
2577                    for view in View::range(latest, latest.saturating_add(activity_timeout)) {
2578                        if notarizations.contains_key(&view) {
2579                            found += 1;
2580                        }
2581                    }
2582                    let tolerated_missing = skip_timeout.get().saturating_add(1);
2583                    assert!(
2584                        found >= activity_timeout.get().saturating_sub(tolerated_missing),
2585                        "found: {found}"
2586                    );
2587                }
2588            }
2589
2590            // Ensure no blocked connections
2591            let blocked = oracle.blocked().await.unwrap();
2592            assert!(blocked.is_empty());
2593        });
2594    }
2595
2596    #[test_group("slow")]
2597    #[test_traced]
2598    fn test_all_recovery() {
2599        all_recovery::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
2600        all_recovery::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
2601        all_recovery::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
2602        all_recovery::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
2603        all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2604        all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2605        all_recovery::<_, _, RoundRobin>(ed25519::fixture);
2606        all_recovery::<_, _, RoundRobin>(secp256r1::fixture);
2607    }
2608
2609    fn partition<S, F, L>(mut fixture: F)
2610    where
2611        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2612        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2613        L: Elector<S>,
2614    {
2615        // Create context
2616        let n = 10;
2617        let required_containers = View::new(50);
2618        let activity_timeout = ViewDelta::new(10);
2619        let skip_timeout = ViewDelta::new(5);
2620        let namespace = b"consensus".to_vec();
2621        let executor = deterministic::Runner::timed(Duration::from_secs(900));
2622        executor.start(|mut context| async move {
2623            // Register participants
2624            let Fixture {
2625                participants,
2626                schemes,
2627                ..
2628            } = fixture(&mut context, &namespace, n);
2629            let mut oracle =
2630                start_test_network_with_peers(context.child("network"), participants.clone(), true)
2631                    .await;
2632            let mut registrations = register_validators(&mut oracle, &participants).await;
2633
2634            // Link all validators
2635            let link = Link {
2636                latency: Duration::from_millis(10),
2637                jitter: Duration::from_millis(1),
2638                success_rate: 1.0,
2639            };
2640            link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
2641
2642            // Create engines
2643            let elector = L::default();
2644            let relay = Arc::new(mocks::relay::Relay::new());
2645            let mut reporters = Vec::new();
2646            let mut engine_handlers = Vec::new();
2647            for (idx, validator) in participants.iter().enumerate() {
2648                // Create scheme context
2649                let context = context
2650                    .child("validator")
2651                    .with_attribute("public_key", validator);
2652
2653                // Configure engine
2654                let reporter_config = mocks::reporter::Config {
2655                    participants: participants.clone().try_into().unwrap(),
2656                    scheme: schemes[idx].clone(),
2657                    elector: elector.clone(),
2658                };
2659                let reporter =
2660                    mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
2661                reporters.push(reporter.clone());
2662                let application_cfg = mocks::application::Config {
2663                    hasher: Sha256::default(),
2664                    relay: relay.clone(),
2665                    me: validator.clone(),
2666                    propose_latency: (10.0, 5.0),
2667                    verify_latency: (10.0, 5.0),
2668                    certify_latency: (10.0, 5.0),
2669                    should_certify: mocks::application::Certifier::Always,
2670                };
2671                let (actor, application) = mocks::application::Application::new(
2672                    context.child("application"),
2673                    application_cfg,
2674                );
2675                actor.start();
2676                let blocker = oracle.control(validator.clone());
2677                let cfg = config::Config {
2678                    scheme: schemes[idx].clone(),
2679                    elector: elector.clone(),
2680                    blocker,
2681                    automaton: application.clone(),
2682                    relay: application.clone(),
2683                    reporter: reporter.clone(),
2684                    strategy: Sequential,
2685                    partition: validator.to_string(),
2686                    mailbox_size: NZUsize!(1024),
2687                    epoch: Epoch::new(333),
2688                    floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
2689                        Epoch::new(333),
2690                    )),
2691                    leader_timeout: Duration::from_secs(1),
2692                    certification_timeout: Duration::from_secs(2),
2693                    timeout_retry: Duration::from_secs(10),
2694                    fetch_timeout: Duration::from_secs(1),
2695                    activity_timeout,
2696                    skip_timeout,
2697                    fetch_concurrent: NZUsize!(4),
2698                    replay_buffer: NZUsize!(1024 * 1024),
2699                    write_buffer: NZUsize!(1024 * 1024),
2700                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2701                    forwarding: ForwardingPolicy::Disabled,
2702                };
2703                let engine = Engine::new(context.child("engine"), cfg);
2704
2705                // Start engine
2706                let (pending, recovered, resolver) = registrations
2707                    .remove(validator)
2708                    .expect("validator should be registered");
2709                engine_handlers.push(engine.start(pending, recovered, resolver));
2710            }
2711
2712            // Wait for all engines to finish
2713            let mut finalizers = Vec::new();
2714            for reporter in reporters.iter_mut() {
2715                let (mut latest, mut monitor) = reporter.subscribe().await;
2716                finalizers.push(context.child("finalizer").spawn(move |_| async move {
2717                    while latest < required_containers {
2718                        latest = monitor.recv().await.expect("event missing");
2719                    }
2720                }));
2721            }
2722            join_all(finalizers).await;
2723
2724            // Cut all links between validator halves
2725            fn separated(n: usize, a: usize, b: usize) -> bool {
2726                let m = n / 2;
2727                (a < m && b >= m) || (a >= m && b < m)
2728            }
2729            link_validators(&mut oracle, &participants, Action::Unlink, Some(separated)).await;
2730
2731            // Wait for any in-progress notarizations/finalizations to finish
2732            context.sleep(Duration::from_secs(10)).await;
2733
2734            // Wait for a few virtual minutes (shouldn't finalize anything)
2735            let mut finalizers = Vec::new();
2736            for reporter in reporters.iter_mut() {
2737                let (_, mut monitor) = reporter.subscribe().await;
2738                finalizers.push(context.child("finalizer").spawn(move |context| async move {
2739                    select! {
2740                        _timeout = context.sleep(Duration::from_secs(60)) => {},
2741                        _done = monitor.recv() => {
2742                            panic!("engine should not notarize or finalize anything");
2743                        },
2744                    }
2745                }));
2746            }
2747            join_all(finalizers).await;
2748
2749            // Restore links
2750            link_validators(
2751                &mut oracle,
2752                &participants,
2753                Action::Link(link),
2754                Some(separated),
2755            )
2756            .await;
2757
2758            // Wait for all engines to finish
2759            let mut finalizers = Vec::new();
2760            for reporter in reporters.iter_mut() {
2761                let (mut latest, mut monitor) = reporter.subscribe().await;
2762                let required = latest.saturating_add(ViewDelta::new(required_containers.get()));
2763                finalizers.push(context.child("finalizer").spawn(move |_| async move {
2764                    while latest < required {
2765                        latest = monitor.recv().await.expect("event missing");
2766                    }
2767                }));
2768            }
2769            join_all(finalizers).await;
2770
2771            // Check reporters for correct activity
2772            for reporter in reporters.iter() {
2773                // Ensure no faults
2774                reporter.assert_no_faults();
2775
2776                // Ensure no invalid signatures
2777                reporter.assert_no_invalid();
2778            }
2779
2780            // Ensure no blocked connections
2781            let blocked = oracle.blocked().await.unwrap();
2782            assert!(blocked.is_empty());
2783        });
2784    }
2785
2786    #[test_group("slow")]
2787    #[test_traced]
2788    fn test_partition() {
2789        partition::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
2790        partition::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
2791        partition::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
2792        partition::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
2793        partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2794        partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2795        partition::<_, _, RoundRobin>(ed25519::fixture);
2796        partition::<_, _, RoundRobin>(secp256r1::fixture);
2797    }
2798
2799    fn slow_and_lossy_links<S, F, L>(seed: u64, mut fixture: F) -> String
2800    where
2801        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2802        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2803        L: Elector<S>,
2804    {
2805        // Create context
2806        let n = 5;
2807        let required_containers = View::new(50);
2808        let activity_timeout = ViewDelta::new(10);
2809        let skip_timeout = ViewDelta::new(5);
2810        let namespace = b"consensus".to_vec();
2811        let cfg = deterministic::Config::new()
2812            .with_seed(seed)
2813            .with_timeout(Some(Duration::from_secs(5_000)));
2814        let executor = deterministic::Runner::new(cfg);
2815        executor.start(|mut context| async move {
2816            // Register participants
2817            let Fixture {
2818                participants,
2819                schemes,
2820                ..
2821            } = fixture(&mut context, &namespace, n);
2822            let mut oracle =
2823                start_test_network_with_peers(context.child("network"), participants.clone(), true)
2824                    .await;
2825            let mut registrations = register_validators(&mut oracle, &participants).await;
2826
2827            // Link all validators
2828            let degraded_link = Link {
2829                latency: Duration::from_millis(200),
2830                jitter: Duration::from_millis(150),
2831                success_rate: 0.5,
2832            };
2833            link_validators(
2834                &mut oracle,
2835                &participants,
2836                Action::Link(degraded_link),
2837                None,
2838            )
2839            .await;
2840
2841            // Create engines
2842            let elector = L::default();
2843            let relay = Arc::new(mocks::relay::Relay::new());
2844            let mut reporters = Vec::new();
2845            let mut engine_handlers = Vec::new();
2846            for (idx, validator) in participants.iter().enumerate() {
2847                // Create scheme context
2848                let context = context
2849                    .child("validator")
2850                    .with_attribute("public_key", validator);
2851
2852                // Configure engine
2853                let reporter_config = mocks::reporter::Config {
2854                    participants: participants.clone().try_into().unwrap(),
2855                    scheme: schemes[idx].clone(),
2856                    elector: elector.clone(),
2857                };
2858                let reporter =
2859                    mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
2860                reporters.push(reporter.clone());
2861                let application_cfg = mocks::application::Config {
2862                    hasher: Sha256::default(),
2863                    relay: relay.clone(),
2864                    me: validator.clone(),
2865                    propose_latency: (10.0, 5.0),
2866                    verify_latency: (10.0, 5.0),
2867                    certify_latency: (10.0, 5.0),
2868                    should_certify: mocks::application::Certifier::Always,
2869                };
2870                let (actor, application) = mocks::application::Application::new(
2871                    context.child("application"),
2872                    application_cfg,
2873                );
2874                actor.start();
2875                let blocker = oracle.control(validator.clone());
2876                let cfg = config::Config {
2877                    scheme: schemes[idx].clone(),
2878                    elector: elector.clone(),
2879                    blocker,
2880                    automaton: application.clone(),
2881                    relay: application.clone(),
2882                    reporter: reporter.clone(),
2883                    strategy: Sequential,
2884                    partition: validator.to_string(),
2885                    mailbox_size: NZUsize!(1024),
2886                    epoch: Epoch::new(333),
2887                    floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
2888                        Epoch::new(333),
2889                    )),
2890                    leader_timeout: Duration::from_secs(1),
2891                    certification_timeout: Duration::from_secs(2),
2892                    timeout_retry: Duration::from_secs(10),
2893                    fetch_timeout: Duration::from_secs(1),
2894                    activity_timeout,
2895                    skip_timeout,
2896                    fetch_concurrent: NZUsize!(4),
2897                    replay_buffer: NZUsize!(1024 * 1024),
2898                    write_buffer: NZUsize!(1024 * 1024),
2899                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2900                    forwarding: ForwardingPolicy::Disabled,
2901                };
2902                let engine = Engine::new(context.child("engine"), cfg);
2903
2904                // Start engine
2905                let (pending, recovered, resolver) = registrations
2906                    .remove(validator)
2907                    .expect("validator should be registered");
2908                engine_handlers.push(engine.start(pending, recovered, resolver));
2909            }
2910
2911            // Wait for all engines to finish
2912            let mut finalizers = Vec::new();
2913            for reporter in reporters.iter_mut() {
2914                let (mut latest, mut monitor) = reporter.subscribe().await;
2915                finalizers.push(context.child("finalizer").spawn(move |_| async move {
2916                    while latest < required_containers {
2917                        latest = monitor.recv().await.expect("event missing");
2918                    }
2919                }));
2920            }
2921            join_all(finalizers).await;
2922
2923            // Check reporters for correct activity
2924            for reporter in reporters.iter() {
2925                // Ensure no faults
2926                reporter.assert_no_faults();
2927
2928                // Ensure no invalid signatures
2929                reporter.assert_no_invalid();
2930            }
2931
2932            // Ensure no blocked connections
2933            let blocked = oracle.blocked().await.unwrap();
2934            assert!(blocked.is_empty());
2935
2936            context.auditor().state()
2937        })
2938    }
2939
2940    #[test_group("slow")]
2941    #[test_traced]
2942    fn test_slow_and_lossy_links() {
2943        slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold_vrf::fixture::<MinPk, _>);
2944        slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold_vrf::fixture::<MinSig, _>);
2945        slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_threshold_std::fixture::<MinPk, _>);
2946        slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_threshold_std::fixture::<MinSig, _>);
2947        slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinPk, _>);
2948        slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinSig, _>);
2949        slow_and_lossy_links::<_, _, RoundRobin>(0, ed25519::fixture);
2950        slow_and_lossy_links::<_, _, RoundRobin>(0, secp256r1::fixture);
2951    }
2952
2953    #[test_group("slow")]
2954    #[test_traced]
2955    fn test_determinism() {
2956        // We use slow and lossy links as the deterministic test
2957        // because it is the most complex test.
2958        for seed in 1..6 {
2959            let ts_vrf_pk_state_1 = slow_and_lossy_links::<_, _, Random>(
2960                seed,
2961                bls12381_threshold_vrf::fixture::<MinPk, _>,
2962            );
2963            let ts_vrf_pk_state_2 = slow_and_lossy_links::<_, _, Random>(
2964                seed,
2965                bls12381_threshold_vrf::fixture::<MinPk, _>,
2966            );
2967            assert_eq!(ts_vrf_pk_state_1, ts_vrf_pk_state_2);
2968
2969            let ts_vrf_sig_state_1 = slow_and_lossy_links::<_, _, Random>(
2970                seed,
2971                bls12381_threshold_vrf::fixture::<MinSig, _>,
2972            );
2973            let ts_vrf_sig_state_2 = slow_and_lossy_links::<_, _, Random>(
2974                seed,
2975                bls12381_threshold_vrf::fixture::<MinSig, _>,
2976            );
2977            assert_eq!(ts_vrf_sig_state_1, ts_vrf_sig_state_2);
2978
2979            let ts_std_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2980                seed,
2981                bls12381_threshold_std::fixture::<MinPk, _>,
2982            );
2983            let ts_std_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2984                seed,
2985                bls12381_threshold_std::fixture::<MinPk, _>,
2986            );
2987            assert_eq!(ts_std_pk_state_1, ts_std_pk_state_2);
2988
2989            let ts_std_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2990                seed,
2991                bls12381_threshold_std::fixture::<MinSig, _>,
2992            );
2993            let ts_std_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2994                seed,
2995                bls12381_threshold_std::fixture::<MinSig, _>,
2996            );
2997            assert_eq!(ts_std_sig_state_1, ts_std_sig_state_2);
2998
2999            let ms_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
3000                seed,
3001                bls12381_multisig::fixture::<MinPk, _>,
3002            );
3003            let ms_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
3004                seed,
3005                bls12381_multisig::fixture::<MinPk, _>,
3006            );
3007            assert_eq!(ms_pk_state_1, ms_pk_state_2);
3008
3009            let ms_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
3010                seed,
3011                bls12381_multisig::fixture::<MinSig, _>,
3012            );
3013            let ms_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
3014                seed,
3015                bls12381_multisig::fixture::<MinSig, _>,
3016            );
3017            assert_eq!(ms_sig_state_1, ms_sig_state_2);
3018
3019            let ed_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
3020            let ed_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
3021            assert_eq!(ed_state_1, ed_state_2);
3022
3023            let secp_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
3024            let secp_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
3025            assert_eq!(secp_state_1, secp_state_2);
3026
3027            let states = [
3028                ("threshold-vrf-minpk", ts_vrf_pk_state_1),
3029                ("threshold-vrf-minsig", ts_vrf_sig_state_1),
3030                ("threshold-std-minpk", ts_std_pk_state_1),
3031                ("threshold-std-minsig", ts_std_sig_state_1),
3032                ("multisig-minpk", ms_pk_state_1),
3033                ("multisig-minsig", ms_sig_state_1),
3034                ("ed25519", ed_state_1),
3035                ("secp256r1", secp_state_1),
3036            ];
3037
3038            // Sanity check that different types can't be identical
3039            for pair in states.windows(2) {
3040                assert_ne!(
3041                    pair[0].1, pair[1].1,
3042                    "state {} equals state {}",
3043                    pair[0].0, pair[1].0
3044                );
3045            }
3046        }
3047    }
3048
3049    fn conflicter<S, F, L>(seed: u64, mut fixture: F)
3050    where
3051        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3052        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3053        L: Elector<S>,
3054    {
3055        // Create context
3056        let n = 4;
3057        let required_containers = View::new(50);
3058        let activity_timeout = ViewDelta::new(10);
3059        let skip_timeout = ViewDelta::new(5);
3060        let namespace = b"consensus".to_vec();
3061        let cfg = deterministic::Config::new()
3062            .with_seed(seed)
3063            .with_timeout(Some(Duration::from_secs(30)));
3064        let executor = deterministic::Runner::new(cfg);
3065        executor.start(|mut context| async move {
3066            // Register participants
3067            let Fixture {
3068                participants,
3069                schemes,
3070                ..
3071            } = fixture(&mut context, &namespace, n);
3072            let mut oracle =
3073                start_test_network_with_peers(context.child("network"), participants.clone(), true)
3074                    .await;
3075            let mut registrations = register_validators(&mut oracle, &participants).await;
3076
3077            // Link all validators
3078            let link = Link {
3079                latency: Duration::from_millis(10),
3080                jitter: Duration::from_millis(1),
3081                success_rate: 1.0,
3082            };
3083            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3084
3085            // Create engines
3086            let elector = L::default();
3087            let relay = Arc::new(mocks::relay::Relay::new());
3088            let mut reporters = Vec::new();
3089            for (idx_scheme, validator) in participants.iter().enumerate() {
3090                // Create scheme context
3091                let context = context
3092                    .child("validator")
3093                    .with_attribute("public_key", validator);
3094
3095                // Start engine
3096                let reporter_config = mocks::reporter::Config {
3097                    participants: participants.clone().try_into().unwrap(),
3098                    scheme: schemes[idx_scheme].clone(),
3099                    elector: elector.clone(),
3100                };
3101                let reporter =
3102                    mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
3103                let (pending, recovered, resolver) = registrations
3104                    .remove(validator)
3105                    .expect("validator should be registered");
3106                if idx_scheme == 0 {
3107                    let cfg = mocks::conflicter::Config {
3108                        scheme: schemes[idx_scheme].clone(),
3109                    };
3110
3111                    let engine: mocks::conflicter::Conflicter<_, _, Sha256> =
3112                        mocks::conflicter::Conflicter::new(context.child("byzantine_engine"), cfg);
3113                    engine.start(pending);
3114                } else {
3115                    reporters.push(reporter.clone());
3116                    let application_cfg = mocks::application::Config {
3117                        hasher: Sha256::default(),
3118                        relay: relay.clone(),
3119                        me: validator.clone(),
3120                        propose_latency: (10.0, 5.0),
3121                        verify_latency: (10.0, 5.0),
3122                        certify_latency: (10.0, 5.0),
3123                        should_certify: mocks::application::Certifier::Always,
3124                    };
3125                    let (actor, application) = mocks::application::Application::new(
3126                        context.child("application"),
3127                        application_cfg,
3128                    );
3129                    actor.start();
3130                    let blocker = oracle.control(validator.clone());
3131                    let cfg = config::Config {
3132                        scheme: schemes[idx_scheme].clone(),
3133                        elector: elector.clone(),
3134                        blocker,
3135                        automaton: application.clone(),
3136                        relay: application.clone(),
3137                        reporter: reporter.clone(),
3138                        strategy: Sequential,
3139                        partition: validator.to_string(),
3140                        mailbox_size: NZUsize!(1024),
3141                        epoch: Epoch::new(333),
3142                        floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
3143                            Epoch::new(333),
3144                        )),
3145                        leader_timeout: Duration::from_secs(1),
3146                        certification_timeout: Duration::from_secs(2),
3147                        timeout_retry: Duration::from_secs(10),
3148                        fetch_timeout: Duration::from_secs(1),
3149                        activity_timeout,
3150                        skip_timeout,
3151                        fetch_concurrent: NZUsize!(4),
3152                        replay_buffer: NZUsize!(1024 * 1024),
3153                        write_buffer: NZUsize!(1024 * 1024),
3154                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3155                        forwarding: ForwardingPolicy::Disabled,
3156                    };
3157                    let engine = Engine::new(context.child("engine"), cfg);
3158                    engine.start(pending, recovered, resolver);
3159                }
3160            }
3161
3162            // Wait for all engines to finish
3163            let mut finalizers = Vec::new();
3164            for reporter in reporters.iter_mut() {
3165                let (mut latest, mut monitor) = reporter.subscribe().await;
3166                finalizers.push(context.child("finalizer").spawn(move |_| async move {
3167                    while latest < required_containers {
3168                        latest = monitor.recv().await.expect("event missing");
3169                    }
3170                }));
3171            }
3172            join_all(finalizers).await;
3173
3174            // Check reporters for correct activity
3175            let byz = &participants[0];
3176            let mut count_conflicting = 0;
3177            for reporter in reporters.iter() {
3178                // Ensure only faults for byz
3179                {
3180                    let faults = reporter.faults.lock();
3181                    assert_eq!(faults.len(), 1);
3182                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
3183                    for (_, faults) in faulter.iter() {
3184                        for fault in faults.iter() {
3185                            match fault {
3186                                Activity::ConflictingNotarize(_) => {
3187                                    count_conflicting += 1;
3188                                }
3189                                Activity::ConflictingFinalize(_) => {
3190                                    count_conflicting += 1;
3191                                }
3192                                _ => panic!("unexpected fault: {fault:?}"),
3193                            }
3194                        }
3195                    }
3196                }
3197
3198                // Ensure no invalid signatures
3199                reporter.assert_no_invalid();
3200            }
3201            assert!(count_conflicting > 0);
3202
3203            // Ensure conflicter is blocked
3204            let blocked = oracle.blocked().await.unwrap();
3205            assert!(!blocked.is_empty());
3206            for (a, b) in blocked {
3207                assert_ne!(&a, byz);
3208                assert_eq!(&b, byz);
3209            }
3210        });
3211    }
3212
3213    #[test_group("slow")]
3214    #[test_traced]
3215    fn test_conflicter() {
3216        for seed in 0..5 {
3217            conflicter::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3218            conflicter::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3219            conflicter::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3220            conflicter::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3221            conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3222            conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3223            conflicter::<_, _, RoundRobin>(seed, ed25519::fixture);
3224            conflicter::<_, _, RoundRobin>(seed, secp256r1::fixture);
3225        }
3226    }
3227
3228    fn invalid<S, F, L>(seed: u64, mut fixture: F)
3229    where
3230        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3231        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3232        L: Elector<S>,
3233    {
3234        // Create context
3235        let n = 4;
3236        let required_containers = View::new(50);
3237        let activity_timeout = ViewDelta::new(10);
3238        let skip_timeout = ViewDelta::new(5);
3239        let namespace = b"consensus".to_vec();
3240        let cfg = deterministic::Config::new()
3241            .with_seed(seed)
3242            .with_timeout(Some(Duration::from_secs(30)));
3243        let executor = deterministic::Runner::new(cfg);
3244        executor.start(|mut context| async move {
3245            // Register participants
3246            let Fixture {
3247                participants,
3248                schemes,
3249                ..
3250            } = fixture(&mut context, &namespace, n);
3251
3252            let schemes: Vec<_> = schemes
3253                .into_iter()
3254                .enumerate()
3255                .map(|(idx, scheme)| {
3256                    let is_byzantine = idx == 0;
3257                    let behavior = if is_byzantine {
3258                        wrapped::Behavior::CorruptSignature
3259                    } else {
3260                        wrapped::Behavior::Honest
3261                    };
3262                    wrapped::Scheme::new(scheme, behavior)
3263                })
3264                .collect();
3265
3266            let mut oracle =
3267                start_test_network_with_peers(context.child("network"), participants.clone(), true)
3268                    .await;
3269            let mut registrations = register_validators(&mut oracle, &participants).await;
3270
3271            // Link all validators
3272            let link = Link {
3273                latency: Duration::from_millis(10),
3274                jitter: Duration::from_millis(1),
3275                success_rate: 1.0,
3276            };
3277            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3278
3279            // Create engines
3280            let elector = wrapped::Config(L::default());
3281            let relay = Arc::new(mocks::relay::Relay::new());
3282            let mut reporters = Vec::new();
3283            for (idx_scheme, validator) in participants.iter().enumerate() {
3284                // Create scheme context
3285                let context = context
3286                    .child("validator")
3287                    .with_attribute("public_key", validator);
3288
3289                let reporter_config = mocks::reporter::Config {
3290                    participants: participants.clone().try_into().unwrap(),
3291                    scheme: schemes[idx_scheme].clone(),
3292                    elector: elector.clone(),
3293                };
3294                let reporter =
3295                    mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
3296                reporters.push(reporter.clone());
3297
3298                let application_cfg = mocks::application::Config {
3299                    hasher: Sha256::default(),
3300                    relay: relay.clone(),
3301                    me: validator.clone(),
3302                    propose_latency: (10.0, 5.0),
3303                    verify_latency: (10.0, 5.0),
3304                    certify_latency: (10.0, 5.0),
3305                    should_certify: mocks::application::Certifier::Always,
3306                };
3307                let (actor, application) = mocks::application::Application::new(
3308                    context.child("application"),
3309                    application_cfg,
3310                );
3311                actor.start();
3312                let blocker = oracle.control(validator.clone());
3313                let cfg = config::Config {
3314                    scheme: schemes[idx_scheme].clone(),
3315                    elector: elector.clone(),
3316                    blocker,
3317                    automaton: application.clone(),
3318                    relay: application.clone(),
3319                    reporter: reporter.clone(),
3320                    strategy: Sequential,
3321                    partition: validator.clone().to_string(),
3322                    mailbox_size: NZUsize!(1024),
3323                    epoch: Epoch::new(333),
3324                    floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
3325                        Epoch::new(333),
3326                    )),
3327                    leader_timeout: Duration::from_secs(1),
3328                    certification_timeout: Duration::from_secs(2),
3329                    timeout_retry: Duration::from_secs(10),
3330                    fetch_timeout: Duration::from_secs(1),
3331                    activity_timeout,
3332                    skip_timeout,
3333                    fetch_concurrent: NZUsize!(4),
3334                    replay_buffer: NZUsize!(1024 * 1024),
3335                    write_buffer: NZUsize!(1024 * 1024),
3336                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3337                    forwarding: ForwardingPolicy::Disabled,
3338                };
3339                let engine = Engine::new(context.child("engine"), cfg);
3340                let (pending, recovered, resolver) = registrations
3341                    .remove(validator)
3342                    .expect("validator should be registered");
3343                engine.start(pending, recovered, resolver);
3344            }
3345
3346            // Wait for all engines to finish.
3347            // The byzantine node will not finish since it will mark any finalization
3348            // certificates it creates (using its own invalid signature) as invalid.
3349            let mut finalizers = Vec::new();
3350            for reporter in reporters.iter_mut().skip(1) {
3351                let (mut latest, mut monitor) = reporter.subscribe().await;
3352                finalizers.push(context.child("finalizer").spawn(move |_| async move {
3353                    while latest < required_containers {
3354                        latest = monitor.recv().await.expect("event missing");
3355                    }
3356                }));
3357            }
3358            join_all(finalizers).await;
3359
3360            // Check all reporters for activity
3361            for (i, reporter) in reporters.iter().enumerate() {
3362                // Ensure no faults
3363                reporter.assert_no_faults();
3364
3365                // All nodes see invalid signatures since the honest reporters get unfiltered votes
3366                // once they pass the view.
3367                assert!(*reporter.invalid_votes.lock() > 0);
3368
3369                // Only the byzantine node sees invalid certificates since it constructs them from
3370                // its own invalid vote. The honest nodes reject them before reaching the reporter.
3371                let is_byzantine = i == 0;
3372                if is_byzantine {
3373                    assert!(*reporter.invalid_certificates.lock() > 0);
3374                } else {
3375                    assert_eq!(*reporter.invalid_certificates.lock(), 0);
3376                }
3377            }
3378
3379            // Ensure byzantine node is blocked by honest nodes.
3380            // The ">=" is because the Byzantine node may block itself.
3381            let blocked = oracle.blocked().await.unwrap();
3382            assert!(blocked.len() >= participants.len() - 1);
3383            let byz = &participants[0];
3384            for (_, b) in blocked {
3385                // Assert only the byzantine node is blocked.
3386                assert_eq!(&b, byz);
3387            }
3388        });
3389    }
3390
3391    #[test_group("slow")]
3392    #[test_traced]
3393    fn test_invalid() {
3394        for seed in 0..5 {
3395            invalid::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3396            invalid::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3397            invalid::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3398            invalid::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3399            invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3400            invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3401            invalid::<_, _, RoundRobin>(seed, ed25519::fixture);
3402            invalid::<_, _, RoundRobin>(seed, secp256r1::fixture);
3403        }
3404    }
3405
3406    fn received_certificates_are_reported<S, F, L>(seed: u64, mut fixture: F)
3407    where
3408        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3409        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3410        L: Elector<S>,
3411    {
3412        let n = 4;
3413        let required_containers = View::new(10);
3414        let activity_timeout = ViewDelta::new(10);
3415        let skip_timeout = ViewDelta::new(5);
3416        let namespace = b"consensus".to_vec();
3417        let cfg = deterministic::Config::new()
3418            .with_seed(seed)
3419            .with_timeout(Some(Duration::from_secs(30)));
3420        let executor = deterministic::Runner::new(cfg);
3421        executor.start(|mut context| async move {
3422            let Fixture {
3423                participants,
3424                schemes,
3425                ..
3426            } = fixture(&mut context, &namespace, n);
3427
3428            let mut oracle = start_test_network_with_peers(
3429                context.child("network"),
3430                participants.clone(),
3431                false,
3432            )
3433            .await;
3434            let mut registrations = register_validators(&mut oracle, &participants).await;
3435
3436            // Link all honest nodes. Only link node 0 to node 1.
3437            //
3438            // Node 0 cannot locally form a certificate because it only sees itself plus one honest
3439            // peer, but it should still receive the certificates relayed by that peer.
3440            let link = Link {
3441                latency: Duration::from_millis(100),
3442                jitter: Duration::from_millis(1),
3443                success_rate: 1.0,
3444            };
3445            fn link_graph(_: usize, i: usize, j: usize) -> bool {
3446                if i == 0 || j == 0 {
3447                    return i == 1 || j == 1;
3448                }
3449                true
3450            }
3451            link_validators(
3452                &mut oracle,
3453                &participants,
3454                Action::Link(link),
3455                Some(link_graph),
3456            )
3457            .await;
3458
3459            let elector = L::default();
3460            let relay = Arc::new(mocks::relay::Relay::new());
3461            let mut reporters = Vec::new();
3462            for (idx_scheme, validator) in participants.iter().enumerate() {
3463                let context = context
3464                    .child("validator")
3465                    .with_attribute("public_key", validator);
3466                let reporter_config = mocks::reporter::Config {
3467                    participants: participants.clone().try_into().unwrap(),
3468                    scheme: schemes[idx_scheme].clone(),
3469                    elector: elector.clone(),
3470                };
3471                let reporter =
3472                    mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
3473                reporters.push(reporter.clone());
3474
3475                let application_cfg = mocks::application::Config {
3476                    hasher: Sha256::default(),
3477                    relay: relay.clone(),
3478                    me: validator.clone(),
3479                    propose_latency: (10.0, 5.0),
3480                    verify_latency: (10.0, 5.0),
3481                    certify_latency: (10.0, 5.0),
3482                    should_certify: mocks::application::Certifier::Always,
3483                };
3484                let (actor, application) = mocks::application::Application::new(
3485                    context.child("application"),
3486                    application_cfg,
3487                );
3488                actor.start();
3489                let blocker = oracle.control(validator.clone());
3490                let cfg = config::Config {
3491                    scheme: schemes[idx_scheme].clone(),
3492                    elector: elector.clone(),
3493                    blocker,
3494                    automaton: application.clone(),
3495                    relay: application.clone(),
3496                    reporter: reporter.clone(),
3497                    strategy: Sequential,
3498                    partition: validator.clone().to_string(),
3499                    mailbox_size: NZUsize!(1024),
3500                    epoch: Epoch::new(333),
3501                    floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
3502                        Epoch::new(333),
3503                    )),
3504                    leader_timeout: Duration::from_secs(1),
3505                    certification_timeout: Duration::from_secs(2),
3506                    timeout_retry: Duration::from_secs(10),
3507                    fetch_timeout: Duration::from_secs(1),
3508                    activity_timeout,
3509                    skip_timeout,
3510                    fetch_concurrent: NZUsize!(4),
3511                    replay_buffer: NZUsize!(1024 * 1024),
3512                    write_buffer: NZUsize!(1024 * 1024),
3513                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3514                    forwarding: ForwardingPolicy::Disabled,
3515                };
3516                let engine = Engine::new(context.child("engine"), cfg);
3517                let (pending, recovered, resolver) = registrations
3518                    .remove(validator)
3519                    .expect("validator should be registered");
3520                engine.start(pending, recovered, resolver);
3521            }
3522            // Wait for an honest node to observe the finalizations
3523            let excluded_reporter = reporters[0].clone();
3524            let mut honest_reporter = reporters[1].clone();
3525            let (mut honest_latest, mut honest_monitor) = honest_reporter.subscribe().await;
3526            while honest_latest < required_containers {
3527                honest_latest = honest_monitor.recv().await.expect("event missing");
3528            }
3529
3530            // Wait for all in-flight certificates to arrive at excluded node and be reported.
3531            context.sleep(Duration::from_secs(1)).await;
3532
3533            // It should have received similar certificates to the honest node (with some
3534            // tolerance for initial views in which may not have yet been connected).
3535            let honest_notarized = {
3536                let notarizations = honest_reporter.notarizations.lock();
3537                View::range(View::new(1), required_containers.next())
3538                    .filter(|view| notarizations.contains_key(view))
3539                    .count()
3540            };
3541            let excluded_notarized = {
3542                let notarizations = excluded_reporter.notarizations.lock();
3543                View::range(View::new(1), required_containers.next())
3544                    .filter(|view| notarizations.contains_key(view))
3545                    .count()
3546            };
3547            assert!(
3548                excluded_notarized >= honest_notarized.saturating_sub(2),
3549                "honest_notarized: {honest_notarized}, excluded_notarized: {excluded_notarized}"
3550            );
3551
3552            let honest_finalized = {
3553                let finalizations = honest_reporter.finalizations.lock();
3554                View::range(View::new(1), required_containers.next())
3555                    .filter(|view| finalizations.contains_key(view))
3556                    .count()
3557            };
3558            let excluded_finalized = {
3559                let finalizations = excluded_reporter.finalizations.lock();
3560                View::range(View::new(1), required_containers.next())
3561                    .filter(|view| finalizations.contains_key(view))
3562                    .count()
3563            };
3564            assert!(
3565                excluded_finalized >= honest_finalized.saturating_sub(2),
3566                "honest_finalized: {honest_finalized}, excluded_finalized: {excluded_finalized}"
3567            );
3568        });
3569    }
3570
3571    // Test that when a node receives finalizations, it reports them.
3572    #[test_group("slow")]
3573    #[test_traced]
3574    fn test_received_certificates_are_reported() {
3575        received_certificates_are_reported::<_, _, Random>(
3576            0,
3577            bls12381_threshold_vrf::fixture::<MinPk, _>,
3578        );
3579        received_certificates_are_reported::<_, _, Random>(
3580            0,
3581            bls12381_threshold_vrf::fixture::<MinSig, _>,
3582        );
3583        received_certificates_are_reported::<_, _, RoundRobin>(
3584            0,
3585            bls12381_threshold_std::fixture::<MinPk, _>,
3586        );
3587        received_certificates_are_reported::<_, _, RoundRobin>(
3588            0,
3589            bls12381_threshold_std::fixture::<MinSig, _>,
3590        );
3591        received_certificates_are_reported::<_, _, RoundRobin>(
3592            0,
3593            bls12381_multisig::fixture::<MinPk, _>,
3594        );
3595        received_certificates_are_reported::<_, _, RoundRobin>(
3596            0,
3597            bls12381_multisig::fixture::<MinSig, _>,
3598        );
3599        received_certificates_are_reported::<_, _, RoundRobin>(0, ed25519::fixture);
3600        received_certificates_are_reported::<_, _, RoundRobin>(0, secp256r1::fixture);
3601    }
3602
3603    #[test_traced]
3604    fn test_survives_burst() {
3605        let n = 4;
3606        let epoch = Epoch::new(333);
3607        let namespace = b"mailbox_size_one_certificate_burst".to_vec();
3608        let executor = deterministic::Runner::default();
3609        executor.start(|mut context| async move {
3610            let Fixture {
3611                participants,
3612                schemes,
3613                ..
3614            } = scheme_mocks::fixture(&mut context, &namespace, n);
3615            let me = participants[0].clone();
3616            let mut oracle =
3617                start_test_network_with_peers(context.child("network"), participants.clone(), true)
3618                    .await;
3619            let (pending, recovered, resolver) = register_validator(&mut oracle, me.clone()).await;
3620
3621            let injector_pk = PrivateKey::from_seed(9_000_000).public_key();
3622            let (mut injector_sender, _injector_receiver) = oracle
3623                .control(injector_pk.clone())
3624                .register(1, TEST_QUOTA)
3625                .await
3626                .unwrap();
3627            let link = Link {
3628                latency: Duration::from_millis(0),
3629                jitter: Duration::from_millis(0),
3630                success_rate: 1.0,
3631            };
3632            oracle
3633                .add_link(injector_pk.clone(), me.clone(), link)
3634                .await
3635                .unwrap();
3636            oracle.manager().track(
3637                1,
3638                TrackedPeers::new(
3639                    Set::from_iter_dedup(std::iter::once(me.clone())),
3640                    Set::from_iter_dedup(std::iter::once(injector_pk.clone())),
3641                ),
3642            );
3643            context.sleep(Duration::from_millis(1)).await;
3644
3645            let quorum = quorum(n) as usize;
3646            let notarization = |view: View, parent: View, payload: &[u8]| {
3647                let proposal =
3648                    Proposal::new(Round::new(epoch, view), parent, Sha256::hash(payload));
3649                let votes: Vec<_> = schemes
3650                    .iter()
3651                    .take(quorum)
3652                    .map(|scheme| TNotarize::sign(scheme, proposal.clone()).unwrap())
3653                    .collect();
3654                TNotarization::from_notarizes(&schemes[0], &votes, &Sequential)
3655                    .expect("notarization requires quorum")
3656            };
3657            let finalization = |view: View, parent: View, payload: &[u8]| {
3658                let proposal =
3659                    Proposal::new(Round::new(epoch, view), parent, Sha256::hash(payload));
3660                let votes: Vec<_> = schemes
3661                    .iter()
3662                    .take(quorum)
3663                    .map(|scheme| TFinalize::sign(scheme, proposal.clone()).unwrap())
3664                    .collect();
3665                TFinalization::from_finalizes(&schemes[0], &votes, &Sequential)
3666                    .expect("finalization requires quorum")
3667            };
3668
3669            // Load the network with certificates that the batcher will want to pass to the voter
3670            for certificate in [
3671                Certificate::Notarization(notarization(View::new(1), View::zero(), b"payload-1")),
3672                Certificate::Notarization(notarization(View::new(2), View::new(1), b"payload-2")),
3673                Certificate::Notarization(notarization(View::new(3), View::new(2), b"payload-3")),
3674                Certificate::Finalization(finalization(View::new(3), View::new(2), b"payload-3")),
3675            ] {
3676                injector_sender.send(Recipients::One(me.clone()), certificate.encode(), true);
3677            }
3678
3679            let elector = RoundRobin::<Sha256>::default();
3680            let reporter_config = mocks::reporter::Config {
3681                participants: participants.clone().try_into().unwrap(),
3682                scheme: schemes[0].clone(),
3683                elector: elector.clone(),
3684            };
3685            let reporter =
3686                mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
3687            let mut monitor_reporter = reporter.clone();
3688            let (mut latest, mut monitor) = monitor_reporter.subscribe().await;
3689
3690            let relay = Arc::new(mocks::relay::Relay::new());
3691            let application_cfg = mocks::application::Config {
3692                hasher: Sha256::default(),
3693                relay: relay.clone(),
3694                me: me.clone(),
3695                propose_latency: (0.0, 0.0),
3696                verify_latency: (0.0, 0.0),
3697                certify_latency: (0.0, 0.0),
3698                should_certify: mocks::application::Certifier::Always,
3699            };
3700            let (mut application_actor, application) =
3701                mocks::application::Application::new(context.child("application"), application_cfg);
3702            application_actor.set_stall_proposals(true);
3703            application_actor.start();
3704
3705            let cfg = config::Config {
3706                scheme: schemes[0].clone(),
3707                elector,
3708                blocker: oracle.control(me.clone()),
3709                automaton: application.clone(),
3710                relay: application,
3711                reporter,
3712                strategy: Sequential,
3713                partition: me.to_string(),
3714                mailbox_size: NZUsize!(1),
3715                epoch,
3716                floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(epoch)),
3717                leader_timeout: Duration::from_secs(1),
3718                certification_timeout: Duration::from_secs(2),
3719                timeout_retry: Duration::from_secs(10),
3720                fetch_timeout: Duration::from_secs(1),
3721                activity_timeout: ViewDelta::new(10),
3722                skip_timeout: ViewDelta::new(5),
3723                fetch_concurrent: NZUsize!(4),
3724                replay_buffer: NZUsize!(1024 * 1024),
3725                write_buffer: NZUsize!(1024 * 1024),
3726                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3727                forwarding: ForwardingPolicy::Disabled,
3728            };
3729            let engine = Engine::new(context.child("engine"), cfg);
3730            engine.start(pending, recovered, resolver);
3731
3732            while latest < View::new(3) {
3733                latest = monitor.recv().await.expect("finalization event missing");
3734            }
3735        });
3736    }
3737
3738    fn impersonator<S, F, L>(seed: u64, mut fixture: F)
3739    where
3740        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3741        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3742        L: Elector<S>,
3743    {
3744        // Create context
3745        let n = 4;
3746        let required_containers = View::new(50);
3747        let activity_timeout = ViewDelta::new(10);
3748        let skip_timeout = ViewDelta::new(5);
3749        let namespace = b"consensus".to_vec();
3750        let cfg = deterministic::Config::new()
3751            .with_seed(seed)
3752            .with_timeout(Some(Duration::from_secs(30)));
3753        let executor = deterministic::Runner::new(cfg);
3754        executor.start(|mut context| async move {
3755            // Register participants
3756            let Fixture {
3757                participants,
3758                schemes,
3759                ..
3760            } = fixture(&mut context, &namespace, n);
3761            let mut oracle =
3762                start_test_network_with_peers(context.child("network"), participants.clone(), true)
3763                    .await;
3764            let mut registrations = register_validators(&mut oracle, &participants).await;
3765
3766            // Link all validators
3767            let link = Link {
3768                latency: Duration::from_millis(10),
3769                jitter: Duration::from_millis(1),
3770                success_rate: 1.0,
3771            };
3772            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3773
3774            // Create engines
3775            let elector = L::default();
3776            let relay = Arc::new(mocks::relay::Relay::new());
3777            let mut reporters = Vec::new();
3778            for (idx_scheme, validator) in participants.iter().enumerate() {
3779                // Create scheme context
3780                let context = context
3781                    .child("validator")
3782                    .with_attribute("public_key", validator);
3783
3784                // Start engine
3785                let reporter_config = mocks::reporter::Config {
3786                    participants: participants.clone().try_into().unwrap(),
3787                    scheme: schemes[idx_scheme].clone(),
3788                    elector: elector.clone(),
3789                };
3790                let reporter =
3791                    mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
3792                let (pending, recovered, resolver) = registrations
3793                    .remove(validator)
3794                    .expect("validator should be registered");
3795                if idx_scheme == 0 {
3796                    let cfg = mocks::impersonator::Config {
3797                        scheme: schemes[idx_scheme].clone(),
3798                    };
3799
3800                    let engine: mocks::impersonator::Impersonator<_, _, Sha256> =
3801                        mocks::impersonator::Impersonator::new(
3802                            context.child("byzantine_engine"),
3803                            cfg,
3804                        );
3805                    engine.start(pending);
3806                } else {
3807                    reporters.push(reporter.clone());
3808                    let application_cfg = mocks::application::Config {
3809                        hasher: Sha256::default(),
3810                        relay: relay.clone(),
3811                        me: validator.clone(),
3812                        propose_latency: (10.0, 5.0),
3813                        verify_latency: (10.0, 5.0),
3814                        certify_latency: (10.0, 5.0),
3815                        should_certify: mocks::application::Certifier::Always,
3816                    };
3817                    let (actor, application) = mocks::application::Application::new(
3818                        context.child("application"),
3819                        application_cfg,
3820                    );
3821                    actor.start();
3822                    let blocker = oracle.control(validator.clone());
3823                    let cfg = config::Config {
3824                        scheme: schemes[idx_scheme].clone(),
3825                        elector: elector.clone(),
3826                        blocker,
3827                        automaton: application.clone(),
3828                        relay: application.clone(),
3829                        reporter: reporter.clone(),
3830                        strategy: Sequential,
3831                        partition: validator.clone().to_string(),
3832                        mailbox_size: NZUsize!(1024),
3833                        epoch: Epoch::new(333),
3834                        floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
3835                            Epoch::new(333),
3836                        )),
3837                        leader_timeout: Duration::from_secs(1),
3838                        certification_timeout: Duration::from_secs(2),
3839                        timeout_retry: Duration::from_secs(10),
3840                        fetch_timeout: Duration::from_secs(1),
3841                        activity_timeout,
3842                        skip_timeout,
3843                        fetch_concurrent: NZUsize!(4),
3844                        replay_buffer: NZUsize!(1024 * 1024),
3845                        write_buffer: NZUsize!(1024 * 1024),
3846                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3847                        forwarding: ForwardingPolicy::Disabled,
3848                    };
3849                    let engine = Engine::new(context.child("engine"), cfg);
3850                    engine.start(pending, recovered, resolver);
3851                }
3852            }
3853
3854            // Wait for all engines to finish
3855            let mut finalizers = Vec::new();
3856            for reporter in reporters.iter_mut() {
3857                let (mut latest, mut monitor) = reporter.subscribe().await;
3858                finalizers.push(context.child("finalizer").spawn(move |_| async move {
3859                    while latest < required_containers {
3860                        latest = monitor.recv().await.expect("event missing");
3861                    }
3862                }));
3863            }
3864            join_all(finalizers).await;
3865
3866            // Check reporters for correct activity
3867            let byz = &participants[0];
3868            for reporter in reporters.iter() {
3869                // Ensure no faults
3870                reporter.assert_no_faults();
3871
3872                // Ensure no invalid signatures
3873                reporter.assert_no_invalid();
3874            }
3875
3876            // Ensure invalid is blocked
3877            let blocked = oracle.blocked().await.unwrap();
3878            assert!(!blocked.is_empty());
3879            for (a, b) in blocked {
3880                assert_ne!(&a, byz);
3881                assert_eq!(&b, byz);
3882            }
3883        });
3884    }
3885
3886    #[test_group("slow")]
3887    #[test_traced]
3888    fn test_impersonator() {
3889        for seed in 0..5 {
3890            impersonator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3891            impersonator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3892            impersonator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3893            impersonator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3894            impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3895            impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3896            impersonator::<_, _, RoundRobin>(seed, ed25519::fixture);
3897            impersonator::<_, _, RoundRobin>(seed, secp256r1::fixture);
3898        }
3899    }
3900
3901    fn equivocator<S, F, L>(seed: u64, mut fixture: F) -> bool
3902    where
3903        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3904        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3905        L: Elector<S>,
3906    {
3907        // Create context
3908        let n = 7;
3909        let required_containers = View::new(50);
3910        let activity_timeout = ViewDelta::new(10);
3911        let skip_timeout = ViewDelta::new(5);
3912        let namespace = b"consensus".to_vec();
3913        let cfg = deterministic::Config::new()
3914            .with_seed(seed)
3915            .with_timeout(Some(Duration::from_secs(60)));
3916        let executor = deterministic::Runner::new(cfg);
3917        executor.start(|mut context| async move {
3918            // Register participants
3919            let Fixture {
3920                participants,
3921                schemes,
3922                ..
3923            } = fixture(&mut context, &namespace, n);
3924            let mut oracle =
3925                start_test_network_with_peers(context.child("network"), participants.clone(), true)
3926                    .await;
3927            let mut registrations = register_validators(&mut oracle, &participants).await;
3928
3929            // Link all validators
3930            let link = Link {
3931                latency: Duration::from_millis(10),
3932                jitter: Duration::from_millis(1),
3933                success_rate: 1.0,
3934            };
3935            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3936
3937            // Create engines
3938            let elector = L::default();
3939            let mut engines = Vec::new();
3940            let relay = Arc::new(mocks::relay::Relay::new());
3941            let mut reporters = Vec::new();
3942            for (idx_scheme, validator) in participants.iter().enumerate() {
3943                // Create scheme context
3944                let context = context
3945                    .child("validator")
3946                    .with_attribute("public_key", validator);
3947
3948                // Start engine
3949                let reporter_config = mocks::reporter::Config {
3950                    participants: participants.clone().try_into().unwrap(),
3951                    scheme: schemes[idx_scheme].clone(),
3952                    elector: elector.clone(),
3953                };
3954                let reporter =
3955                    mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
3956                reporters.push(reporter.clone());
3957                let (pending, recovered, resolver) = registrations
3958                    .remove(validator)
3959                    .expect("validator should be registered");
3960                if idx_scheme == 0 {
3961                    let cfg = mocks::equivocator::Config {
3962                        scheme: schemes[idx_scheme].clone(),
3963                        epoch: Epoch::new(333),
3964                        relay: relay.clone(),
3965                        hasher: Sha256::default(),
3966                        elector: elector.clone(),
3967                    };
3968
3969                    let engine = mocks::equivocator::Equivocator::new(
3970                        context.child("byzantine_engine"),
3971                        cfg,
3972                    );
3973                    engines.push(engine.start(pending, recovered));
3974                } else {
3975                    let application_cfg = mocks::application::Config {
3976                        hasher: Sha256::default(),
3977                        relay: relay.clone(),
3978                        me: validator.clone(),
3979                        propose_latency: (10.0, 5.0),
3980                        verify_latency: (10.0, 5.0),
3981                        certify_latency: (10.0, 5.0),
3982                        should_certify: mocks::application::Certifier::Always,
3983                    };
3984                    let (actor, application) = mocks::application::Application::new(
3985                        context.child("application"),
3986                        application_cfg,
3987                    );
3988                    actor.start();
3989                    let blocker = oracle.control(validator.clone());
3990                    let cfg = config::Config {
3991                        scheme: schemes[idx_scheme].clone(),
3992                        elector: elector.clone(),
3993                        blocker,
3994                        automaton: application.clone(),
3995                        relay: application.clone(),
3996                        reporter: reporter.clone(),
3997                        strategy: Sequential,
3998                        partition: validator.to_string(),
3999                        mailbox_size: NZUsize!(1024),
4000                        epoch: Epoch::new(333),
4001                        floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
4002                            Epoch::new(333),
4003                        )),
4004                        leader_timeout: Duration::from_secs(1),
4005                        certification_timeout: Duration::from_secs(2),
4006                        timeout_retry: Duration::from_secs(10),
4007                        fetch_timeout: Duration::from_secs(1),
4008                        activity_timeout,
4009                        skip_timeout,
4010                        fetch_concurrent: NZUsize!(4),
4011                        replay_buffer: NZUsize!(1024 * 1024),
4012                        write_buffer: NZUsize!(1024 * 1024),
4013                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4014                        forwarding: ForwardingPolicy::Disabled,
4015                    };
4016                    let engine = Engine::new(context.child("engine"), cfg);
4017                    engines.push(engine.start(pending, recovered, resolver));
4018                }
4019            }
4020
4021            // Wait for all engines to hit required containers
4022            let mut finalizers = Vec::new();
4023            for reporter in reporters.iter_mut().skip(1) {
4024                let (mut latest, mut monitor) = reporter.subscribe().await;
4025                finalizers.push(context.child("finalizer").spawn(move |_| async move {
4026                    while latest < required_containers {
4027                        latest = monitor.recv().await.expect("event missing");
4028                    }
4029                }));
4030            }
4031            join_all(finalizers).await;
4032
4033            // Abort a validator
4034            let idx = context.gen_range(1..engines.len()); // skip byzantine validator
4035            let validator = &participants[idx];
4036            let handle = engines.remove(idx);
4037            handle.abort();
4038            let _ = handle.await;
4039            reporters.remove(idx);
4040            info!(idx, ?validator, "aborted validator");
4041
4042            // Wait for all engines to hit required containers
4043            let mut finalizers = Vec::new();
4044            for reporter in reporters.iter_mut().skip(1) {
4045                let (mut latest, mut monitor) = reporter.subscribe().await;
4046                finalizers.push(context.child("finalizer").spawn(move |_| async move {
4047                    while latest < View::new(required_containers.get() * 2) {
4048                        latest = monitor.recv().await.expect("event missing");
4049                    }
4050                }));
4051            }
4052            join_all(finalizers).await;
4053
4054            // Recreate engine
4055            info!(idx, ?validator, "restarting validator");
4056            let context = context
4057                .child("validator_restarted")
4058                .with_attribute("public_key", validator);
4059
4060            // Start engine
4061            let reporter_config = mocks::reporter::Config {
4062                participants: participants.clone().try_into().unwrap(),
4063                scheme: schemes[idx].clone(),
4064                elector: elector.clone(),
4065            };
4066            let reporter =
4067                mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
4068            let (pending, recovered, resolver) =
4069                register_validator(&mut oracle, validator.clone()).await;
4070            reporters.push(reporter.clone());
4071            let application_cfg = mocks::application::Config {
4072                hasher: Sha256::default(),
4073                relay: relay.clone(),
4074                me: validator.clone(),
4075                propose_latency: (10.0, 5.0),
4076                verify_latency: (10.0, 5.0),
4077                certify_latency: (10.0, 5.0),
4078                should_certify: mocks::application::Certifier::Always,
4079            };
4080            let (actor, application) =
4081                mocks::application::Application::new(context.child("application"), application_cfg);
4082            actor.start();
4083            let blocker = oracle.control(validator.clone());
4084            let cfg = config::Config {
4085                scheme: schemes[idx].clone(),
4086                elector: elector.clone(),
4087                blocker,
4088                automaton: application.clone(),
4089                relay: application.clone(),
4090                reporter: reporter.clone(),
4091                strategy: Sequential,
4092                partition: validator.to_string(),
4093                mailbox_size: NZUsize!(1024),
4094                epoch: Epoch::new(333),
4095                floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(Epoch::new(
4096                    333,
4097                ))),
4098                leader_timeout: Duration::from_secs(1),
4099                certification_timeout: Duration::from_secs(2),
4100                timeout_retry: Duration::from_secs(10),
4101                fetch_timeout: Duration::from_secs(1),
4102                activity_timeout,
4103                skip_timeout,
4104                fetch_concurrent: NZUsize!(4),
4105                replay_buffer: NZUsize!(1024 * 1024),
4106                write_buffer: NZUsize!(1024 * 1024),
4107                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4108                forwarding: ForwardingPolicy::Disabled,
4109            };
4110            let engine = Engine::new(context.child("engine"), cfg);
4111            engine.start(pending, recovered, resolver);
4112
4113            // Wait for all engines to hit required containers
4114            let mut finalizers = Vec::new();
4115            for reporter in reporters.iter_mut().skip(1) {
4116                let (mut latest, mut monitor) = reporter.subscribe().await;
4117                finalizers.push(context.child("finalizer").spawn(move |_| async move {
4118                    while latest < View::new(required_containers.get() * 3) {
4119                        latest = monitor.recv().await.expect("event missing");
4120                    }
4121                }));
4122            }
4123            join_all(finalizers).await;
4124
4125            // Check equivocator blocking (we aren't guaranteed a fault will be produced
4126            // because it may not be possible to extract a conflicting vote from the certificate
4127            // we receive)
4128            let byz = &participants[0];
4129            let blocked = oracle.blocked().await.unwrap();
4130            for (a, b) in &blocked {
4131                assert_ne!(a, byz);
4132                assert_eq!(b, byz);
4133            }
4134            !blocked.is_empty()
4135        })
4136    }
4137
4138    #[test_group("slow")]
4139    #[test_traced]
4140    fn test_equivocator_bls12381_threshold_vrf_min_pk() {
4141        let detected = (0..5).any(|seed| {
4142            equivocator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>)
4143        });
4144        assert!(
4145            detected,
4146            "expected at least one seed to detect equivocation"
4147        );
4148    }
4149
4150    #[test_group("slow")]
4151    #[test_traced]
4152    fn test_equivocator_bls12381_threshold_vrf_min_sig() {
4153        let detected = (0..5).any(|seed| {
4154            equivocator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>)
4155        });
4156        assert!(
4157            detected,
4158            "expected at least one seed to detect equivocation"
4159        );
4160    }
4161
4162    #[test_group("slow")]
4163    #[test_traced]
4164    fn test_equivocator_bls12381_threshold_std_min_pk() {
4165        let detected = (0..5).any(|seed| {
4166            equivocator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>)
4167        });
4168        assert!(
4169            detected,
4170            "expected at least one seed to detect equivocation"
4171        );
4172    }
4173
4174    #[test_group("slow")]
4175    #[test_traced]
4176    fn test_equivocator_bls12381_threshold_std_min_sig() {
4177        let detected = (0..5).any(|seed| {
4178            equivocator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>)
4179        });
4180        assert!(
4181            detected,
4182            "expected at least one seed to detect equivocation"
4183        );
4184    }
4185
4186    #[test_group("slow")]
4187    #[test_traced]
4188    fn test_equivocator_bls12381_multisig_min_pk() {
4189        let detected = (0..5).any(|seed| {
4190            equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>)
4191        });
4192        assert!(
4193            detected,
4194            "expected at least one seed to detect equivocation"
4195        );
4196    }
4197
4198    #[test_group("slow")]
4199    #[test_traced]
4200    fn test_equivocator_bls12381_multisig_min_sig() {
4201        let detected = (0..5).any(|seed| {
4202            equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>)
4203        });
4204        assert!(
4205            detected,
4206            "expected at least one seed to detect equivocation"
4207        );
4208    }
4209
4210    #[test_group("slow")]
4211    #[test_traced]
4212    fn test_equivocator_ed25519() {
4213        let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, ed25519::fixture));
4214        assert!(
4215            detected,
4216            "expected at least one seed to detect equivocation"
4217        );
4218    }
4219
4220    #[test_group("slow")]
4221    #[test_traced]
4222    fn test_equivocator_secp256r1() {
4223        let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, secp256r1::fixture));
4224        assert!(
4225            detected,
4226            "expected at least one seed to detect equivocation"
4227        );
4228    }
4229
4230    fn reconfigurer<S, F, L>(seed: u64, mut fixture: F)
4231    where
4232        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4233        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4234        L: Elector<S>,
4235    {
4236        // Create context
4237        let n = 4;
4238        let required_containers = View::new(50);
4239        let activity_timeout = ViewDelta::new(10);
4240        let skip_timeout = ViewDelta::new(5);
4241        let namespace = b"consensus".to_vec();
4242        let cfg = deterministic::Config::new()
4243            .with_seed(seed)
4244            .with_timeout(Some(Duration::from_secs(30)));
4245        let executor = deterministic::Runner::new(cfg);
4246        executor.start(|mut context| async move {
4247            // Register participants
4248            let Fixture {
4249                participants,
4250                schemes,
4251                ..
4252            } = fixture(&mut context, &namespace, n);
4253            let mut oracle =
4254                start_test_network_with_peers(context.child("network"), participants.clone(), true)
4255                    .await;
4256            let mut registrations = register_validators(&mut oracle, &participants).await;
4257
4258            // Link all validators
4259            let link = Link {
4260                latency: Duration::from_millis(10),
4261                jitter: Duration::from_millis(1),
4262                success_rate: 1.0,
4263            };
4264            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4265
4266            // Create engines
4267            let elector = L::default();
4268            let relay = Arc::new(mocks::relay::Relay::new());
4269            let mut reporters = Vec::new();
4270            for (idx_scheme, validator) in participants.iter().enumerate() {
4271                // Create scheme context
4272                let context = context
4273                    .child("validator")
4274                    .with_attribute("public_key", validator);
4275
4276                // Start engine
4277                let reporter_config = mocks::reporter::Config {
4278                    participants: participants.clone().try_into().unwrap(),
4279                    scheme: schemes[idx_scheme].clone(),
4280                    elector: elector.clone(),
4281                };
4282                let reporter =
4283                    mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
4284                let (pending, recovered, resolver) = registrations
4285                    .remove(validator)
4286                    .expect("validator should be registered");
4287                if idx_scheme == 0 {
4288                    let cfg = mocks::reconfigurer::Config {
4289                        scheme: schemes[idx_scheme].clone(),
4290                    };
4291                    let engine: mocks::reconfigurer::Reconfigurer<_, _, Sha256> =
4292                        mocks::reconfigurer::Reconfigurer::new(
4293                            context.child("byzantine_engine"),
4294                            cfg,
4295                        );
4296                    engine.start(pending);
4297                } else {
4298                    reporters.push(reporter.clone());
4299                    let application_cfg = mocks::application::Config {
4300                        hasher: Sha256::default(),
4301                        relay: relay.clone(),
4302                        me: validator.clone(),
4303                        propose_latency: (10.0, 5.0),
4304                        verify_latency: (10.0, 5.0),
4305                        certify_latency: (10.0, 5.0),
4306                        should_certify: mocks::application::Certifier::Always,
4307                    };
4308                    let (actor, application) = mocks::application::Application::new(
4309                        context.child("application"),
4310                        application_cfg,
4311                    );
4312                    actor.start();
4313                    let blocker = oracle.control(validator.clone());
4314                    let cfg = config::Config {
4315                        scheme: schemes[idx_scheme].clone(),
4316                        elector: elector.clone(),
4317                        blocker,
4318                        automaton: application.clone(),
4319                        relay: application.clone(),
4320                        reporter: reporter.clone(),
4321                        strategy: Sequential,
4322                        partition: validator.to_string(),
4323                        mailbox_size: NZUsize!(1024),
4324                        epoch: Epoch::new(333),
4325                        floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
4326                            Epoch::new(333),
4327                        )),
4328                        leader_timeout: Duration::from_secs(1),
4329                        certification_timeout: Duration::from_secs(2),
4330                        timeout_retry: Duration::from_secs(10),
4331                        fetch_timeout: Duration::from_secs(1),
4332                        activity_timeout,
4333                        skip_timeout,
4334                        fetch_concurrent: NZUsize!(4),
4335                        replay_buffer: NZUsize!(1024 * 1024),
4336                        write_buffer: NZUsize!(1024 * 1024),
4337                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4338                        forwarding: ForwardingPolicy::Disabled,
4339                    };
4340                    let engine = Engine::new(context.child("engine"), cfg);
4341                    engine.start(pending, recovered, resolver);
4342                }
4343            }
4344
4345            // Wait for all engines to finish
4346            let mut finalizers = Vec::new();
4347            for reporter in reporters.iter_mut() {
4348                let (mut latest, mut monitor) = reporter.subscribe().await;
4349                finalizers.push(context.child("finalizer").spawn(move |_| async move {
4350                    while latest < required_containers {
4351                        latest = monitor.recv().await.expect("event missing");
4352                    }
4353                }));
4354            }
4355            join_all(finalizers).await;
4356
4357            // Check reporters for correct activity
4358            let byz = &participants[0];
4359            for reporter in reporters.iter() {
4360                // Ensure no faults
4361                reporter.assert_no_faults();
4362
4363                // Ensure no invalid signatures
4364                reporter.assert_no_invalid();
4365            }
4366
4367            // Ensure reconfigurer is blocked (epoch mismatch)
4368            let blocked = oracle.blocked().await.unwrap();
4369            assert!(!blocked.is_empty());
4370            for (a, b) in blocked {
4371                assert_ne!(&a, byz);
4372                assert_eq!(&b, byz);
4373            }
4374        });
4375    }
4376
4377    #[test_group("slow")]
4378    #[test_traced]
4379    fn test_reconfigurer() {
4380        for seed in 0..5 {
4381            reconfigurer::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
4382            reconfigurer::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
4383            reconfigurer::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
4384            reconfigurer::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
4385            reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
4386            reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
4387            reconfigurer::<_, _, RoundRobin>(seed, ed25519::fixture);
4388            reconfigurer::<_, _, RoundRobin>(seed, secp256r1::fixture);
4389        }
4390    }
4391
4392    fn nuller<S, F, L>(seed: u64, mut fixture: F)
4393    where
4394        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4395        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4396        L: Elector<S>,
4397    {
4398        // Create context
4399        let n = 4;
4400        let required_containers = View::new(50);
4401        let activity_timeout = ViewDelta::new(10);
4402        let skip_timeout = ViewDelta::new(5);
4403        let namespace = b"consensus".to_vec();
4404        let cfg = deterministic::Config::new()
4405            .with_seed(seed)
4406            .with_timeout(Some(Duration::from_secs(30)));
4407        let executor = deterministic::Runner::new(cfg);
4408        executor.start(|mut context| async move {
4409            // Register participants
4410            let Fixture {
4411                participants,
4412                schemes,
4413                ..
4414            } = fixture(&mut context, &namespace, n);
4415            let mut oracle =
4416                start_test_network_with_peers(context.child("network"), participants.clone(), true)
4417                    .await;
4418            let mut registrations = register_validators(&mut oracle, &participants).await;
4419
4420            // Link all validators
4421            let link = Link {
4422                latency: Duration::from_millis(10),
4423                jitter: Duration::from_millis(1),
4424                success_rate: 1.0,
4425            };
4426            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4427
4428            // Create engines
4429            let elector = L::default();
4430            let relay = Arc::new(mocks::relay::Relay::new());
4431            let mut reporters = Vec::new();
4432            for (idx_scheme, validator) in participants.iter().enumerate() {
4433                // Create scheme context
4434                let context = context
4435                    .child("validator")
4436                    .with_attribute("public_key", validator);
4437
4438                // Start engine
4439                let reporter_config = mocks::reporter::Config {
4440                    participants: participants.clone().try_into().unwrap(),
4441                    scheme: schemes[idx_scheme].clone(),
4442                    elector: elector.clone(),
4443                };
4444                let reporter =
4445                    mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
4446                let (pending, recovered, resolver) = registrations
4447                    .remove(validator)
4448                    .expect("validator should be registered");
4449                if idx_scheme == 0 {
4450                    let cfg = mocks::nuller::Config {
4451                        scheme: schemes[idx_scheme].clone(),
4452                    };
4453                    let engine: mocks::nuller::Nuller<_, _, Sha256> =
4454                        mocks::nuller::Nuller::new(context.child("byzantine_engine"), cfg);
4455                    engine.start(pending);
4456                } else {
4457                    reporters.push(reporter.clone());
4458                    let application_cfg = mocks::application::Config {
4459                        hasher: Sha256::default(),
4460                        relay: relay.clone(),
4461                        me: validator.clone(),
4462                        propose_latency: (10.0, 5.0),
4463                        verify_latency: (10.0, 5.0),
4464                        certify_latency: (10.0, 5.0),
4465                        should_certify: mocks::application::Certifier::Always,
4466                    };
4467                    let (actor, application) = mocks::application::Application::new(
4468                        context.child("application"),
4469                        application_cfg,
4470                    );
4471                    actor.start();
4472                    let blocker = oracle.control(validator.clone());
4473                    let cfg = config::Config {
4474                        scheme: schemes[idx_scheme].clone(),
4475                        elector: elector.clone(),
4476                        blocker,
4477                        automaton: application.clone(),
4478                        relay: application.clone(),
4479                        reporter: reporter.clone(),
4480                        strategy: Sequential,
4481                        partition: validator.clone().to_string(),
4482                        mailbox_size: NZUsize!(1024),
4483                        epoch: Epoch::new(333),
4484                        floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
4485                            Epoch::new(333),
4486                        )),
4487                        leader_timeout: Duration::from_secs(1),
4488                        certification_timeout: Duration::from_secs(2),
4489                        timeout_retry: Duration::from_secs(10),
4490                        fetch_timeout: Duration::from_secs(1),
4491                        activity_timeout,
4492                        skip_timeout,
4493                        fetch_concurrent: NZUsize!(4),
4494                        replay_buffer: NZUsize!(1024 * 1024),
4495                        write_buffer: NZUsize!(1024 * 1024),
4496                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4497                        forwarding: ForwardingPolicy::Disabled,
4498                    };
4499                    let engine = Engine::new(context.child("engine"), cfg);
4500                    engine.start(pending, recovered, resolver);
4501                }
4502            }
4503
4504            // Wait for all engines to finish
4505            let mut finalizers = Vec::new();
4506            for reporter in reporters.iter_mut() {
4507                let (mut latest, mut monitor) = reporter.subscribe().await;
4508                finalizers.push(context.child("finalizer").spawn(move |_| async move {
4509                    while latest < required_containers {
4510                        latest = monitor.recv().await.expect("event missing");
4511                    }
4512                }));
4513            }
4514            join_all(finalizers).await;
4515
4516            // Check reporters for correct activity
4517            let byz = &participants[0];
4518            let mut count_nullify_and_finalize = 0;
4519            for reporter in reporters.iter() {
4520                // Ensure only faults for byz
4521                {
4522                    let faults = reporter.faults.lock();
4523                    assert_eq!(faults.len(), 1);
4524                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
4525                    for (_, faults) in faulter.iter() {
4526                        for fault in faults.iter() {
4527                            match fault {
4528                                Activity::NullifyFinalize(_) => {
4529                                    count_nullify_and_finalize += 1;
4530                                }
4531                                _ => panic!("unexpected fault: {fault:?}"),
4532                            }
4533                        }
4534                    }
4535                }
4536
4537                // Ensure no invalid signatures
4538                reporter.assert_no_invalid();
4539            }
4540            assert!(count_nullify_and_finalize > 0);
4541
4542            // Ensure nullifier is blocked
4543            let blocked = oracle.blocked().await.unwrap();
4544            assert!(!blocked.is_empty());
4545            for (a, b) in blocked {
4546                assert_ne!(&a, byz);
4547                assert_eq!(&b, byz);
4548            }
4549        });
4550    }
4551
4552    #[test_group("slow")]
4553    #[test_traced]
4554    fn test_nuller() {
4555        for seed in 0..5 {
4556            nuller::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
4557            nuller::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
4558            nuller::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
4559            nuller::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
4560            nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
4561            nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
4562            nuller::<_, _, RoundRobin>(seed, ed25519::fixture);
4563            nuller::<_, _, RoundRobin>(seed, secp256r1::fixture);
4564        }
4565    }
4566
4567    fn outdated<S, F, L>(seed: u64, mut fixture: F)
4568    where
4569        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4570        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4571        L: Elector<S>,
4572    {
4573        // Create context
4574        let n = 4;
4575        let required_containers = View::new(100);
4576        let activity_timeout = ViewDelta::new(10);
4577        let skip_timeout = ViewDelta::new(5);
4578        let namespace = b"consensus".to_vec();
4579        let cfg = deterministic::Config::new()
4580            .with_seed(seed)
4581            .with_timeout(Some(Duration::from_secs(30)));
4582        let executor = deterministic::Runner::new(cfg);
4583        executor.start(|mut context| async move {
4584            // Register participants
4585            let Fixture {
4586                participants,
4587                schemes,
4588                ..
4589            } = fixture(&mut context, &namespace, n);
4590            let mut oracle =
4591                start_test_network_with_peers(context.child("network"), participants.clone(), true)
4592                    .await;
4593            let mut registrations = register_validators(&mut oracle, &participants).await;
4594
4595            // Link all validators
4596            let link = Link {
4597                latency: Duration::from_millis(10),
4598                jitter: Duration::from_millis(1),
4599                success_rate: 1.0,
4600            };
4601            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4602
4603            // Create engines
4604            let elector = L::default();
4605            let relay = Arc::new(mocks::relay::Relay::new());
4606            let mut reporters = Vec::new();
4607            for (idx_scheme, validator) in participants.iter().enumerate() {
4608                // Create scheme context
4609                let context = context
4610                    .child("validator")
4611                    .with_attribute("public_key", validator);
4612
4613                // Start engine
4614                let reporter_config = mocks::reporter::Config {
4615                    participants: participants.clone().try_into().unwrap(),
4616                    scheme: schemes[idx_scheme].clone(),
4617                    elector: elector.clone(),
4618                };
4619                let reporter =
4620                    mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
4621                let (pending, recovered, resolver) = registrations
4622                    .remove(validator)
4623                    .expect("validator should be registered");
4624                if idx_scheme == 0 {
4625                    let cfg = mocks::outdated::Config {
4626                        scheme: schemes[idx_scheme].clone(),
4627                        view_delta: ViewDelta::new(activity_timeout.get().saturating_mul(4)),
4628                    };
4629                    let engine: mocks::outdated::Outdated<_, _, Sha256> =
4630                        mocks::outdated::Outdated::new(context.child("byzantine_engine"), cfg);
4631                    engine.start(pending);
4632                } else {
4633                    reporters.push(reporter.clone());
4634                    let application_cfg = mocks::application::Config {
4635                        hasher: Sha256::default(),
4636                        relay: relay.clone(),
4637                        me: validator.clone(),
4638                        propose_latency: (10.0, 5.0),
4639                        verify_latency: (10.0, 5.0),
4640                        certify_latency: (10.0, 5.0),
4641                        should_certify: mocks::application::Certifier::Always,
4642                    };
4643                    let (actor, application) = mocks::application::Application::new(
4644                        context.child("application"),
4645                        application_cfg,
4646                    );
4647                    actor.start();
4648                    let blocker = oracle.control(validator.clone());
4649                    let cfg = config::Config {
4650                        scheme: schemes[idx_scheme].clone(),
4651                        elector: elector.clone(),
4652                        blocker,
4653                        automaton: application.clone(),
4654                        relay: application.clone(),
4655                        reporter: reporter.clone(),
4656                        strategy: Sequential,
4657                        partition: validator.clone().to_string(),
4658                        mailbox_size: NZUsize!(1024),
4659                        epoch: Epoch::new(333),
4660                        floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
4661                            Epoch::new(333),
4662                        )),
4663                        leader_timeout: Duration::from_secs(1),
4664                        certification_timeout: Duration::from_secs(2),
4665                        timeout_retry: Duration::from_secs(10),
4666                        fetch_timeout: Duration::from_secs(1),
4667                        activity_timeout,
4668                        skip_timeout,
4669                        fetch_concurrent: NZUsize!(4),
4670                        replay_buffer: NZUsize!(1024 * 1024),
4671                        write_buffer: NZUsize!(1024 * 1024),
4672                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4673                        forwarding: ForwardingPolicy::Disabled,
4674                    };
4675                    let engine = Engine::new(context.child("engine"), cfg);
4676                    engine.start(pending, recovered, resolver);
4677                }
4678            }
4679
4680            // Wait for all engines to finish
4681            let mut finalizers = Vec::new();
4682            for reporter in reporters.iter_mut() {
4683                let (mut latest, mut monitor) = reporter.subscribe().await;
4684                finalizers.push(context.child("finalizer").spawn(move |_| async move {
4685                    while latest < required_containers {
4686                        latest = monitor.recv().await.expect("event missing");
4687                    }
4688                }));
4689            }
4690            join_all(finalizers).await;
4691
4692            // Check reporters for correct activity
4693            for reporter in reporters.iter() {
4694                // Ensure no faults
4695                reporter.assert_no_faults();
4696
4697                // Ensure no invalid signatures
4698                reporter.assert_no_invalid();
4699            }
4700
4701            // Ensure no blocked connections
4702            let blocked = oracle.blocked().await.unwrap();
4703            assert!(blocked.is_empty());
4704        });
4705    }
4706
4707    #[test_group("slow")]
4708    #[test_traced]
4709    fn test_outdated() {
4710        for seed in 0..5 {
4711            outdated::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
4712            outdated::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
4713            outdated::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
4714            outdated::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
4715            outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
4716            outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
4717            outdated::<_, _, RoundRobin>(seed, ed25519::fixture);
4718            outdated::<_, _, RoundRobin>(seed, secp256r1::fixture);
4719        }
4720    }
4721
4722    fn run_1k<S, F, L>(mut fixture: F)
4723    where
4724        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4725        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4726        L: Elector<S>,
4727    {
4728        // Create context
4729        let n = 10;
4730        let required_containers = View::new(1_000);
4731        let activity_timeout = ViewDelta::new(10);
4732        let skip_timeout = ViewDelta::new(5);
4733        let namespace = b"consensus".to_vec();
4734        let cfg = deterministic::Config::new();
4735        let executor = deterministic::Runner::new(cfg);
4736        executor.start(|mut context| async move {
4737            // Register participants
4738            let Fixture {
4739                participants,
4740                schemes,
4741                ..
4742            } = fixture(&mut context, &namespace, n);
4743            let mut oracle =
4744                start_test_network_with_peers(context.child("network"), participants.clone(), true)
4745                    .await;
4746            let mut registrations = register_validators(&mut oracle, &participants).await;
4747
4748            // Link all validators
4749            let link = Link {
4750                latency: Duration::from_millis(80),
4751                jitter: Duration::from_millis(10),
4752                success_rate: 0.98,
4753            };
4754            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4755
4756            // Create engines
4757            let elector = L::default();
4758            let relay = Arc::new(mocks::relay::Relay::new());
4759            let mut reporters = Vec::new();
4760            let mut engine_handlers = Vec::new();
4761            for (idx, validator) in participants.iter().enumerate() {
4762                // Create scheme context
4763                let context = context
4764                    .child("validator")
4765                    .with_attribute("public_key", validator);
4766
4767                // Configure engine
4768                let reporter_config = mocks::reporter::Config {
4769                    participants: participants.clone().try_into().unwrap(),
4770                    scheme: schemes[idx].clone(),
4771                    elector: elector.clone(),
4772                };
4773                let reporter =
4774                    mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
4775                reporters.push(reporter.clone());
4776                let application_cfg = mocks::application::Config {
4777                    hasher: Sha256::default(),
4778                    relay: relay.clone(),
4779                    me: validator.clone(),
4780                    propose_latency: (100.0, 50.0),
4781                    verify_latency: (50.0, 40.0),
4782                    certify_latency: (50.0, 40.0),
4783                    should_certify: mocks::application::Certifier::Always,
4784                };
4785                let (actor, application) = mocks::application::Application::new(
4786                    context.child("application"),
4787                    application_cfg,
4788                );
4789                actor.start();
4790                let blocker = oracle.control(validator.clone());
4791                let cfg = config::Config {
4792                    scheme: schemes[idx].clone(),
4793                    elector: elector.clone(),
4794                    blocker,
4795                    automaton: application.clone(),
4796                    relay: application.clone(),
4797                    reporter: reporter.clone(),
4798                    strategy: Sequential,
4799                    partition: validator.to_string(),
4800                    mailbox_size: NZUsize!(1024),
4801                    epoch: Epoch::new(333),
4802                    floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
4803                        Epoch::new(333),
4804                    )),
4805                    leader_timeout: Duration::from_secs(1),
4806                    certification_timeout: Duration::from_secs(2),
4807                    timeout_retry: Duration::from_secs(10),
4808                    fetch_timeout: Duration::from_secs(1),
4809                    activity_timeout,
4810                    skip_timeout,
4811                    fetch_concurrent: NZUsize!(4),
4812                    replay_buffer: NZUsize!(1024 * 1024),
4813                    write_buffer: NZUsize!(1024 * 1024),
4814                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4815                    forwarding: ForwardingPolicy::Disabled,
4816                };
4817                let engine = Engine::new(context.child("engine"), cfg);
4818
4819                // Start engine
4820                let (pending, recovered, resolver) = registrations
4821                    .remove(validator)
4822                    .expect("validator should be registered");
4823                engine_handlers.push(engine.start(pending, recovered, resolver));
4824            }
4825
4826            // Wait for all engines to finish
4827            let mut finalizers = Vec::new();
4828            for reporter in reporters.iter_mut() {
4829                let (mut latest, mut monitor) = reporter.subscribe().await;
4830                finalizers.push(context.child("finalizer").spawn(move |_| async move {
4831                    while latest < required_containers {
4832                        latest = monitor.recv().await.expect("event missing");
4833                    }
4834                }));
4835            }
4836            join_all(finalizers).await;
4837
4838            // Check reporters for correct activity
4839            for reporter in reporters.iter() {
4840                // Ensure no faults
4841                reporter.assert_no_faults();
4842
4843                // Ensure no invalid signatures
4844                reporter.assert_no_invalid();
4845            }
4846
4847            // Ensure no blocked connections
4848            let blocked = oracle.blocked().await.unwrap();
4849            assert!(blocked.is_empty());
4850        })
4851    }
4852
4853    #[test_group("slow")]
4854    #[test_traced]
4855    fn test_1k_bls12381_threshold_vrf_min_pk() {
4856        run_1k::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
4857    }
4858
4859    #[test_group("slow")]
4860    #[test_traced]
4861    fn test_1k_bls12381_threshold_vrf_min_sig() {
4862        run_1k::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
4863    }
4864
4865    #[test_group("slow")]
4866    #[test_traced]
4867    fn test_1k_bls12381_threshold_std_min_pk() {
4868        run_1k::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
4869    }
4870
4871    #[test_group("slow")]
4872    #[test_traced]
4873    fn test_1k_bls12381_threshold_std_min_sig() {
4874        run_1k::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
4875    }
4876
4877    #[test_group("slow")]
4878    #[test_traced]
4879    fn test_1k_bls12381_multisig_min_pk() {
4880        run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4881    }
4882
4883    #[test_group("slow")]
4884    #[test_traced]
4885    fn test_1k_bls12381_multisig_min_sig() {
4886        run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
4887    }
4888
4889    #[test_group("slow")]
4890    #[test_traced]
4891    fn test_1k_ed25519() {
4892        run_1k::<_, _, RoundRobin>(ed25519::fixture);
4893    }
4894
4895    #[test_group("slow")]
4896    #[test_traced]
4897    fn test_1k_secp256r1() {
4898        run_1k::<_, _, RoundRobin>(secp256r1::fixture);
4899    }
4900
4901    fn engine_shutdown<S, F, L>(seed: u64, mut fixture: F, graceful: bool)
4902    where
4903        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4904        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4905        L: Elector<S>,
4906    {
4907        let n = 1;
4908        let namespace = b"consensus".to_vec();
4909        let cfg = deterministic::Config::default()
4910            .with_seed(seed)
4911            .with_timeout(Some(Duration::from_secs(10)));
4912        let executor = deterministic::Runner::new(cfg);
4913        executor.start(|mut context| async move {
4914            // Register a single participant
4915            let Fixture {
4916                participants,
4917                schemes,
4918                ..
4919            } = fixture(&mut context, &namespace, n);
4920            let mut oracle =
4921                start_test_network_with_peers(context.child("network"), participants.clone(), true)
4922                    .await;
4923            let mut registrations = register_validators(&mut oracle, &participants).await;
4924
4925            // Link the single validator to itself (no-ops for completeness)
4926            let link = Link {
4927                latency: Duration::from_millis(1),
4928                jitter: Duration::from_millis(0),
4929                success_rate: 1.0,
4930            };
4931            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4932
4933            // Create engine
4934            let elector = L::default();
4935            let reporter_config = mocks::reporter::Config {
4936                participants: participants.clone().try_into().unwrap(),
4937                scheme: schemes[0].clone(),
4938                elector: elector.clone(),
4939            };
4940            let reporter =
4941                mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
4942            let relay = Arc::new(mocks::relay::Relay::new());
4943            let application_cfg = mocks::application::Config {
4944                hasher: Sha256::default(),
4945                relay: relay.clone(),
4946                me: participants[0].clone(),
4947                propose_latency: (1.0, 0.0),
4948                verify_latency: (1.0, 0.0),
4949                certify_latency: (1.0, 0.0),
4950                should_certify: mocks::application::Certifier::Always,
4951            };
4952            let (actor, application) =
4953                mocks::application::Application::new(context.child("application"), application_cfg);
4954            actor.start();
4955            let blocker = oracle.control(participants[0].clone());
4956            let cfg = config::Config {
4957                scheme: schemes[0].clone(),
4958                elector: elector.clone(),
4959                blocker,
4960                automaton: application.clone(),
4961                relay: application.clone(),
4962                reporter: reporter.clone(),
4963                strategy: Sequential,
4964                partition: participants[0].clone().to_string(),
4965                mailbox_size: NZUsize!(64),
4966                epoch: Epoch::new(333),
4967                floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(Epoch::new(
4968                    333,
4969                ))),
4970                leader_timeout: Duration::from_millis(50),
4971                certification_timeout: Duration::from_millis(100),
4972                timeout_retry: Duration::from_millis(250),
4973                fetch_timeout: Duration::from_millis(50),
4974                activity_timeout: ViewDelta::new(4),
4975                skip_timeout: ViewDelta::new(2),
4976                fetch_concurrent: NZUsize!(4),
4977                replay_buffer: NZUsize!(1024 * 16),
4978                write_buffer: NZUsize!(1024 * 16),
4979                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4980                forwarding: ForwardingPolicy::Disabled,
4981            };
4982            let engine = Engine::new(context.child("engine"), cfg);
4983
4984            // Start engine
4985            let (pending, recovered, resolver) = registrations
4986                .remove(&participants[0])
4987                .expect("validator should be registered");
4988            let handle = engine.start(pending, recovered, resolver);
4989
4990            // Allow tasks to start
4991            context.sleep(Duration::from_millis(1000)).await;
4992
4993            // Count running tasks under the engine prefix
4994            let running_before = count_running_tasks(&context, "engine");
4995            assert!(
4996                running_before > 0,
4997                "at least one engine task should be running"
4998            );
4999
5000            // Make sure the engine is still running after some time
5001            context.sleep(Duration::from_millis(1500)).await;
5002            assert!(
5003                count_running_tasks(&context, "engine") > 0,
5004                "engine tasks should still be running"
5005            );
5006
5007            // Shutdown engine and ensure children stop
5008            let running_after = if graceful {
5009                let result = context
5010                    .child("stop")
5011                    .stop(0, Some(Duration::from_secs(5)))
5012                    .await;
5013                assert!(
5014                    result.is_ok(),
5015                    "graceful shutdown should complete: {result:?}"
5016                );
5017                count_running_tasks(&context, "engine")
5018            } else {
5019                handle.abort();
5020                let _ = handle.await; // ensure parent tear-down runs
5021
5022                // Give the runtime a tick to process aborts
5023                context.sleep(Duration::from_millis(1000)).await;
5024                count_running_tasks(&context, "engine")
5025            };
5026            assert_eq!(
5027                running_after, 0,
5028                "all engine tasks should be stopped, but {running_after} still running"
5029            );
5030        });
5031    }
5032
5033    #[test_group("slow")]
5034    #[test_traced]
5035    fn test_children_shutdown_on_engine_abort() {
5036        for seed in 0..10 {
5037            engine_shutdown::<_, _, Random>(
5038                seed,
5039                bls12381_threshold_vrf::fixture::<MinPk, _>,
5040                false,
5041            );
5042            engine_shutdown::<_, _, Random>(
5043                seed,
5044                bls12381_threshold_vrf::fixture::<MinSig, _>,
5045                false,
5046            );
5047            engine_shutdown::<_, _, RoundRobin>(
5048                seed,
5049                bls12381_threshold_std::fixture::<MinPk, _>,
5050                false,
5051            );
5052            engine_shutdown::<_, _, RoundRobin>(
5053                seed,
5054                bls12381_threshold_std::fixture::<MinSig, _>,
5055                false,
5056            );
5057            engine_shutdown::<_, _, RoundRobin>(
5058                seed,
5059                bls12381_multisig::fixture::<MinPk, _>,
5060                false,
5061            );
5062            engine_shutdown::<_, _, RoundRobin>(
5063                seed,
5064                bls12381_multisig::fixture::<MinSig, _>,
5065                false,
5066            );
5067            engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, false);
5068            engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, false);
5069        }
5070    }
5071
5072    #[test_group("slow")]
5073    #[test_traced]
5074    fn test_graceful_shutdown() {
5075        for seed in 0..10 {
5076            engine_shutdown::<_, _, Random>(
5077                seed,
5078                bls12381_threshold_vrf::fixture::<MinPk, _>,
5079                true,
5080            );
5081            engine_shutdown::<_, _, Random>(
5082                seed,
5083                bls12381_threshold_vrf::fixture::<MinSig, _>,
5084                true,
5085            );
5086            engine_shutdown::<_, _, RoundRobin>(
5087                seed,
5088                bls12381_threshold_std::fixture::<MinPk, _>,
5089                true,
5090            );
5091            engine_shutdown::<_, _, RoundRobin>(
5092                seed,
5093                bls12381_threshold_std::fixture::<MinSig, _>,
5094                true,
5095            );
5096            engine_shutdown::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>, true);
5097            engine_shutdown::<_, _, RoundRobin>(
5098                seed,
5099                bls12381_multisig::fixture::<MinSig, _>,
5100                true,
5101            );
5102            engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, true);
5103            engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, true);
5104        }
5105    }
5106
5107    fn attributable_reporter_filtering<S, F, L>(mut fixture: F)
5108    where
5109        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5110        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5111        L: Elector<S>,
5112    {
5113        let n = 3;
5114        let required_containers = View::new(10);
5115        let activity_timeout = ViewDelta::new(10);
5116        let skip_timeout = ViewDelta::new(5);
5117        let namespace = b"consensus".to_vec();
5118        let executor = deterministic::Runner::timed(Duration::from_secs(30));
5119        executor.start(|mut context| async move {
5120            // Register participants
5121            let Fixture {
5122                participants,
5123                schemes,
5124                ..
5125            } = fixture(&mut context, &namespace, n);
5126            let mut oracle = start_test_network_with_peers(
5127                context.child("network"),
5128                participants.clone(),
5129                false,
5130            )
5131            .await;
5132            let mut registrations = register_validators(&mut oracle, &participants).await;
5133
5134            // Link all validators
5135            let link = Link {
5136                latency: Duration::from_millis(10),
5137                jitter: Duration::from_millis(1),
5138                success_rate: 1.0,
5139            };
5140            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5141
5142            // Create engines with `AttributableReporter` wrapper
5143            let elector = L::default();
5144            let relay = Arc::new(mocks::relay::Relay::new());
5145            let mut reporters = Vec::new();
5146            for (idx, validator) in participants.iter().enumerate() {
5147                let context = context
5148                    .child("validator")
5149                    .with_attribute("public_key", validator);
5150
5151                let reporter_config = mocks::reporter::Config {
5152                    participants: participants.clone().try_into().unwrap(),
5153                    scheme: schemes[idx].clone(),
5154                    elector: elector.clone(),
5155                };
5156                let mock_reporter =
5157                    mocks::reporter::Reporter::new(context.child("mock_reporter"), reporter_config);
5158
5159                // Wrap with `AttributableReporter`
5160                let attributable_reporter = scheme::reporter::AttributableReporter::new(
5161                    context.child("rng"),
5162                    schemes[idx].clone(),
5163                    mock_reporter.clone(),
5164                    Sequential,
5165                    true, // Enable verification
5166                );
5167                reporters.push(mock_reporter.clone());
5168
5169                let application_cfg = mocks::application::Config {
5170                    hasher: Sha256::default(),
5171                    relay: relay.clone(),
5172                    me: validator.clone(),
5173                    propose_latency: (10.0, 5.0),
5174                    verify_latency: (10.0, 5.0),
5175                    certify_latency: (10.0, 5.0),
5176                    should_certify: mocks::application::Certifier::Always,
5177                };
5178                let (actor, application) = mocks::application::Application::new(
5179                    context.child("application"),
5180                    application_cfg,
5181                );
5182                actor.start();
5183                let blocker = oracle.control(validator.clone());
5184                let cfg = config::Config {
5185                    scheme: schemes[idx].clone(),
5186                    elector: elector.clone(),
5187                    blocker,
5188                    automaton: application.clone(),
5189                    relay: application.clone(),
5190                    reporter: attributable_reporter,
5191                    strategy: Sequential,
5192                    partition: validator.to_string(),
5193                    mailbox_size: NZUsize!(1024),
5194                    epoch: Epoch::new(333),
5195                    floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
5196                        Epoch::new(333),
5197                    )),
5198                    leader_timeout: Duration::from_secs(1),
5199                    certification_timeout: Duration::from_secs(2),
5200                    timeout_retry: Duration::from_secs(10),
5201                    fetch_timeout: Duration::from_secs(1),
5202                    activity_timeout,
5203                    skip_timeout,
5204                    fetch_concurrent: NZUsize!(4),
5205                    replay_buffer: NZUsize!(1024 * 1024),
5206                    write_buffer: NZUsize!(1024 * 1024),
5207                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5208                    forwarding: ForwardingPolicy::Disabled,
5209                };
5210                let engine = Engine::new(context.child("engine"), cfg);
5211
5212                // Start engine
5213                let (pending, recovered, resolver) = registrations
5214                    .remove(validator)
5215                    .expect("validator should be registered");
5216                engine.start(pending, recovered, resolver);
5217            }
5218
5219            // Wait for all engines to finish
5220            let mut finalizers = Vec::new();
5221            for reporter in reporters.iter_mut() {
5222                let (mut latest, mut monitor) = reporter.subscribe().await;
5223                finalizers.push(context.child("finalizer").spawn(move |_| async move {
5224                    while latest < required_containers {
5225                        latest = monitor.recv().await.expect("event missing");
5226                    }
5227                }));
5228            }
5229            join_all(finalizers).await;
5230
5231            // Verify filtering behavior based on scheme attributability
5232            for reporter in reporters.iter() {
5233                // Ensure no faults (normal operation)
5234                reporter.assert_no_faults();
5235
5236                // Ensure no invalid signatures
5237                reporter.assert_no_invalid();
5238
5239                // Check that we have certificates reported
5240                {
5241                    let notarizations = reporter.notarizations.lock();
5242                    let finalizations = reporter.finalizations.lock();
5243                    assert!(
5244                        !notarizations.is_empty() || !finalizations.is_empty(),
5245                        "Certificates should be reported"
5246                    );
5247                }
5248
5249                // Check notarizes
5250                let notarizes = reporter.notarizes.lock();
5251                let last_view = notarizes.keys().max().cloned().unwrap_or_default();
5252                for (view, payloads) in notarizes.iter() {
5253                    if *view == last_view {
5254                        continue; // Skip last view
5255                    }
5256
5257                    let signers: usize = payloads.values().map(|signers| signers.len()).sum();
5258
5259                    // For attributable schemes, we should see peer activities
5260                    if S::is_attributable() {
5261                        assert!(signers > 1, "view {view}: {signers}");
5262                    } else {
5263                        // For non-attributable, we shouldn't see any peer activities
5264                        assert_eq!(signers, 0);
5265                    }
5266                }
5267
5268                // Check finalizes
5269                let finalizes = reporter.finalizes.lock();
5270                for (_, payloads) in finalizes.iter() {
5271                    let signers: usize = payloads.values().map(|signers| signers.len()).sum();
5272
5273                    // For attributable schemes, we should see peer activities
5274                    if S::is_attributable() {
5275                        assert!(signers > 1);
5276                    } else {
5277                        // For non-attributable, we shouldn't see any peer activities
5278                        assert_eq!(signers, 0);
5279                    }
5280                }
5281            }
5282
5283            // Ensure no blocked connections (normal operation)
5284            let blocked = oracle.blocked().await.unwrap();
5285            assert!(blocked.is_empty());
5286        });
5287    }
5288
5289    #[test_traced]
5290    fn test_attributable_reporter_filtering() {
5291        attributable_reporter_filtering::<_, _, Random>(
5292            bls12381_threshold_vrf::fixture::<MinPk, _>,
5293        );
5294        attributable_reporter_filtering::<_, _, Random>(
5295            bls12381_threshold_vrf::fixture::<MinSig, _>,
5296        );
5297        attributable_reporter_filtering::<_, _, RoundRobin>(
5298            bls12381_threshold_std::fixture::<MinPk, _>,
5299        );
5300        attributable_reporter_filtering::<_, _, RoundRobin>(
5301            bls12381_threshold_std::fixture::<MinSig, _>,
5302        );
5303        attributable_reporter_filtering::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
5304        attributable_reporter_filtering::<_, _, RoundRobin>(
5305            bls12381_multisig::fixture::<MinSig, _>,
5306        );
5307        attributable_reporter_filtering::<_, _, RoundRobin>(ed25519::fixture);
5308        attributable_reporter_filtering::<_, _, RoundRobin>(secp256r1::fixture);
5309    }
5310
5311    fn split_views_no_lockup<S, F, L>(mut fixture: F)
5312    where
5313        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5314        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5315        L: Elector<S>,
5316    {
5317        // Scenario:
5318        // - View F: Finalization of B_1 seen by all participants.
5319        // - View F+1:
5320        //   - Nullification seen by honest (4..=6,7) and all 3 byzantines
5321        //   - Notarization of B_2A seen by honest (1..=3)
5322        // - View F+2:
5323        //   - Nullification seen by honest (1..=3,7) and all 3 byzantines
5324        //   - Notarization of B_2B seen by honest (4..=6)
5325        // - View F+3: Nullification. Seen by all participants.
5326        // - Then ensure progress resumes beyond F+3 after reconnecting
5327
5328        // Define participant types
5329        enum ParticipantType {
5330            Group1,    // receives notarization for f+1, nullification for f+2
5331            Group2,    // receives nullification for f+1, notarization for f+2
5332            Ignorant,  // receives nullification for f+1 and f+2
5333            Byzantine, // nullify-only
5334        }
5335        let get_type = |idx: usize| -> ParticipantType {
5336            match idx {
5337                0..3 => ParticipantType::Group1,
5338                3..6 => ParticipantType::Group2,
5339                6 => ParticipantType::Ignorant,
5340                7..10 => ParticipantType::Byzantine,
5341                _ => unreachable!(),
5342            }
5343        };
5344
5345        // Create context
5346        let n = 10;
5347        let quorum = quorum(n) as usize;
5348        assert_eq!(quorum, 7);
5349        let activity_timeout = ViewDelta::new(10);
5350        let skip_timeout = ViewDelta::new(5);
5351        let namespace = b"consensus".to_vec();
5352        let executor = deterministic::Runner::timed(Duration::from_secs(300));
5353        executor.start(|mut context| async move {
5354            // Register participants
5355            let Fixture {
5356                participants,
5357                schemes,
5358                ..
5359            } = fixture(&mut context, &namespace, n);
5360            let mut oracle = start_test_network_with_peers(
5361                context.child("network"),
5362                participants.clone(),
5363                false,
5364            )
5365            .await;
5366            let mut registrations = register_validators(&mut oracle, &participants).await;
5367
5368            // ========== Build the certificates manually ==========
5369
5370            // Helper: assemble finalization from explicit signer indices
5371            let build_finalization = |proposal: &Proposal<D>| -> TFinalization<_, D> {
5372                let votes: Vec<_> = (0..=quorum)
5373                    .map(|i| TFinalize::sign(&schemes[i], proposal.clone()).unwrap())
5374                    .collect();
5375                TFinalization::from_finalizes(&schemes[0], &votes, &Sequential)
5376                    .expect("finalization quorum")
5377            };
5378            // Helper: assemble notarization from explicit signer indices
5379            let build_notarization = |proposal: &Proposal<D>| -> TNotarization<_, D> {
5380                let votes: Vec<_> = (0..=quorum)
5381                    .map(|i| TNotarize::sign(&schemes[i], proposal.clone()).unwrap())
5382                    .collect();
5383                TNotarization::from_notarizes(&schemes[0], &votes, &Sequential)
5384                    .expect("notarization quorum")
5385            };
5386            let build_nullification = |round: Round| -> TNullification<_> {
5387                let votes: Vec<_> = (0..=quorum)
5388                    .map(|i| TNullify::sign::<D>(&schemes[i], round).unwrap())
5389                    .collect();
5390                TNullification::from_nullifies(&schemes[0], &votes, &Sequential)
5391                    .expect("nullification quorum")
5392            };
5393            // Choose F=1 and construct B_1, B_2A, B_2B
5394            let f_view = 1;
5395            let round_f = Round::new(Epoch::new(333), View::new(f_view));
5396            let payload_b0 = Sha256::hash(b"B_F");
5397            let proposal_b0 = Proposal::new(round_f, View::new(f_view - 1), payload_b0);
5398            let payload_b1a = Sha256::hash(b"B_G1");
5399            let proposal_b1a = Proposal::new(
5400                Round::new(Epoch::new(333), View::new(f_view + 1)),
5401                View::new(f_view),
5402                payload_b1a,
5403            );
5404            let payload_b1b = Sha256::hash(b"B_G2");
5405            let proposal_b1b = Proposal::new(
5406                Round::new(Epoch::new(333), View::new(f_view + 2)),
5407                View::new(f_view),
5408                payload_b1b,
5409            );
5410
5411            // Build notarization and finalization for the first block
5412            let b0_notarization = build_notarization(&proposal_b0);
5413            let b0_finalization = build_finalization(&proposal_b0);
5414            // Build notarizations for F+1 and F+2
5415            let b1a_notarization = build_notarization(&proposal_b1a);
5416            let b1b_notarization = build_notarization(&proposal_b1b);
5417            // Build nullifications for F+1 and F+2
5418            let null_a = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 1)));
5419            let null_b = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 2)));
5420
5421            // Create an 11th non-participant injector and obtain senders
5422            let injector_pk = PrivateKey::from_seed(1_000_000).public_key();
5423            let (mut injector_sender, _inj_certificate_receiver) = oracle
5424                .control(injector_pk.clone())
5425                .register(1, TEST_QUOTA)
5426                .await
5427                .unwrap();
5428
5429            // Create minimal one-way links from injector to all participants (not full mesh)
5430            let link = Link {
5431                latency: Duration::from_millis(10),
5432                jitter: Duration::from_millis(0),
5433                success_rate: 1.0,
5434            };
5435            for p in participants.iter() {
5436                oracle
5437                    .add_link(injector_pk.clone(), p.clone(), link.clone())
5438                    .await
5439                    .unwrap();
5440            }
5441            oracle.manager().track(
5442                1,
5443                TrackedPeers::new(
5444                    Set::from_iter_dedup(participants.iter().cloned()),
5445                    Set::from_iter_dedup(std::slice::from_ref(&injector_pk).iter().cloned()),
5446                ),
5447            );
5448            context.sleep(Duration::from_millis(10)).await;
5449
5450            // ========== Broadcast certificates over recovered network. ==========
5451
5452            // View F:
5453            let msg = Certificate::<_, D>::Notarization(b0_notarization).encode();
5454            injector_sender.send(Recipients::All, msg, true);
5455            let msg = Certificate::<_, D>::Finalization(b0_finalization).encode();
5456            injector_sender.send(Recipients::All, msg, true);
5457            // View F+1:
5458            let notarization_msg = Certificate::<_, D>::Notarization(b1a_notarization);
5459            let nullification_msg = Certificate::<_, D>::Nullification(null_a.clone());
5460            for (i, participant) in participants.iter().enumerate() {
5461                let recipient = Recipients::One(participant.clone());
5462                let msg = match get_type(i) {
5463                    ParticipantType::Group1 => notarization_msg.encode(),
5464                    _ => nullification_msg.encode(),
5465                };
5466                injector_sender.send(recipient, msg, true);
5467            }
5468            // View F+2:
5469            let notarization_msg = Certificate::<_, D>::Notarization(b1b_notarization);
5470            let nullification_msg = Certificate::<_, D>::Nullification(null_b.clone());
5471            for (i, participant) in participants.iter().enumerate() {
5472                let recipient = Recipients::One(participant.clone());
5473                let msg = match get_type(i) {
5474                    ParticipantType::Group2 => notarization_msg.encode(),
5475                    _ => nullification_msg.encode(),
5476                };
5477                injector_sender.send(recipient, msg, true);
5478            }
5479
5480            // ========== Create engines ==========
5481
5482            // Start engines after preloading certificates into each participant's
5483            // recovered channel (ensuring processing before any leader attempts to issue a
5484            // conflicting vote).
5485            let elector = L::default();
5486            let relay = Arc::new(mocks::relay::Relay::new());
5487            let mut honest_reporters = Vec::new();
5488            for (idx, validator) in participants.iter().enumerate() {
5489                let (pending, recovered, resolver) = registrations
5490                    .remove(validator)
5491                    .expect("validator should be registered");
5492                let participant_type = get_type(idx);
5493                if matches!(participant_type, ParticipantType::Byzantine) {
5494                    // Byzantine engines
5495                    let cfg = mocks::nullify_only::Config {
5496                        scheme: schemes[idx].clone(),
5497                    };
5498                    let engine: mocks::nullify_only::NullifyOnly<_, _, Sha256> =
5499                        mocks::nullify_only::NullifyOnly::new(
5500                            context
5501                                .child("byzantine")
5502                                .with_attribute("public_key", validator),
5503                            cfg,
5504                        );
5505                    engine.start(pending);
5506                    // Recovered/resolver channels are unused for byzantine actors.
5507                    drop(recovered);
5508                    drop(resolver);
5509                } else {
5510                    // Honest engines
5511                    let reporter_config = mocks::reporter::Config {
5512                        participants: participants.clone().try_into().unwrap(),
5513                        scheme: schemes[idx].clone(),
5514                        elector: elector.clone(),
5515                    };
5516                    let reporter = mocks::reporter::Reporter::new(
5517                        context
5518                            .child("reporter")
5519                            .with_attribute("public_key", validator),
5520                        reporter_config,
5521                    );
5522                    honest_reporters.push(reporter.clone());
5523
5524                    let application_cfg = mocks::application::Config {
5525                        hasher: Sha256::default(),
5526                        relay: relay.clone(),
5527                        me: validator.clone(),
5528                        propose_latency: (250.0, 50.0), // ensure we process certificates first
5529                        verify_latency: (10.0, 5.0),
5530                        certify_latency: (10.0, 5.0),
5531                        should_certify: mocks::application::Certifier::Always,
5532                    };
5533                    let (actor, application) = mocks::application::Application::new(
5534                        context
5535                            .child("application")
5536                            .with_attribute("public_key", validator),
5537                        application_cfg,
5538                    );
5539                    actor.start();
5540                    let blocker = oracle.control(validator.clone());
5541                    let cfg = config::Config {
5542                        scheme: schemes[idx].clone(),
5543                        elector: elector.clone(),
5544                        blocker,
5545                        automaton: application.clone(),
5546                        relay: application.clone(),
5547                        reporter: reporter.clone(),
5548                        strategy: Sequential,
5549                        partition: validator.to_string(),
5550                        mailbox_size: NZUsize!(1024),
5551                        epoch: Epoch::new(333),
5552                        floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
5553                            Epoch::new(333),
5554                        )),
5555                        leader_timeout: Duration::from_secs(10),
5556                        certification_timeout: Duration::from_secs(10),
5557                        timeout_retry: Duration::from_secs(10),
5558                        fetch_timeout: Duration::from_secs(1),
5559                        activity_timeout,
5560                        skip_timeout,
5561                        fetch_concurrent: NZUsize!(4),
5562                        replay_buffer: NZUsize!(1024 * 1024),
5563                        write_buffer: NZUsize!(1024 * 1024),
5564                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5565                        forwarding: ForwardingPolicy::Disabled,
5566                    };
5567                    let engine = Engine::new(
5568                        context
5569                            .child("engine")
5570                            .with_attribute("public_key", validator),
5571                        cfg,
5572                    );
5573                    engine.start(pending, recovered, resolver);
5574                }
5575            }
5576
5577            // Allow started engines to consume preloaded certificates.
5578            context.sleep(Duration::from_secs(2)).await;
5579
5580            // ========== Assert the exact certificates are seen in each view ==========
5581
5582            // Assert the exact certificates in view F
5583            // All participants should have finalized B_0
5584            let view = View::new(f_view);
5585            for reporter in honest_reporters.iter() {
5586                let finalizations = reporter.finalizations.lock();
5587                assert!(finalizations.contains_key(&view));
5588            }
5589
5590            // Assert the exact certificates in view F+1
5591            // Group 1 should have notarized B_1A only
5592            // All other participants should have nullified F+1
5593            let view = View::new(f_view + 1);
5594            for (i, reporter) in honest_reporters.iter().enumerate() {
5595                let finalizations = reporter.finalizations.lock();
5596                assert!(!finalizations.contains_key(&view));
5597                let nullifications = reporter.nullifications.lock();
5598                let notarizations = reporter.notarizations.lock();
5599                match get_type(i) {
5600                    ParticipantType::Group1 => {
5601                        assert!(notarizations.contains_key(&view));
5602                        assert!(!nullifications.contains_key(&view));
5603                    }
5604                    _ => {
5605                        assert!(nullifications.contains_key(&view));
5606                        assert!(!notarizations.contains_key(&view));
5607                    }
5608                }
5609            }
5610
5611            // Assert the exact certificates in view F+2
5612            // Group 2 should have notarized B_1B only
5613            // All other participants should have nullified F+2
5614            let view = View::new(f_view + 2);
5615            for (i, reporter) in honest_reporters.iter().enumerate() {
5616                let finalizations = reporter.finalizations.lock();
5617                assert!(!finalizations.contains_key(&view));
5618                let nullifications = reporter.nullifications.lock();
5619                let notarizations = reporter.notarizations.lock();
5620                match get_type(i) {
5621                    ParticipantType::Group2 => {
5622                        assert!(notarizations.contains_key(&view));
5623                        assert!(!nullifications.contains_key(&view));
5624                    }
5625                    _ => {
5626                        assert!(nullifications.contains_key(&view));
5627                        assert!(!notarizations.contains_key(&view));
5628                    }
5629                }
5630            }
5631
5632            // Assert no members have yet nullified view F+3
5633            let next_view = View::new(f_view + 3);
5634            for (i, reporter) in honest_reporters.iter().enumerate() {
5635                let nullifies = reporter.nullifies.lock();
5636                assert!(!nullifies.contains_key(&next_view), "reporter {i}");
5637            }
5638
5639            // ========== Reconnect all participants ==========
5640
5641            // Reconnect all participants fully using the helper
5642            link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
5643
5644            // Wait until all honest reporters finalize strictly past F+2 (e.g., at least F+3)
5645            {
5646                let target = View::new(f_view + 3);
5647                let mut finalizers = Vec::new();
5648                for reporter in honest_reporters.iter_mut() {
5649                    let (mut latest, mut monitor) = reporter.subscribe().await;
5650                    finalizers.push(
5651                        context
5652                            .child("resume_finalizer")
5653                            .spawn(move |_| async move {
5654                                while latest < target {
5655                                    latest = monitor.recv().await.expect("event missing");
5656                                }
5657                            }),
5658                    );
5659                }
5660                join_all(finalizers).await;
5661            }
5662
5663            // Sanity checks: no faults/invalid signatures, and no peers blocked
5664            for reporter in honest_reporters.iter() {
5665                reporter.assert_no_faults();
5666                reporter.assert_no_invalid();
5667            }
5668            let blocked = oracle.blocked().await.unwrap();
5669            assert!(blocked.is_empty(), "blocked peers: {blocked:?}");
5670        });
5671    }
5672
5673    #[test_group("slow")]
5674    #[test_traced]
5675    fn test_split_views_no_lockup() {
5676        split_views_no_lockup::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
5677        split_views_no_lockup::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
5678        split_views_no_lockup::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
5679        split_views_no_lockup::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
5680        split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
5681        split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
5682        split_views_no_lockup::<_, _, RoundRobin>(ed25519::fixture);
5683        split_views_no_lockup::<_, _, RoundRobin>(secp256r1::fixture);
5684    }
5685
5686    fn tle<V, L>()
5687    where
5688        V: Variant,
5689        L: Elector<bls12381_threshold_vrf::Scheme<PublicKey, V>>,
5690    {
5691        // Create context
5692        let n = 4;
5693        let namespace = b"consensus".to_vec();
5694        let activity_timeout = ViewDelta::new(100);
5695        let skip_timeout = ViewDelta::new(50);
5696        let executor = deterministic::Runner::timed(Duration::from_secs(30));
5697        executor.start(|mut context| async move {
5698            // Register participants
5699            let Fixture {
5700                participants,
5701                schemes,
5702                ..
5703            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, &namespace, n);
5704            let mut oracle =
5705                start_test_network_with_peers(context.child("network"), participants.clone(), true)
5706                    .await;
5707            let mut registrations = register_validators(&mut oracle, &participants).await;
5708
5709            // Link all validators
5710            let link = Link {
5711                latency: Duration::from_millis(10),
5712                jitter: Duration::from_millis(5),
5713                success_rate: 1.0,
5714            };
5715            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5716
5717            // Create engines and reporters
5718            let elector = L::default();
5719            let relay = Arc::new(mocks::relay::Relay::new());
5720            let mut reporters = Vec::new();
5721            let mut engine_handlers = Vec::new();
5722            let monitor_reporter = Arc::new(Mutex::new(None));
5723            for (idx, validator) in participants.iter().enumerate() {
5724                // Create scheme context
5725                let context = context
5726                    .child("validator")
5727                    .with_attribute("public_key", validator);
5728
5729                // Store first reporter for monitoring
5730                let reporter_config = mocks::reporter::Config {
5731                    participants: participants.clone().try_into().unwrap(),
5732                    scheme: schemes[idx].clone(),
5733                    elector: elector.clone(),
5734                };
5735                let reporter =
5736                    mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
5737                reporters.push(reporter.clone());
5738                if idx == 0 {
5739                    *monitor_reporter.lock() = Some(reporter.clone());
5740                }
5741
5742                // Configure application
5743                let application_cfg = mocks::application::Config {
5744                    hasher: Sha256::default(),
5745                    relay: relay.clone(),
5746                    me: validator.clone(),
5747                    propose_latency: (10.0, 5.0),
5748                    verify_latency: (10.0, 5.0),
5749                    certify_latency: (10.0, 5.0),
5750                    should_certify: mocks::application::Certifier::Always,
5751                };
5752                let (actor, application) = mocks::application::Application::new(
5753                    context.child("application"),
5754                    application_cfg,
5755                );
5756                actor.start();
5757                let blocker = oracle.control(validator.clone());
5758                let cfg = config::Config {
5759                    scheme: schemes[idx].clone(),
5760                    elector: elector.clone(),
5761                    blocker,
5762                    automaton: application.clone(),
5763                    relay: application.clone(),
5764                    reporter: reporter.clone(),
5765                    strategy: Sequential,
5766                    partition: validator.to_string(),
5767                    mailbox_size: NZUsize!(1024),
5768                    epoch: Epoch::new(333),
5769                    floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
5770                        Epoch::new(333),
5771                    )),
5772                    leader_timeout: Duration::from_millis(100),
5773                    certification_timeout: Duration::from_millis(200),
5774                    timeout_retry: Duration::from_millis(500),
5775                    fetch_timeout: Duration::from_millis(100),
5776                    activity_timeout,
5777                    skip_timeout,
5778                    fetch_concurrent: NZUsize!(4),
5779                    replay_buffer: NZUsize!(1024 * 1024),
5780                    write_buffer: NZUsize!(1024 * 1024),
5781                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5782                    forwarding: ForwardingPolicy::Disabled,
5783                };
5784                let engine = Engine::new(context.child("engine"), cfg);
5785
5786                // Start engine
5787                let (pending, recovered, resolver) = registrations
5788                    .remove(validator)
5789                    .expect("validator should be registered");
5790                engine_handlers.push(engine.start(pending, recovered, resolver));
5791            }
5792
5793            // Prepare TLE test data
5794            let target = Round::new(Epoch::new(333), View::new(10)); // Encrypt for round (epoch 333, view 10)
5795            let message = b"Secret message for future view10"; // 32 bytes
5796
5797            // Encrypt message
5798            let ciphertext = schemes[0].encrypt(&mut context, target, *message);
5799
5800            // Wait for consensus to reach the target view and then decrypt
5801            let reporter = monitor_reporter.lock().clone().unwrap();
5802            loop {
5803                // Wait for notarization
5804                context.sleep(Duration::from_millis(100)).await;
5805                let notarizations = reporter.notarizations.lock();
5806                let Some(notarization) = notarizations.get(&target.view()) else {
5807                    continue;
5808                };
5809
5810                // Decrypt the message using the seed
5811                let seed = notarization.seed();
5812                let decrypted = seed
5813                    .decrypt(&ciphertext)
5814                    .expect("Decryption should succeed with valid seed signature");
5815                assert_eq!(
5816                    message,
5817                    decrypted.as_ref(),
5818                    "Decrypted message should match original message"
5819                );
5820                break;
5821            }
5822        });
5823    }
5824
5825    #[test_traced]
5826    fn test_tle() {
5827        tle::<MinPk, Random>();
5828        tle::<MinSig, Random>();
5829    }
5830
5831    fn hailstorm<S, F, L>(
5832        seed: u64,
5833        shutdowns: usize,
5834        interval: ViewDelta,
5835        mut fixture: F,
5836    ) -> String
5837    where
5838        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5839        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5840        L: Elector<S>,
5841    {
5842        // Create context
5843        let n = 5;
5844        let activity_timeout = ViewDelta::new(10);
5845        let skip_timeout = ViewDelta::new(5);
5846        let namespace = b"consensus".to_vec();
5847        let cfg = deterministic::Config::new().with_seed(seed);
5848        let executor = deterministic::Runner::new(cfg);
5849        executor.start(|mut context| async move {
5850            // Register participants
5851            let Fixture {
5852                participants,
5853                schemes,
5854                ..
5855            } = fixture(&mut context, &namespace, n);
5856            let mut oracle =
5857                start_test_network_with_peers(context.child("network"), participants.clone(), true)
5858                    .await;
5859            let mut registrations = register_validators(&mut oracle, &participants).await;
5860
5861            // Link all validators
5862            let link = Link {
5863                latency: Duration::from_millis(10),
5864                jitter: Duration::from_millis(1),
5865                success_rate: 1.0,
5866            };
5867            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5868
5869            // Create engines
5870            let elector = L::default();
5871            let relay = Arc::new(mocks::relay::Relay::new());
5872            let mut reporters = BTreeMap::new();
5873            let mut engine_handlers = BTreeMap::new();
5874            for (idx, validator) in participants.iter().enumerate() {
5875                // Create scheme context
5876                let context = context
5877                    .child("validator")
5878                    .with_attribute("public_key", validator);
5879
5880                // Configure engine
5881                let reporter_config = mocks::reporter::Config {
5882                    participants: participants.clone().try_into().unwrap(),
5883                    scheme: schemes[idx].clone(),
5884                    elector: elector.clone(),
5885                };
5886                let reporter =
5887                    mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
5888                reporters.insert(idx, reporter.clone());
5889                let application_cfg = mocks::application::Config {
5890                    hasher: Sha256::default(),
5891                    relay: relay.clone(),
5892                    me: validator.clone(),
5893                    propose_latency: (10.0, 5.0),
5894                    verify_latency: (10.0, 5.0),
5895                    certify_latency: (10.0, 5.0),
5896                    should_certify: mocks::application::Certifier::Always,
5897                };
5898                let (actor, application) = mocks::application::Application::new(
5899                    context.child("application"),
5900                    application_cfg,
5901                );
5902                actor.start();
5903                let blocker = oracle.control(validator.clone());
5904                let cfg = config::Config {
5905                    scheme: schemes[idx].clone(),
5906                    elector: elector.clone(),
5907                    blocker,
5908                    automaton: application.clone(),
5909                    relay: application.clone(),
5910                    reporter: reporter.clone(),
5911                    strategy: Sequential,
5912                    partition: validator.to_string(),
5913                    mailbox_size: NZUsize!(1024),
5914                    epoch: Epoch::new(333),
5915                    floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
5916                        Epoch::new(333),
5917                    )),
5918                    leader_timeout: Duration::from_secs(1),
5919                    certification_timeout: Duration::from_secs(2),
5920                    timeout_retry: Duration::from_secs(10),
5921                    fetch_timeout: Duration::from_secs(1),
5922                    activity_timeout,
5923                    skip_timeout,
5924                    fetch_concurrent: NZUsize!(4),
5925                    replay_buffer: NZUsize!(1024 * 1024),
5926                    write_buffer: NZUsize!(1024 * 1024),
5927                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5928                    forwarding: ForwardingPolicy::Disabled,
5929                };
5930                let engine = Engine::new(context.child("engine"), cfg);
5931
5932                // Start engine
5933                let (pending, recovered, resolver) = registrations
5934                    .remove(validator)
5935                    .expect("validator should be registered");
5936                engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
5937            }
5938
5939            // Run shutdowns
5940            let mut target = View::zero();
5941            for i in 0..shutdowns {
5942                // Update target
5943                target = target.saturating_add(interval);
5944
5945                // Wait for all engines to finish
5946                let mut finalizers = Vec::new();
5947                for (_, reporter) in reporters.iter_mut() {
5948                    let (mut latest, mut monitor) = reporter.subscribe().await;
5949                    finalizers.push(context.child("finalizer").spawn(move |_| async move {
5950                        while latest < target {
5951                            latest = monitor.recv().await.expect("event missing");
5952                        }
5953                    }));
5954                }
5955                join_all(finalizers).await;
5956                target = target.saturating_add(interval);
5957
5958                // Select a random engine to shutdown
5959                let idx = context.gen_range(0..engine_handlers.len());
5960                let validator = &participants[idx];
5961                let handle = engine_handlers.remove(&idx).unwrap();
5962                handle.abort();
5963                let _ = handle.await;
5964                let selected_reporter = reporters.remove(&idx).unwrap();
5965                info!(idx, ?validator, "shutdown validator");
5966
5967                // Wait for all engines to finish
5968                let mut finalizers = Vec::new();
5969                for (_, reporter) in reporters.iter_mut() {
5970                    let (mut latest, mut monitor) = reporter.subscribe().await;
5971                    finalizers.push(context.child("finalizer").spawn(move |_| async move {
5972                        while latest < target {
5973                            latest = monitor.recv().await.expect("event missing");
5974                        }
5975                    }));
5976                }
5977                join_all(finalizers).await;
5978                target = target.saturating_add(interval);
5979
5980                // Recreate engine
5981                info!(idx, ?validator, "restarting validator");
5982                let context = context
5983                    .child("validator_restarted")
5984                    .with_attribute("public_key", validator)
5985                    .with_attribute("restart", i);
5986
5987                // Start engine
5988                let (pending, recovered, resolver) =
5989                    register_validator(&mut oracle, validator.clone()).await;
5990                let application_cfg = mocks::application::Config {
5991                    hasher: Sha256::default(),
5992                    relay: relay.clone(),
5993                    me: validator.clone(),
5994                    propose_latency: (10.0, 5.0),
5995                    verify_latency: (10.0, 5.0),
5996                    certify_latency: (10.0, 5.0),
5997                    should_certify: mocks::application::Certifier::Always,
5998                };
5999                let (actor, application) = mocks::application::Application::new(
6000                    context.child("application"),
6001                    application_cfg,
6002                );
6003                actor.start();
6004                reporters.insert(idx, selected_reporter.clone());
6005                let blocker = oracle.control(validator.clone());
6006                let cfg = config::Config {
6007                    scheme: schemes[idx].clone(),
6008                    elector: elector.clone(),
6009                    blocker,
6010                    automaton: application.clone(),
6011                    relay: application.clone(),
6012                    reporter: selected_reporter,
6013                    strategy: Sequential,
6014                    partition: validator.to_string(),
6015                    mailbox_size: NZUsize!(1024),
6016                    epoch: Epoch::new(333),
6017                    floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
6018                        Epoch::new(333),
6019                    )),
6020                    leader_timeout: Duration::from_secs(1),
6021                    certification_timeout: Duration::from_secs(2),
6022                    timeout_retry: Duration::from_secs(10),
6023                    fetch_timeout: Duration::from_secs(1),
6024                    activity_timeout,
6025                    skip_timeout,
6026                    fetch_concurrent: NZUsize!(4),
6027                    replay_buffer: NZUsize!(1024 * 1024),
6028                    write_buffer: NZUsize!(1024 * 1024),
6029                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
6030                    forwarding: ForwardingPolicy::Disabled,
6031                };
6032                let engine = Engine::new(context.child("engine"), cfg);
6033                engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
6034
6035                // Wait for all engines to hit required containers
6036                let mut finalizers = Vec::new();
6037                for (_, reporter) in reporters.iter_mut() {
6038                    let (mut latest, mut monitor) = reporter.subscribe().await;
6039                    finalizers.push(context.child("finalizer").spawn(move |_| async move {
6040                        while latest < target {
6041                            latest = monitor.recv().await.expect("event missing");
6042                        }
6043                    }));
6044                }
6045                join_all(finalizers).await;
6046                info!(idx, ?validator, "validator recovered");
6047            }
6048
6049            // Check reporters for correct activity
6050            let latest_complete = target.saturating_sub(activity_timeout);
6051            for (_, reporter) in reporters.iter() {
6052                // Ensure no faults
6053                reporter.assert_no_faults();
6054
6055                // Ensure no invalid signatures
6056                reporter.assert_no_invalid();
6057
6058                // Ensure no forks
6059                let mut notarized = HashMap::new();
6060                let mut finalized = HashMap::new();
6061                {
6062                    let notarizes = reporter.notarizes.lock();
6063                    for view in View::range(View::new(1), latest_complete) {
6064                        // Ensure only one payload proposed per view
6065                        let Some(payloads) = notarizes.get(&view) else {
6066                            continue;
6067                        };
6068                        if payloads.len() > 1 {
6069                            panic!("view: {view}");
6070                        }
6071                        let (digest, _) = payloads.iter().next().unwrap();
6072                        notarized.insert(view, *digest);
6073                    }
6074                }
6075                {
6076                    let notarizations = reporter.notarizations.lock();
6077                    for view in View::range(View::new(1), latest_complete) {
6078                        // Ensure notarization matches digest from notarizes
6079                        let Some(notarization) = notarizations.get(&view) else {
6080                            continue;
6081                        };
6082                        let Some(digest) = notarized.get(&view) else {
6083                            continue;
6084                        };
6085                        assert_eq!(&notarization.proposal.payload, digest);
6086                    }
6087                }
6088                {
6089                    let finalizes = reporter.finalizes.lock();
6090                    for view in View::range(View::new(1), latest_complete) {
6091                        // Ensure only one payload proposed per view
6092                        let Some(payloads) = finalizes.get(&view) else {
6093                            continue;
6094                        };
6095                        if payloads.len() > 1 {
6096                            panic!("view: {view}");
6097                        }
6098                        let (digest, _) = payloads.iter().next().unwrap();
6099                        finalized.insert(view, *digest);
6100
6101                        // Only check at views below timeout
6102                        if view > latest_complete {
6103                            continue;
6104                        }
6105
6106                        // Ensure no nullifies for any finalizers
6107                        let nullifies = reporter.nullifies.lock();
6108                        let Some(nullifies) = nullifies.get(&view) else {
6109                            continue;
6110                        };
6111                        for (_, finalizers) in payloads.iter() {
6112                            for finalizer in finalizers.iter() {
6113                                if nullifies.contains(finalizer) {
6114                                    panic!("should not nullify and finalize at same view");
6115                                }
6116                            }
6117                        }
6118                    }
6119                }
6120                {
6121                    let finalizations = reporter.finalizations.lock();
6122                    for view in View::range(View::new(1), latest_complete) {
6123                        // Ensure finalization matches digest from finalizes
6124                        let Some(finalization) = finalizations.get(&view) else {
6125                            continue;
6126                        };
6127                        let Some(digest) = finalized.get(&view) else {
6128                            continue;
6129                        };
6130                        assert_eq!(&finalization.proposal.payload, digest);
6131                    }
6132                }
6133            }
6134
6135            // Ensure no blocked connections
6136            let blocked = oracle.blocked().await.unwrap();
6137            assert!(blocked.is_empty());
6138
6139            // Return state for audit
6140            context.auditor().state()
6141        })
6142    }
6143
6144    #[test_group("slow")]
6145    #[test_traced]
6146    fn test_hailstorm_bls12381_threshold_vrf_min_pk() {
6147        assert_eq!(
6148            hailstorm::<_, _, Random>(
6149                0,
6150                10,
6151                ViewDelta::new(15),
6152                bls12381_threshold_vrf::fixture::<MinPk, _>
6153            ),
6154            hailstorm::<_, _, Random>(
6155                0,
6156                10,
6157                ViewDelta::new(15),
6158                bls12381_threshold_vrf::fixture::<MinPk, _>
6159            ),
6160        );
6161    }
6162
6163    #[test_group("slow")]
6164    #[test_traced]
6165    fn test_hailstorm_bls12381_threshold_vrf_min_sig() {
6166        assert_eq!(
6167            hailstorm::<_, _, Random>(
6168                0,
6169                10,
6170                ViewDelta::new(15),
6171                bls12381_threshold_vrf::fixture::<MinSig, _>
6172            ),
6173            hailstorm::<_, _, Random>(
6174                0,
6175                10,
6176                ViewDelta::new(15),
6177                bls12381_threshold_vrf::fixture::<MinSig, _>
6178            ),
6179        );
6180    }
6181
6182    #[test_group("slow")]
6183    #[test_traced]
6184    fn test_hailstorm_bls12381_threshold_std_min_pk() {
6185        assert_eq!(
6186            hailstorm::<_, _, RoundRobin>(
6187                0,
6188                10,
6189                ViewDelta::new(15),
6190                bls12381_threshold_std::fixture::<MinPk, _>
6191            ),
6192            hailstorm::<_, _, RoundRobin>(
6193                0,
6194                10,
6195                ViewDelta::new(15),
6196                bls12381_threshold_std::fixture::<MinPk, _>
6197            ),
6198        );
6199    }
6200
6201    #[test_group("slow")]
6202    #[test_traced]
6203    fn test_hailstorm_bls12381_threshold_std_min_sig() {
6204        assert_eq!(
6205            hailstorm::<_, _, RoundRobin>(
6206                0,
6207                10,
6208                ViewDelta::new(15),
6209                bls12381_threshold_std::fixture::<MinSig, _>
6210            ),
6211            hailstorm::<_, _, RoundRobin>(
6212                0,
6213                10,
6214                ViewDelta::new(15),
6215                bls12381_threshold_std::fixture::<MinSig, _>
6216            ),
6217        );
6218    }
6219
6220    #[test_group("slow")]
6221    #[test_traced]
6222    fn test_hailstorm_bls12381_multisig_min_pk() {
6223        assert_eq!(
6224            hailstorm::<_, _, RoundRobin>(
6225                0,
6226                10,
6227                ViewDelta::new(15),
6228                bls12381_multisig::fixture::<MinPk, _>
6229            ),
6230            hailstorm::<_, _, RoundRobin>(
6231                0,
6232                10,
6233                ViewDelta::new(15),
6234                bls12381_multisig::fixture::<MinPk, _>
6235            ),
6236        );
6237    }
6238
6239    #[test_group("slow")]
6240    #[test_traced]
6241    fn test_hailstorm_bls12381_multisig_min_sig() {
6242        assert_eq!(
6243            hailstorm::<_, _, RoundRobin>(
6244                0,
6245                10,
6246                ViewDelta::new(15),
6247                bls12381_multisig::fixture::<MinSig, _>
6248            ),
6249            hailstorm::<_, _, RoundRobin>(
6250                0,
6251                10,
6252                ViewDelta::new(15),
6253                bls12381_multisig::fixture::<MinSig, _>
6254            ),
6255        );
6256    }
6257
6258    #[test_group("slow")]
6259    #[test_traced]
6260    fn test_hailstorm_ed25519() {
6261        assert_eq!(
6262            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture),
6263            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture)
6264        );
6265    }
6266
6267    #[test_group("slow")]
6268    #[test_traced]
6269    fn test_hailstorm_secp256r1() {
6270        assert_eq!(
6271            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture),
6272            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture)
6273        );
6274    }
6275
6276    /// Configuration for a Twins testing campaign.
6277    ///
6278    /// A campaign generates adversarial primary/secondary recipient-set
6279    /// scenarios, splits Byzantine participants into twin halves, and verifies
6280    /// that honest nodes still finalize blocks after the adversarial prefix
6281    /// ends.
6282    ///
6283    /// # Fields
6284    ///
6285    /// - `n`: Total participants. The number of faults is derived as
6286    ///   `N3f1::max_faults(n)`. Larger `n` increases the per-scenario
6287    ///   compromised-set space but also makes each case slower to execute.
6288    ///
6289    /// - `rounds`: Number of adversarial rounds that form the attack prefix.
6290    ///   Each round independently places participants relative to the primary
6291    ///   and secondary recipient sets: outside both, both-halves,
6292    ///   primary-only, or secondary-only. The two recipient sets may overlap;
6293    ///   a participant in `both-halves` is visible to both twins in that view.
6294    ///   After these rounds, the network becomes fully synchronous. More
6295    ///   rounds exponentially increase the canonical scenario space.
6296    ///
6297    /// - `mode`: How multi-round scenarios are constructed. `Sampled` picks
6298    ///   independent recipient sets per round; `Sustained` repeats a single
6299    ///   recipient-set pattern across all rounds (modeling a persistent
6300    ///   adversarial split).
6301    ///
6302    /// - `max_cases`: Upper bound on the total emitted cases. Each case is a
6303    ///   (scenario, compromised-assignment) pair. Also caps scenario
6304    ///   enumeration (sampling uniformly when the space is larger). Cases
6305    ///   are shuffled and truncated to this budget.
6306    ///
6307    /// - `trailing_finalizations`: Number of finalizations each honest node
6308    ///   must produce *after* the adversarial prefix before the case is
6309    ///   considered successful. This is the liveness assertion -- it ensures
6310    ///   the protocol actually commits blocks under synchrony, not just
6311    ///   reaches a high view via nullifications.
6312    #[derive(Clone, Copy, Debug)]
6313    struct TwinsCampaign {
6314        n: u32,
6315        rounds: usize,
6316        mode: twins::Mode,
6317        max_cases: usize,
6318        trailing_finalizations: usize,
6319    }
6320
6321    fn twins_campaign<S, F, L>(
6322        rng: &mut StdRng,
6323        campaign: TwinsCampaign,
6324        link: Link,
6325        mut fixture: F,
6326    ) where
6327        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
6328        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
6329        L: Elector<S>,
6330    {
6331        let n = campaign.n;
6332        let faults = N3f1::max_faults(n) as usize;
6333        let cases = twins::cases(
6334            rng,
6335            twins::Framework {
6336                participants: n as usize,
6337                faults,
6338                rounds: campaign.rounds,
6339                mode: campaign.mode,
6340                max_cases: campaign.max_cases,
6341            },
6342        );
6343        assert!(
6344            !cases.is_empty(),
6345            "twins campaign should generate at least one case"
6346        );
6347        for case in cases {
6348            let scenario = case.scenario.clone();
6349            let twin_indices = case.compromised.clone();
6350            assert_eq!(
6351                twin_indices.len(),
6352                faults,
6353                "unexpected twins count for n={n} (expected f={faults})",
6354            );
6355
6356            let activity_timeout = ViewDelta::new(10);
6357            let skip_timeout = ViewDelta::new(5);
6358            let namespace = b"consensus".to_vec();
6359            let link = link.clone();
6360            let trailing_finalizations = campaign.trailing_finalizations;
6361            let mut case_fixture =
6362                |ctx: &mut deterministic::Context, ns: &[u8], n: u32| fixture(ctx, ns, n);
6363            let cfg = deterministic::Config::new()
6364                .with_rng(Box::new(StdRng::from_rng(&mut *rng).unwrap()));
6365            let executor = deterministic::Runner::new(cfg);
6366            executor.start(|mut context| async move {
6367                let Fixture {
6368                    participants,
6369                    schemes,
6370                    ..
6371                } = case_fixture(&mut context, &namespace, n);
6372                let participants: Arc<[_]> = participants.into();
6373                let mut oracle = start_test_network_with_peers(
6374                    context.child("network"),
6375                    participants.iter().cloned(),
6376                    false,
6377                )
6378                .await;
6379                let mut registrations = register_validators(&mut oracle, &participants).await;
6380                link_validators(&mut oracle, &participants, Action::Link(link), None).await;
6381
6382                let elector = TwinsElector::new(L::default(), &scenario, n as usize);
6383                let relay = Arc::new(mocks::relay::Relay::new());
6384                let mut reporters = Vec::new();
6385                let mut engine_handlers = Vec::new();
6386                let twin_index_set: HashSet<usize> = twin_indices.iter().copied().collect();
6387
6388                // Create twin engines (f Byzantine twins).
6389                for idx in twin_indices.iter().copied() {
6390                    let validator = &participants[idx];
6391                    let (
6392                        (vote_sender, vote_receiver),
6393                        (certificate_sender, certificate_receiver),
6394                        (_resolver_sender, _resolver_receiver),
6395                    ) = registrations
6396                        .remove(validator)
6397                        .expect("validator should be registered");
6398
6399                    let make_vote_forwarder = || {
6400                        let participants = participants.clone();
6401                        let scenario = scenario.clone();
6402                        move |origin: SplitOrigin, _: &Recipients<_>, message: &IoBuf| {
6403                            let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
6404                            let (primary, secondary) =
6405                                scenario.partitions(msg.view(), participants.as_ref());
6406                            match origin {
6407                                SplitOrigin::Primary => Some(Recipients::Some(primary)),
6408                                SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
6409                            }
6410                        }
6411                    };
6412                    let make_certificate_forwarder = || {
6413                        let codec = schemes[idx].certificate_codec_config();
6414                        let participants = participants.clone();
6415                        let scenario = scenario.clone();
6416                        move |origin: SplitOrigin, _: &Recipients<_>, message: &IoBuf| {
6417                            let msg: Certificate<S, D> =
6418                                Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
6419                            let (primary, secondary) =
6420                                scenario.partitions(msg.view(), participants.as_ref());
6421                            match origin {
6422                                SplitOrigin::Primary => Some(Recipients::Some(primary)),
6423                                SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
6424                            }
6425                        }
6426                    };
6427                    let make_vote_router = || {
6428                        let participants = participants.clone();
6429                        let scenario = scenario.clone();
6430                        move |(sender, message): &(_, IoBuf)| {
6431                            let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
6432                            scenario.route(msg.view(), sender, participants.as_ref())
6433                        }
6434                    };
6435                    let make_certificate_router = || {
6436                        let codec = schemes[idx].certificate_codec_config();
6437                        let participants = participants.clone();
6438                        let scenario = scenario.clone();
6439                        move |(sender, message): &(_, IoBuf)| {
6440                            let msg: Certificate<S, D> =
6441                                Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
6442                            scenario.route(msg.view(), sender, participants.as_ref())
6443                        }
6444                    };
6445                    let (vote_sender_primary, vote_sender_secondary) =
6446                        vote_sender.split_with(make_vote_forwarder());
6447                    let (vote_receiver_primary, vote_receiver_secondary) = vote_receiver
6448                        .split_with(
6449                            context.child("pending_split").with_attribute("index", idx),
6450                            make_vote_router(),
6451                        );
6452                    let (certificate_sender_primary, certificate_sender_secondary) =
6453                        certificate_sender.split_with(make_certificate_forwarder());
6454                    let (certificate_receiver_primary, certificate_receiver_secondary) =
6455                        certificate_receiver.split_with(
6456                            context
6457                                .child("recovered_split")
6458                                .with_attribute("index", idx),
6459                            make_certificate_router(),
6460                        );
6461
6462                    for (twin_label, pending, recovered) in [
6463                        (
6464                            "primary",
6465                            (vote_sender_primary, vote_receiver_primary),
6466                            (certificate_sender_primary, certificate_receiver_primary),
6467                        ),
6468                        (
6469                            "secondary",
6470                            (vote_sender_secondary, vote_receiver_secondary),
6471                            (certificate_sender_secondary, certificate_receiver_secondary),
6472                        ),
6473                    ] {
6474                        let partition = format!("twin_{idx}_{twin_label}");
6475                        let context = context
6476                            .child("twin")
6477                            .with_attribute("index", idx)
6478                            .with_attribute("side", twin_label);
6479
6480                        let reporter_config = mocks::reporter::Config {
6481                            participants: participants.as_ref().try_into().unwrap(),
6482                            scheme: schemes[idx].clone(),
6483                            elector: elector.clone(),
6484                        };
6485                        let reporter = mocks::reporter::Reporter::new(
6486                            context.child("reporter"),
6487                            reporter_config,
6488                        );
6489                        reporters.push(reporter.clone());
6490
6491                        let application_cfg = mocks::application::Config {
6492                            hasher: Sha256::default(),
6493                            relay: relay.clone(),
6494                            me: validator.clone(),
6495                            propose_latency: (10.0, 5.0),
6496                            verify_latency: (10.0, 5.0),
6497                            certify_latency: (10.0, 5.0),
6498                            should_certify: mocks::application::Certifier::Always,
6499                        };
6500                        let (actor, application) = mocks::application::Application::new(
6501                            context.child("application"),
6502                            application_cfg,
6503                        );
6504                        actor.start();
6505
6506                        let blocker = oracle.control(validator.clone());
6507                        let cfg = config::Config {
6508                            scheme: schemes[idx].clone(),
6509                            elector: elector.clone(),
6510                            blocker,
6511                            automaton: application.clone(),
6512                            relay: application.clone(),
6513                            reporter: reporter.clone(),
6514                            strategy: Sequential,
6515                            partition,
6516                            mailbox_size: NZUsize!(1024),
6517                            epoch: Epoch::new(333),
6518                            floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
6519                                Epoch::new(333),
6520                            )),
6521                            leader_timeout: Duration::from_secs(1),
6522                            certification_timeout: Duration::from_millis(1_500),
6523                            timeout_retry: Duration::from_secs(10),
6524                            fetch_timeout: Duration::from_secs(1),
6525                            activity_timeout,
6526                            skip_timeout,
6527                            fetch_concurrent: NZUsize!(4),
6528                            replay_buffer: NZUsize!(1024 * 1024),
6529                            write_buffer: NZUsize!(1024 * 1024),
6530                            page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
6531                            forwarding: ForwardingPolicy::Disabled,
6532                        };
6533                        let engine = Engine::new(context.child("engine"), cfg);
6534                        engine_handlers.push(engine.start(
6535                            pending,
6536                            recovered,
6537                            inert_channel(participants.as_ref()),
6538                        ));
6539                    }
6540                }
6541
6542                // Create honest engines.
6543                let honest_start = reporters.len();
6544                for (idx, validator) in participants.iter().enumerate() {
6545                    if twin_index_set.contains(&idx) {
6546                        continue;
6547                    }
6548
6549                    let partition = format!("honest_{idx}");
6550                    let context = context.child("honest").with_attribute("index", idx);
6551
6552                    let reporter_config = mocks::reporter::Config {
6553                        participants: participants.as_ref().try_into().unwrap(),
6554                        scheme: schemes[idx].clone(),
6555                        elector: elector.clone(),
6556                    };
6557                    let reporter =
6558                        mocks::reporter::Reporter::new(context.child("reporter"), reporter_config);
6559                    reporters.push(reporter.clone());
6560
6561                    let application_cfg = mocks::application::Config {
6562                        hasher: Sha256::default(),
6563                        relay: relay.clone(),
6564                        me: validator.clone(),
6565                        propose_latency: (10.0, 5.0),
6566                        verify_latency: (10.0, 5.0),
6567                        certify_latency: (10.0, 5.0),
6568                        should_certify: mocks::application::Certifier::Always,
6569                    };
6570                    let (actor, application) = mocks::application::Application::new(
6571                        context.child("application"),
6572                        application_cfg,
6573                    );
6574                    actor.start();
6575
6576                    let blocker = oracle.control(validator.clone());
6577                    let cfg = config::Config {
6578                        scheme: schemes[idx].clone(),
6579                        elector: elector.clone(),
6580                        blocker,
6581                        automaton: application.clone(),
6582                        relay: application.clone(),
6583                        reporter: reporter.clone(),
6584                        strategy: Sequential,
6585                        partition,
6586                        mailbox_size: NZUsize!(1024),
6587                        epoch: Epoch::new(333),
6588                        floor: config::Floor::Genesis(mocks::application::genesis::<Sha256>(
6589                            Epoch::new(333),
6590                        )),
6591                        leader_timeout: Duration::from_secs(1),
6592                        certification_timeout: Duration::from_millis(1_500),
6593                        timeout_retry: Duration::from_secs(10),
6594                        fetch_timeout: Duration::from_secs(1),
6595                        activity_timeout,
6596                        skip_timeout,
6597                        fetch_concurrent: NZUsize!(4),
6598                        replay_buffer: NZUsize!(1024 * 1024),
6599                        write_buffer: NZUsize!(1024 * 1024),
6600                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
6601                        forwarding: ForwardingPolicy::Disabled,
6602                    };
6603                    let engine = Engine::new(context.child("engine"), cfg);
6604
6605                    let (
6606                        (pending_sender, pending_receiver),
6607                        (recovered_sender, recovered_receiver),
6608                        _,
6609                    ) = registrations
6610                        .remove(validator)
6611                        .expect("validator should be registered");
6612                    engine_handlers.push(engine.start(
6613                        (pending_sender, pending_receiver),
6614                        (recovered_sender, recovered_receiver),
6615                        inert_channel(participants.as_ref()),
6616                    ));
6617                }
6618
6619                // Wait for progress (liveness check) across honest replicas only.
6620                //
6621                // Only count finalizations after the adversarial prefix so we
6622                // verify the protocol actually recovers and makes progress under
6623                // synchrony. Finalizations during the prefix may be artifacts of
6624                // the attack setup and do not demonstrate liveness.
6625                //
6626                // Twin halves are Byzantine test machinery and are not required to
6627                // make progress for the campaign to establish honest-node liveness.
6628                let prefix_end = View::new(scenario.rounds().len() as u64);
6629                let mut finalizers = Vec::new();
6630                for (i, reporter) in reporters.iter_mut().skip(honest_start).enumerate() {
6631                    let (_latest, mut monitor) = reporter.subscribe().await;
6632                    let required = trailing_finalizations;
6633                    finalizers.push(context.child("finalizer").with_attribute("index", i).spawn(
6634                        move |_| async move {
6635                            let mut count = 0usize;
6636                            while count < required {
6637                                let view = monitor.recv().await.expect("event missing");
6638                                if view > prefix_end {
6639                                    count += 1;
6640                                }
6641                            }
6642                        },
6643                    ));
6644                }
6645                join_all(finalizers).await;
6646
6647                // Verify safety: no conflicting finalizations across honest reporters.
6648                let mut finalized_at_view: BTreeMap<View, D> = BTreeMap::new();
6649                for reporter in reporters.iter().skip(honest_start) {
6650                    let finalizations = reporter.finalizations.lock();
6651                    for (view, finalization) in finalizations.iter() {
6652                        let digest = finalization.proposal.payload;
6653                        if let Some(existing) = finalized_at_view.get(view) {
6654                            assert_eq!(
6655                                existing, &digest,
6656                                "safety violation: conflicting finalizations at view {view}"
6657                            );
6658                        } else {
6659                            finalized_at_view.insert(*view, digest);
6660                        }
6661                    }
6662                }
6663
6664                // Verify no invalid signatures were observed by honest replicas.
6665                for reporter in reporters.iter().skip(honest_start) {
6666                    reporter.assert_no_invalid();
6667                }
6668
6669                // Ensure no honest signer appears under multiple payloads for the same view.
6670                let twin_identities: HashSet<_> = twin_indices
6671                    .iter()
6672                    .map(|idx| participants[*idx].clone())
6673                    .collect();
6674                let mut notarized_by_honest_signer: BTreeMap<View, HashMap<PublicKey, D>> =
6675                    BTreeMap::new();
6676                let mut finalized_by_honest_signer: BTreeMap<View, HashMap<PublicKey, D>> =
6677                    BTreeMap::new();
6678                for reporter in reporters.iter().skip(honest_start) {
6679                    let notarizes = reporter.notarizes.lock();
6680                    for (view, payloads) in notarizes.iter() {
6681                        let signers = notarized_by_honest_signer.entry(*view).or_default();
6682                        for (digest, payload_signers) in payloads.iter() {
6683                            for signer in payload_signers.iter() {
6684                                if twin_identities.contains(signer) {
6685                                    continue;
6686                                }
6687                                if let Some(existing) = signers.insert(signer.clone(), *digest) {
6688                                    assert_eq!(
6689                                    existing, *digest,
6690                                    "honest signer produced conflicting notarizes at view {view}"
6691                                );
6692                                }
6693                            }
6694                        }
6695                    }
6696
6697                    let finalizes = reporter.finalizes.lock();
6698                    for (view, payloads) in finalizes.iter() {
6699                        let signers = finalized_by_honest_signer.entry(*view).or_default();
6700                        for (digest, payload_signers) in payloads.iter() {
6701                            for signer in payload_signers.iter() {
6702                                if twin_identities.contains(signer) {
6703                                    continue;
6704                                }
6705                                if let Some(existing) = signers.insert(signer.clone(), *digest) {
6706                                    assert_eq!(
6707                                    existing, *digest,
6708                                    "honest signer produced conflicting finalizes at view {view}"
6709                                );
6710                                }
6711                            }
6712                        }
6713                    }
6714                }
6715
6716                // Ensure faults are attributable to twins.
6717                for reporter in reporters.iter().skip(honest_start) {
6718                    let faults = reporter.faults.lock();
6719                    for (faulter, _) in faults.iter() {
6720                        assert!(
6721                            twin_identities.contains(faulter),
6722                            "fault from non-twin participant"
6723                        );
6724                    }
6725                }
6726
6727                let blocked = oracle.blocked().await.unwrap();
6728                for (_, faulter) in blocked {
6729                    assert!(
6730                        twin_identities.contains(&faulter),
6731                        "blocked peer attributed to non-twin participant"
6732                    );
6733                }
6734            });
6735        }
6736    }
6737
6738    const TWINS_CAMPAIGN: TwinsCampaign = TwinsCampaign {
6739        n: 5,
6740        rounds: 3,
6741        mode: twins::Mode::Sampled,
6742        max_cases: 20,
6743        trailing_finalizations: 10,
6744    };
6745
6746    const TWINS_LINK: Link = Link {
6747        latency: Duration::from_millis(500),
6748        jitter: Duration::from_millis(500),
6749        success_rate: 1.0,
6750    };
6751
6752    #[test_group("slow")]
6753    #[test_traced("INFO")]
6754    fn test_twins_sampled() {
6755        for link in [
6756            Link {
6757                latency: Duration::from_millis(10),
6758                jitter: Duration::from_millis(10),
6759                success_rate: 1.0,
6760            },
6761            TWINS_LINK,
6762        ] {
6763            twins_campaign::<_, _, RoundRobin>(
6764                &mut test_rng(),
6765                TWINS_CAMPAIGN,
6766                link,
6767                scheme_mocks::fixture,
6768            );
6769        }
6770    }
6771
6772    #[test_group("slow")]
6773    #[test_traced("INFO")]
6774    fn test_twins_sustained() {
6775        let campaign = TwinsCampaign {
6776            mode: twins::Mode::Sustained,
6777            ..TWINS_CAMPAIGN
6778        };
6779        for link in [
6780            Link {
6781                latency: Duration::from_millis(10),
6782                jitter: Duration::from_millis(10),
6783                success_rate: 1.0,
6784            },
6785            TWINS_LINK,
6786        ] {
6787            twins_campaign::<_, _, RoundRobin>(
6788                &mut test_rng(),
6789                campaign,
6790                link,
6791                scheme_mocks::fixture,
6792            );
6793        }
6794    }
6795
6796    #[test_group("slow")]
6797    #[test_traced("INFO")]
6798    fn test_twins_large_sampled() {
6799        let campaign = TwinsCampaign {
6800            n: 10,
6801            rounds: 5,
6802            ..TWINS_CAMPAIGN
6803        };
6804        twins_campaign::<_, _, RoundRobin>(
6805            &mut test_rng(),
6806            campaign,
6807            TWINS_LINK,
6808            scheme_mocks::fixture,
6809        );
6810    }
6811
6812    #[test_group("slow")]
6813    #[test_traced("INFO")]
6814    fn test_twins_large_sustained() {
6815        let campaign = TwinsCampaign {
6816            n: 10,
6817            rounds: 5,
6818            mode: twins::Mode::Sustained,
6819            ..TWINS_CAMPAIGN
6820        };
6821        twins_campaign::<_, _, RoundRobin>(
6822            &mut test_rng(),
6823            campaign,
6824            TWINS_LINK,
6825            scheme_mocks::fixture,
6826        );
6827    }
6828
6829    #[test_group("slow")]
6830    #[test_traced("INFO")]
6831    fn test_twins_multisig_min_pk() {
6832        twins_campaign::<_, _, RoundRobin>(
6833            &mut test_rng(),
6834            TWINS_CAMPAIGN,
6835            TWINS_LINK,
6836            bls12381_multisig::fixture::<MinPk, _>,
6837        );
6838    }
6839
6840    #[test_group("slow")]
6841    #[test_traced("INFO")]
6842    fn test_twins_multisig_min_sig() {
6843        twins_campaign::<_, _, RoundRobin>(
6844            &mut test_rng(),
6845            TWINS_CAMPAIGN,
6846            TWINS_LINK,
6847            bls12381_multisig::fixture::<MinSig, _>,
6848        );
6849    }
6850
6851    #[test_group("slow")]
6852    #[test_traced("INFO")]
6853    fn test_twins_threshold_vrf_min_pk() {
6854        twins_campaign::<_, _, Random>(
6855            &mut test_rng(),
6856            TWINS_CAMPAIGN,
6857            TWINS_LINK,
6858            bls12381_threshold_vrf::fixture::<MinPk, _>,
6859        );
6860    }
6861
6862    #[test_group("slow")]
6863    #[test_traced("INFO")]
6864    fn test_twins_threshold_vrf_min_sig() {
6865        twins_campaign::<_, _, Random>(
6866            &mut test_rng(),
6867            TWINS_CAMPAIGN,
6868            TWINS_LINK,
6869            bls12381_threshold_vrf::fixture::<MinSig, _>,
6870        );
6871    }
6872
6873    #[test_group("slow")]
6874    #[test_traced("INFO")]
6875    fn test_twins_threshold_std_min_pk() {
6876        twins_campaign::<_, _, RoundRobin>(
6877            &mut test_rng(),
6878            TWINS_CAMPAIGN,
6879            TWINS_LINK,
6880            bls12381_threshold_std::fixture::<MinPk, _>,
6881        );
6882    }
6883
6884    #[test_group("slow")]
6885    #[test_traced("INFO")]
6886    fn test_twins_threshold_std_min_sig() {
6887        twins_campaign::<_, _, RoundRobin>(
6888            &mut test_rng(),
6889            TWINS_CAMPAIGN,
6890            TWINS_LINK,
6891            bls12381_threshold_std::fixture::<MinSig, _>,
6892        );
6893    }
6894
6895    #[test_group("slow")]
6896    #[test_traced("INFO")]
6897    fn test_twins_ed25519() {
6898        twins_campaign::<_, _, RoundRobin>(
6899            &mut test_rng(),
6900            TWINS_CAMPAIGN,
6901            TWINS_LINK,
6902            ed25519::fixture,
6903        );
6904    }
6905
6906    #[test_group("slow")]
6907    #[test_traced("INFO")]
6908    fn test_twins_secp256r1() {
6909        twins_campaign::<_, _, RoundRobin>(
6910            &mut test_rng(),
6911            TWINS_CAMPAIGN,
6912            TWINS_LINK,
6913            secp256r1::fixture,
6914        );
6915    }
6916}