Skip to main content

commonware_consensus/simplex/
mod.rs

1//! Simple and fast BFT agreement inspired by Simplex Consensus.
2//!
3//! Inspired by [Simplex Consensus](https://eprint.iacr.org/2023/463), `simplex` provides simple and fast BFT
4//! agreement with network-speed view (i.e. block time) latency and optimal finalization latency in a
5//! partially synchronous setting.
6//!
7//! # Features
8//!
9//! * Wicked Fast Block Times (2 Network Hops)
10//! * Optimal Finalization Latency (3 Network Hops)
11//! * Externalized Uptime and Fault Proofs
12//! * Require Certification Before Finalization
13//! * Decoupled Block Broadcast and Sync
14//! * Lazy Message Verification
15//! * Application-Defined Block Format
16//! * Pluggable Hashing and Cryptography
17//! * Embedded VRF (via [scheme::bls12381_threshold::vrf])
18//!
19//! # Design
20//!
21//! ## Protocol Description
22//!
23//! ### Genesis
24//!
25//! Genesis (view 0) is implicitly finalized. There is no finalization certificate for genesis;
26//! the digest returned by [`Automaton::genesis`](crate::Automaton::genesis) serves as the initial
27//! finalized state. Voting begins at view 1, with the first proposal referencing genesis as its parent.
28//!
29//! ### Specification for View `v`
30//!
31//! Upon entering view `v`:
32//! * Determine leader `l` for view `v`
33//! * Set timer for leader proposal `t_l = 2Δ` and advance `t_a = 3Δ`
34//!     * If leader `l` has not been active in last `r` views, set `t_l` to 0.
35//! * If leader `l`, broadcast `notarize(c,v)`
36//!   * If can't propose container in view `v` because missing notarization/nullification for a
37//!     previous view `v_m`, request `v_m`
38//!
39//! Upon receiving first `notarize(c,v)` from `l`:
40//! * Cancel `t_l`
41//! * If the container's parent `c_parent` is finalized (or both notarized and certified) at `v_parent`
42//!   and we have nullifications for all views between `v` and `v_parent`, verify `c` and broadcast `notarize(c,v)`
43//!     * If verification of `c` fails, immediately broadcast `nullify(v)`
44//!
45//! Upon receiving `2f+1` `notarize(c,v)`:
46//! * Cancel `t_a`
47//! * Mark `c` as notarized
48//! * Broadcast `notarization(c,v)` (even if we have not verified `c`)
49//! * Attempt to certify `c` (see [Certification](#certification))
50//!     * On success: broadcast `finalize(c,v)` (if have not broadcast `nullify(v)`) and enter `v+1`
51//!     * On failure: broadcast `nullify(v)`
52//!
53//! Upon receiving `2f+1` `nullify(v)`:
54//! * Broadcast `nullification(v)`
55//! * Enter `v+1`
56//!
57//! Upon receiving `2f+1` `finalize(c,v)`:
58//! * Mark `c` as finalized (and recursively finalize its parents)
59//! * Broadcast `finalization(c,v)` (even if we have not verified `c`)
60//!
61//! Upon `t_l` or `t_a` firing:
62//! * Broadcast `nullify(v)`
63//! * Every `t_r` after `nullify(v)` broadcast that we are still in view `v`:
64//!    * Rebroadcast `nullify(v)` and either `notarization(v-1)` or `nullification(v-1)`
65//!
66//! _When `2f+1` votes of a given type (`notarize(c,v)`, `nullify(v)`, or `finalize(c,v)`) have been have been collected
67//! from unique participants, a certificate (`notarization(c,v)`, `nullification(v)`, or `finalization(c,v)`) can be assembled.
68//! These certificates serve as a standalone proof of consensus progress that downstream systems can ingest without executing
69//! the protocol._
70//!
71//! ### Joining Consensus
72//!
73//! As soon as `2f+1` nullifies or finalizes are observed for some view `v`, the `Voter` will
74//! enter `v+1`. Notarizations advance the view if-and-only-if the application certifies them.
75//! This means that a new participant joining consensus will immediately jump ahead on the previous
76//! view's nullification or finalization and begin participating in consensus at the current view.
77//!
78//! ### Vote Broadcast
79//!
80//! Honest participants opportunistically broadcast votes for all views they are actively tracking. This
81//! includes views that are no longer current (i.e. sending a `notarize(c,v)` for view `v-1` while in view `v`).
82//!
83//! This is particularly useful for applications that wish to reward participants for actively participating in consensus
84//! (by including their votes onchain) and want to ensure their reward mechanism doesn't penalize decentralized participants
85//! that may not be able to issue a vote within the latest view consistently in a timely manner (say that there is a quorum
86//! in one data center and one participant is in another data center).
87//!
88//! _We only parse and verify votes necessary to drive consensus forward, so applications that do not require this functionality
89//! will incur negligible overhead._
90//!
91//! ### Certification
92//!
93//! After a payload is notarized, the application can optionally delay or prevent finalization via the
94//! [`CertifiableAutomaton::certify`](crate::CertifiableAutomaton::certify) method. By default, `certify`
95//! returns `true` for all payloads, meaning finalization proceeds immediately after notarization.
96//!
97//! Customizing `certify` is useful for systems that employ erasure coding, where participants may want
98//! to wait until they have received enough shards to reconstruct and validate the full block before
99//! voting to finalize.
100//!
101//! If `certify` returns `true`, the participant broadcasts a `finalize` vote for the payload and enters the
102//! next view. If `certify` returns `false`, the participant broadcasts `nullify` for the view instead (treating
103//! it as an immediate timeout), and will refuse to build upon the proposal or notarize proposals that build upon it.
104//! Thus, a payload can only be finalized if a quorum of participants certify it.
105//!
106//! Certification of some notarization should only be abandoned once a finalization at the same or higher view is observed.
107//! Until then (say a nullification certificate for a view arrives before certification completes), the application should continue
108//! attempting to complete certification. This increases the likelihood that we can vote on the next honest proposer's block (which
109//! may build on our in-flight certification or the nullification). If we did not do this, it is possible that different parts of
110//! the network (neither with quorum) would refuse to vote on each other's blocks (halting consensus).
111//!
112//! _The decision returned by `certify` must be deterministic and consistent across all honest participants to ensure
113//! liveness._
114//!
115//! ### Deviations from Simplex Consensus
116//!
117//! * Fetch missing notarizations/nullifications as needed rather than assuming each proposal contains
118//!   a set of all notarizations/nullifications for all historical blocks.
119//! * Introduce distinct messages for `notarize` and `nullify` rather than referring to both as a `vote` for
120//!   either a "block" or a "dummy block", respectively.
121//! * Introduce a "leader timeout" to trigger early view transitions for unresponsive leaders.
122//! * Skip "leader timeout" and "certification timeout" if a designated leader hasn't participated in
123//!   some number of views (again to trigger early view transition for an unresponsive leader).
124//! * Introduce message rebroadcast to continue making progress if messages from a given view are dropped (only way
125//!   to ensure messages are reliably delivered is with a heavyweight reliable broadcast protocol).
126//! * Treat local proposal failure as immediate timeout expiry and broadcast `nullify(v)`.
127//! * Treat local verification failure as immediate timeout expiry and broadcast `nullify(v)`.
128//! * Consider the current leader's `nullify(v)` as immediate timeout expiry and broadcast `nullify(v)`.
129//! * Upon seeing `notarization(c,v)`, instead of moving to the view `v+1` immediately, request certification from
130//!   the application (see [Certification](#certification)). Only move to view `v+1` and broadcast `finalize(c,v)`
131//!   if certification succeeds, otherwise broadcast `nullify(v)` and refuse to build upon `c`.
132//!
133//! ## Protocol Properties
134//!
135//! ### Forced Inclusion (Tail-Forking Resistance)
136//!
137//! A notarized payload in view `v` must appear in the canonical chain if no nullification
138//! certificate exists for `v`. This follows directly from the protocol rules:
139//!
140//! 1. To propose in view `v+k`, the leader must reference a certified parent in some view `v_p`
141//!    and possess nullification certificates for every view between `v_p` and `v+k`.
142//! 2. A nullification certificate for view `v` requires `2f+1` `nullify(v)` votes.
143//! 3. An honest participant only broadcasts `nullify(v)` when a timeout fires (`t_l` or `t_a`)
144//!    or when certification fails.
145//!
146//! Therefore, if view `v` completes without timeout and certification succeeds, no honest
147//! participant has broadcast `nullify(v)`. With at most `f` Byzantine participants, at most `f`
148//! `nullify(v)` votes exist, which is insufficient to form a nullification certificate. Without
149//! that certificate, no future leader can skip view `v`, and the notarized payload must be
150//! included as an ancestor in all subsequent proposals.
151//!
152//! ### Optimistic Finality
153//!
154//! The forced inclusion property provides a weaker but faster form of finality: once a
155//! notarization certificate is observed for view `v` (without any timeout having fired),
156//! the notarized payload can be treated as speculatively final. No future sequence of
157//! proposals can exclude it from the canonical chain.
158//!
159//! This "speculative finality" is available after just 2 network hops (proposal + notarization),
160//! compared to the 3 hops required for full finalization (proposal + notarization + finalization).
161//! A notarized-but-not-yet-finalized payload can only be excluded in two scenarios:
162//! `f+1` or more honest participants timed out, or certification failed. Because
163//! certification is deterministic, it either fails for all honest participants or none,
164//! so a certification failure always produces a nullification. In the common case
165//! (no faults, no timeouts), exclusion cannot happen.
166//!
167//! ### Unchained Finalization
168//!
169//! Finalization does not require consecutive honest views. When a participant certifies
170//! `notarization(c,v)`, it broadcasts `finalize(c,v)` and immediately enters `v+1`,
171//! regardless of what happens in subsequent views. These `finalize(c,v)` votes accumulate
172//! independently of the current view: even if views `v+1` through `v+k` all time out
173//! (producing nullifications), the `finalize(c,v)` votes still count toward the `2f+1`
174//! threshold needed to form `finalization(c,v)`.
175//!
176//! This means a payload notarized in view `v` can be finalized while the network is
177//! in view `v+k` for any `k >= 1`. There is no requirement that a particular view
178//! after `v` succeeds or that any subsequent leader cooperates. As long as `2f+1`
179//! participants eventually certify and broadcast `finalize(c,v)`, the finalization
180//! certificate will form.
181//!
182//! ## Architecture
183//!
184//! All logic is split into four components: the `Batcher`, the `Voter`, the `Resolver`, and the `Application` (provided by the user).
185//! The `Batcher` is responsible for collecting messages from peers and lazily verifying them when a quorum is met. The `Voter`
186//! is responsible for directing participation in the current view. The `Resolver` is responsible for
187//! fetching artifacts from previous views required to verify proposed blocks in the latest view. Lastly, the `Application`
188//! is responsible for proposing new blocks and indicating whether some block is valid.
189//!
190//! To drive great performance, all interactions between `Batcher`, `Voter`, `Resolver`, and `Application` are
191//! non-blocking. This means that, for example, the `Voter` can continue processing messages while the
192//! `Application` verifies a proposed block or the `Resolver` fetches a notarization.
193//!
194//! ```txt
195//!                            +------------+          +++++++++++++++
196//!                            |            +--------->+             +
197//!                            |  Batcher   |          +    Peers    +
198//!                            |            |<---------+             +
199//!                            +-------+----+          +++++++++++++++
200//!                                |   ^
201//!                                |   |
202//!                                |   |
203//!                                |   |
204//!                                v   |
205//! +---------------+           +---------+            +++++++++++++++
206//! |               |<----------+         +----------->+             +
207//! |  Application  |           |  Voter  |            +    Peers    +
208//! |               +---------->|         |<-----------+             +
209//! +---------------+           +--+------+            +++++++++++++++
210//!                                |   ^
211//!                                |   |
212//!                                |   |
213//!                                |   |
214//!                                v   |
215//!                            +-------+----+          +++++++++++++++
216//!                            |            +--------->+             +
217//!                            |  Resolver  |          +    Peers    +
218//!                            |            |<---------+             +
219//!                            +------------+          +++++++++++++++
220//! ```
221//!
222//! ### Batched Verification
223//!
224//! Unlike other consensus constructions that verify all incoming messages received from peers, for schemes
225//! where [`Scheme::is_batchable()`](commonware_cryptography::certificate::Scheme::is_batchable) returns `true`
226//! (such as [scheme::ed25519], [scheme::bls12381_multisig] and [scheme::bls12381_threshold]), `simplex` lazily
227//! verifies messages (only when a quorum is met), enabling efficient batch verification. For schemes where
228//! `is_batchable()` returns `false` (such as [scheme::secp256r1]), signatures are verified eagerly as they
229//! arrive since there is no batching benefit.
230//!
231//! If an invalid signature is detected, the `Batcher` will perform repeated bisections over collected
232//! messages to find the offending message (and block the peer(s) that sent it via [commonware_p2p::Blocker]).
233//!
234//! _If using a p2p implementation that is not authenticated, it is not safe to employ this optimization
235//! as any attacking peer could simply reconnect from a different address. We recommend [commonware_p2p::authenticated]._
236//!
237//! ### Fetching Missing Certificates
238//!
239//! Instead of trying to fetch all possible certificates above the floor, we only attempt to fetch
240//! nullifications for all views from the floor (last certified notarization or finalization) to the current view.
241//! This technique, however, is not sufficient to guarantee progress.
242//!
243//! Consider the case where `f` honest participants have seen a finalization for a given view `v` (and nullifications only
244//! from `v` to the current view `c`) but the remaining `f+1` honest participants have not (they have exclusively seen
245//! nullifications from some view `o < v` to `c`). Neither partition of participants will vote for the other's proposals.
246//!
247//! To ensure progress is eventually made, leaders with nullified proposals directly broadcast the best finalization
248//! certificate they are aware of to ensure all honest participants eventually consider the same proposal ancestry valid.
249//!
250//! _While a more aggressive recovery mechanism could be employed, like requiring all participants to broadcast their highest
251//! finalization certificate after nullification, it would impose significant overhead under normal network
252//! conditions (whereas the approach described incurs no overhead under normal network conditions). Recall, honest participants
253//! already broadcast observed certificates to all other participants in each view (and misaligned participants should only ever
254//! be observed following severe network degradation)._
255//!
256//! ## Pluggable Hashing and Cryptography
257//!
258//! Hashing is abstracted via the [commonware_cryptography::Hasher] trait and cryptography is abstracted via
259//! the [commonware_cryptography::certificate::Scheme] trait, allowing deployments to employ approaches that best match their
260//! requirements (or to provide their own without modifying any consensus logic). The following schemes
261//! are supported out-of-the-box:
262//!
263//! ### [scheme::ed25519]
264//!
265//! [commonware_cryptography::ed25519] signatures are ["High-speed high-security signatures"](https://eprint.iacr.org/2011/368)
266//! with 32 byte public keys and 64 byte signatures. While they are well-supported by commercial HSMs and offer efficient batch
267//! verification, the signatures are not aggregatable (and certificates grow linearly with the quorum size).
268//!
269//! ### [scheme::bls12381_multisig]
270//!
271//! [commonware_cryptography::bls12381] is a ["digital signature scheme with aggregation properties"](https://www.ietf.org/archive/id/draft-irtf-cfrg-bls-signature-05.txt).
272//! Unlike [commonware_cryptography::ed25519], signatures from multiple participants (say the signers in a certificate) can be aggregated
273//! into a single signature (reducing bandwidth usage per broadcast). That being said, [commonware_cryptography::bls12381] is much slower
274//! to verify than [commonware_cryptography::ed25519] and isn't supported by most HSMs (a standardization effort expired in 2022).
275//!
276//! ### [scheme::secp256r1]
277//!
278//! [commonware_cryptography::secp256r1] signatures use the NIST P-256 elliptic curve (also known as prime256v1), which is widely
279//! supported by commercial HSMs and hardware security modules. Unlike [commonware_cryptography::ed25519], Secp256r1 does not
280//! benefit from batch verification, so signatures are verified individually. Certificates grow linearly with quorum size
281//! (similar to ed25519).
282//!
283//! ### [scheme::bls12381_threshold]
284//!
285//! [scheme::bls12381_threshold] employs threshold cryptography (BLS12-381 threshold signatures with a `2f+1` of `3f+1` quorum)
286//! to generate succinct consensus certificates (verifiable with just the static public key). This scheme requires instantiating
287//! the shared secret via [commonware_cryptography::bls12381::dkg] and resharing whenever participants change.
288//!
289//! Two (non-attributable) variants are provided:
290//!
291//! - [scheme::bls12381_threshold::standard]: Certificates contain only a vote signature.
292//!
293//! - [scheme::bls12381_threshold::vrf]: Certificates contain a vote signature and a view signature (i.e. a seed that can be used
294//!   as a VRF). This variant can be configured for random leader election (via [elector::Random]) and/or incorporate this randomness
295//!   into execution.
296//!
297//! #### Embedded VRF ([scheme::bls12381_threshold::vrf])
298//!
299//! Every `notarize(c,v)` or `nullify(v)` message includes an `attestation(v)` (a partial signature over the view `v`). After `2f+1`
300//! `notarize(c,v)` or `nullify(v)` messages are collected from unique participants, `seed(v)` can be recovered. Because `attestation(v)` is
301//! only over the view `v`, the seed derived for a given view `v` is the same regardless of whether or not a block was notarized in said
302//! view `v`.
303//!
304//! Because the value of `seed(v)` cannot be known prior to message broadcast by any participant (including the leader) in view `v`
305//! and cannot be manipulated by any participant (deterministic for any `2f+1` signers at a given view `v`), it can be used both as a beacon
306//! for leader election (where `seed(v)` determines the leader for `v+1`) and a source of randomness in execution (where `seed(v)`
307//! is used as a seed in `v`).
308//!
309//! #### Succinct Certificates
310//!
311//! All broadcast consensus messages (`notarize(c,v)`, `nullify(v)`, `finalize(c,v)`) contain attestations (partial signatures) for a static
312//! public key (derived from a group polynomial that can be recomputed during reconfiguration using [dkg](commonware_cryptography::bls12381::dkg)).
313//! As soon as `2f+1` messages are collected, a threshold signature over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)`
314//! can be recovered, respectively. Because the public key is static, any of these certificates can be verified by an external
315//! process without following the consensus instance and/or tracking the current set of participants (as is typically required
316//! to operate a lite client).
317//!
318//! These threshold signatures over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)` (i.e. the consensus certificates)
319//! can be used to secure interoperability between different consensus instances and user interactions with an infrastructure provider
320//! (where any data served can be proven to derive from some finalized block of some consensus instance with a known static public key).
321//!
322//! ## Persistence
323//!
324//! The `Voter` caches all data required to participate in consensus to avoid any disk reads on
325//! on the critical path. To enable recovery, the `Voter` writes valid messages it receives from
326//! consensus and messages it generates to a write-ahead log (WAL) implemented by [commonware_storage::journal::segmented::variable::Journal].
327//! Before sending a message, the `Journal` sync is invoked to prevent inadvertent Byzantine behavior
328//! on restart (especially in the case of unclean shutdown).
329
330pub mod elector;
331pub mod scheme;
332pub mod types;
333
334cfg_if::cfg_if! {
335    if #[cfg(not(target_arch = "wasm32"))] {
336        mod actors;
337        pub mod config;
338        pub use config::Config;
339        mod engine;
340        pub use engine::Engine;
341        mod metrics;
342    }
343}
344
345#[cfg(any(test, feature = "mocks"))]
346pub mod mocks;
347
348#[cfg(not(target_arch = "wasm32"))]
349use crate::types::{View, ViewDelta};
350
351/// The minimum view we are tracking both in-memory and on-disk.
352#[cfg(not(target_arch = "wasm32"))]
353pub(crate) const fn min_active(activity_timeout: ViewDelta, last_finalized: View) -> View {
354    last_finalized.saturating_sub(activity_timeout)
355}
356
357/// Whether or not a view is interesting to us. This is a function
358/// of both `min_active` and whether or not the view is too far
359/// in the future (based on the view we are currently in).
360#[cfg(not(target_arch = "wasm32"))]
361pub(crate) fn interesting(
362    activity_timeout: ViewDelta,
363    last_finalized: View,
364    current: View,
365    pending: View,
366    allow_future: bool,
367) -> bool {
368    // If the view is genesis, skip it, genesis doesn't have votes
369    if pending.is_zero() {
370        return false;
371    }
372    if pending < min_active(activity_timeout, last_finalized) {
373        return false;
374    }
375    if !allow_future && pending > current.next() {
376        return false;
377    }
378    true
379}
380
381/// Convenience alias for [`N3f1::quorum`].
382#[cfg(test)]
383pub(crate) fn quorum(n: u32) -> u32 {
384    use commonware_utils::{Faults, N3f1};
385
386    N3f1::quorum(n)
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392    use crate::{
393        simplex::{
394            elector::{Config as Elector, Random, RoundRobin},
395            mocks::twins::Strategy,
396            scheme::{
397                bls12381_multisig,
398                bls12381_threshold::{
399                    standard as bls12381_threshold_std,
400                    vrf::{self as bls12381_threshold_vrf, Seedable},
401                },
402                ed25519, secp256r1, Scheme,
403            },
404            types::{
405                Certificate, Finalization as TFinalization, Finalize as TFinalize,
406                Notarization as TNotarization, Notarize as TNotarize,
407                Nullification as TNullification, Nullify as TNullify, Proposal, Vote,
408            },
409        },
410        types::{Epoch, Round},
411        Monitor, Viewable,
412    };
413    use commonware_codec::{Decode, DecodeExt, Encode};
414    use commonware_cryptography::{
415        bls12381::primitives::variant::{MinPk, MinSig, Variant},
416        certificate::mocks::Fixture,
417        ed25519::{PrivateKey, PublicKey},
418        sha256::{Digest as Sha256Digest, Digest as D},
419        Hasher as _, Sha256, Signer as _,
420    };
421    use commonware_macros::{select, test_group, test_traced};
422    use commonware_p2p::{
423        simulated::{Config, Link, Network, Oracle, Receiver, Sender, SplitOrigin, SplitTarget},
424        Recipients, Sender as _,
425    };
426    use commonware_parallel::Sequential;
427    use commonware_runtime::{
428        buffer::paged::CacheRef, count_running_tasks, deterministic, Clock, IoBuf, Metrics, Quota,
429        Runner, Spawner,
430    };
431    use commonware_utils::{sync::Mutex, test_rng, Faults, N3f1, NZUsize, NZU16};
432    use engine::Engine;
433    use futures::future::join_all;
434    use rand::{rngs::StdRng, Rng as _};
435    use std::{
436        collections::{BTreeMap, HashMap},
437        num::{NonZeroU16, NonZeroU32, NonZeroUsize},
438        sync::Arc,
439        time::Duration,
440    };
441    use tracing::{debug, info, warn};
442    use types::Activity;
443
444    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
445    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
446    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
447
448    #[test]
449    fn test_interesting() {
450        let activity_timeout = ViewDelta::new(10);
451
452        // Genesis view is never interesting
453        assert!(!interesting(
454            activity_timeout,
455            View::zero(),
456            View::zero(),
457            View::zero(),
458            false
459        ));
460        assert!(!interesting(
461            activity_timeout,
462            View::zero(),
463            View::new(1),
464            View::zero(),
465            true
466        ));
467
468        // View below min_active is not interesting
469        assert!(!interesting(
470            activity_timeout,
471            View::new(20),
472            View::new(25),
473            View::new(5), // below min_active (10)
474            false
475        ));
476
477        // View at min_active boundary is interesting
478        assert!(interesting(
479            activity_timeout,
480            View::new(20),
481            View::new(25),
482            View::new(10), // exactly min_active
483            false
484        ));
485
486        // Future view beyond current.next() is not interesting when allow_future is false
487        assert!(!interesting(
488            activity_timeout,
489            View::new(20),
490            View::new(25),
491            View::new(27),
492            false
493        ));
494
495        // Future view beyond current.next() is interesting when allow_future is true
496        assert!(interesting(
497            activity_timeout,
498            View::new(20),
499            View::new(25),
500            View::new(27),
501            true
502        ));
503
504        // View at current.next() is interesting
505        assert!(interesting(
506            activity_timeout,
507            View::new(20),
508            View::new(25),
509            View::new(26),
510            false
511        ));
512
513        // View within valid range is interesting
514        assert!(interesting(
515            activity_timeout,
516            View::new(20),
517            View::new(25),
518            View::new(22),
519            false
520        ));
521
522        // When last_finalized is 0 and activity_timeout would underflow
523        // min_active saturates at 0, so view 1 should still be interesting
524        assert!(interesting(
525            activity_timeout,
526            View::zero(),
527            View::new(5),
528            View::new(1),
529            false
530        ));
531    }
532
533    /// Register a validator with the oracle.
534    async fn register_validator(
535        oracle: &mut Oracle<PublicKey, deterministic::Context>,
536        validator: PublicKey,
537    ) -> (
538        (
539            Sender<PublicKey, deterministic::Context>,
540            Receiver<PublicKey>,
541        ),
542        (
543            Sender<PublicKey, deterministic::Context>,
544            Receiver<PublicKey>,
545        ),
546        (
547            Sender<PublicKey, deterministic::Context>,
548            Receiver<PublicKey>,
549        ),
550    ) {
551        let control = oracle.control(validator.clone());
552        let (vote_sender, vote_receiver) = control.register(0, TEST_QUOTA).await.unwrap();
553        let (certificate_sender, certificate_receiver) =
554            control.register(1, TEST_QUOTA).await.unwrap();
555        let (resolver_sender, resolver_receiver) = control.register(2, TEST_QUOTA).await.unwrap();
556        (
557            (vote_sender, vote_receiver),
558            (certificate_sender, certificate_receiver),
559            (resolver_sender, resolver_receiver),
560        )
561    }
562
563    /// Registers all validators using the oracle.
564    async fn register_validators(
565        oracle: &mut Oracle<PublicKey, deterministic::Context>,
566        validators: &[PublicKey],
567    ) -> HashMap<
568        PublicKey,
569        (
570            (
571                Sender<PublicKey, deterministic::Context>,
572                Receiver<PublicKey>,
573            ),
574            (
575                Sender<PublicKey, deterministic::Context>,
576                Receiver<PublicKey>,
577            ),
578            (
579                Sender<PublicKey, deterministic::Context>,
580                Receiver<PublicKey>,
581            ),
582        ),
583    > {
584        let mut registrations = HashMap::new();
585        for validator in validators.iter() {
586            let registration = register_validator(oracle, validator.clone()).await;
587            registrations.insert(validator.clone(), registration);
588        }
589        registrations
590    }
591
592    /// Enum to describe the action to take when linking validators.
593    enum Action {
594        Link(Link),
595        Update(Link), // Unlink and then link
596        Unlink,
597    }
598
599    /// Links (or unlinks) validators using the oracle.
600    ///
601    /// The `action` parameter determines the action (e.g. link, unlink) to take.
602    /// The `restrict_to` function can be used to restrict the linking to certain connections,
603    /// otherwise all validators will be linked to all other validators.
604    async fn link_validators(
605        oracle: &mut Oracle<PublicKey, deterministic::Context>,
606        validators: &[PublicKey],
607        action: Action,
608        restrict_to: Option<fn(usize, usize, usize) -> bool>,
609    ) {
610        for (i1, v1) in validators.iter().enumerate() {
611            for (i2, v2) in validators.iter().enumerate() {
612                // Ignore self
613                if v2 == v1 {
614                    continue;
615                }
616
617                // Restrict to certain connections
618                if let Some(f) = restrict_to {
619                    if !f(validators.len(), i1, i2) {
620                        continue;
621                    }
622                }
623
624                // Do any unlinking first
625                match action {
626                    Action::Update(_) | Action::Unlink => {
627                        oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
628                    }
629                    _ => {}
630                }
631
632                // Do any linking after
633                match action {
634                    Action::Link(ref link) | Action::Update(ref link) => {
635                        oracle
636                            .add_link(v1.clone(), v2.clone(), link.clone())
637                            .await
638                            .unwrap();
639                    }
640                    _ => {}
641                }
642            }
643        }
644    }
645
646    /// Counts lines where all patterns match and the trailing value is non-zero.
647    fn count_nonzero_metric_lines(encoded: &str, patterns: &[&str]) -> u32 {
648        encoded
649            .lines()
650            .filter(|line| patterns.iter().all(|p| line.contains(p)))
651            .filter(|line| {
652                line.split_whitespace()
653                    .last()
654                    .and_then(|s| s.parse::<u64>().ok())
655                    .is_some_and(|n| n > 0)
656            })
657            .count() as u32
658    }
659
660    fn all_online<S, F, L>(mut fixture: F)
661    where
662        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
663        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
664        L: Elector<S>,
665    {
666        // Create context
667        let n = 5;
668        let quorum = quorum(n) as usize;
669        let required_containers = View::new(100);
670        let activity_timeout = ViewDelta::new(10);
671        let skip_timeout = ViewDelta::new(5);
672        let namespace = b"consensus".to_vec();
673        let executor = deterministic::Runner::timed(Duration::from_secs(300));
674        executor.start(|mut context| async move {
675            // Create simulated network
676            let (network, mut oracle) = Network::new(
677                context.with_label("network"),
678                Config {
679                    max_size: 1024 * 1024,
680                    disconnect_on_block: true,
681                    tracked_peer_sets: None,
682                },
683            );
684
685            // Start network
686            network.start();
687
688            // Register participants
689            let Fixture {
690                participants,
691                schemes,
692                ..
693            } = fixture(&mut context, &namespace, n);
694            let mut registrations = register_validators(&mut oracle, &participants).await;
695
696            // Link all validators
697            let link = Link {
698                latency: Duration::from_millis(10),
699                jitter: Duration::from_millis(1),
700                success_rate: 1.0,
701            };
702            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
703
704            // Create engines
705            let elector = L::default();
706            let relay = Arc::new(mocks::relay::Relay::new());
707            let mut reporters = Vec::new();
708            let mut engine_handlers = Vec::new();
709            for (idx, validator) in participants.iter().enumerate() {
710                // Create scheme context
711                let context = context.with_label(&format!("validator_{}", *validator));
712
713                // Configure engine
714                let reporter_config = mocks::reporter::Config {
715                    participants: participants.clone().try_into().unwrap(),
716                    scheme: schemes[idx].clone(),
717                    elector: elector.clone(),
718                };
719                let reporter =
720                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
721                reporters.push(reporter.clone());
722                let application_cfg = mocks::application::Config {
723                    hasher: Sha256::default(),
724                    relay: relay.clone(),
725                    me: validator.clone(),
726                    propose_latency: (10.0, 5.0),
727                    verify_latency: (10.0, 5.0),
728                    certify_latency: (10.0, 5.0),
729                    should_certify: mocks::application::Certifier::Sometimes,
730                };
731                let (actor, application) = mocks::application::Application::new(
732                    context.with_label("application"),
733                    application_cfg,
734                );
735                actor.start();
736                let blocker = oracle.control(validator.clone());
737                let cfg = config::Config {
738                    scheme: schemes[idx].clone(),
739                    elector: elector.clone(),
740                    blocker,
741                    automaton: application.clone(),
742                    relay: application.clone(),
743                    reporter: reporter.clone(),
744                    strategy: Sequential,
745                    partition: validator.to_string(),
746                    mailbox_size: 1024,
747                    epoch: Epoch::new(333),
748                    leader_timeout: Duration::from_secs(1),
749                    certification_timeout: Duration::from_secs(2),
750                    timeout_retry: Duration::from_secs(10),
751                    fetch_timeout: Duration::from_secs(1),
752                    activity_timeout,
753                    skip_timeout,
754                    fetch_concurrent: 4,
755                    replay_buffer: NZUsize!(1024 * 1024),
756                    write_buffer: NZUsize!(1024 * 1024),
757                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
758                };
759                let engine = Engine::new(context.with_label("engine"), cfg);
760
761                // Start engine
762                let (pending, recovered, resolver) = registrations
763                    .remove(validator)
764                    .expect("validator should be registered");
765                engine_handlers.push(engine.start(pending, recovered, resolver));
766            }
767
768            // Wait for all engines to finish
769            let mut finalizers = Vec::new();
770            for reporter in reporters.iter_mut() {
771                let (mut latest, mut monitor) = reporter.subscribe().await;
772                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
773                    while latest < required_containers {
774                        latest = monitor.recv().await.expect("event missing");
775                    }
776                }));
777            }
778            join_all(finalizers).await;
779
780            // Check reporters for correct activity
781            let latest_complete = required_containers.saturating_sub(activity_timeout);
782            for reporter in reporters.iter() {
783                // Ensure no faults
784                {
785                    let faults = reporter.faults.lock();
786                    assert!(faults.is_empty());
787                }
788
789                // Ensure no invalid signatures
790                {
791                    let invalid = reporter.invalid.lock();
792                    assert_eq!(*invalid, 0);
793                }
794
795                // Ensure certificates for all views
796                {
797                    let certified = reporter.certified.lock();
798                    for view in View::range(View::new(1), latest_complete) {
799                        // Ensure certificate for every view
800                        if !certified.contains(&view) {
801                            panic!("view: {view}");
802                        }
803                    }
804                }
805
806                // Ensure no forks
807                let mut notarized = HashMap::new();
808                let mut finalized = HashMap::new();
809                {
810                    let notarizes = reporter.notarizes.lock();
811                    for view in View::range(View::new(1), latest_complete) {
812                        // Ensure only one payload proposed per view
813                        let Some(payloads) = notarizes.get(&view) else {
814                            continue;
815                        };
816                        if payloads.len() > 1 {
817                            panic!("view: {view}");
818                        }
819                        let (digest, notarizers) = payloads.iter().next().unwrap();
820                        notarized.insert(view, *digest);
821
822                        if notarizers.len() < quorum {
823                            // We can't verify that everyone participated at every view because some nodes may
824                            // have started later.
825                            panic!("view: {view}");
826                        }
827                    }
828                }
829                {
830                    let notarizations = reporter.notarizations.lock();
831                    for view in View::range(View::new(1), latest_complete) {
832                        // Ensure notarization matches digest from notarizes
833                        let Some(notarization) = notarizations.get(&view) else {
834                            continue;
835                        };
836                        let Some(digest) = notarized.get(&view) else {
837                            continue;
838                        };
839                        assert_eq!(&notarization.proposal.payload, digest);
840                    }
841                }
842                {
843                    let finalizes = reporter.finalizes.lock();
844                    for view in View::range(View::new(1), latest_complete) {
845                        // Ensure only one payload proposed per view
846                        let Some(payloads) = finalizes.get(&view) else {
847                            continue;
848                        };
849                        if payloads.len() > 1 {
850                            panic!("view: {view}");
851                        }
852                        let (digest, finalizers) = payloads.iter().next().unwrap();
853                        finalized.insert(view, *digest);
854
855                        // Only check at views below timeout
856                        if view > latest_complete {
857                            continue;
858                        }
859
860                        // Ensure everyone participating
861                        if finalizers.len() < quorum {
862                            // We can't verify that everyone participated at every view because some nodes may
863                            // have started later.
864                            panic!("view: {view}");
865                        }
866
867                        // Ensure no nullifies for any finalizers
868                        let nullifies = reporter.nullifies.lock();
869                        let Some(nullifies) = nullifies.get(&view) else {
870                            continue;
871                        };
872                        for (_, finalizers) in payloads.iter() {
873                            for finalizer in finalizers.iter() {
874                                if nullifies.contains(finalizer) {
875                                    panic!("should not nullify and finalize at same view");
876                                }
877                            }
878                        }
879                    }
880                }
881                {
882                    let finalizations = reporter.finalizations.lock();
883                    for view in View::range(View::new(1), latest_complete) {
884                        // Ensure finalization matches digest from finalizes
885                        let Some(finalization) = finalizations.get(&view) else {
886                            continue;
887                        };
888                        let Some(digest) = finalized.get(&view) else {
889                            continue;
890                        };
891                        assert_eq!(&finalization.proposal.payload, digest);
892                    }
893                }
894            }
895
896            // Ensure no blocked connections
897            let blocked = oracle.blocked().await.unwrap();
898            assert!(blocked.is_empty());
899        });
900    }
901
902    #[test_group("slow")]
903    #[test_traced]
904    fn test_all_online() {
905        all_online::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
906        all_online::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
907        all_online::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
908        all_online::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
909        all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
910        all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
911        all_online::<_, _, RoundRobin>(ed25519::fixture);
912        all_online::<_, _, RoundRobin>(secp256r1::fixture);
913    }
914
915    fn observer<S, F, L>(mut fixture: F)
916    where
917        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
918        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
919        L: Elector<S>,
920    {
921        // Create context
922        let n_active = 5;
923        let required_containers = View::new(100);
924        let activity_timeout = ViewDelta::new(10);
925        let skip_timeout = ViewDelta::new(5);
926        let namespace = b"consensus".to_vec();
927        let executor = deterministic::Runner::timed(Duration::from_secs(300));
928        executor.start(|mut context| async move {
929            // Create simulated network
930            let (network, mut oracle) = Network::new(
931                context.with_label("network"),
932                Config {
933                    max_size: 1024 * 1024,
934                    disconnect_on_block: true,
935                    tracked_peer_sets: None,
936                },
937            );
938
939            // Start network
940            network.start();
941
942            // Register participants (active)
943            let Fixture {
944                participants,
945                schemes,
946                verifier,
947                ..
948            } = fixture(&mut context, &namespace, n_active);
949
950            // Add observer (no share)
951            let private_key_observer = PrivateKey::from_seed(n_active as u64);
952            let public_key_observer = private_key_observer.public_key();
953
954            // Register all (including observer) with the network
955            let mut all_validators = participants.clone();
956            all_validators.push(public_key_observer.clone());
957            all_validators.sort();
958            let mut registrations = register_validators(&mut oracle, &all_validators).await;
959
960            // Link all peers (including observer)
961            let link = Link {
962                latency: Duration::from_millis(10),
963                jitter: Duration::from_millis(1),
964                success_rate: 1.0,
965            };
966            link_validators(&mut oracle, &all_validators, Action::Link(link), None).await;
967
968            // Create engines
969            let elector = L::default();
970            let relay = Arc::new(mocks::relay::Relay::new());
971            let mut reporters = Vec::new();
972
973            for (idx, validator) in participants.iter().enumerate() {
974                let is_observer = *validator == public_key_observer;
975
976                // Create scheme context
977                let context = context.with_label(&format!("validator_{}", *validator));
978
979                // Configure engine
980                let signing = if is_observer {
981                    verifier.clone()
982                } else {
983                    schemes[idx].clone()
984                };
985                let reporter_config = mocks::reporter::Config {
986                    participants: participants.clone().try_into().unwrap(),
987                    scheme: signing.clone(),
988                    elector: elector.clone(),
989                };
990                let reporter =
991                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
992                reporters.push(reporter.clone());
993                let application_cfg = mocks::application::Config {
994                    hasher: Sha256::default(),
995                    relay: relay.clone(),
996                    me: validator.clone(),
997                    propose_latency: (10.0, 5.0),
998                    verify_latency: (10.0, 5.0),
999                    certify_latency: (10.0, 5.0),
1000                    should_certify: mocks::application::Certifier::Sometimes,
1001                };
1002                let (actor, application) = mocks::application::Application::new(
1003                    context.with_label("application"),
1004                    application_cfg,
1005                );
1006                actor.start();
1007                let blocker = oracle.control(validator.clone());
1008                let cfg = config::Config {
1009                    scheme: signing.clone(),
1010                    elector: elector.clone(),
1011                    blocker,
1012                    automaton: application.clone(),
1013                    relay: application.clone(),
1014                    reporter: reporter.clone(),
1015                    strategy: Sequential,
1016                    partition: validator.to_string(),
1017                    mailbox_size: 1024,
1018                    epoch: Epoch::new(333),
1019                    leader_timeout: Duration::from_secs(1),
1020                    certification_timeout: Duration::from_secs(2),
1021                    timeout_retry: Duration::from_secs(10),
1022                    fetch_timeout: Duration::from_secs(1),
1023                    activity_timeout,
1024                    skip_timeout,
1025                    fetch_concurrent: 4,
1026                    replay_buffer: NZUsize!(1024 * 1024),
1027                    write_buffer: NZUsize!(1024 * 1024),
1028                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1029                };
1030                let engine = Engine::new(context.with_label("engine"), cfg);
1031
1032                // Start engine
1033                let (pending, recovered, resolver) = registrations
1034                    .remove(validator)
1035                    .expect("validator should be registered");
1036                engine.start(pending, recovered, resolver);
1037            }
1038
1039            // Wait for all  engines to finish
1040            let mut finalizers = Vec::new();
1041            for reporter in reporters.iter_mut() {
1042                let (mut latest, mut monitor) = reporter.subscribe().await;
1043                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1044                    while latest < required_containers {
1045                        latest = monitor.recv().await.expect("event missing");
1046                    }
1047                }));
1048            }
1049            join_all(finalizers).await;
1050
1051            // Sanity check
1052            for reporter in reporters.iter() {
1053                // Ensure no faults or invalid signatures
1054                {
1055                    let faults = reporter.faults.lock();
1056                    assert!(faults.is_empty());
1057                }
1058                {
1059                    let invalid = reporter.invalid.lock();
1060                    assert_eq!(*invalid, 0);
1061                }
1062
1063                // Ensure no blocked connections
1064                let blocked = oracle.blocked().await.unwrap();
1065                assert!(blocked.is_empty());
1066            }
1067        });
1068    }
1069
1070    #[test_group("slow")]
1071    #[test_traced]
1072    fn test_observer() {
1073        observer::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1074        observer::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1075        observer::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1076        observer::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1077        observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1078        observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1079        observer::<_, _, RoundRobin>(ed25519::fixture);
1080        observer::<_, _, RoundRobin>(secp256r1::fixture);
1081    }
1082
1083    fn unclean_shutdown<S, F, L>(mut fixture: F)
1084    where
1085        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1086        F: FnMut(&mut StdRng, &[u8], u32) -> Fixture<S>,
1087        L: Elector<S>,
1088    {
1089        // Create context
1090        let n = 5;
1091        let required_containers = View::new(100);
1092        let activity_timeout = ViewDelta::new(10);
1093        let skip_timeout = ViewDelta::new(5);
1094        let namespace = b"consensus".to_vec();
1095
1096        // Random restarts every x seconds
1097        let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
1098        let supervised = Arc::new(Mutex::new(Vec::new()));
1099        let mut prev_checkpoint = None;
1100
1101        // Create validator keys
1102        let mut rng = test_rng();
1103        let Fixture {
1104            participants,
1105            schemes,
1106            ..
1107        } = fixture(&mut rng, &namespace, n);
1108
1109        // Create block relay, shared across restarts.
1110        let relay = Arc::new(mocks::relay::Relay::<Sha256Digest, S::PublicKey>::new());
1111
1112        loop {
1113            let rng = rng.clone();
1114            let participants = participants.clone();
1115            let schemes = schemes.clone();
1116            let shutdowns = shutdowns.clone();
1117            let supervised = supervised.clone();
1118            let relay = relay.clone();
1119            relay.deregister_all(); // Clear all recipients from previous restart.
1120
1121            let f = |mut context: deterministic::Context| async move {
1122                // Create simulated network
1123                let (network, mut oracle) = Network::new(
1124                    context.with_label("network"),
1125                    Config {
1126                        max_size: 1024 * 1024,
1127                        disconnect_on_block: true,
1128                        tracked_peer_sets: None,
1129                    },
1130                );
1131
1132                // Start network
1133                network.start();
1134
1135                // Register participants
1136                let mut registrations = register_validators(&mut oracle, &participants).await;
1137
1138                // Link all validators
1139                let link = Link {
1140                    latency: Duration::from_millis(50),
1141                    jitter: Duration::from_millis(50),
1142                    success_rate: 1.0,
1143                };
1144                link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1145
1146                // Create engines
1147                let elector = L::default();
1148                let relay = Arc::new(mocks::relay::Relay::new());
1149                let mut reporters = HashMap::new();
1150                let mut engine_handlers = Vec::new();
1151                for (idx, validator) in participants.iter().enumerate() {
1152                    // Create scheme context
1153                    let context = context.with_label(&format!("validator_{}", *validator));
1154
1155                    // Configure engine
1156                    let reporter_config = mocks::reporter::Config {
1157                        participants: participants.clone().try_into().unwrap(),
1158                        scheme: schemes[idx].clone(),
1159                        elector: elector.clone(),
1160                    };
1161                    let reporter = mocks::reporter::Reporter::new(rng.clone(), reporter_config);
1162                    reporters.insert(validator.clone(), reporter.clone());
1163                    let application_cfg = mocks::application::Config {
1164                        hasher: Sha256::default(),
1165                        relay: relay.clone(),
1166                        me: validator.clone(),
1167                        propose_latency: (10.0, 5.0),
1168                        verify_latency: (10.0, 5.0),
1169                        certify_latency: (10.0, 5.0),
1170                        should_certify: mocks::application::Certifier::Sometimes,
1171                    };
1172                    let (actor, application) = mocks::application::Application::new(
1173                        context.with_label("application"),
1174                        application_cfg,
1175                    );
1176                    actor.start();
1177                    let blocker = oracle.control(validator.clone());
1178                    let cfg = config::Config {
1179                        scheme: schemes[idx].clone(),
1180                        elector: elector.clone(),
1181                        blocker,
1182                        automaton: application.clone(),
1183                        relay: application.clone(),
1184                        reporter: reporter.clone(),
1185                        strategy: Sequential,
1186                        partition: validator.to_string(),
1187                        mailbox_size: 1024,
1188                        epoch: Epoch::new(333),
1189                        leader_timeout: Duration::from_secs(1),
1190                        certification_timeout: Duration::from_secs(2),
1191                        timeout_retry: Duration::from_secs(10),
1192                        fetch_timeout: Duration::from_secs(1),
1193                        activity_timeout,
1194                        skip_timeout,
1195                        fetch_concurrent: 4,
1196                        replay_buffer: NZUsize!(1024 * 1024),
1197                        write_buffer: NZUsize!(1024 * 1024),
1198                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1199                    };
1200                    let engine = Engine::new(context.with_label("engine"), cfg);
1201
1202                    // Start engine
1203                    let (pending, recovered, resolver) = registrations
1204                        .remove(validator)
1205                        .expect("validator should be registered");
1206                    engine_handlers.push(engine.start(pending, recovered, resolver));
1207                }
1208
1209                // Store all finalizer handles
1210                let mut finalizers = Vec::new();
1211                for (_, reporter) in reporters.iter_mut() {
1212                    let (mut latest, mut monitor) = reporter.subscribe().await;
1213                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1214                        while latest < required_containers {
1215                            latest = monitor.recv().await.expect("event missing");
1216                        }
1217                    }));
1218                }
1219
1220                // Exit at random points for unclean shutdown of entire set
1221                let wait =
1222                    context.gen_range(Duration::from_millis(100)..Duration::from_millis(2_000));
1223                let result = select! {
1224                    _ = context.sleep(wait) => {
1225                        // Collect reporters to check faults
1226                        {
1227                            let mut shutdowns = shutdowns.lock();
1228                            debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
1229                            *shutdowns += 1;
1230                        }
1231                        supervised.lock().push(reporters);
1232                        false
1233                    },
1234                    _ = join_all(finalizers) => {
1235                        // Check reporters for faults activity
1236                        let supervised = supervised.lock();
1237                        for reporters in supervised.iter() {
1238                            for (_, reporter) in reporters.iter() {
1239                                let faults = reporter.faults.lock();
1240                                assert!(faults.is_empty());
1241                            }
1242                        }
1243                        true
1244                    },
1245                };
1246
1247                // Ensure no blocked connections
1248                let blocked = oracle.blocked().await.unwrap();
1249                assert!(blocked.is_empty());
1250
1251                result
1252            };
1253
1254            let (complete, checkpoint) = prev_checkpoint
1255                .map_or_else(
1256                    || deterministic::Runner::timed(Duration::from_secs(180)),
1257                    deterministic::Runner::from,
1258                )
1259                .start_and_recover(f);
1260
1261            // Check if we should exit
1262            if complete {
1263                break;
1264            }
1265
1266            prev_checkpoint = Some(checkpoint);
1267        }
1268    }
1269
1270    #[test_group("slow")]
1271    #[test_traced]
1272    fn test_unclean_shutdown() {
1273        unclean_shutdown::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1274        unclean_shutdown::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1275        unclean_shutdown::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1276        unclean_shutdown::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1277        unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1278        unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1279        unclean_shutdown::<_, _, RoundRobin>(ed25519::fixture);
1280        unclean_shutdown::<_, _, RoundRobin>(secp256r1::fixture);
1281    }
1282
1283    fn backfill<S, F, L>(mut fixture: F)
1284    where
1285        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1286        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1287        L: Elector<S>,
1288    {
1289        // Create context
1290        let n = 4;
1291        let required_containers = View::new(100);
1292        let activity_timeout = ViewDelta::new(10);
1293        let skip_timeout = ViewDelta::new(5);
1294        let namespace = b"consensus".to_vec();
1295        let executor = deterministic::Runner::timed(Duration::from_secs(240));
1296        executor.start(|mut context| async move {
1297            // Create simulated network
1298            let (network, mut oracle) = Network::new(
1299                context.with_label("network"),
1300                Config {
1301                    max_size: 1024 * 1024,
1302                    disconnect_on_block: true,
1303                    tracked_peer_sets: None,
1304                },
1305            );
1306
1307            // Start network
1308            network.start();
1309
1310            // Register participants
1311            let Fixture {
1312                participants,
1313                schemes,
1314                ..
1315            } = fixture(&mut context, &namespace, n);
1316            let mut registrations = register_validators(&mut oracle, &participants).await;
1317
1318            // Link all validators except first
1319            let link = Link {
1320                latency: Duration::from_millis(10),
1321                jitter: Duration::from_millis(1),
1322                success_rate: 1.0,
1323            };
1324            link_validators(
1325                &mut oracle,
1326                &participants,
1327                Action::Link(link),
1328                Some(|_, i, j| ![i, j].contains(&0usize)),
1329            )
1330            .await;
1331
1332            // Create engines
1333            let elector = L::default();
1334            let relay = Arc::new(mocks::relay::Relay::new());
1335            let mut reporters = Vec::new();
1336            let mut engine_handlers = Vec::new();
1337            for (idx_scheme, validator) in participants.iter().enumerate() {
1338                // Skip first peer
1339                if idx_scheme == 0 {
1340                    continue;
1341                }
1342
1343                // Create scheme context
1344                let context = context.with_label(&format!("validator_{}", *validator));
1345
1346                // Configure engine
1347                let reporter_config = mocks::reporter::Config {
1348                    participants: participants.clone().try_into().unwrap(),
1349                    scheme: schemes[idx_scheme].clone(),
1350                    elector: elector.clone(),
1351                };
1352                let reporter =
1353                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1354                reporters.push(reporter.clone());
1355                let application_cfg = mocks::application::Config {
1356                    hasher: Sha256::default(),
1357                    relay: relay.clone(),
1358                    me: validator.clone(),
1359                    propose_latency: (10.0, 5.0),
1360                    verify_latency: (10.0, 5.0),
1361                    certify_latency: (10.0, 5.0),
1362                    should_certify: mocks::application::Certifier::Sometimes,
1363                };
1364                let (actor, application) = mocks::application::Application::new(
1365                    context.with_label("application"),
1366                    application_cfg,
1367                );
1368                actor.start();
1369                let blocker = oracle.control(validator.clone());
1370                let cfg = config::Config {
1371                    scheme: schemes[idx_scheme].clone(),
1372                    elector: elector.clone(),
1373                    blocker,
1374                    automaton: application.clone(),
1375                    relay: application.clone(),
1376                    reporter: reporter.clone(),
1377                    strategy: Sequential,
1378                    partition: validator.to_string(),
1379                    mailbox_size: 1024,
1380                    epoch: Epoch::new(333),
1381                    leader_timeout: Duration::from_secs(1),
1382                    certification_timeout: Duration::from_secs(2),
1383                    timeout_retry: Duration::from_secs(10),
1384                    fetch_timeout: Duration::from_secs(1),
1385                    activity_timeout,
1386                    skip_timeout,
1387                    fetch_concurrent: 4,
1388                    replay_buffer: NZUsize!(1024 * 1024),
1389                    write_buffer: NZUsize!(1024 * 1024),
1390                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1391                };
1392                let engine = Engine::new(context.with_label("engine"), cfg);
1393
1394                // Start engine
1395                let (pending, recovered, resolver) = registrations
1396                    .remove(validator)
1397                    .expect("validator should be registered");
1398                engine_handlers.push(engine.start(pending, recovered, resolver));
1399            }
1400
1401            // Wait for all engines to finish
1402            let mut finalizers = Vec::new();
1403            for reporter in reporters.iter_mut() {
1404                let (mut latest, mut monitor) = reporter.subscribe().await;
1405                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1406                    while latest < required_containers {
1407                        latest = monitor.recv().await.expect("event missing");
1408                    }
1409                }));
1410            }
1411            join_all(finalizers).await;
1412
1413            // Degrade network connections for online peers
1414            let link = Link {
1415                latency: Duration::from_secs(3),
1416                jitter: Duration::from_millis(0),
1417                success_rate: 1.0,
1418            };
1419            link_validators(
1420                &mut oracle,
1421                &participants,
1422                Action::Update(link.clone()),
1423                Some(|_, i, j| ![i, j].contains(&0usize)),
1424            )
1425            .await;
1426
1427            // Wait for nullifications to accrue
1428            context.sleep(Duration::from_secs(60)).await;
1429
1430            // Unlink second peer from all (except first)
1431            link_validators(
1432                &mut oracle,
1433                &participants,
1434                Action::Unlink,
1435                Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
1436            )
1437            .await;
1438
1439            // Configure engine for first peer
1440            let me = participants[0].clone();
1441            let context = context.with_label(&format!("validator_{me}"));
1442
1443            // Link first peer to all (except second)
1444            link_validators(
1445                &mut oracle,
1446                &participants,
1447                Action::Link(link),
1448                Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
1449            )
1450            .await;
1451
1452            // Restore network connections for all online peers
1453            let link = Link {
1454                latency: Duration::from_millis(10),
1455                jitter: Duration::from_millis(3),
1456                success_rate: 1.0,
1457            };
1458            link_validators(
1459                &mut oracle,
1460                &participants,
1461                Action::Update(link),
1462                Some(|_, i, j| ![i, j].contains(&1usize)),
1463            )
1464            .await;
1465
1466            // Configure engine
1467            let reporter_config = mocks::reporter::Config {
1468                participants: participants.clone().try_into().unwrap(),
1469                scheme: schemes[0].clone(),
1470                elector: elector.clone(),
1471            };
1472            let mut reporter =
1473                mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1474            reporters.push(reporter.clone());
1475            let application_cfg = mocks::application::Config {
1476                hasher: Sha256::default(),
1477                relay: relay.clone(),
1478                me: me.clone(),
1479                propose_latency: (10.0, 5.0),
1480                verify_latency: (10.0, 5.0),
1481                certify_latency: (10.0, 5.0),
1482                should_certify: mocks::application::Certifier::Sometimes,
1483            };
1484            let (actor, application) = mocks::application::Application::new(
1485                context.with_label("application"),
1486                application_cfg,
1487            );
1488            actor.start();
1489            let blocker = oracle.control(me.clone());
1490            let cfg = config::Config {
1491                scheme: schemes[0].clone(),
1492                elector: elector.clone(),
1493                blocker,
1494                automaton: application.clone(),
1495                relay: application.clone(),
1496                reporter: reporter.clone(),
1497                strategy: Sequential,
1498                partition: me.to_string(),
1499                mailbox_size: 1024,
1500                epoch: Epoch::new(333),
1501                leader_timeout: Duration::from_secs(1),
1502                certification_timeout: Duration::from_secs(2),
1503                timeout_retry: Duration::from_secs(10),
1504                fetch_timeout: Duration::from_secs(1),
1505                activity_timeout,
1506                skip_timeout,
1507                fetch_concurrent: 4,
1508                replay_buffer: NZUsize!(1024 * 1024),
1509                write_buffer: NZUsize!(1024 * 1024),
1510                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1511            };
1512            let engine = Engine::new(context.with_label("engine"), cfg);
1513
1514            // Start engine
1515            let (pending, recovered, resolver) = registrations
1516                .remove(&me)
1517                .expect("validator should be registered");
1518            engine_handlers.push(engine.start(pending, recovered, resolver));
1519
1520            // Wait for new engine to finalize required
1521            let (mut latest, mut monitor) = reporter.subscribe().await;
1522            while latest < required_containers {
1523                latest = monitor.recv().await.expect("event missing");
1524            }
1525
1526            // Ensure no blocked connections
1527            let blocked = oracle.blocked().await.unwrap();
1528            assert!(blocked.is_empty());
1529        });
1530    }
1531
1532    #[test_group("slow")]
1533    #[test_traced]
1534    fn test_backfill() {
1535        backfill::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1536        backfill::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1537        backfill::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1538        backfill::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1539        backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1540        backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1541        backfill::<_, _, RoundRobin>(ed25519::fixture);
1542        backfill::<_, _, RoundRobin>(secp256r1::fixture);
1543    }
1544
1545    fn one_offline<S, F, L>(mut fixture: F)
1546    where
1547        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1548        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1549        L: Elector<S>,
1550    {
1551        // Create context
1552        let n = 5;
1553        let quorum = quorum(n) as usize;
1554        let required_containers = View::new(100);
1555        let activity_timeout = ViewDelta::new(10);
1556        let skip_timeout = ViewDelta::new(5);
1557        let max_exceptions = 10;
1558        let namespace = b"consensus".to_vec();
1559        let executor = deterministic::Runner::timed(Duration::from_secs(300));
1560        executor.start(|mut context| async move {
1561            // Create simulated network
1562            let (network, mut oracle) = Network::new(
1563                context.with_label("network"),
1564                Config {
1565                    max_size: 1024 * 1024,
1566                    disconnect_on_block: true,
1567                    tracked_peer_sets: None,
1568                },
1569            );
1570
1571            // Start network
1572            network.start();
1573
1574            // Register participants
1575            let Fixture {
1576                participants,
1577                schemes,
1578                ..
1579            } = fixture(&mut context, &namespace, n);
1580            let mut registrations = register_validators(&mut oracle, &participants).await;
1581
1582            // Link all validators except first
1583            let link = Link {
1584                latency: Duration::from_millis(10),
1585                jitter: Duration::from_millis(1),
1586                success_rate: 1.0,
1587            };
1588            link_validators(
1589                &mut oracle,
1590                &participants,
1591                Action::Link(link),
1592                Some(|_, i, j| ![i, j].contains(&0usize)),
1593            )
1594            .await;
1595
1596            // Create engines
1597            let elector = L::default();
1598            let relay = Arc::new(mocks::relay::Relay::new());
1599            let mut reporters = Vec::new();
1600            let mut engine_handlers = Vec::new();
1601            for (idx_scheme, validator) in participants.iter().enumerate() {
1602                // Skip first peer
1603                if idx_scheme == 0 {
1604                    continue;
1605                }
1606
1607                // Create scheme context
1608                let context = context.with_label(&format!("validator_{}", *validator));
1609
1610                // Configure engine
1611                let reporter_config = mocks::reporter::Config {
1612                    participants: participants.clone().try_into().unwrap(),
1613                    scheme: schemes[idx_scheme].clone(),
1614                    elector: elector.clone(),
1615                };
1616                let reporter =
1617                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1618                reporters.push(reporter.clone());
1619                let application_cfg = mocks::application::Config {
1620                    hasher: Sha256::default(),
1621                    relay: relay.clone(),
1622                    me: validator.clone(),
1623                    propose_latency: (10.0, 5.0),
1624                    verify_latency: (10.0, 5.0),
1625                    certify_latency: (10.0, 5.0),
1626                    should_certify: mocks::application::Certifier::Sometimes,
1627                };
1628                let (actor, application) = mocks::application::Application::new(
1629                    context.with_label("application"),
1630                    application_cfg,
1631                );
1632                actor.start();
1633                let blocker = oracle.control(validator.clone());
1634                let cfg = config::Config {
1635                    scheme: schemes[idx_scheme].clone(),
1636                    elector: elector.clone(),
1637                    blocker,
1638                    automaton: application.clone(),
1639                    relay: application.clone(),
1640                    reporter: reporter.clone(),
1641                    strategy: Sequential,
1642                    partition: validator.to_string(),
1643                    mailbox_size: 1024,
1644                    epoch: Epoch::new(333),
1645                    leader_timeout: Duration::from_secs(1),
1646                    certification_timeout: Duration::from_secs(2),
1647                    timeout_retry: Duration::from_secs(10),
1648                    fetch_timeout: Duration::from_secs(1),
1649                    activity_timeout,
1650                    skip_timeout,
1651                    fetch_concurrent: 4,
1652                    replay_buffer: NZUsize!(1024 * 1024),
1653                    write_buffer: NZUsize!(1024 * 1024),
1654                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1655                };
1656                let engine = Engine::new(context.with_label("engine"), cfg);
1657
1658                // Start engine
1659                let (pending, recovered, resolver) = registrations
1660                    .remove(validator)
1661                    .expect("validator should be registered");
1662                engine_handlers.push(engine.start(pending, recovered, resolver));
1663            }
1664
1665            // Wait for all engines to finish
1666            let mut finalizers = Vec::new();
1667            for reporter in reporters.iter_mut() {
1668                let (mut latest, mut monitor) = reporter.subscribe().await;
1669                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1670                    while latest < required_containers {
1671                        latest = monitor.recv().await.expect("event missing");
1672                    }
1673                }));
1674            }
1675            join_all(finalizers).await;
1676
1677            // Check reporters for correct activity
1678            let exceptions = 0;
1679            let offline = &participants[0];
1680            for reporter in reporters.iter() {
1681                // Ensure no faults
1682                {
1683                    let faults = reporter.faults.lock();
1684                    assert!(faults.is_empty());
1685                }
1686
1687                // Ensure no invalid signatures
1688                {
1689                    let invalid = reporter.invalid.lock();
1690                    assert_eq!(*invalid, 0);
1691                }
1692
1693                // Ensure offline node is never active
1694                let mut exceptions = 0;
1695                {
1696                    let notarizes = reporter.notarizes.lock();
1697                    for (view, payloads) in notarizes.iter() {
1698                        for (_, participants) in payloads.iter() {
1699                            if participants.contains(offline) {
1700                                panic!("view: {view}");
1701                            }
1702                        }
1703                    }
1704                }
1705                {
1706                    let nullifies = reporter.nullifies.lock();
1707                    for (view, participants) in nullifies.iter() {
1708                        if participants.contains(offline) {
1709                            panic!("view: {view}");
1710                        }
1711                    }
1712                }
1713                {
1714                    let finalizes = reporter.finalizes.lock();
1715                    for (view, payloads) in finalizes.iter() {
1716                        for (_, finalizers) in payloads.iter() {
1717                            if finalizers.contains(offline) {
1718                                panic!("view: {view}");
1719                            }
1720                        }
1721                    }
1722                }
1723
1724                // Identify offline views
1725                let mut offline_views = Vec::new();
1726                {
1727                    let leaders = reporter.leaders.lock();
1728                    for (view, leader) in leaders.iter() {
1729                        if leader == offline {
1730                            offline_views.push(*view);
1731                        }
1732                    }
1733                }
1734                assert!(!offline_views.is_empty());
1735
1736                // Ensure nullifies/nullification collected for offline node
1737                {
1738                    let nullifies = reporter.nullifies.lock();
1739                    for view in offline_views.iter() {
1740                        let nullifies = nullifies.get(view).map_or(0, |n| n.len());
1741                        if nullifies < quorum {
1742                            warn!("missing expected view nullifies: {}", view);
1743                            exceptions += 1;
1744                        }
1745                    }
1746                }
1747                {
1748                    let nullifications = reporter.nullifications.lock();
1749                    for view in offline_views.iter() {
1750                        if !nullifications.contains_key(view) {
1751                            warn!("missing expected view nullifies: {}", view);
1752                            exceptions += 1;
1753                        }
1754                    }
1755                }
1756
1757                // Ensure exceptions within allowed
1758                assert!(exceptions <= max_exceptions);
1759            }
1760            assert!(exceptions <= max_exceptions);
1761
1762            // Ensure no blocked connections
1763            let blocked = oracle.blocked().await.unwrap();
1764            assert!(blocked.is_empty());
1765
1766            // Ensure online nodes are recording timeouts/nullifications for the offline leader
1767            let encoded = context.encode();
1768            let leader_label = format!("leader=\"{}\"", offline);
1769            assert!(
1770                count_nonzero_metric_lines(&encoded, &["_timeouts", &leader_label]) >= n - 1,
1771                "expected timeout metrics for offline leader"
1772            );
1773            assert_eq!(
1774                count_nonzero_metric_lines(&encoded, &["_nullifications", &leader_label]),
1775                n - 1,
1776                "expected all online nodes to record _nullifications for offline leader"
1777            );
1778        });
1779    }
1780
1781    #[test_group("slow")]
1782    #[test_traced]
1783    fn test_one_offline() {
1784        one_offline::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1785        one_offline::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1786        one_offline::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1787        one_offline::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1788        one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1789        one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1790        one_offline::<_, _, RoundRobin>(ed25519::fixture);
1791        one_offline::<_, _, RoundRobin>(secp256r1::fixture);
1792    }
1793
1794    fn slow_validator<S, F, L>(mut fixture: F)
1795    where
1796        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1797        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1798        L: Elector<S>,
1799    {
1800        // Create context
1801        let n = 5;
1802        let required_containers = View::new(50);
1803        let activity_timeout = ViewDelta::new(10);
1804        let skip_timeout = ViewDelta::new(5);
1805        let namespace = b"consensus".to_vec();
1806        let executor = deterministic::Runner::timed(Duration::from_secs(300));
1807        executor.start(|mut context| async move {
1808            // Create simulated network
1809            let (network, mut oracle) = Network::new(
1810                context.with_label("network"),
1811                Config {
1812                    max_size: 1024 * 1024,
1813                    disconnect_on_block: true,
1814                    tracked_peer_sets: None,
1815                },
1816            );
1817
1818            // Start network
1819            network.start();
1820
1821            // Register participants
1822            let Fixture {
1823                participants,
1824                schemes,
1825                ..
1826            } = fixture(&mut context, &namespace, n);
1827            let mut registrations = register_validators(&mut oracle, &participants).await;
1828
1829            // Link all validators
1830            let link = Link {
1831                latency: Duration::from_millis(10),
1832                jitter: Duration::from_millis(1),
1833                success_rate: 1.0,
1834            };
1835            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1836
1837            // Create engines
1838            let elector = L::default();
1839            let relay = Arc::new(mocks::relay::Relay::new());
1840            let mut reporters = Vec::new();
1841            let mut engine_handlers = Vec::new();
1842            for (idx_scheme, validator) in participants.iter().enumerate() {
1843                // Create scheme context
1844                let context = context.with_label(&format!("validator_{}", *validator));
1845
1846                // Configure engine
1847                let reporter_config = mocks::reporter::Config {
1848                    participants: participants.clone().try_into().unwrap(),
1849                    scheme: schemes[idx_scheme].clone(),
1850                    elector: elector.clone(),
1851                };
1852                let reporter =
1853                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1854                reporters.push(reporter.clone());
1855                let application_cfg = if idx_scheme == 0 {
1856                    mocks::application::Config {
1857                        hasher: Sha256::default(),
1858                        relay: relay.clone(),
1859                        me: validator.clone(),
1860                        propose_latency: (10_000.0, 0.0),
1861                        verify_latency: (10_000.0, 5.0),
1862                        certify_latency: (10_000.0, 5.0),
1863                        should_certify: mocks::application::Certifier::Sometimes,
1864                    }
1865                } else {
1866                    mocks::application::Config {
1867                        hasher: Sha256::default(),
1868                        relay: relay.clone(),
1869                        me: validator.clone(),
1870                        propose_latency: (10.0, 5.0),
1871                        verify_latency: (10.0, 5.0),
1872                        certify_latency: (10.0, 5.0),
1873                        should_certify: mocks::application::Certifier::Sometimes,
1874                    }
1875                };
1876                let (actor, application) = mocks::application::Application::new(
1877                    context.with_label("application"),
1878                    application_cfg,
1879                );
1880                actor.start();
1881                let blocker = oracle.control(validator.clone());
1882                let cfg = config::Config {
1883                    scheme: schemes[idx_scheme].clone(),
1884                    elector: elector.clone(),
1885                    blocker,
1886                    automaton: application.clone(),
1887                    relay: application.clone(),
1888                    reporter: reporter.clone(),
1889                    strategy: Sequential,
1890                    partition: validator.to_string(),
1891                    mailbox_size: 1024,
1892                    epoch: Epoch::new(333),
1893                    leader_timeout: Duration::from_secs(1),
1894                    certification_timeout: Duration::from_secs(2),
1895                    timeout_retry: Duration::from_secs(10),
1896                    fetch_timeout: Duration::from_secs(1),
1897                    activity_timeout,
1898                    skip_timeout,
1899                    fetch_concurrent: 4,
1900                    replay_buffer: NZUsize!(1024 * 1024),
1901                    write_buffer: NZUsize!(1024 * 1024),
1902                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1903                };
1904                let engine = Engine::new(context.with_label("engine"), cfg);
1905
1906                // Start engine
1907                let (pending, recovered, resolver) = registrations
1908                    .remove(validator)
1909                    .expect("validator should be registered");
1910                engine_handlers.push(engine.start(pending, recovered, resolver));
1911            }
1912
1913            // Wait for all engines to finish
1914            let mut finalizers = Vec::new();
1915            for reporter in reporters.iter_mut() {
1916                let (mut latest, mut monitor) = reporter.subscribe().await;
1917                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1918                    while latest < required_containers {
1919                        latest = monitor.recv().await.expect("event missing");
1920                    }
1921                }));
1922            }
1923            join_all(finalizers).await;
1924
1925            // Check reporters for correct activity
1926            let slow = &participants[0];
1927            for reporter in reporters.iter() {
1928                // Ensure no faults
1929                {
1930                    let faults = reporter.faults.lock();
1931                    assert!(faults.is_empty());
1932                }
1933
1934                // Ensure no invalid signatures
1935                {
1936                    let invalid = reporter.invalid.lock();
1937                    assert_eq!(*invalid, 0);
1938                }
1939
1940                // Ensure slow node still emits notarizes and finalizes (when receiving certificates)
1941                let mut observed = false;
1942                {
1943                    let notarizes = reporter.notarizes.lock();
1944                    for (_, payloads) in notarizes.iter() {
1945                        for (_, participants) in payloads.iter() {
1946                            if participants.contains(slow) {
1947                                observed = true;
1948                                break;
1949                            }
1950                        }
1951                    }
1952                }
1953                {
1954                    let finalizes = reporter.finalizes.lock();
1955                    for (_, payloads) in finalizes.iter() {
1956                        for (_, finalizers) in payloads.iter() {
1957                            if finalizers.contains(slow) {
1958                                observed = true;
1959                                break;
1960                            }
1961                        }
1962                    }
1963                }
1964                assert!(observed);
1965            }
1966
1967            // Ensure no blocked connections
1968            let blocked = oracle.blocked().await.unwrap();
1969            assert!(blocked.is_empty());
1970        });
1971    }
1972
1973    #[test_group("slow")]
1974    #[test_traced]
1975    fn test_slow_validator() {
1976        slow_validator::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1977        slow_validator::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1978        slow_validator::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1979        slow_validator::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1980        slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1981        slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1982        slow_validator::<_, _, RoundRobin>(ed25519::fixture);
1983        slow_validator::<_, _, RoundRobin>(secp256r1::fixture);
1984    }
1985
1986    fn all_recovery<S, F, L>(mut fixture: F)
1987    where
1988        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1989        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1990        L: Elector<S>,
1991    {
1992        // Create context
1993        let n = 5;
1994        let required_containers = View::new(100);
1995        let activity_timeout = ViewDelta::new(10);
1996        let skip_timeout = ViewDelta::new(2);
1997        let namespace = b"consensus".to_vec();
1998        let executor = deterministic::Runner::timed(Duration::from_secs(1800));
1999        executor.start(|mut context| async move {
2000            // Create simulated network
2001            let (network, mut oracle) = Network::new(
2002                context.with_label("network"),
2003                Config {
2004                    max_size: 1024 * 1024,
2005                    disconnect_on_block: false,
2006                    tracked_peer_sets: None,
2007                },
2008            );
2009
2010            // Start network
2011            network.start();
2012
2013            // Register participants
2014            let Fixture {
2015                participants,
2016                schemes,
2017                ..
2018            } = fixture(&mut context, &namespace, n);
2019            let mut registrations = register_validators(&mut oracle, &participants).await;
2020
2021            // Link all validators
2022            let link = Link {
2023                latency: Duration::from_secs(3),
2024                jitter: Duration::from_millis(0),
2025                success_rate: 1.0,
2026            };
2027            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2028
2029            // Create engines
2030            let elector = L::default();
2031            let relay = Arc::new(mocks::relay::Relay::new());
2032            let mut reporters = Vec::new();
2033            let mut engine_handlers = Vec::new();
2034            for (idx, validator) in participants.iter().enumerate() {
2035                // Create scheme context
2036                let context = context.with_label(&format!("validator_{}", *validator));
2037
2038                // Configure engine
2039                let reporter_config = mocks::reporter::Config {
2040                    participants: participants.clone().try_into().unwrap(),
2041                    scheme: schemes[idx].clone(),
2042                    elector: elector.clone(),
2043                };
2044                let reporter =
2045                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2046                reporters.push(reporter.clone());
2047                let application_cfg = mocks::application::Config {
2048                    hasher: Sha256::default(),
2049                    relay: relay.clone(),
2050                    me: validator.clone(),
2051                    propose_latency: (10.0, 5.0),
2052                    verify_latency: (10.0, 5.0),
2053                    certify_latency: (10.0, 5.0),
2054                    should_certify: mocks::application::Certifier::Sometimes,
2055                };
2056                let (actor, application) = mocks::application::Application::new(
2057                    context.with_label("application"),
2058                    application_cfg,
2059                );
2060                actor.start();
2061                let blocker = oracle.control(validator.clone());
2062                let cfg = config::Config {
2063                    scheme: schemes[idx].clone(),
2064                    elector: elector.clone(),
2065                    blocker,
2066                    automaton: application.clone(),
2067                    relay: application.clone(),
2068                    reporter: reporter.clone(),
2069                    strategy: Sequential,
2070                    partition: validator.to_string(),
2071                    mailbox_size: 1024,
2072                    epoch: Epoch::new(333),
2073                    leader_timeout: Duration::from_secs(1),
2074                    certification_timeout: Duration::from_secs(2),
2075                    timeout_retry: Duration::from_secs(10),
2076                    fetch_timeout: Duration::from_secs(1),
2077                    activity_timeout,
2078                    skip_timeout,
2079                    fetch_concurrent: 4,
2080                    replay_buffer: NZUsize!(1024 * 1024),
2081                    write_buffer: NZUsize!(1024 * 1024),
2082                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2083                };
2084                let engine = Engine::new(context.with_label("engine"), cfg);
2085
2086                // Start engine
2087                let (pending, recovered, resolver) = registrations
2088                    .remove(validator)
2089                    .expect("validator should be registered");
2090                engine_handlers.push(engine.start(pending, recovered, resolver));
2091            }
2092
2093            // Wait for a few virtual minutes (shouldn't finalize anything)
2094            let mut finalizers = Vec::new();
2095            for reporter in reporters.iter_mut() {
2096                let (_, mut monitor) = reporter.subscribe().await;
2097                finalizers.push(
2098                    context
2099                        .with_label("finalizer")
2100                        .spawn(move |context| async move {
2101                            select! {
2102                                _timeout = context.sleep(Duration::from_secs(60)) => {},
2103                                _done = monitor.recv() => {
2104                                    panic!("engine should not notarize or finalize anything");
2105                                },
2106                            }
2107                        }),
2108                );
2109            }
2110            join_all(finalizers).await;
2111
2112            // Unlink all validators to get latest view
2113            link_validators(&mut oracle, &participants, Action::Unlink, None).await;
2114
2115            // Wait for a virtual minute (nothing should happen)
2116            context.sleep(Duration::from_secs(60)).await;
2117
2118            // Get latest view
2119            let mut latest = View::zero();
2120            for reporter in reporters.iter() {
2121                let nullifies = reporter.nullifies.lock();
2122                let max = nullifies.keys().max().unwrap();
2123                if *max > latest {
2124                    latest = *max;
2125                }
2126            }
2127
2128            // Update links
2129            let link = Link {
2130                latency: Duration::from_millis(10),
2131                jitter: Duration::from_millis(1),
2132                success_rate: 1.0,
2133            };
2134            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2135
2136            // Wait for all engines to finish
2137            let mut finalizers = Vec::new();
2138            for reporter in reporters.iter_mut() {
2139                let (mut latest, mut monitor) = reporter.subscribe().await;
2140                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2141                    while latest < required_containers {
2142                        latest = monitor.recv().await.expect("event missing");
2143                    }
2144                }));
2145            }
2146            join_all(finalizers).await;
2147
2148            // Check reporters for correct activity
2149            for reporter in reporters.iter() {
2150                // Ensure no faults
2151                {
2152                    let faults = reporter.faults.lock();
2153                    assert!(faults.is_empty());
2154                }
2155
2156                // Ensure no invalid signatures
2157                {
2158                    let invalid = reporter.invalid.lock();
2159                    assert_eq!(*invalid, 0);
2160                }
2161
2162                // Ensure quick recovery.
2163                //
2164                // If the skip timeout isn't implemented correctly, we may go many views before participants
2165                // start to notarize a validator's proposal.
2166                {
2167                    // Ensure nearly all views around latest are notarized.
2168                    // We don't check for finalization since some of the blocks may fail to be
2169                    // certified for the purposes of testing.
2170                    let mut found = 0;
2171                    let notarizations = reporter.notarizations.lock();
2172                    for view in View::range(latest, latest.saturating_add(activity_timeout)) {
2173                        if notarizations.contains_key(&view) {
2174                            found += 1;
2175                        }
2176                    }
2177                    assert!(
2178                        found >= activity_timeout.get().saturating_sub(2),
2179                        "found: {found}"
2180                    );
2181                }
2182            }
2183
2184            // Ensure no blocked connections
2185            let blocked = oracle.blocked().await.unwrap();
2186            assert!(blocked.is_empty());
2187        });
2188    }
2189
2190    #[test_group("slow")]
2191    #[test_traced]
2192    fn test_all_recovery() {
2193        all_recovery::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
2194        all_recovery::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
2195        all_recovery::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
2196        all_recovery::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
2197        all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2198        all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2199        all_recovery::<_, _, RoundRobin>(ed25519::fixture);
2200        all_recovery::<_, _, RoundRobin>(secp256r1::fixture);
2201    }
2202
2203    fn partition<S, F, L>(mut fixture: F)
2204    where
2205        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2206        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2207        L: Elector<S>,
2208    {
2209        // Create context
2210        let n = 10;
2211        let required_containers = View::new(50);
2212        let activity_timeout = ViewDelta::new(10);
2213        let skip_timeout = ViewDelta::new(5);
2214        let namespace = b"consensus".to_vec();
2215        let executor = deterministic::Runner::timed(Duration::from_secs(900));
2216        executor.start(|mut context| async move {
2217            // Create simulated network
2218            let (network, mut oracle) = Network::new(
2219                context.with_label("network"),
2220                Config {
2221                    max_size: 1024 * 1024,
2222                    disconnect_on_block: false,
2223                    tracked_peer_sets: None,
2224                },
2225            );
2226
2227            // Start network
2228            network.start();
2229
2230            // Register participants
2231            let Fixture {
2232                participants,
2233                schemes,
2234                ..
2235            } = fixture(&mut context, &namespace, n);
2236            let mut registrations = register_validators(&mut oracle, &participants).await;
2237
2238            // Link all validators
2239            let link = Link {
2240                latency: Duration::from_millis(10),
2241                jitter: Duration::from_millis(1),
2242                success_rate: 1.0,
2243            };
2244            link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
2245
2246            // Create engines
2247            let elector = L::default();
2248            let relay = Arc::new(mocks::relay::Relay::new());
2249            let mut reporters = Vec::new();
2250            let mut engine_handlers = Vec::new();
2251            for (idx, validator) in participants.iter().enumerate() {
2252                // Create scheme context
2253                let context = context.with_label(&format!("validator_{}", *validator));
2254
2255                // Configure engine
2256                let reporter_config = mocks::reporter::Config {
2257                    participants: participants.clone().try_into().unwrap(),
2258                    scheme: schemes[idx].clone(),
2259                    elector: elector.clone(),
2260                };
2261                let reporter =
2262                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2263                reporters.push(reporter.clone());
2264                let application_cfg = mocks::application::Config {
2265                    hasher: Sha256::default(),
2266                    relay: relay.clone(),
2267                    me: validator.clone(),
2268                    propose_latency: (10.0, 5.0),
2269                    verify_latency: (10.0, 5.0),
2270                    certify_latency: (10.0, 5.0),
2271                    should_certify: mocks::application::Certifier::Sometimes,
2272                };
2273                let (actor, application) = mocks::application::Application::new(
2274                    context.with_label("application"),
2275                    application_cfg,
2276                );
2277                actor.start();
2278                let blocker = oracle.control(validator.clone());
2279                let cfg = config::Config {
2280                    scheme: schemes[idx].clone(),
2281                    elector: elector.clone(),
2282                    blocker,
2283                    automaton: application.clone(),
2284                    relay: application.clone(),
2285                    reporter: reporter.clone(),
2286                    strategy: Sequential,
2287                    partition: validator.to_string(),
2288                    mailbox_size: 1024,
2289                    epoch: Epoch::new(333),
2290                    leader_timeout: Duration::from_secs(1),
2291                    certification_timeout: Duration::from_secs(2),
2292                    timeout_retry: Duration::from_secs(10),
2293                    fetch_timeout: Duration::from_secs(1),
2294                    activity_timeout,
2295                    skip_timeout,
2296                    fetch_concurrent: 4,
2297                    replay_buffer: NZUsize!(1024 * 1024),
2298                    write_buffer: NZUsize!(1024 * 1024),
2299                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2300                };
2301                let engine = Engine::new(context.with_label("engine"), cfg);
2302
2303                // Start engine
2304                let (pending, recovered, resolver) = registrations
2305                    .remove(validator)
2306                    .expect("validator should be registered");
2307                engine_handlers.push(engine.start(pending, recovered, resolver));
2308            }
2309
2310            // Wait for all engines to finish
2311            let mut finalizers = Vec::new();
2312            for reporter in reporters.iter_mut() {
2313                let (mut latest, mut monitor) = reporter.subscribe().await;
2314                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2315                    while latest < required_containers {
2316                        latest = monitor.recv().await.expect("event missing");
2317                    }
2318                }));
2319            }
2320            join_all(finalizers).await;
2321
2322            // Cut all links between validator halves
2323            fn separated(n: usize, a: usize, b: usize) -> bool {
2324                let m = n / 2;
2325                (a < m && b >= m) || (a >= m && b < m)
2326            }
2327            link_validators(&mut oracle, &participants, Action::Unlink, Some(separated)).await;
2328
2329            // Wait for any in-progress notarizations/finalizations to finish
2330            context.sleep(Duration::from_secs(10)).await;
2331
2332            // Wait for a few virtual minutes (shouldn't finalize anything)
2333            let mut finalizers = Vec::new();
2334            for reporter in reporters.iter_mut() {
2335                let (_, mut monitor) = reporter.subscribe().await;
2336                finalizers.push(
2337                    context
2338                        .with_label("finalizer")
2339                        .spawn(move |context| async move {
2340                            select! {
2341                                _timeout = context.sleep(Duration::from_secs(60)) => {},
2342                                _done = monitor.recv() => {
2343                                    panic!("engine should not notarize or finalize anything");
2344                                },
2345                            }
2346                        }),
2347                );
2348            }
2349            join_all(finalizers).await;
2350
2351            // Restore links
2352            link_validators(
2353                &mut oracle,
2354                &participants,
2355                Action::Link(link),
2356                Some(separated),
2357            )
2358            .await;
2359
2360            // Wait for all engines to finish
2361            let mut finalizers = Vec::new();
2362            for reporter in reporters.iter_mut() {
2363                let (mut latest, mut monitor) = reporter.subscribe().await;
2364                let required = latest.saturating_add(ViewDelta::new(required_containers.get()));
2365                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2366                    while latest < required {
2367                        latest = monitor.recv().await.expect("event missing");
2368                    }
2369                }));
2370            }
2371            join_all(finalizers).await;
2372
2373            // Check reporters for correct activity
2374            for reporter in reporters.iter() {
2375                // Ensure no faults
2376                {
2377                    let faults = reporter.faults.lock();
2378                    assert!(faults.is_empty());
2379                }
2380
2381                // Ensure no invalid signatures
2382                {
2383                    let invalid = reporter.invalid.lock();
2384                    assert_eq!(*invalid, 0);
2385                }
2386            }
2387
2388            // Ensure no blocked connections
2389            let blocked = oracle.blocked().await.unwrap();
2390            assert!(blocked.is_empty());
2391        });
2392    }
2393
2394    #[test_group("slow")]
2395    #[test_traced]
2396    fn test_partition() {
2397        partition::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
2398        partition::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
2399        partition::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
2400        partition::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
2401        partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2402        partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2403        partition::<_, _, RoundRobin>(ed25519::fixture);
2404        partition::<_, _, RoundRobin>(secp256r1::fixture);
2405    }
2406
2407    fn slow_and_lossy_links<S, F, L>(seed: u64, mut fixture: F) -> String
2408    where
2409        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2410        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2411        L: Elector<S>,
2412    {
2413        // Create context
2414        let n = 5;
2415        let required_containers = View::new(50);
2416        let activity_timeout = ViewDelta::new(10);
2417        let skip_timeout = ViewDelta::new(5);
2418        let namespace = b"consensus".to_vec();
2419        let cfg = deterministic::Config::new()
2420            .with_seed(seed)
2421            .with_timeout(Some(Duration::from_secs(5_000)));
2422        let executor = deterministic::Runner::new(cfg);
2423        executor.start(|mut context| async move {
2424            // Create simulated network
2425            let (network, mut oracle) = Network::new(
2426                context.with_label("network"),
2427                Config {
2428                    max_size: 1024 * 1024,
2429                    disconnect_on_block: false,
2430                    tracked_peer_sets: None,
2431                },
2432            );
2433
2434            // Start network
2435            network.start();
2436
2437            // Register participants
2438            let Fixture {
2439                participants,
2440                schemes,
2441                ..
2442            } = fixture(&mut context, &namespace, n);
2443            let mut registrations = register_validators(&mut oracle, &participants).await;
2444
2445            // Link all validators
2446            let degraded_link = Link {
2447                latency: Duration::from_millis(200),
2448                jitter: Duration::from_millis(150),
2449                success_rate: 0.5,
2450            };
2451            link_validators(
2452                &mut oracle,
2453                &participants,
2454                Action::Link(degraded_link),
2455                None,
2456            )
2457            .await;
2458
2459            // Create engines
2460            let elector = L::default();
2461            let relay = Arc::new(mocks::relay::Relay::new());
2462            let mut reporters = Vec::new();
2463            let mut engine_handlers = Vec::new();
2464            for (idx, validator) in participants.iter().enumerate() {
2465                // Create scheme context
2466                let context = context.with_label(&format!("validator_{}", *validator));
2467
2468                // Configure engine
2469                let reporter_config = mocks::reporter::Config {
2470                    participants: participants.clone().try_into().unwrap(),
2471                    scheme: schemes[idx].clone(),
2472                    elector: elector.clone(),
2473                };
2474                let reporter =
2475                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2476                reporters.push(reporter.clone());
2477                let application_cfg = mocks::application::Config {
2478                    hasher: Sha256::default(),
2479                    relay: relay.clone(),
2480                    me: validator.clone(),
2481                    propose_latency: (10.0, 5.0),
2482                    verify_latency: (10.0, 5.0),
2483                    certify_latency: (10.0, 5.0),
2484                    should_certify: mocks::application::Certifier::Sometimes,
2485                };
2486                let (actor, application) = mocks::application::Application::new(
2487                    context.with_label("application"),
2488                    application_cfg,
2489                );
2490                actor.start();
2491                let blocker = oracle.control(validator.clone());
2492                let cfg = config::Config {
2493                    scheme: schemes[idx].clone(),
2494                    elector: elector.clone(),
2495                    blocker,
2496                    automaton: application.clone(),
2497                    relay: application.clone(),
2498                    reporter: reporter.clone(),
2499                    strategy: Sequential,
2500                    partition: validator.to_string(),
2501                    mailbox_size: 1024,
2502                    epoch: Epoch::new(333),
2503                    leader_timeout: Duration::from_secs(1),
2504                    certification_timeout: Duration::from_secs(2),
2505                    timeout_retry: Duration::from_secs(10),
2506                    fetch_timeout: Duration::from_secs(1),
2507                    activity_timeout,
2508                    skip_timeout,
2509                    fetch_concurrent: 4,
2510                    replay_buffer: NZUsize!(1024 * 1024),
2511                    write_buffer: NZUsize!(1024 * 1024),
2512                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2513                };
2514                let engine = Engine::new(context.with_label("engine"), cfg);
2515
2516                // Start engine
2517                let (pending, recovered, resolver) = registrations
2518                    .remove(validator)
2519                    .expect("validator should be registered");
2520                engine_handlers.push(engine.start(pending, recovered, resolver));
2521            }
2522
2523            // Wait for all engines to finish
2524            let mut finalizers = Vec::new();
2525            for reporter in reporters.iter_mut() {
2526                let (mut latest, mut monitor) = reporter.subscribe().await;
2527                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2528                    while latest < required_containers {
2529                        latest = monitor.recv().await.expect("event missing");
2530                    }
2531                }));
2532            }
2533            join_all(finalizers).await;
2534
2535            // Check reporters for correct activity
2536            for reporter in reporters.iter() {
2537                // Ensure no faults
2538                {
2539                    let faults = reporter.faults.lock();
2540                    assert!(faults.is_empty());
2541                }
2542
2543                // Ensure no invalid signatures
2544                {
2545                    let invalid = reporter.invalid.lock();
2546                    assert_eq!(*invalid, 0);
2547                }
2548            }
2549
2550            // Ensure no blocked connections
2551            let blocked = oracle.blocked().await.unwrap();
2552            assert!(blocked.is_empty());
2553
2554            context.auditor().state()
2555        })
2556    }
2557
2558    #[test_group("slow")]
2559    #[test_traced]
2560    fn test_slow_and_lossy_links() {
2561        slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold_vrf::fixture::<MinPk, _>);
2562        slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold_vrf::fixture::<MinSig, _>);
2563        slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_threshold_std::fixture::<MinPk, _>);
2564        slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_threshold_std::fixture::<MinSig, _>);
2565        slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinPk, _>);
2566        slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinSig, _>);
2567        slow_and_lossy_links::<_, _, RoundRobin>(0, ed25519::fixture);
2568        slow_and_lossy_links::<_, _, RoundRobin>(0, secp256r1::fixture);
2569    }
2570
2571    #[test_group("slow")]
2572    #[test_traced]
2573    fn test_determinism() {
2574        // We use slow and lossy links as the deterministic test
2575        // because it is the most complex test.
2576        for seed in 1..6 {
2577            let ts_vrf_pk_state_1 = slow_and_lossy_links::<_, _, Random>(
2578                seed,
2579                bls12381_threshold_vrf::fixture::<MinPk, _>,
2580            );
2581            let ts_vrf_pk_state_2 = slow_and_lossy_links::<_, _, Random>(
2582                seed,
2583                bls12381_threshold_vrf::fixture::<MinPk, _>,
2584            );
2585            assert_eq!(ts_vrf_pk_state_1, ts_vrf_pk_state_2);
2586
2587            let ts_vrf_sig_state_1 = slow_and_lossy_links::<_, _, Random>(
2588                seed,
2589                bls12381_threshold_vrf::fixture::<MinSig, _>,
2590            );
2591            let ts_vrf_sig_state_2 = slow_and_lossy_links::<_, _, Random>(
2592                seed,
2593                bls12381_threshold_vrf::fixture::<MinSig, _>,
2594            );
2595            assert_eq!(ts_vrf_sig_state_1, ts_vrf_sig_state_2);
2596
2597            let ts_std_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2598                seed,
2599                bls12381_threshold_std::fixture::<MinPk, _>,
2600            );
2601            let ts_std_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2602                seed,
2603                bls12381_threshold_std::fixture::<MinPk, _>,
2604            );
2605            assert_eq!(ts_std_pk_state_1, ts_std_pk_state_2);
2606
2607            let ts_std_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2608                seed,
2609                bls12381_threshold_std::fixture::<MinSig, _>,
2610            );
2611            let ts_std_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2612                seed,
2613                bls12381_threshold_std::fixture::<MinSig, _>,
2614            );
2615            assert_eq!(ts_std_sig_state_1, ts_std_sig_state_2);
2616
2617            let ms_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2618                seed,
2619                bls12381_multisig::fixture::<MinPk, _>,
2620            );
2621            let ms_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2622                seed,
2623                bls12381_multisig::fixture::<MinPk, _>,
2624            );
2625            assert_eq!(ms_pk_state_1, ms_pk_state_2);
2626
2627            let ms_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2628                seed,
2629                bls12381_multisig::fixture::<MinSig, _>,
2630            );
2631            let ms_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2632                seed,
2633                bls12381_multisig::fixture::<MinSig, _>,
2634            );
2635            assert_eq!(ms_sig_state_1, ms_sig_state_2);
2636
2637            let ed_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
2638            let ed_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
2639            assert_eq!(ed_state_1, ed_state_2);
2640
2641            let secp_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
2642            let secp_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
2643            assert_eq!(secp_state_1, secp_state_2);
2644
2645            let states = [
2646                ("threshold-vrf-minpk", ts_vrf_pk_state_1),
2647                ("threshold-vrf-minsig", ts_vrf_sig_state_1),
2648                ("threshold-std-minpk", ts_std_pk_state_1),
2649                ("threshold-std-minsig", ts_std_sig_state_1),
2650                ("multisig-minpk", ms_pk_state_1),
2651                ("multisig-minsig", ms_sig_state_1),
2652                ("ed25519", ed_state_1),
2653                ("secp256r1", secp_state_1),
2654            ];
2655
2656            // Sanity check that different types can't be identical
2657            for pair in states.windows(2) {
2658                assert_ne!(
2659                    pair[0].1, pair[1].1,
2660                    "state {} equals state {}",
2661                    pair[0].0, pair[1].0
2662                );
2663            }
2664        }
2665    }
2666
2667    fn conflicter<S, F, L>(seed: u64, mut fixture: F)
2668    where
2669        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2670        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2671        L: Elector<S>,
2672    {
2673        // Create context
2674        let n = 4;
2675        let required_containers = View::new(50);
2676        let activity_timeout = ViewDelta::new(10);
2677        let skip_timeout = ViewDelta::new(5);
2678        let namespace = b"consensus".to_vec();
2679        let cfg = deterministic::Config::new()
2680            .with_seed(seed)
2681            .with_timeout(Some(Duration::from_secs(30)));
2682        let executor = deterministic::Runner::new(cfg);
2683        executor.start(|mut context| async move {
2684            // Create simulated network
2685            let (network, mut oracle) = Network::new(
2686                context.with_label("network"),
2687                Config {
2688                    max_size: 1024 * 1024,
2689                    disconnect_on_block: false,
2690                    tracked_peer_sets: None,
2691                },
2692            );
2693
2694            // Start network
2695            network.start();
2696
2697            // Register participants
2698            let Fixture {
2699                participants,
2700                schemes,
2701                ..
2702            } = fixture(&mut context, &namespace, n);
2703            let mut registrations = register_validators(&mut oracle, &participants).await;
2704
2705            // Link all validators
2706            let link = Link {
2707                latency: Duration::from_millis(10),
2708                jitter: Duration::from_millis(1),
2709                success_rate: 1.0,
2710            };
2711            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2712
2713            // Create engines
2714            let elector = L::default();
2715            let relay = Arc::new(mocks::relay::Relay::new());
2716            let mut reporters = Vec::new();
2717            for (idx_scheme, validator) in participants.iter().enumerate() {
2718                // Create scheme context
2719                let context = context.with_label(&format!("validator_{}", *validator));
2720
2721                // Start engine
2722                let reporter_config = mocks::reporter::Config {
2723                    participants: participants.clone().try_into().unwrap(),
2724                    scheme: schemes[idx_scheme].clone(),
2725                    elector: elector.clone(),
2726                };
2727                let reporter =
2728                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2729                let (pending, recovered, resolver) = registrations
2730                    .remove(validator)
2731                    .expect("validator should be registered");
2732                if idx_scheme == 0 {
2733                    let cfg = mocks::conflicter::Config {
2734                        scheme: schemes[idx_scheme].clone(),
2735                    };
2736
2737                    let engine: mocks::conflicter::Conflicter<_, _, Sha256> =
2738                        mocks::conflicter::Conflicter::new(
2739                            context.with_label("byzantine_engine"),
2740                            cfg,
2741                        );
2742                    engine.start(pending);
2743                } else {
2744                    reporters.push(reporter.clone());
2745                    let application_cfg = mocks::application::Config {
2746                        hasher: Sha256::default(),
2747                        relay: relay.clone(),
2748                        me: validator.clone(),
2749                        propose_latency: (10.0, 5.0),
2750                        verify_latency: (10.0, 5.0),
2751                        certify_latency: (10.0, 5.0),
2752                        should_certify: mocks::application::Certifier::Sometimes,
2753                    };
2754                    let (actor, application) = mocks::application::Application::new(
2755                        context.with_label("application"),
2756                        application_cfg,
2757                    );
2758                    actor.start();
2759                    let blocker = oracle.control(validator.clone());
2760                    let cfg = config::Config {
2761                        scheme: schemes[idx_scheme].clone(),
2762                        elector: elector.clone(),
2763                        blocker,
2764                        automaton: application.clone(),
2765                        relay: application.clone(),
2766                        reporter: reporter.clone(),
2767                        strategy: Sequential,
2768                        partition: validator.to_string(),
2769                        mailbox_size: 1024,
2770                        epoch: Epoch::new(333),
2771                        leader_timeout: Duration::from_secs(1),
2772                        certification_timeout: Duration::from_secs(2),
2773                        timeout_retry: Duration::from_secs(10),
2774                        fetch_timeout: Duration::from_secs(1),
2775                        activity_timeout,
2776                        skip_timeout,
2777                        fetch_concurrent: 4,
2778                        replay_buffer: NZUsize!(1024 * 1024),
2779                        write_buffer: NZUsize!(1024 * 1024),
2780                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2781                    };
2782                    let engine = Engine::new(context.with_label("engine"), cfg);
2783                    engine.start(pending, recovered, resolver);
2784                }
2785            }
2786
2787            // Wait for all engines to finish
2788            let mut finalizers = Vec::new();
2789            for reporter in reporters.iter_mut() {
2790                let (mut latest, mut monitor) = reporter.subscribe().await;
2791                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2792                    while latest < required_containers {
2793                        latest = monitor.recv().await.expect("event missing");
2794                    }
2795                }));
2796            }
2797            join_all(finalizers).await;
2798
2799            // Check reporters for correct activity
2800            let byz = &participants[0];
2801            let mut count_conflicting = 0;
2802            for reporter in reporters.iter() {
2803                // Ensure only faults for byz
2804                {
2805                    let faults = reporter.faults.lock();
2806                    assert_eq!(faults.len(), 1);
2807                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
2808                    for (_, faults) in faulter.iter() {
2809                        for fault in faults.iter() {
2810                            match fault {
2811                                Activity::ConflictingNotarize(_) => {
2812                                    count_conflicting += 1;
2813                                }
2814                                Activity::ConflictingFinalize(_) => {
2815                                    count_conflicting += 1;
2816                                }
2817                                _ => panic!("unexpected fault: {fault:?}"),
2818                            }
2819                        }
2820                    }
2821                }
2822
2823                // Ensure no invalid signatures
2824                {
2825                    let invalid = reporter.invalid.lock();
2826                    assert_eq!(*invalid, 0);
2827                }
2828            }
2829            assert!(count_conflicting > 0);
2830
2831            // Ensure conflicter is blocked
2832            let blocked = oracle.blocked().await.unwrap();
2833            assert!(!blocked.is_empty());
2834            for (a, b) in blocked {
2835                assert_ne!(&a, byz);
2836                assert_eq!(&b, byz);
2837            }
2838        });
2839    }
2840
2841    #[test_group("slow")]
2842    #[test_traced]
2843    fn test_conflicter() {
2844        for seed in 0..5 {
2845            conflicter::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
2846            conflicter::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
2847            conflicter::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
2848            conflicter::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
2849            conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
2850            conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
2851            conflicter::<_, _, RoundRobin>(seed, ed25519::fixture);
2852            conflicter::<_, _, RoundRobin>(seed, secp256r1::fixture);
2853        }
2854    }
2855
2856    fn invalid<S, F, L>(seed: u64, mut fixture: F)
2857    where
2858        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2859        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2860        L: Elector<S>,
2861    {
2862        // Create context
2863        let n = 4;
2864        let required_containers = View::new(50);
2865        let activity_timeout = ViewDelta::new(10);
2866        let skip_timeout = ViewDelta::new(5);
2867        let namespace = b"consensus".to_vec();
2868        let cfg = deterministic::Config::new()
2869            .with_seed(seed)
2870            .with_timeout(Some(Duration::from_secs(30)));
2871        let executor = deterministic::Runner::new(cfg);
2872        executor.start(|mut context| async move {
2873            // Create simulated network
2874            let (network, mut oracle) = Network::new(
2875                context.with_label("network"),
2876                Config {
2877                    max_size: 1024 * 1024,
2878                    disconnect_on_block: false,
2879                    tracked_peer_sets: None,
2880                },
2881            );
2882
2883            // Start network
2884            network.start();
2885
2886            // Register participants
2887            let Fixture {
2888                participants,
2889                schemes,
2890                ..
2891            } = fixture(&mut context, &namespace, n);
2892
2893            // Create scheme with wrong namespace for byzantine node (index 0)
2894            let Fixture {
2895                schemes: wrong_schemes,
2896                ..
2897            } = fixture(&mut context, b"wrong-namespace", n);
2898
2899            let mut registrations = register_validators(&mut oracle, &participants).await;
2900
2901            // Link all validators
2902            let link = Link {
2903                latency: Duration::from_millis(10),
2904                jitter: Duration::from_millis(1),
2905                success_rate: 1.0,
2906            };
2907            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2908
2909            // Create engines
2910            let elector = L::default();
2911            let relay = Arc::new(mocks::relay::Relay::new());
2912            let mut reporters = Vec::new();
2913            for (idx_scheme, validator) in participants.iter().enumerate() {
2914                // Create scheme context
2915                let context = context.with_label(&format!("validator_{}", *validator));
2916
2917                // Byzantine node (idx 0) uses wrong namespace scheme to produce invalid signatures
2918                // Honest nodes use correct namespace schemes
2919                let scheme = if idx_scheme == 0 {
2920                    wrong_schemes[idx_scheme].clone()
2921                } else {
2922                    schemes[idx_scheme].clone()
2923                };
2924
2925                let reporter_config = mocks::reporter::Config {
2926                    participants: participants.clone().try_into().unwrap(),
2927                    scheme: schemes[idx_scheme].clone(),
2928                    elector: elector.clone(),
2929                };
2930                let reporter =
2931                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2932                reporters.push(reporter.clone());
2933
2934                let application_cfg = mocks::application::Config {
2935                    hasher: Sha256::default(),
2936                    relay: relay.clone(),
2937                    me: validator.clone(),
2938                    propose_latency: (10.0, 5.0),
2939                    verify_latency: (10.0, 5.0),
2940                    certify_latency: (10.0, 5.0),
2941                    should_certify: mocks::application::Certifier::Sometimes,
2942                };
2943                let (actor, application) = mocks::application::Application::new(
2944                    context.with_label("application"),
2945                    application_cfg,
2946                );
2947                actor.start();
2948                let blocker = oracle.control(validator.clone());
2949                let leader_timeout = if idx_scheme == 0 {
2950                    // Force the wrong-namespace byzantine node to timeout quickly so
2951                    // honest nodes deterministically observe at least one invalid vote.
2952                    Duration::from_millis(100)
2953                } else {
2954                    Duration::from_secs(1)
2955                };
2956                let cfg = config::Config {
2957                    scheme,
2958                    elector: elector.clone(),
2959                    blocker,
2960                    automaton: application.clone(),
2961                    relay: application.clone(),
2962                    reporter: reporter.clone(),
2963                    strategy: Sequential,
2964                    partition: validator.clone().to_string(),
2965                    mailbox_size: 1024,
2966                    epoch: Epoch::new(333),
2967                    leader_timeout,
2968                    certification_timeout: Duration::from_secs(2),
2969                    timeout_retry: Duration::from_secs(10),
2970                    fetch_timeout: Duration::from_secs(1),
2971                    activity_timeout,
2972                    skip_timeout,
2973                    fetch_concurrent: 4,
2974                    replay_buffer: NZUsize!(1024 * 1024),
2975                    write_buffer: NZUsize!(1024 * 1024),
2976                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2977                };
2978                let engine = Engine::new(context.with_label("engine"), cfg);
2979                let (pending, recovered, resolver) = registrations
2980                    .remove(validator)
2981                    .expect("validator should be registered");
2982                engine.start(pending, recovered, resolver);
2983            }
2984
2985            // Wait for honest engines to finish (skip byzantine node at index 0)
2986            let mut finalizers = Vec::new();
2987            for reporter in reporters.iter_mut().skip(1) {
2988                let (mut latest, mut monitor) = reporter.subscribe().await;
2989                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2990                    while latest < required_containers {
2991                        latest = monitor.recv().await.expect("event missing");
2992                    }
2993                }));
2994            }
2995            join_all(finalizers).await;
2996
2997            // Check honest reporters (reporters[1..]) for correct activity
2998            let mut invalid_count = 0;
2999            for reporter in reporters.iter().skip(1) {
3000                // Ensure no faults
3001                {
3002                    let faults = reporter.faults.lock();
3003                    assert!(faults.is_empty());
3004                }
3005
3006                // Count invalid signatures
3007                {
3008                    let invalid = reporter.invalid.lock();
3009                    if *invalid > 0 {
3010                        invalid_count += 1;
3011                    }
3012                }
3013            }
3014
3015            // All honest nodes should see invalid signatures from the byzantine node
3016            assert_eq!(invalid_count, n - 1);
3017
3018            // Ensure byzantine node is blocked by honest nodes
3019            let blocked = oracle.blocked().await.unwrap();
3020            assert!(!blocked.is_empty());
3021            for (a, b) in blocked {
3022                if a != participants[0] {
3023                    assert_eq!(b, participants[0]);
3024                }
3025            }
3026        });
3027    }
3028
3029    #[test_group("slow")]
3030    #[test_traced]
3031    fn test_invalid() {
3032        for seed in 0..5 {
3033            invalid::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3034            invalid::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3035            invalid::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3036            invalid::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3037            invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3038            invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3039            invalid::<_, _, RoundRobin>(seed, ed25519::fixture);
3040            invalid::<_, _, RoundRobin>(seed, secp256r1::fixture);
3041        }
3042    }
3043
3044    fn impersonator<S, F, L>(seed: u64, mut fixture: F)
3045    where
3046        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3047        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3048        L: Elector<S>,
3049    {
3050        // Create context
3051        let n = 4;
3052        let required_containers = View::new(50);
3053        let activity_timeout = ViewDelta::new(10);
3054        let skip_timeout = ViewDelta::new(5);
3055        let namespace = b"consensus".to_vec();
3056        let cfg = deterministic::Config::new()
3057            .with_seed(seed)
3058            .with_timeout(Some(Duration::from_secs(30)));
3059        let executor = deterministic::Runner::new(cfg);
3060        executor.start(|mut context| async move {
3061            // Create simulated network
3062            let (network, mut oracle) = Network::new(
3063                context.with_label("network"),
3064                Config {
3065                    max_size: 1024 * 1024,
3066                    disconnect_on_block: false,
3067                    tracked_peer_sets: None,
3068                },
3069            );
3070
3071            // Start network
3072            network.start();
3073
3074            // Register participants
3075            let Fixture {
3076                participants,
3077                schemes,
3078                ..
3079            } = fixture(&mut context, &namespace, n);
3080            let mut registrations = register_validators(&mut oracle, &participants).await;
3081
3082            // Link all validators
3083            let link = Link {
3084                latency: Duration::from_millis(10),
3085                jitter: Duration::from_millis(1),
3086                success_rate: 1.0,
3087            };
3088            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3089
3090            // Create engines
3091            let elector = L::default();
3092            let relay = Arc::new(mocks::relay::Relay::new());
3093            let mut reporters = Vec::new();
3094            for (idx_scheme, validator) in participants.iter().enumerate() {
3095                // Create scheme context
3096                let context = context.with_label(&format!("validator_{}", *validator));
3097
3098                // Start engine
3099                let reporter_config = mocks::reporter::Config {
3100                    participants: participants.clone().try_into().unwrap(),
3101                    scheme: schemes[idx_scheme].clone(),
3102                    elector: elector.clone(),
3103                };
3104                let reporter =
3105                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3106                let (pending, recovered, resolver) = registrations
3107                    .remove(validator)
3108                    .expect("validator should be registered");
3109                if idx_scheme == 0 {
3110                    let cfg = mocks::impersonator::Config {
3111                        scheme: schemes[idx_scheme].clone(),
3112                    };
3113
3114                    let engine: mocks::impersonator::Impersonator<_, _, Sha256> =
3115                        mocks::impersonator::Impersonator::new(
3116                            context.with_label("byzantine_engine"),
3117                            cfg,
3118                        );
3119                    engine.start(pending);
3120                } else {
3121                    reporters.push(reporter.clone());
3122                    let application_cfg = mocks::application::Config {
3123                        hasher: Sha256::default(),
3124                        relay: relay.clone(),
3125                        me: validator.clone(),
3126                        propose_latency: (10.0, 5.0),
3127                        verify_latency: (10.0, 5.0),
3128                        certify_latency: (10.0, 5.0),
3129                        should_certify: mocks::application::Certifier::Sometimes,
3130                    };
3131                    let (actor, application) = mocks::application::Application::new(
3132                        context.with_label("application"),
3133                        application_cfg,
3134                    );
3135                    actor.start();
3136                    let blocker = oracle.control(validator.clone());
3137                    let cfg = config::Config {
3138                        scheme: schemes[idx_scheme].clone(),
3139                        elector: elector.clone(),
3140                        blocker,
3141                        automaton: application.clone(),
3142                        relay: application.clone(),
3143                        reporter: reporter.clone(),
3144                        strategy: Sequential,
3145                        partition: validator.clone().to_string(),
3146                        mailbox_size: 1024,
3147                        epoch: Epoch::new(333),
3148                        leader_timeout: Duration::from_secs(1),
3149                        certification_timeout: Duration::from_secs(2),
3150                        timeout_retry: Duration::from_secs(10),
3151                        fetch_timeout: Duration::from_secs(1),
3152                        activity_timeout,
3153                        skip_timeout,
3154                        fetch_concurrent: 4,
3155                        replay_buffer: NZUsize!(1024 * 1024),
3156                        write_buffer: NZUsize!(1024 * 1024),
3157                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3158                    };
3159                    let engine = Engine::new(context.with_label("engine"), cfg);
3160                    engine.start(pending, recovered, resolver);
3161                }
3162            }
3163
3164            // Wait for all engines to finish
3165            let mut finalizers = Vec::new();
3166            for reporter in reporters.iter_mut() {
3167                let (mut latest, mut monitor) = reporter.subscribe().await;
3168                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3169                    while latest < required_containers {
3170                        latest = monitor.recv().await.expect("event missing");
3171                    }
3172                }));
3173            }
3174            join_all(finalizers).await;
3175
3176            // Check reporters for correct activity
3177            let byz = &participants[0];
3178            for reporter in reporters.iter() {
3179                // Ensure no faults
3180                {
3181                    let faults = reporter.faults.lock();
3182                    assert!(faults.is_empty());
3183                }
3184
3185                // Ensure no invalid signatures
3186                {
3187                    let invalid = reporter.invalid.lock();
3188                    assert_eq!(*invalid, 0);
3189                }
3190            }
3191
3192            // Ensure invalid is blocked
3193            let blocked = oracle.blocked().await.unwrap();
3194            assert!(!blocked.is_empty());
3195            for (a, b) in blocked {
3196                assert_ne!(&a, byz);
3197                assert_eq!(&b, byz);
3198            }
3199        });
3200    }
3201
3202    #[test_group("slow")]
3203    #[test_traced]
3204    fn test_impersonator() {
3205        for seed in 0..5 {
3206            impersonator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3207            impersonator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3208            impersonator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3209            impersonator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3210            impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3211            impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3212            impersonator::<_, _, RoundRobin>(seed, ed25519::fixture);
3213            impersonator::<_, _, RoundRobin>(seed, secp256r1::fixture);
3214        }
3215    }
3216
3217    fn equivocator<S, F, L>(seed: u64, mut fixture: F) -> bool
3218    where
3219        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3220        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3221        L: Elector<S>,
3222    {
3223        // Create context
3224        let n = 7;
3225        let required_containers = View::new(50);
3226        let activity_timeout = ViewDelta::new(10);
3227        let skip_timeout = ViewDelta::new(5);
3228        let namespace = b"consensus".to_vec();
3229        let cfg = deterministic::Config::new()
3230            .with_seed(seed)
3231            .with_timeout(Some(Duration::from_secs(60)));
3232        let executor = deterministic::Runner::new(cfg);
3233        executor.start(|mut context| async move {
3234            // Create simulated network
3235            let (network, mut oracle) = Network::new(
3236                context.with_label("network"),
3237                Config {
3238                    max_size: 1024 * 1024,
3239                    disconnect_on_block: false,
3240                    tracked_peer_sets: None,
3241                },
3242            );
3243
3244            // Start network
3245            network.start();
3246
3247            // Register participants
3248            let Fixture {
3249                participants,
3250                schemes,
3251                ..
3252            } = fixture(&mut context, &namespace, n);
3253            let mut registrations = register_validators(&mut oracle, &participants).await;
3254
3255            // Link all validators
3256            let link = Link {
3257                latency: Duration::from_millis(10),
3258                jitter: Duration::from_millis(1),
3259                success_rate: 1.0,
3260            };
3261            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3262
3263            // Create engines
3264            let elector = L::default();
3265            let mut engines = Vec::new();
3266            let relay = Arc::new(mocks::relay::Relay::new());
3267            let mut reporters = Vec::new();
3268            for (idx_scheme, validator) in participants.iter().enumerate() {
3269                // Create scheme context
3270                let context = context.with_label(&format!("validator_{}", *validator));
3271
3272                // Start engine
3273                let reporter_config = mocks::reporter::Config {
3274                    participants: participants.clone().try_into().unwrap(),
3275                    scheme: schemes[idx_scheme].clone(),
3276                    elector: elector.clone(),
3277                };
3278                let reporter =
3279                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3280                reporters.push(reporter.clone());
3281                let (pending, recovered, resolver) = registrations
3282                    .remove(validator)
3283                    .expect("validator should be registered");
3284                if idx_scheme == 0 {
3285                    let cfg = mocks::equivocator::Config {
3286                        scheme: schemes[idx_scheme].clone(),
3287                        epoch: Epoch::new(333),
3288                        relay: relay.clone(),
3289                        hasher: Sha256::default(),
3290                        elector: elector.clone(),
3291                    };
3292
3293                    let engine = mocks::equivocator::Equivocator::new(
3294                        context.with_label("byzantine_engine"),
3295                        cfg,
3296                    );
3297                    engines.push(engine.start(pending, recovered));
3298                } else {
3299                    let application_cfg = mocks::application::Config {
3300                        hasher: Sha256::default(),
3301                        relay: relay.clone(),
3302                        me: validator.clone(),
3303                        propose_latency: (10.0, 5.0),
3304                        verify_latency: (10.0, 5.0),
3305                        certify_latency: (10.0, 5.0),
3306                        should_certify: mocks::application::Certifier::Sometimes,
3307                    };
3308                    let (actor, application) = mocks::application::Application::new(
3309                        context.with_label("application"),
3310                        application_cfg,
3311                    );
3312                    actor.start();
3313                    let blocker = oracle.control(validator.clone());
3314                    let cfg = config::Config {
3315                        scheme: schemes[idx_scheme].clone(),
3316                        elector: elector.clone(),
3317                        blocker,
3318                        automaton: application.clone(),
3319                        relay: application.clone(),
3320                        reporter: reporter.clone(),
3321                        strategy: Sequential,
3322                        partition: validator.to_string(),
3323                        mailbox_size: 1024,
3324                        epoch: Epoch::new(333),
3325                        leader_timeout: Duration::from_secs(1),
3326                        certification_timeout: Duration::from_secs(2),
3327                        timeout_retry: Duration::from_secs(10),
3328                        fetch_timeout: Duration::from_secs(1),
3329                        activity_timeout,
3330                        skip_timeout,
3331                        fetch_concurrent: 4,
3332                        replay_buffer: NZUsize!(1024 * 1024),
3333                        write_buffer: NZUsize!(1024 * 1024),
3334                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3335                    };
3336                    let engine = Engine::new(context.with_label("engine"), cfg);
3337                    engines.push(engine.start(pending, recovered, resolver));
3338                }
3339            }
3340
3341            // Wait for all engines to hit required containers
3342            let mut finalizers = Vec::new();
3343            for reporter in reporters.iter_mut().skip(1) {
3344                let (mut latest, mut monitor) = reporter.subscribe().await;
3345                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3346                    while latest < required_containers {
3347                        latest = monitor.recv().await.expect("event missing");
3348                    }
3349                }));
3350            }
3351            join_all(finalizers).await;
3352
3353            // Abort a validator
3354            let idx = context.gen_range(1..engines.len()); // skip byzantine validator
3355            let validator = &participants[idx];
3356            let handle = engines.remove(idx);
3357            handle.abort();
3358            let _ = handle.await;
3359            reporters.remove(idx);
3360            info!(idx, ?validator, "aborted validator");
3361
3362            // Wait for all engines to hit required containers
3363            let mut finalizers = Vec::new();
3364            for reporter in reporters.iter_mut().skip(1) {
3365                let (mut latest, mut monitor) = reporter.subscribe().await;
3366                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3367                    while latest < View::new(required_containers.get() * 2) {
3368                        latest = monitor.recv().await.expect("event missing");
3369                    }
3370                }));
3371            }
3372            join_all(finalizers).await;
3373
3374            // Recreate engine
3375            info!(idx, ?validator, "restarting validator");
3376            let context = context.with_label(&format!("validator_{}_restarted", *validator));
3377
3378            // Start engine
3379            let reporter_config = mocks::reporter::Config {
3380                participants: participants.clone().try_into().unwrap(),
3381                scheme: schemes[idx].clone(),
3382                elector: elector.clone(),
3383            };
3384            let reporter =
3385                mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3386            let (pending, recovered, resolver) =
3387                register_validator(&mut oracle, validator.clone()).await;
3388            reporters.push(reporter.clone());
3389            let application_cfg = mocks::application::Config {
3390                hasher: Sha256::default(),
3391                relay: relay.clone(),
3392                me: validator.clone(),
3393                propose_latency: (10.0, 5.0),
3394                verify_latency: (10.0, 5.0),
3395                certify_latency: (10.0, 5.0),
3396                should_certify: mocks::application::Certifier::Sometimes,
3397            };
3398            let (actor, application) = mocks::application::Application::new(
3399                context.with_label("application"),
3400                application_cfg,
3401            );
3402            actor.start();
3403            let blocker = oracle.control(validator.clone());
3404            let cfg = config::Config {
3405                scheme: schemes[idx].clone(),
3406                elector: elector.clone(),
3407                blocker,
3408                automaton: application.clone(),
3409                relay: application.clone(),
3410                reporter: reporter.clone(),
3411                strategy: Sequential,
3412                partition: validator.to_string(),
3413                mailbox_size: 1024,
3414                epoch: Epoch::new(333),
3415                leader_timeout: Duration::from_secs(1),
3416                certification_timeout: Duration::from_secs(2),
3417                timeout_retry: Duration::from_secs(10),
3418                fetch_timeout: Duration::from_secs(1),
3419                activity_timeout,
3420                skip_timeout,
3421                fetch_concurrent: 4,
3422                replay_buffer: NZUsize!(1024 * 1024),
3423                write_buffer: NZUsize!(1024 * 1024),
3424                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3425            };
3426            let engine = Engine::new(context.with_label("engine"), cfg);
3427            engine.start(pending, recovered, resolver);
3428
3429            // Wait for all engines to hit required containers
3430            let mut finalizers = Vec::new();
3431            for reporter in reporters.iter_mut().skip(1) {
3432                let (mut latest, mut monitor) = reporter.subscribe().await;
3433                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3434                    while latest < View::new(required_containers.get() * 3) {
3435                        latest = monitor.recv().await.expect("event missing");
3436                    }
3437                }));
3438            }
3439            join_all(finalizers).await;
3440
3441            // Check equivocator blocking (we aren't guaranteed a fault will be produced
3442            // because it may not be possible to extract a conflicting vote from the certificate
3443            // we receive)
3444            let byz = &participants[0];
3445            let blocked = oracle.blocked().await.unwrap();
3446            for (a, b) in &blocked {
3447                assert_ne!(a, byz);
3448                assert_eq!(b, byz);
3449            }
3450            !blocked.is_empty()
3451        })
3452    }
3453
3454    #[test_group("slow")]
3455    #[test_traced]
3456    fn test_equivocator_bls12381_threshold_vrf_min_pk() {
3457        let detected = (0..5).any(|seed| {
3458            equivocator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>)
3459        });
3460        assert!(
3461            detected,
3462            "expected at least one seed to detect equivocation"
3463        );
3464    }
3465
3466    #[test_group("slow")]
3467    #[test_traced]
3468    fn test_equivocator_bls12381_threshold_vrf_min_sig() {
3469        let detected = (0..5).any(|seed| {
3470            equivocator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>)
3471        });
3472        assert!(
3473            detected,
3474            "expected at least one seed to detect equivocation"
3475        );
3476    }
3477
3478    #[test_group("slow")]
3479    #[test_traced]
3480    fn test_equivocator_bls12381_threshold_std_min_pk() {
3481        let detected = (0..5).any(|seed| {
3482            equivocator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>)
3483        });
3484        assert!(
3485            detected,
3486            "expected at least one seed to detect equivocation"
3487        );
3488    }
3489
3490    #[test_group("slow")]
3491    #[test_traced]
3492    fn test_equivocator_bls12381_threshold_std_min_sig() {
3493        let detected = (0..5).any(|seed| {
3494            equivocator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>)
3495        });
3496        assert!(
3497            detected,
3498            "expected at least one seed to detect equivocation"
3499        );
3500    }
3501
3502    #[test_group("slow")]
3503    #[test_traced]
3504    fn test_equivocator_bls12381_multisig_min_pk() {
3505        let detected = (0..5).any(|seed| {
3506            equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>)
3507        });
3508        assert!(
3509            detected,
3510            "expected at least one seed to detect equivocation"
3511        );
3512    }
3513
3514    #[test_group("slow")]
3515    #[test_traced]
3516    fn test_equivocator_bls12381_multisig_min_sig() {
3517        let detected = (0..5).any(|seed| {
3518            equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>)
3519        });
3520        assert!(
3521            detected,
3522            "expected at least one seed to detect equivocation"
3523        );
3524    }
3525
3526    #[test_group("slow")]
3527    #[test_traced]
3528    fn test_equivocator_ed25519() {
3529        let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, ed25519::fixture));
3530        assert!(
3531            detected,
3532            "expected at least one seed to detect equivocation"
3533        );
3534    }
3535
3536    #[test_group("slow")]
3537    #[test_traced]
3538    fn test_equivocator_secp256r1() {
3539        let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, secp256r1::fixture));
3540        assert!(
3541            detected,
3542            "expected at least one seed to detect equivocation"
3543        );
3544    }
3545
3546    fn reconfigurer<S, F, L>(seed: u64, mut fixture: F)
3547    where
3548        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3549        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3550        L: Elector<S>,
3551    {
3552        // Create context
3553        let n = 4;
3554        let required_containers = View::new(50);
3555        let activity_timeout = ViewDelta::new(10);
3556        let skip_timeout = ViewDelta::new(5);
3557        let namespace = b"consensus".to_vec();
3558        let cfg = deterministic::Config::new()
3559            .with_seed(seed)
3560            .with_timeout(Some(Duration::from_secs(30)));
3561        let executor = deterministic::Runner::new(cfg);
3562        executor.start(|mut context| async move {
3563            // Create simulated network
3564            let (network, mut oracle) = Network::new(
3565                context.with_label("network"),
3566                Config {
3567                    max_size: 1024 * 1024,
3568                    disconnect_on_block: false,
3569                    tracked_peer_sets: None,
3570                },
3571            );
3572
3573            // Start network
3574            network.start();
3575
3576            // Register participants
3577            let Fixture {
3578                participants,
3579                schemes,
3580                ..
3581            } = fixture(&mut context, &namespace, n);
3582            let mut registrations = register_validators(&mut oracle, &participants).await;
3583
3584            // Link all validators
3585            let link = Link {
3586                latency: Duration::from_millis(10),
3587                jitter: Duration::from_millis(1),
3588                success_rate: 1.0,
3589            };
3590            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3591
3592            // Create engines
3593            let elector = L::default();
3594            let relay = Arc::new(mocks::relay::Relay::new());
3595            let mut reporters = Vec::new();
3596            for (idx_scheme, validator) in participants.iter().enumerate() {
3597                // Create scheme context
3598                let context = context.with_label(&format!("validator_{}", *validator));
3599
3600                // Start engine
3601                let reporter_config = mocks::reporter::Config {
3602                    participants: participants.clone().try_into().unwrap(),
3603                    scheme: schemes[idx_scheme].clone(),
3604                    elector: elector.clone(),
3605                };
3606                let reporter =
3607                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3608                let (pending, recovered, resolver) = registrations
3609                    .remove(validator)
3610                    .expect("validator should be registered");
3611                if idx_scheme == 0 {
3612                    let cfg = mocks::reconfigurer::Config {
3613                        scheme: schemes[idx_scheme].clone(),
3614                    };
3615                    let engine: mocks::reconfigurer::Reconfigurer<_, _, Sha256> =
3616                        mocks::reconfigurer::Reconfigurer::new(
3617                            context.with_label("byzantine_engine"),
3618                            cfg,
3619                        );
3620                    engine.start(pending);
3621                } else {
3622                    reporters.push(reporter.clone());
3623                    let application_cfg = mocks::application::Config {
3624                        hasher: Sha256::default(),
3625                        relay: relay.clone(),
3626                        me: validator.clone(),
3627                        propose_latency: (10.0, 5.0),
3628                        verify_latency: (10.0, 5.0),
3629                        certify_latency: (10.0, 5.0),
3630                        should_certify: mocks::application::Certifier::Sometimes,
3631                    };
3632                    let (actor, application) = mocks::application::Application::new(
3633                        context.with_label("application"),
3634                        application_cfg,
3635                    );
3636                    actor.start();
3637                    let blocker = oracle.control(validator.clone());
3638                    let cfg = config::Config {
3639                        scheme: schemes[idx_scheme].clone(),
3640                        elector: elector.clone(),
3641                        blocker,
3642                        automaton: application.clone(),
3643                        relay: application.clone(),
3644                        reporter: reporter.clone(),
3645                        strategy: Sequential,
3646                        partition: validator.to_string(),
3647                        mailbox_size: 1024,
3648                        epoch: Epoch::new(333),
3649                        leader_timeout: Duration::from_secs(1),
3650                        certification_timeout: Duration::from_secs(2),
3651                        timeout_retry: Duration::from_secs(10),
3652                        fetch_timeout: Duration::from_secs(1),
3653                        activity_timeout,
3654                        skip_timeout,
3655                        fetch_concurrent: 4,
3656                        replay_buffer: NZUsize!(1024 * 1024),
3657                        write_buffer: NZUsize!(1024 * 1024),
3658                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3659                    };
3660                    let engine = Engine::new(context.with_label("engine"), cfg);
3661                    engine.start(pending, recovered, resolver);
3662                }
3663            }
3664
3665            // Wait for all engines to finish
3666            let mut finalizers = Vec::new();
3667            for reporter in reporters.iter_mut() {
3668                let (mut latest, mut monitor) = reporter.subscribe().await;
3669                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3670                    while latest < required_containers {
3671                        latest = monitor.recv().await.expect("event missing");
3672                    }
3673                }));
3674            }
3675            join_all(finalizers).await;
3676
3677            // Check reporters for correct activity
3678            let byz = &participants[0];
3679            for reporter in reporters.iter() {
3680                // Ensure no faults
3681                {
3682                    let faults = reporter.faults.lock();
3683                    assert!(faults.is_empty());
3684                }
3685
3686                // Ensure no invalid signatures
3687                {
3688                    let invalid = reporter.invalid.lock();
3689                    assert_eq!(*invalid, 0);
3690                }
3691            }
3692
3693            // Ensure reconfigurer is blocked (epoch mismatch)
3694            let blocked = oracle.blocked().await.unwrap();
3695            assert!(!blocked.is_empty());
3696            for (a, b) in blocked {
3697                assert_ne!(&a, byz);
3698                assert_eq!(&b, byz);
3699            }
3700        });
3701    }
3702
3703    #[test_group("slow")]
3704    #[test_traced]
3705    fn test_reconfigurer() {
3706        for seed in 0..5 {
3707            reconfigurer::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3708            reconfigurer::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3709            reconfigurer::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3710            reconfigurer::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3711            reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3712            reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3713            reconfigurer::<_, _, RoundRobin>(seed, ed25519::fixture);
3714            reconfigurer::<_, _, RoundRobin>(seed, secp256r1::fixture);
3715        }
3716    }
3717
3718    fn nuller<S, F, L>(seed: u64, mut fixture: F)
3719    where
3720        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3721        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3722        L: Elector<S>,
3723    {
3724        // Create context
3725        let n = 4;
3726        let required_containers = View::new(50);
3727        let activity_timeout = ViewDelta::new(10);
3728        let skip_timeout = ViewDelta::new(5);
3729        let namespace = b"consensus".to_vec();
3730        let cfg = deterministic::Config::new()
3731            .with_seed(seed)
3732            .with_timeout(Some(Duration::from_secs(30)));
3733        let executor = deterministic::Runner::new(cfg);
3734        executor.start(|mut context| async move {
3735            // Create simulated network
3736            let (network, mut oracle) = Network::new(
3737                context.with_label("network"),
3738                Config {
3739                    max_size: 1024 * 1024,
3740                    disconnect_on_block: false,
3741                    tracked_peer_sets: None,
3742                },
3743            );
3744
3745            // Start network
3746            network.start();
3747
3748            // Register participants
3749            let Fixture {
3750                participants,
3751                schemes,
3752                ..
3753            } = fixture(&mut context, &namespace, n);
3754            let mut registrations = register_validators(&mut oracle, &participants).await;
3755
3756            // Link all validators
3757            let link = Link {
3758                latency: Duration::from_millis(10),
3759                jitter: Duration::from_millis(1),
3760                success_rate: 1.0,
3761            };
3762            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3763
3764            // Create engines
3765            let elector = L::default();
3766            let relay = Arc::new(mocks::relay::Relay::new());
3767            let mut reporters = Vec::new();
3768            for (idx_scheme, validator) in participants.iter().enumerate() {
3769                // Create scheme context
3770                let context = context.with_label(&format!("validator_{}", *validator));
3771
3772                // Start engine
3773                let reporter_config = mocks::reporter::Config {
3774                    participants: participants.clone().try_into().unwrap(),
3775                    scheme: schemes[idx_scheme].clone(),
3776                    elector: elector.clone(),
3777                };
3778                let reporter =
3779                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3780                let (pending, recovered, resolver) = registrations
3781                    .remove(validator)
3782                    .expect("validator should be registered");
3783                if idx_scheme == 0 {
3784                    let cfg = mocks::nuller::Config {
3785                        scheme: schemes[idx_scheme].clone(),
3786                    };
3787                    let engine: mocks::nuller::Nuller<_, _, Sha256> =
3788                        mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
3789                    engine.start(pending);
3790                } else {
3791                    reporters.push(reporter.clone());
3792                    let application_cfg = mocks::application::Config {
3793                        hasher: Sha256::default(),
3794                        relay: relay.clone(),
3795                        me: validator.clone(),
3796                        propose_latency: (10.0, 5.0),
3797                        verify_latency: (10.0, 5.0),
3798                        certify_latency: (10.0, 5.0),
3799                        should_certify: mocks::application::Certifier::Sometimes,
3800                    };
3801                    let (actor, application) = mocks::application::Application::new(
3802                        context.with_label("application"),
3803                        application_cfg,
3804                    );
3805                    actor.start();
3806                    let blocker = oracle.control(validator.clone());
3807                    let cfg = config::Config {
3808                        scheme: schemes[idx_scheme].clone(),
3809                        elector: elector.clone(),
3810                        blocker,
3811                        automaton: application.clone(),
3812                        relay: application.clone(),
3813                        reporter: reporter.clone(),
3814                        strategy: Sequential,
3815                        partition: validator.clone().to_string(),
3816                        mailbox_size: 1024,
3817                        epoch: Epoch::new(333),
3818                        leader_timeout: Duration::from_secs(1),
3819                        certification_timeout: Duration::from_secs(2),
3820                        timeout_retry: Duration::from_secs(10),
3821                        fetch_timeout: Duration::from_secs(1),
3822                        activity_timeout,
3823                        skip_timeout,
3824                        fetch_concurrent: 4,
3825                        replay_buffer: NZUsize!(1024 * 1024),
3826                        write_buffer: NZUsize!(1024 * 1024),
3827                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
3828                    };
3829                    let engine = Engine::new(context.with_label("engine"), cfg);
3830                    engine.start(pending, recovered, resolver);
3831                }
3832            }
3833
3834            // Wait for all engines to finish
3835            let mut finalizers = Vec::new();
3836            for reporter in reporters.iter_mut() {
3837                let (mut latest, mut monitor) = reporter.subscribe().await;
3838                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3839                    while latest < required_containers {
3840                        latest = monitor.recv().await.expect("event missing");
3841                    }
3842                }));
3843            }
3844            join_all(finalizers).await;
3845
3846            // Check reporters for correct activity
3847            let byz = &participants[0];
3848            let mut count_nullify_and_finalize = 0;
3849            for reporter in reporters.iter() {
3850                // Ensure only faults for byz
3851                {
3852                    let faults = reporter.faults.lock();
3853                    assert_eq!(faults.len(), 1);
3854                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
3855                    for (_, faults) in faulter.iter() {
3856                        for fault in faults.iter() {
3857                            match fault {
3858                                Activity::NullifyFinalize(_) => {
3859                                    count_nullify_and_finalize += 1;
3860                                }
3861                                _ => panic!("unexpected fault: {fault:?}"),
3862                            }
3863                        }
3864                    }
3865                }
3866
3867                // Ensure no invalid signatures
3868                {
3869                    let invalid = reporter.invalid.lock();
3870                    assert_eq!(*invalid, 0);
3871                }
3872            }
3873            assert!(count_nullify_and_finalize > 0);
3874
3875            // Ensure nullifier is blocked
3876            let blocked = oracle.blocked().await.unwrap();
3877            assert!(!blocked.is_empty());
3878            for (a, b) in blocked {
3879                assert_ne!(&a, byz);
3880                assert_eq!(&b, byz);
3881            }
3882        });
3883    }
3884
3885    #[test_group("slow")]
3886    #[test_traced]
3887    fn test_nuller() {
3888        for seed in 0..5 {
3889            nuller::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3890            nuller::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3891            nuller::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3892            nuller::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3893            nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3894            nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3895            nuller::<_, _, RoundRobin>(seed, ed25519::fixture);
3896            nuller::<_, _, RoundRobin>(seed, secp256r1::fixture);
3897        }
3898    }
3899
3900    fn outdated<S, F, L>(seed: u64, mut fixture: F)
3901    where
3902        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3903        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3904        L: Elector<S>,
3905    {
3906        // Create context
3907        let n = 4;
3908        let required_containers = View::new(100);
3909        let activity_timeout = ViewDelta::new(10);
3910        let skip_timeout = ViewDelta::new(5);
3911        let namespace = b"consensus".to_vec();
3912        let cfg = deterministic::Config::new()
3913            .with_seed(seed)
3914            .with_timeout(Some(Duration::from_secs(30)));
3915        let executor = deterministic::Runner::new(cfg);
3916        executor.start(|mut context| async move {
3917            // Create simulated network
3918            let (network, mut oracle) = Network::new(
3919                context.with_label("network"),
3920                Config {
3921                    max_size: 1024 * 1024,
3922                    disconnect_on_block: false,
3923                    tracked_peer_sets: None,
3924                },
3925            );
3926
3927            // Start network
3928            network.start();
3929
3930            // Register participants
3931            let Fixture {
3932                participants,
3933                schemes,
3934                ..
3935            } = fixture(&mut context, &namespace, n);
3936            let mut registrations = register_validators(&mut oracle, &participants).await;
3937
3938            // Link all validators
3939            let link = Link {
3940                latency: Duration::from_millis(10),
3941                jitter: Duration::from_millis(1),
3942                success_rate: 1.0,
3943            };
3944            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3945
3946            // Create engines
3947            let elector = L::default();
3948            let relay = Arc::new(mocks::relay::Relay::new());
3949            let mut reporters = Vec::new();
3950            for (idx_scheme, validator) in participants.iter().enumerate() {
3951                // Create scheme context
3952                let context = context.with_label(&format!("validator_{}", *validator));
3953
3954                // Start engine
3955                let reporter_config = mocks::reporter::Config {
3956                    participants: participants.clone().try_into().unwrap(),
3957                    scheme: schemes[idx_scheme].clone(),
3958                    elector: elector.clone(),
3959                };
3960                let reporter =
3961                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3962                let (pending, recovered, resolver) = registrations
3963                    .remove(validator)
3964                    .expect("validator should be registered");
3965                if idx_scheme == 0 {
3966                    let cfg = mocks::outdated::Config {
3967                        scheme: schemes[idx_scheme].clone(),
3968                        view_delta: ViewDelta::new(activity_timeout.get().saturating_mul(4)),
3969                    };
3970                    let engine: mocks::outdated::Outdated<_, _, Sha256> =
3971                        mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
3972                    engine.start(pending);
3973                } else {
3974                    reporters.push(reporter.clone());
3975                    let application_cfg = mocks::application::Config {
3976                        hasher: Sha256::default(),
3977                        relay: relay.clone(),
3978                        me: validator.clone(),
3979                        propose_latency: (10.0, 5.0),
3980                        verify_latency: (10.0, 5.0),
3981                        certify_latency: (10.0, 5.0),
3982                        should_certify: mocks::application::Certifier::Sometimes,
3983                    };
3984                    let (actor, application) = mocks::application::Application::new(
3985                        context.with_label("application"),
3986                        application_cfg,
3987                    );
3988                    actor.start();
3989                    let blocker = oracle.control(validator.clone());
3990                    let cfg = config::Config {
3991                        scheme: schemes[idx_scheme].clone(),
3992                        elector: elector.clone(),
3993                        blocker,
3994                        automaton: application.clone(),
3995                        relay: application.clone(),
3996                        reporter: reporter.clone(),
3997                        strategy: Sequential,
3998                        partition: validator.clone().to_string(),
3999                        mailbox_size: 1024,
4000                        epoch: Epoch::new(333),
4001                        leader_timeout: Duration::from_secs(1),
4002                        certification_timeout: Duration::from_secs(2),
4003                        timeout_retry: Duration::from_secs(10),
4004                        fetch_timeout: Duration::from_secs(1),
4005                        activity_timeout,
4006                        skip_timeout,
4007                        fetch_concurrent: 4,
4008                        replay_buffer: NZUsize!(1024 * 1024),
4009                        write_buffer: NZUsize!(1024 * 1024),
4010                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4011                    };
4012                    let engine = Engine::new(context.with_label("engine"), cfg);
4013                    engine.start(pending, recovered, resolver);
4014                }
4015            }
4016
4017            // Wait for all engines to finish
4018            let mut finalizers = Vec::new();
4019            for reporter in reporters.iter_mut() {
4020                let (mut latest, mut monitor) = reporter.subscribe().await;
4021                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4022                    while latest < required_containers {
4023                        latest = monitor.recv().await.expect("event missing");
4024                    }
4025                }));
4026            }
4027            join_all(finalizers).await;
4028
4029            // Check reporters for correct activity
4030            for reporter in reporters.iter() {
4031                // Ensure no faults
4032                {
4033                    let faults = reporter.faults.lock();
4034                    assert!(faults.is_empty());
4035                }
4036
4037                // Ensure no invalid signatures
4038                {
4039                    let invalid = reporter.invalid.lock();
4040                    assert_eq!(*invalid, 0);
4041                }
4042            }
4043
4044            // Ensure no blocked connections
4045            let blocked = oracle.blocked().await.unwrap();
4046            assert!(blocked.is_empty());
4047        });
4048    }
4049
4050    #[test_group("slow")]
4051    #[test_traced]
4052    fn test_outdated() {
4053        for seed in 0..5 {
4054            outdated::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
4055            outdated::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
4056            outdated::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
4057            outdated::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
4058            outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
4059            outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
4060            outdated::<_, _, RoundRobin>(seed, ed25519::fixture);
4061            outdated::<_, _, RoundRobin>(seed, secp256r1::fixture);
4062        }
4063    }
4064
4065    fn run_1k<S, F, L>(mut fixture: F)
4066    where
4067        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4068        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4069        L: Elector<S>,
4070    {
4071        // Create context
4072        let n = 10;
4073        let required_containers = View::new(1_000);
4074        let activity_timeout = ViewDelta::new(10);
4075        let skip_timeout = ViewDelta::new(5);
4076        let namespace = b"consensus".to_vec();
4077        let cfg = deterministic::Config::new();
4078        let executor = deterministic::Runner::new(cfg);
4079        executor.start(|mut context| async move {
4080            // Create simulated network
4081            let (network, mut oracle) = Network::new(
4082                context.with_label("network"),
4083                Config {
4084                    max_size: 1024 * 1024,
4085                    disconnect_on_block: false,
4086                    tracked_peer_sets: None,
4087                },
4088            );
4089
4090            // Start network
4091            network.start();
4092
4093            // Register participants
4094            let Fixture {
4095                participants,
4096                schemes,
4097                ..
4098            } = fixture(&mut context, &namespace, n);
4099            let mut registrations = register_validators(&mut oracle, &participants).await;
4100
4101            // Link all validators
4102            let link = Link {
4103                latency: Duration::from_millis(80),
4104                jitter: Duration::from_millis(10),
4105                success_rate: 0.98,
4106            };
4107            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4108
4109            // Create engines
4110            let elector = L::default();
4111            let relay = Arc::new(mocks::relay::Relay::new());
4112            let mut reporters = Vec::new();
4113            let mut engine_handlers = Vec::new();
4114            for (idx, validator) in participants.iter().enumerate() {
4115                // Create scheme context
4116                let context = context.with_label(&format!("validator_{}", *validator));
4117
4118                // Configure engine
4119                let reporter_config = mocks::reporter::Config {
4120                    participants: participants.clone().try_into().unwrap(),
4121                    scheme: schemes[idx].clone(),
4122                    elector: elector.clone(),
4123                };
4124                let reporter =
4125                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
4126                reporters.push(reporter.clone());
4127                let application_cfg = mocks::application::Config {
4128                    hasher: Sha256::default(),
4129                    relay: relay.clone(),
4130                    me: validator.clone(),
4131                    propose_latency: (100.0, 50.0),
4132                    verify_latency: (50.0, 40.0),
4133                    certify_latency: (50.0, 40.0),
4134                    should_certify: mocks::application::Certifier::Sometimes,
4135                };
4136                let (actor, application) = mocks::application::Application::new(
4137                    context.with_label("application"),
4138                    application_cfg,
4139                );
4140                actor.start();
4141                let blocker = oracle.control(validator.clone());
4142                let cfg = config::Config {
4143                    scheme: schemes[idx].clone(),
4144                    elector: elector.clone(),
4145                    blocker,
4146                    automaton: application.clone(),
4147                    relay: application.clone(),
4148                    reporter: reporter.clone(),
4149                    strategy: Sequential,
4150                    partition: validator.to_string(),
4151                    mailbox_size: 1024,
4152                    epoch: Epoch::new(333),
4153                    leader_timeout: Duration::from_secs(1),
4154                    certification_timeout: Duration::from_secs(2),
4155                    timeout_retry: Duration::from_secs(10),
4156                    fetch_timeout: Duration::from_secs(1),
4157                    activity_timeout,
4158                    skip_timeout,
4159                    fetch_concurrent: 4,
4160                    replay_buffer: NZUsize!(1024 * 1024),
4161                    write_buffer: NZUsize!(1024 * 1024),
4162                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4163                };
4164                let engine = Engine::new(context.with_label("engine"), cfg);
4165
4166                // Start engine
4167                let (pending, recovered, resolver) = registrations
4168                    .remove(validator)
4169                    .expect("validator should be registered");
4170                engine_handlers.push(engine.start(pending, recovered, resolver));
4171            }
4172
4173            // Wait for all engines to finish
4174            let mut finalizers = Vec::new();
4175            for reporter in reporters.iter_mut() {
4176                let (mut latest, mut monitor) = reporter.subscribe().await;
4177                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4178                    while latest < required_containers {
4179                        latest = monitor.recv().await.expect("event missing");
4180                    }
4181                }));
4182            }
4183            join_all(finalizers).await;
4184
4185            // Check reporters for correct activity
4186            for reporter in reporters.iter() {
4187                // Ensure no faults
4188                {
4189                    let faults = reporter.faults.lock();
4190                    assert!(faults.is_empty());
4191                }
4192
4193                // Ensure no invalid signatures
4194                {
4195                    let invalid = reporter.invalid.lock();
4196                    assert_eq!(*invalid, 0);
4197                }
4198            }
4199
4200            // Ensure no blocked connections
4201            let blocked = oracle.blocked().await.unwrap();
4202            assert!(blocked.is_empty());
4203        })
4204    }
4205
4206    #[test_group("slow")]
4207    #[test_traced]
4208    fn test_1k_bls12381_threshold_vrf_min_pk() {
4209        run_1k::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
4210    }
4211
4212    #[test_group("slow")]
4213    #[test_traced]
4214    fn test_1k_bls12381_threshold_vrf_min_sig() {
4215        run_1k::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
4216    }
4217
4218    #[test_group("slow")]
4219    #[test_traced]
4220    fn test_1k_bls12381_threshold_std_min_pk() {
4221        run_1k::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
4222    }
4223
4224    #[test_group("slow")]
4225    #[test_traced]
4226    fn test_1k_bls12381_threshold_std_min_sig() {
4227        run_1k::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
4228    }
4229
4230    #[test_group("slow")]
4231    #[test_traced]
4232    fn test_1k_bls12381_multisig_min_pk() {
4233        run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4234    }
4235
4236    #[test_group("slow")]
4237    #[test_traced]
4238    fn test_1k_bls12381_multisig_min_sig() {
4239        run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
4240    }
4241
4242    #[test_group("slow")]
4243    #[test_traced]
4244    fn test_1k_ed25519() {
4245        run_1k::<_, _, RoundRobin>(ed25519::fixture);
4246    }
4247
4248    #[test_group("slow")]
4249    #[test_traced]
4250    fn test_1k_secp256r1() {
4251        run_1k::<_, _, RoundRobin>(secp256r1::fixture);
4252    }
4253
4254    fn engine_shutdown<S, F, L>(seed: u64, mut fixture: F, graceful: bool)
4255    where
4256        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4257        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4258        L: Elector<S>,
4259    {
4260        let n = 1;
4261        let namespace = b"consensus".to_vec();
4262        let cfg = deterministic::Config::default()
4263            .with_seed(seed)
4264            .with_timeout(Some(Duration::from_secs(10)));
4265        let executor = deterministic::Runner::new(cfg);
4266        executor.start(|mut context| async move {
4267            // Create simulated network
4268            let (network, mut oracle) = Network::new(
4269                context.with_label("network"),
4270                Config {
4271                    max_size: 1024 * 1024,
4272                    disconnect_on_block: true,
4273                    tracked_peer_sets: None,
4274                },
4275            );
4276
4277            // Start network
4278            network.start();
4279
4280            // Register a single participant
4281            let Fixture {
4282                participants,
4283                schemes,
4284                ..
4285            } = fixture(&mut context, &namespace, n);
4286            let mut registrations = register_validators(&mut oracle, &participants).await;
4287
4288            // Link the single validator to itself (no-ops for completeness)
4289            let link = Link {
4290                latency: Duration::from_millis(1),
4291                jitter: Duration::from_millis(0),
4292                success_rate: 1.0,
4293            };
4294            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4295
4296            // Create engine
4297            let elector = L::default();
4298            let reporter_config = mocks::reporter::Config {
4299                participants: participants.clone().try_into().unwrap(),
4300                scheme: schemes[0].clone(),
4301                elector: elector.clone(),
4302            };
4303            let reporter =
4304                mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
4305            let relay = Arc::new(mocks::relay::Relay::new());
4306            let application_cfg = mocks::application::Config {
4307                hasher: Sha256::default(),
4308                relay: relay.clone(),
4309                me: participants[0].clone(),
4310                propose_latency: (1.0, 0.0),
4311                verify_latency: (1.0, 0.0),
4312                certify_latency: (1.0, 0.0),
4313                should_certify: mocks::application::Certifier::Sometimes,
4314            };
4315            let (actor, application) = mocks::application::Application::new(
4316                context.with_label("application"),
4317                application_cfg,
4318            );
4319            actor.start();
4320            let blocker = oracle.control(participants[0].clone());
4321            let cfg = config::Config {
4322                scheme: schemes[0].clone(),
4323                elector: elector.clone(),
4324                blocker,
4325                automaton: application.clone(),
4326                relay: application.clone(),
4327                reporter: reporter.clone(),
4328                strategy: Sequential,
4329                partition: participants[0].clone().to_string(),
4330                mailbox_size: 64,
4331                epoch: Epoch::new(333),
4332                leader_timeout: Duration::from_millis(50),
4333                certification_timeout: Duration::from_millis(100),
4334                timeout_retry: Duration::from_millis(250),
4335                fetch_timeout: Duration::from_millis(50),
4336                activity_timeout: ViewDelta::new(4),
4337                skip_timeout: ViewDelta::new(2),
4338                fetch_concurrent: 4,
4339                replay_buffer: NZUsize!(1024 * 16),
4340                write_buffer: NZUsize!(1024 * 16),
4341                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4342            };
4343            let engine = Engine::new(context.with_label("engine"), cfg);
4344
4345            // Start engine
4346            let (pending, recovered, resolver) = registrations
4347                .remove(&participants[0])
4348                .expect("validator should be registered");
4349            let handle = engine.start(pending, recovered, resolver);
4350
4351            // Allow tasks to start
4352            context.sleep(Duration::from_millis(1000)).await;
4353
4354            // Count running tasks under the engine prefix
4355            let running_before = count_running_tasks(&context, "engine");
4356            assert!(
4357                running_before > 0,
4358                "at least one engine task should be running"
4359            );
4360
4361            // Make sure the engine is still running after some time
4362            context.sleep(Duration::from_millis(1500)).await;
4363            assert!(
4364                count_running_tasks(&context, "engine") > 0,
4365                "engine tasks should still be running"
4366            );
4367
4368            // Shutdown engine and ensure children stop
4369            let running_after = if graceful {
4370                let metrics_context = context.clone();
4371                let result = context.stop(0, Some(Duration::from_secs(5))).await;
4372                assert!(
4373                    result.is_ok(),
4374                    "graceful shutdown should complete: {result:?}"
4375                );
4376                count_running_tasks(&metrics_context, "engine")
4377            } else {
4378                handle.abort();
4379                let _ = handle.await; // ensure parent tear-down runs
4380
4381                // Give the runtime a tick to process aborts
4382                context.sleep(Duration::from_millis(1000)).await;
4383                count_running_tasks(&context, "engine")
4384            };
4385            assert_eq!(
4386                running_after, 0,
4387                "all engine tasks should be stopped, but {running_after} still running"
4388            );
4389        });
4390    }
4391
4392    #[test_group("slow")]
4393    #[test_traced]
4394    fn test_children_shutdown_on_engine_abort() {
4395        for seed in 0..10 {
4396            engine_shutdown::<_, _, Random>(
4397                seed,
4398                bls12381_threshold_vrf::fixture::<MinPk, _>,
4399                false,
4400            );
4401            engine_shutdown::<_, _, Random>(
4402                seed,
4403                bls12381_threshold_vrf::fixture::<MinSig, _>,
4404                false,
4405            );
4406            engine_shutdown::<_, _, RoundRobin>(
4407                seed,
4408                bls12381_threshold_std::fixture::<MinPk, _>,
4409                false,
4410            );
4411            engine_shutdown::<_, _, RoundRobin>(
4412                seed,
4413                bls12381_threshold_std::fixture::<MinSig, _>,
4414                false,
4415            );
4416            engine_shutdown::<_, _, RoundRobin>(
4417                seed,
4418                bls12381_multisig::fixture::<MinPk, _>,
4419                false,
4420            );
4421            engine_shutdown::<_, _, RoundRobin>(
4422                seed,
4423                bls12381_multisig::fixture::<MinSig, _>,
4424                false,
4425            );
4426            engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, false);
4427            engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, false);
4428        }
4429    }
4430
4431    #[test_group("slow")]
4432    #[test_traced]
4433    fn test_graceful_shutdown() {
4434        for seed in 0..10 {
4435            engine_shutdown::<_, _, Random>(
4436                seed,
4437                bls12381_threshold_vrf::fixture::<MinPk, _>,
4438                true,
4439            );
4440            engine_shutdown::<_, _, Random>(
4441                seed,
4442                bls12381_threshold_vrf::fixture::<MinSig, _>,
4443                true,
4444            );
4445            engine_shutdown::<_, _, RoundRobin>(
4446                seed,
4447                bls12381_threshold_std::fixture::<MinPk, _>,
4448                true,
4449            );
4450            engine_shutdown::<_, _, RoundRobin>(
4451                seed,
4452                bls12381_threshold_std::fixture::<MinSig, _>,
4453                true,
4454            );
4455            engine_shutdown::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>, true);
4456            engine_shutdown::<_, _, RoundRobin>(
4457                seed,
4458                bls12381_multisig::fixture::<MinSig, _>,
4459                true,
4460            );
4461            engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, true);
4462            engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, true);
4463        }
4464    }
4465
4466    fn attributable_reporter_filtering<S, F, L>(mut fixture: F)
4467    where
4468        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4469        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4470        L: Elector<S>,
4471    {
4472        let n = 3;
4473        let required_containers = View::new(10);
4474        let activity_timeout = ViewDelta::new(10);
4475        let skip_timeout = ViewDelta::new(5);
4476        let namespace = b"consensus".to_vec();
4477        let executor = deterministic::Runner::timed(Duration::from_secs(30));
4478        executor.start(|mut context| async move {
4479            // Create simulated network
4480            let (network, mut oracle) = Network::new(
4481                context.with_label("network"),
4482                Config {
4483                    max_size: 1024 * 1024,
4484                    disconnect_on_block: false,
4485                    tracked_peer_sets: None,
4486                },
4487            );
4488            network.start();
4489
4490            // Register participants
4491            let Fixture {
4492                participants,
4493                schemes,
4494                ..
4495            } = fixture(&mut context, &namespace, n);
4496            let mut registrations = register_validators(&mut oracle, &participants).await;
4497
4498            // Link all validators
4499            let link = Link {
4500                latency: Duration::from_millis(10),
4501                jitter: Duration::from_millis(1),
4502                success_rate: 1.0,
4503            };
4504            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4505
4506            // Create engines with `AttributableReporter` wrapper
4507            let elector = L::default();
4508            let relay = Arc::new(mocks::relay::Relay::new());
4509            let mut reporters = Vec::new();
4510            for (idx, validator) in participants.iter().enumerate() {
4511                let context = context.with_label(&format!("validator_{}", *validator));
4512
4513                let reporter_config = mocks::reporter::Config {
4514                    participants: participants.clone().try_into().unwrap(),
4515                    scheme: schemes[idx].clone(),
4516                    elector: elector.clone(),
4517                };
4518                let mock_reporter = mocks::reporter::Reporter::new(
4519                    context.with_label("mock_reporter"),
4520                    reporter_config,
4521                );
4522
4523                // Wrap with `AttributableReporter`
4524                let attributable_reporter = scheme::reporter::AttributableReporter::new(
4525                    context.with_label("rng"),
4526                    schemes[idx].clone(),
4527                    mock_reporter.clone(),
4528                    Sequential,
4529                    true, // Enable verification
4530                );
4531                reporters.push(mock_reporter.clone());
4532
4533                let application_cfg = mocks::application::Config {
4534                    hasher: Sha256::default(),
4535                    relay: relay.clone(),
4536                    me: validator.clone(),
4537                    propose_latency: (10.0, 5.0),
4538                    verify_latency: (10.0, 5.0),
4539                    certify_latency: (10.0, 5.0),
4540                    should_certify: mocks::application::Certifier::Sometimes,
4541                };
4542                let (actor, application) = mocks::application::Application::new(
4543                    context.with_label("application"),
4544                    application_cfg,
4545                );
4546                actor.start();
4547                let blocker = oracle.control(validator.clone());
4548                let cfg = config::Config {
4549                    scheme: schemes[idx].clone(),
4550                    elector: elector.clone(),
4551                    blocker,
4552                    automaton: application.clone(),
4553                    relay: application.clone(),
4554                    reporter: attributable_reporter,
4555                    strategy: Sequential,
4556                    partition: validator.to_string(),
4557                    mailbox_size: 1024,
4558                    epoch: Epoch::new(333),
4559                    leader_timeout: Duration::from_secs(1),
4560                    certification_timeout: Duration::from_secs(2),
4561                    timeout_retry: Duration::from_secs(10),
4562                    fetch_timeout: Duration::from_secs(1),
4563                    activity_timeout,
4564                    skip_timeout,
4565                    fetch_concurrent: 4,
4566                    replay_buffer: NZUsize!(1024 * 1024),
4567                    write_buffer: NZUsize!(1024 * 1024),
4568                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4569                };
4570                let engine = Engine::new(context.with_label("engine"), cfg);
4571
4572                // Start engine
4573                let (pending, recovered, resolver) = registrations
4574                    .remove(validator)
4575                    .expect("validator should be registered");
4576                engine.start(pending, recovered, resolver);
4577            }
4578
4579            // Wait for all engines to finish
4580            let mut finalizers = Vec::new();
4581            for reporter in reporters.iter_mut() {
4582                let (mut latest, mut monitor) = reporter.subscribe().await;
4583                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4584                    while latest < required_containers {
4585                        latest = monitor.recv().await.expect("event missing");
4586                    }
4587                }));
4588            }
4589            join_all(finalizers).await;
4590
4591            // Verify filtering behavior based on scheme attributability
4592            for reporter in reporters.iter() {
4593                // Ensure no faults (normal operation)
4594                {
4595                    let faults = reporter.faults.lock();
4596                    assert!(faults.is_empty(), "No faults should be reported");
4597                }
4598
4599                // Ensure no invalid signatures
4600                {
4601                    let invalid = reporter.invalid.lock();
4602                    assert_eq!(*invalid, 0, "No invalid signatures");
4603                }
4604
4605                // Check that we have certificates reported
4606                {
4607                    let notarizations = reporter.notarizations.lock();
4608                    let finalizations = reporter.finalizations.lock();
4609                    assert!(
4610                        !notarizations.is_empty() || !finalizations.is_empty(),
4611                        "Certificates should be reported"
4612                    );
4613                }
4614
4615                // Check notarizes
4616                let notarizes = reporter.notarizes.lock();
4617                let last_view = notarizes.keys().max().cloned().unwrap_or_default();
4618                for (view, payloads) in notarizes.iter() {
4619                    if *view == last_view {
4620                        continue; // Skip last view
4621                    }
4622
4623                    let signers: usize = payloads.values().map(|signers| signers.len()).sum();
4624
4625                    // For attributable schemes, we should see peer activities
4626                    if S::is_attributable() {
4627                        assert!(signers > 1, "view {view}: {signers}");
4628                    } else {
4629                        // For non-attributable, we shouldn't see any peer activities
4630                        assert_eq!(signers, 0);
4631                    }
4632                }
4633
4634                // Check finalizes
4635                let finalizes = reporter.finalizes.lock();
4636                for (_, payloads) in finalizes.iter() {
4637                    let signers: usize = payloads.values().map(|signers| signers.len()).sum();
4638
4639                    // For attributable schemes, we should see peer activities
4640                    if S::is_attributable() {
4641                        assert!(signers > 1);
4642                    } else {
4643                        // For non-attributable, we shouldn't see any peer activities
4644                        assert_eq!(signers, 0);
4645                    }
4646                }
4647            }
4648
4649            // Ensure no blocked connections (normal operation)
4650            let blocked = oracle.blocked().await.unwrap();
4651            assert!(blocked.is_empty());
4652        });
4653    }
4654
4655    #[test_traced]
4656    fn test_attributable_reporter_filtering() {
4657        attributable_reporter_filtering::<_, _, Random>(
4658            bls12381_threshold_vrf::fixture::<MinPk, _>,
4659        );
4660        attributable_reporter_filtering::<_, _, Random>(
4661            bls12381_threshold_vrf::fixture::<MinSig, _>,
4662        );
4663        attributable_reporter_filtering::<_, _, RoundRobin>(
4664            bls12381_threshold_std::fixture::<MinPk, _>,
4665        );
4666        attributable_reporter_filtering::<_, _, RoundRobin>(
4667            bls12381_threshold_std::fixture::<MinSig, _>,
4668        );
4669        attributable_reporter_filtering::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4670        attributable_reporter_filtering::<_, _, RoundRobin>(
4671            bls12381_multisig::fixture::<MinSig, _>,
4672        );
4673        attributable_reporter_filtering::<_, _, RoundRobin>(ed25519::fixture);
4674        attributable_reporter_filtering::<_, _, RoundRobin>(secp256r1::fixture);
4675    }
4676
4677    fn split_views_no_lockup<S, F, L>(mut fixture: F)
4678    where
4679        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4680        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4681        L: Elector<S>,
4682    {
4683        // Scenario:
4684        // - View F: Finalization of B_1 seen by all participants.
4685        // - View F+1:
4686        //   - Nullification seen by honest (4..=6,7) and all 3 byzantines
4687        //   - Notarization of B_2A seen by honest (1..=3)
4688        // - View F+2:
4689        //   - Nullification seen by honest (1..=3,7) and all 3 byzantines
4690        //   - Notarization of B_2B seen by honest (4..=6)
4691        // - View F+3: Nullification. Seen by all participants.
4692        // - Then ensure progress resumes beyond F+3 after reconnecting
4693
4694        // Define participant types
4695        enum ParticipantType {
4696            Group1,    // receives notarization for f+1, nullification for f+2
4697            Group2,    // receives nullification for f+1, notarization for f+2
4698            Ignorant,  // receives nullification for f+1 and f+2
4699            Byzantine, // nullify-only
4700        }
4701        let get_type = |idx: usize| -> ParticipantType {
4702            match idx {
4703                0..3 => ParticipantType::Group1,
4704                3..6 => ParticipantType::Group2,
4705                6 => ParticipantType::Ignorant,
4706                7..10 => ParticipantType::Byzantine,
4707                _ => unreachable!(),
4708            }
4709        };
4710
4711        // Create context
4712        let n = 10;
4713        let quorum = quorum(n) as usize;
4714        assert_eq!(quorum, 7);
4715        let activity_timeout = ViewDelta::new(10);
4716        let skip_timeout = ViewDelta::new(5);
4717        let namespace = b"consensus".to_vec();
4718        let executor = deterministic::Runner::timed(Duration::from_secs(300));
4719        executor.start(|mut context| async move {
4720            // Create simulated network
4721            let (network, mut oracle) = Network::new(
4722                context.with_label("network"),
4723                Config {
4724                    max_size: 1024 * 1024,
4725                    disconnect_on_block: false,
4726                    tracked_peer_sets: None,
4727                },
4728            );
4729            network.start();
4730
4731            // Register participants
4732            let Fixture {
4733                participants,
4734                schemes,
4735                ..
4736            } = fixture(&mut context, &namespace, n);
4737            let mut registrations = register_validators(&mut oracle, &participants).await;
4738
4739            // ========== Build the certificates manually ==========
4740
4741            // Helper: assemble finalization from explicit signer indices
4742            let build_finalization = |proposal: &Proposal<D>| -> TFinalization<_, D> {
4743                let votes: Vec<_> = (0..=quorum)
4744                    .map(|i| TFinalize::sign(&schemes[i], proposal.clone()).unwrap())
4745                    .collect();
4746                TFinalization::from_finalizes(&schemes[0], &votes, &Sequential)
4747                    .expect("finalization quorum")
4748            };
4749            // Helper: assemble notarization from explicit signer indices
4750            let build_notarization = |proposal: &Proposal<D>| -> TNotarization<_, D> {
4751                let votes: Vec<_> = (0..=quorum)
4752                    .map(|i| TNotarize::sign(&schemes[i], proposal.clone()).unwrap())
4753                    .collect();
4754                TNotarization::from_notarizes(&schemes[0], &votes, &Sequential)
4755                    .expect("notarization quorum")
4756            };
4757            let build_nullification = |round: Round| -> TNullification<_> {
4758                let votes: Vec<_> = (0..=quorum)
4759                    .map(|i| TNullify::sign::<D>(&schemes[i], round).unwrap())
4760                    .collect();
4761                TNullification::from_nullifies(&schemes[0], &votes, &Sequential)
4762                    .expect("nullification quorum")
4763            };
4764            // Choose F=1 and construct B_1, B_2A, B_2B
4765            let f_view = 1;
4766            let round_f = Round::new(Epoch::new(333), View::new(f_view));
4767            let payload_b0 = Sha256::hash(b"B_F");
4768            let proposal_b0 = Proposal::new(round_f, View::new(f_view - 1), payload_b0);
4769            let payload_b1a = Sha256::hash(b"B_G1");
4770            let proposal_b1a = Proposal::new(
4771                Round::new(Epoch::new(333), View::new(f_view + 1)),
4772                View::new(f_view),
4773                payload_b1a,
4774            );
4775            let payload_b1b = Sha256::hash(b"B_G2");
4776            let proposal_b1b = Proposal::new(
4777                Round::new(Epoch::new(333), View::new(f_view + 2)),
4778                View::new(f_view),
4779                payload_b1b,
4780            );
4781
4782            // Build notarization and finalization for the first block
4783            let b0_notarization = build_notarization(&proposal_b0);
4784            let b0_finalization = build_finalization(&proposal_b0);
4785            // Build notarizations for F+1 and F+2
4786            let b1a_notarization = build_notarization(&proposal_b1a);
4787            let b1b_notarization = build_notarization(&proposal_b1b);
4788            // Build nullifications for F+1 and F+2
4789            let null_a = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 1)));
4790            let null_b = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 2)));
4791
4792            // Create an 11th non-participant injector and obtain senders
4793            let injector_pk = PrivateKey::from_seed(1_000_000).public_key();
4794            let (mut injector_sender, _inj_certificate_receiver) = oracle
4795                .control(injector_pk.clone())
4796                .register(1, TEST_QUOTA)
4797                .await
4798                .unwrap();
4799
4800            // Create minimal one-way links from injector to all participants (not full mesh)
4801            let link = Link {
4802                latency: Duration::from_millis(10),
4803                jitter: Duration::from_millis(0),
4804                success_rate: 1.0,
4805            };
4806            for p in participants.iter() {
4807                oracle
4808                    .add_link(injector_pk.clone(), p.clone(), link.clone())
4809                    .await
4810                    .unwrap();
4811            }
4812
4813            // ========== Broadcast certificates over recovered network. ==========
4814
4815            // View F:
4816            let msg = Certificate::<_, D>::Notarization(b0_notarization).encode();
4817            injector_sender
4818                .send(Recipients::All, msg, true)
4819                .await
4820                .unwrap();
4821            let msg = Certificate::<_, D>::Finalization(b0_finalization).encode();
4822            injector_sender
4823                .send(Recipients::All, msg, true)
4824                .await
4825                .unwrap();
4826            // View F+1:
4827            let notarization_msg = Certificate::<_, D>::Notarization(b1a_notarization);
4828            let nullification_msg = Certificate::<_, D>::Nullification(null_a.clone());
4829            for (i, participant) in participants.iter().enumerate() {
4830                let recipient = Recipients::One(participant.clone());
4831                let msg = match get_type(i) {
4832                    ParticipantType::Group1 => notarization_msg.encode(),
4833                    _ => nullification_msg.encode(),
4834                };
4835                injector_sender.send(recipient, msg, true).await.unwrap();
4836            }
4837            // View F+2:
4838            let notarization_msg = Certificate::<_, D>::Notarization(b1b_notarization);
4839            let nullification_msg = Certificate::<_, D>::Nullification(null_b.clone());
4840            for (i, participant) in participants.iter().enumerate() {
4841                let recipient = Recipients::One(participant.clone());
4842                let msg = match get_type(i) {
4843                    ParticipantType::Group2 => notarization_msg.encode(),
4844                    _ => nullification_msg.encode(),
4845                };
4846                injector_sender.send(recipient, msg, true).await.unwrap();
4847            }
4848
4849            // ========== Create engines ==========
4850
4851            // Start engines after preloading certificates into each participant's
4852            // recovered channel (ensuring processing before any leader attempts to issue a
4853            // conflicting vote).
4854            let elector = L::default();
4855            let relay = Arc::new(mocks::relay::Relay::new());
4856            let mut honest_reporters = Vec::new();
4857            for (idx, validator) in participants.iter().enumerate() {
4858                let (pending, recovered, resolver) = registrations
4859                    .remove(validator)
4860                    .expect("validator should be registered");
4861                let participant_type = get_type(idx);
4862                if matches!(participant_type, ParticipantType::Byzantine) {
4863                    // Byzantine engines
4864                    let cfg = mocks::nullify_only::Config {
4865                        scheme: schemes[idx].clone(),
4866                    };
4867                    let engine: mocks::nullify_only::NullifyOnly<_, _, Sha256> =
4868                        mocks::nullify_only::NullifyOnly::new(
4869                            context.with_label(&format!("byzantine_{}", *validator)),
4870                            cfg,
4871                        );
4872                    engine.start(pending);
4873                    // Recovered/resolver channels are unused for byzantine actors.
4874                    drop(recovered);
4875                    drop(resolver);
4876                } else {
4877                    // Honest engines
4878                    let reporter_config = mocks::reporter::Config {
4879                        participants: participants.clone().try_into().unwrap(),
4880                        scheme: schemes[idx].clone(),
4881                        elector: elector.clone(),
4882                    };
4883                    let reporter = mocks::reporter::Reporter::new(
4884                        context.with_label(&format!("reporter_{}", *validator)),
4885                        reporter_config,
4886                    );
4887                    honest_reporters.push(reporter.clone());
4888
4889                    let application_cfg = mocks::application::Config {
4890                        hasher: Sha256::default(),
4891                        relay: relay.clone(),
4892                        me: validator.clone(),
4893                        propose_latency: (250.0, 50.0), // ensure we process certificates first
4894                        verify_latency: (10.0, 5.0),
4895                        certify_latency: (10.0, 5.0),
4896                        should_certify: mocks::application::Certifier::Sometimes,
4897                    };
4898                    let (actor, application) = mocks::application::Application::new(
4899                        context.with_label(&format!("application_{}", *validator)),
4900                        application_cfg,
4901                    );
4902                    actor.start();
4903                    let blocker = oracle.control(validator.clone());
4904                    let cfg = config::Config {
4905                        scheme: schemes[idx].clone(),
4906                        elector: elector.clone(),
4907                        blocker,
4908                        automaton: application.clone(),
4909                        relay: application.clone(),
4910                        reporter: reporter.clone(),
4911                        strategy: Sequential,
4912                        partition: validator.to_string(),
4913                        mailbox_size: 1024,
4914                        epoch: Epoch::new(333),
4915                        leader_timeout: Duration::from_secs(10),
4916                        certification_timeout: Duration::from_secs(10),
4917                        timeout_retry: Duration::from_secs(10),
4918                        fetch_timeout: Duration::from_secs(1),
4919                        activity_timeout,
4920                        skip_timeout,
4921                        fetch_concurrent: 4,
4922                        replay_buffer: NZUsize!(1024 * 1024),
4923                        write_buffer: NZUsize!(1024 * 1024),
4924                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
4925                    };
4926                    let engine =
4927                        Engine::new(context.with_label(&format!("engine_{}", *validator)), cfg);
4928                    engine.start(pending, recovered, resolver);
4929                }
4930            }
4931
4932            // Allow started engines to consume preloaded certificates.
4933            context.sleep(Duration::from_secs(2)).await;
4934
4935            // ========== Assert the exact certificates are seen in each view ==========
4936
4937            // Assert the exact certificates in view F
4938            // All participants should have finalized B_0
4939            let view = View::new(f_view);
4940            for reporter in honest_reporters.iter() {
4941                let finalizations = reporter.finalizations.lock();
4942                assert!(finalizations.contains_key(&view));
4943            }
4944
4945            // Assert the exact certificates in view F+1
4946            // Group 1 should have notarized B_1A only
4947            // All other participants should have nullified F+1
4948            let view = View::new(f_view + 1);
4949            for (i, reporter) in honest_reporters.iter().enumerate() {
4950                let finalizations = reporter.finalizations.lock();
4951                assert!(!finalizations.contains_key(&view));
4952                let nullifications = reporter.nullifications.lock();
4953                let notarizations = reporter.notarizations.lock();
4954                match get_type(i) {
4955                    ParticipantType::Group1 => {
4956                        assert!(notarizations.contains_key(&view));
4957                        assert!(!nullifications.contains_key(&view));
4958                    }
4959                    _ => {
4960                        assert!(nullifications.contains_key(&view));
4961                        assert!(!notarizations.contains_key(&view));
4962                    }
4963                }
4964            }
4965
4966            // Assert the exact certificates in view F+2
4967            // Group 2 should have notarized B_1B only
4968            // All other participants should have nullified F+2
4969            let view = View::new(f_view + 2);
4970            for (i, reporter) in honest_reporters.iter().enumerate() {
4971                let finalizations = reporter.finalizations.lock();
4972                assert!(!finalizations.contains_key(&view));
4973                let nullifications = reporter.nullifications.lock();
4974                let notarizations = reporter.notarizations.lock();
4975                match get_type(i) {
4976                    ParticipantType::Group2 => {
4977                        assert!(notarizations.contains_key(&view));
4978                        assert!(!nullifications.contains_key(&view));
4979                    }
4980                    _ => {
4981                        assert!(nullifications.contains_key(&view));
4982                        assert!(!notarizations.contains_key(&view));
4983                    }
4984                }
4985            }
4986
4987            // Assert no members have yet nullified view F+3
4988            let next_view = View::new(f_view + 3);
4989            for (i, reporter) in honest_reporters.iter().enumerate() {
4990                let nullifies = reporter.nullifies.lock();
4991                assert!(!nullifies.contains_key(&next_view), "reporter {i}");
4992            }
4993
4994            // ========== Reconnect all participants ==========
4995
4996            // Reconnect all participants fully using the helper
4997            link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
4998
4999            // Wait until all honest reporters finalize strictly past F+2 (e.g., at least F+3)
5000            {
5001                let target = View::new(f_view + 3);
5002                let mut finalizers = Vec::new();
5003                for reporter in honest_reporters.iter_mut() {
5004                    let (mut latest, mut monitor) = reporter.subscribe().await;
5005                    finalizers.push(context.with_label("resume_finalizer").spawn(
5006                        move |_| async move {
5007                            while latest < target {
5008                                latest = monitor.recv().await.expect("event missing");
5009                            }
5010                        },
5011                    ));
5012                }
5013                join_all(finalizers).await;
5014            }
5015
5016            // Sanity checks: no faults/invalid signatures, and no peers blocked
5017            for reporter in honest_reporters.iter() {
5018                {
5019                    let faults = reporter.faults.lock();
5020                    assert!(faults.is_empty());
5021                }
5022                {
5023                    let invalid = reporter.invalid.lock();
5024                    assert_eq!(*invalid, 0);
5025                }
5026            }
5027            let blocked = oracle.blocked().await.unwrap();
5028            assert!(blocked.is_empty(), "blocked peers: {blocked:?}");
5029        });
5030    }
5031
5032    #[test_group("slow")]
5033    #[test_traced]
5034    fn test_split_views_no_lockup() {
5035        split_views_no_lockup::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
5036        split_views_no_lockup::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
5037        split_views_no_lockup::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
5038        split_views_no_lockup::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
5039        split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
5040        split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
5041        split_views_no_lockup::<_, _, RoundRobin>(ed25519::fixture);
5042        split_views_no_lockup::<_, _, RoundRobin>(secp256r1::fixture);
5043    }
5044
5045    fn tle<V, L>()
5046    where
5047        V: Variant,
5048        L: Elector<bls12381_threshold_vrf::Scheme<PublicKey, V>>,
5049    {
5050        // Create context
5051        let n = 4;
5052        let namespace = b"consensus".to_vec();
5053        let activity_timeout = ViewDelta::new(100);
5054        let skip_timeout = ViewDelta::new(50);
5055        let executor = deterministic::Runner::timed(Duration::from_secs(30));
5056        executor.start(|mut context| async move {
5057            // Create simulated network
5058            let (network, mut oracle) = Network::new(
5059                context.with_label("network"),
5060                Config {
5061                    max_size: 1024 * 1024,
5062                    disconnect_on_block: false,
5063                    tracked_peer_sets: None,
5064                },
5065            );
5066
5067            // Start network
5068            network.start();
5069
5070            // Register participants
5071            let Fixture {
5072                participants,
5073                schemes,
5074                ..
5075            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, &namespace, n);
5076            let mut registrations = register_validators(&mut oracle, &participants).await;
5077
5078            // Link all validators
5079            let link = Link {
5080                latency: Duration::from_millis(10),
5081                jitter: Duration::from_millis(5),
5082                success_rate: 1.0,
5083            };
5084            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5085
5086            // Create engines and reporters
5087            let elector = L::default();
5088            let relay = Arc::new(mocks::relay::Relay::new());
5089            let mut reporters = Vec::new();
5090            let mut engine_handlers = Vec::new();
5091            let monitor_reporter = Arc::new(Mutex::new(None));
5092            for (idx, validator) in participants.iter().enumerate() {
5093                // Create scheme context
5094                let context = context.with_label(&format!("validator_{}", *validator));
5095
5096                // Store first reporter for monitoring
5097                let reporter_config = mocks::reporter::Config {
5098                    participants: participants.clone().try_into().unwrap(),
5099                    scheme: schemes[idx].clone(),
5100                    elector: elector.clone(),
5101                };
5102                let reporter =
5103                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5104                reporters.push(reporter.clone());
5105                if idx == 0 {
5106                    *monitor_reporter.lock() = Some(reporter.clone());
5107                }
5108
5109                // Configure application
5110                let application_cfg = mocks::application::Config {
5111                    hasher: Sha256::default(),
5112                    relay: relay.clone(),
5113                    me: validator.clone(),
5114                    propose_latency: (10.0, 5.0),
5115                    verify_latency: (10.0, 5.0),
5116                    certify_latency: (10.0, 5.0),
5117                    should_certify: mocks::application::Certifier::Sometimes,
5118                };
5119                let (actor, application) = mocks::application::Application::new(
5120                    context.with_label("application"),
5121                    application_cfg,
5122                );
5123                actor.start();
5124                let blocker = oracle.control(validator.clone());
5125                let cfg = config::Config {
5126                    scheme: schemes[idx].clone(),
5127                    elector: elector.clone(),
5128                    blocker,
5129                    automaton: application.clone(),
5130                    relay: application.clone(),
5131                    reporter: reporter.clone(),
5132                    strategy: Sequential,
5133                    partition: validator.to_string(),
5134                    mailbox_size: 1024,
5135                    epoch: Epoch::new(333),
5136                    leader_timeout: Duration::from_millis(100),
5137                    certification_timeout: Duration::from_millis(200),
5138                    timeout_retry: Duration::from_millis(500),
5139                    fetch_timeout: Duration::from_millis(100),
5140                    activity_timeout,
5141                    skip_timeout,
5142                    fetch_concurrent: 4,
5143                    replay_buffer: NZUsize!(1024 * 1024),
5144                    write_buffer: NZUsize!(1024 * 1024),
5145                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5146                };
5147                let engine = Engine::new(context.with_label("engine"), cfg);
5148
5149                // Start engine
5150                let (pending, recovered, resolver) = registrations
5151                    .remove(validator)
5152                    .expect("validator should be registered");
5153                engine_handlers.push(engine.start(pending, recovered, resolver));
5154            }
5155
5156            // Prepare TLE test data
5157            let target = Round::new(Epoch::new(333), View::new(10)); // Encrypt for round (epoch 333, view 10)
5158            let message = b"Secret message for future view10"; // 32 bytes
5159
5160            // Encrypt message
5161            let ciphertext = schemes[0].encrypt(&mut context, target, *message);
5162
5163            // Wait for consensus to reach the target view and then decrypt
5164            let reporter = monitor_reporter.lock().clone().unwrap();
5165            loop {
5166                // Wait for notarization
5167                context.sleep(Duration::from_millis(100)).await;
5168                let notarizations = reporter.notarizations.lock();
5169                let Some(notarization) = notarizations.get(&target.view()) else {
5170                    continue;
5171                };
5172
5173                // Decrypt the message using the seed
5174                let seed = notarization.seed();
5175                let decrypted = seed
5176                    .decrypt(&ciphertext)
5177                    .expect("Decryption should succeed with valid seed signature");
5178                assert_eq!(
5179                    message,
5180                    decrypted.as_ref(),
5181                    "Decrypted message should match original message"
5182                );
5183                break;
5184            }
5185        });
5186    }
5187
5188    #[test_traced]
5189    fn test_tle() {
5190        tle::<MinPk, Random>();
5191        tle::<MinSig, Random>();
5192    }
5193
5194    fn hailstorm<S, F, L>(
5195        seed: u64,
5196        shutdowns: usize,
5197        interval: ViewDelta,
5198        mut fixture: F,
5199    ) -> String
5200    where
5201        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5202        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5203        L: Elector<S>,
5204    {
5205        // Create context
5206        let n = 5;
5207        let activity_timeout = ViewDelta::new(10);
5208        let skip_timeout = ViewDelta::new(5);
5209        let namespace = b"consensus".to_vec();
5210        let cfg = deterministic::Config::new().with_seed(seed);
5211        let executor = deterministic::Runner::new(cfg);
5212        executor.start(|mut context| async move {
5213            // Create simulated network
5214            let (network, mut oracle) = Network::new(
5215                context.with_label("network"),
5216                Config {
5217                    max_size: 1024 * 1024,
5218                    disconnect_on_block: true,
5219                    tracked_peer_sets: None,
5220                },
5221            );
5222
5223            // Start network
5224            network.start();
5225
5226            // Register participants
5227            let Fixture {
5228                participants,
5229                schemes,
5230                ..
5231            } = fixture(&mut context, &namespace, n);
5232            let mut registrations = register_validators(&mut oracle, &participants).await;
5233
5234            // Link all validators
5235            let link = Link {
5236                latency: Duration::from_millis(10),
5237                jitter: Duration::from_millis(1),
5238                success_rate: 1.0,
5239            };
5240            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5241
5242            // Create engines
5243            let elector = L::default();
5244            let relay = Arc::new(mocks::relay::Relay::new());
5245            let mut reporters = BTreeMap::new();
5246            let mut engine_handlers = BTreeMap::new();
5247            for (idx, validator) in participants.iter().enumerate() {
5248                // Create scheme context
5249                let context = context.with_label(&format!("validator_{}", *validator));
5250
5251                // Configure engine
5252                let reporter_config = mocks::reporter::Config {
5253                    participants: participants.clone().try_into().unwrap(),
5254                    scheme: schemes[idx].clone(),
5255                    elector: elector.clone(),
5256                };
5257                let reporter =
5258                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5259                reporters.insert(idx, reporter.clone());
5260                let application_cfg = mocks::application::Config {
5261                    hasher: Sha256::default(),
5262                    relay: relay.clone(),
5263                    me: validator.clone(),
5264                    propose_latency: (10.0, 5.0),
5265                    verify_latency: (10.0, 5.0),
5266                    certify_latency: (10.0, 5.0),
5267                    should_certify: mocks::application::Certifier::Sometimes,
5268                };
5269                let (actor, application) = mocks::application::Application::new(
5270                    context.with_label("application"),
5271                    application_cfg,
5272                );
5273                actor.start();
5274                let blocker = oracle.control(validator.clone());
5275                let cfg = config::Config {
5276                    scheme: schemes[idx].clone(),
5277                    elector: elector.clone(),
5278                    blocker,
5279                    automaton: application.clone(),
5280                    relay: application.clone(),
5281                    reporter: reporter.clone(),
5282                    strategy: Sequential,
5283                    partition: validator.to_string(),
5284                    mailbox_size: 1024,
5285                    epoch: Epoch::new(333),
5286                    leader_timeout: Duration::from_secs(1),
5287                    certification_timeout: Duration::from_secs(2),
5288                    timeout_retry: Duration::from_secs(10),
5289                    fetch_timeout: Duration::from_secs(1),
5290                    activity_timeout,
5291                    skip_timeout,
5292                    fetch_concurrent: 4,
5293                    replay_buffer: NZUsize!(1024 * 1024),
5294                    write_buffer: NZUsize!(1024 * 1024),
5295                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5296                };
5297                let engine = Engine::new(context.with_label("engine"), cfg);
5298
5299                // Start engine
5300                let (pending, recovered, resolver) = registrations
5301                    .remove(validator)
5302                    .expect("validator should be registered");
5303                engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
5304            }
5305
5306            // Run shutdowns
5307            let mut target = View::zero();
5308            for i in 0..shutdowns {
5309                // Update target
5310                target = target.saturating_add(interval);
5311
5312                // Wait for all engines to finish
5313                let mut finalizers = Vec::new();
5314                for (_, reporter) in reporters.iter_mut() {
5315                    let (mut latest, mut monitor) = reporter.subscribe().await;
5316                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5317                        while latest < target {
5318                            latest = monitor.recv().await.expect("event missing");
5319                        }
5320                    }));
5321                }
5322                join_all(finalizers).await;
5323                target = target.saturating_add(interval);
5324
5325                // Select a random engine to shutdown
5326                let idx = context.gen_range(0..engine_handlers.len());
5327                let validator = &participants[idx];
5328                let handle = engine_handlers.remove(&idx).unwrap();
5329                handle.abort();
5330                let _ = handle.await;
5331                let selected_reporter = reporters.remove(&idx).unwrap();
5332                info!(idx, ?validator, "shutdown validator");
5333
5334                // Wait for all engines to finish
5335                let mut finalizers = Vec::new();
5336                for (_, reporter) in reporters.iter_mut() {
5337                    let (mut latest, mut monitor) = reporter.subscribe().await;
5338                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5339                        while latest < target {
5340                            latest = monitor.recv().await.expect("event missing");
5341                        }
5342                    }));
5343                }
5344                join_all(finalizers).await;
5345                target = target.saturating_add(interval);
5346
5347                // Recreate engine
5348                info!(idx, ?validator, "restarting validator");
5349                let context =
5350                    context.with_label(&format!("validator_{}_restarted_{}", *validator, i));
5351
5352                // Start engine
5353                let (pending, recovered, resolver) =
5354                    register_validator(&mut oracle, validator.clone()).await;
5355                let application_cfg = mocks::application::Config {
5356                    hasher: Sha256::default(),
5357                    relay: relay.clone(),
5358                    me: validator.clone(),
5359                    propose_latency: (10.0, 5.0),
5360                    verify_latency: (10.0, 5.0),
5361                    certify_latency: (10.0, 5.0),
5362                    should_certify: mocks::application::Certifier::Sometimes,
5363                };
5364                let (actor, application) = mocks::application::Application::new(
5365                    context.with_label("application"),
5366                    application_cfg,
5367                );
5368                actor.start();
5369                reporters.insert(idx, selected_reporter.clone());
5370                let blocker = oracle.control(validator.clone());
5371                let cfg = config::Config {
5372                    scheme: schemes[idx].clone(),
5373                    elector: elector.clone(),
5374                    blocker,
5375                    automaton: application.clone(),
5376                    relay: application.clone(),
5377                    reporter: selected_reporter,
5378                    strategy: Sequential,
5379                    partition: validator.to_string(),
5380                    mailbox_size: 1024,
5381                    epoch: Epoch::new(333),
5382                    leader_timeout: Duration::from_secs(1),
5383                    certification_timeout: Duration::from_secs(2),
5384                    timeout_retry: Duration::from_secs(10),
5385                    fetch_timeout: Duration::from_secs(1),
5386                    activity_timeout,
5387                    skip_timeout,
5388                    fetch_concurrent: 4,
5389                    replay_buffer: NZUsize!(1024 * 1024),
5390                    write_buffer: NZUsize!(1024 * 1024),
5391                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5392                };
5393                let engine = Engine::new(context.with_label("engine"), cfg);
5394                engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
5395
5396                // Wait for all engines to hit required containers
5397                let mut finalizers = Vec::new();
5398                for (_, reporter) in reporters.iter_mut() {
5399                    let (mut latest, mut monitor) = reporter.subscribe().await;
5400                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5401                        while latest < target {
5402                            latest = monitor.recv().await.expect("event missing");
5403                        }
5404                    }));
5405                }
5406                join_all(finalizers).await;
5407                info!(idx, ?validator, "validator recovered");
5408            }
5409
5410            // Check reporters for correct activity
5411            let latest_complete = target.saturating_sub(activity_timeout);
5412            for (_, reporter) in reporters.iter() {
5413                // Ensure no faults
5414                {
5415                    let faults = reporter.faults.lock();
5416                    assert!(faults.is_empty());
5417                }
5418
5419                // Ensure no invalid signatures
5420                {
5421                    let invalid = reporter.invalid.lock();
5422                    assert_eq!(*invalid, 0);
5423                }
5424
5425                // Ensure no forks
5426                let mut notarized = HashMap::new();
5427                let mut finalized = HashMap::new();
5428                {
5429                    let notarizes = reporter.notarizes.lock();
5430                    for view in View::range(View::new(1), latest_complete) {
5431                        // Ensure only one payload proposed per view
5432                        let Some(payloads) = notarizes.get(&view) else {
5433                            continue;
5434                        };
5435                        if payloads.len() > 1 {
5436                            panic!("view: {view}");
5437                        }
5438                        let (digest, _) = payloads.iter().next().unwrap();
5439                        notarized.insert(view, *digest);
5440                    }
5441                }
5442                {
5443                    let notarizations = reporter.notarizations.lock();
5444                    for view in View::range(View::new(1), latest_complete) {
5445                        // Ensure notarization matches digest from notarizes
5446                        let Some(notarization) = notarizations.get(&view) else {
5447                            continue;
5448                        };
5449                        let Some(digest) = notarized.get(&view) else {
5450                            continue;
5451                        };
5452                        assert_eq!(&notarization.proposal.payload, digest);
5453                    }
5454                }
5455                {
5456                    let finalizes = reporter.finalizes.lock();
5457                    for view in View::range(View::new(1), latest_complete) {
5458                        // Ensure only one payload proposed per view
5459                        let Some(payloads) = finalizes.get(&view) else {
5460                            continue;
5461                        };
5462                        if payloads.len() > 1 {
5463                            panic!("view: {view}");
5464                        }
5465                        let (digest, _) = payloads.iter().next().unwrap();
5466                        finalized.insert(view, *digest);
5467
5468                        // Only check at views below timeout
5469                        if view > latest_complete {
5470                            continue;
5471                        }
5472
5473                        // Ensure no nullifies for any finalizers
5474                        let nullifies = reporter.nullifies.lock();
5475                        let Some(nullifies) = nullifies.get(&view) else {
5476                            continue;
5477                        };
5478                        for (_, finalizers) in payloads.iter() {
5479                            for finalizer in finalizers.iter() {
5480                                if nullifies.contains(finalizer) {
5481                                    panic!("should not nullify and finalize at same view");
5482                                }
5483                            }
5484                        }
5485                    }
5486                }
5487                {
5488                    let finalizations = reporter.finalizations.lock();
5489                    for view in View::range(View::new(1), latest_complete) {
5490                        // Ensure finalization matches digest from finalizes
5491                        let Some(finalization) = finalizations.get(&view) else {
5492                            continue;
5493                        };
5494                        let Some(digest) = finalized.get(&view) else {
5495                            continue;
5496                        };
5497                        assert_eq!(&finalization.proposal.payload, digest);
5498                    }
5499                }
5500            }
5501
5502            // Ensure no blocked connections
5503            let blocked = oracle.blocked().await.unwrap();
5504            assert!(blocked.is_empty());
5505
5506            // Return state for audit
5507            context.auditor().state()
5508        })
5509    }
5510
5511    #[test_group("slow")]
5512    #[test_traced]
5513    fn test_hailstorm_bls12381_threshold_vrf_min_pk() {
5514        assert_eq!(
5515            hailstorm::<_, _, Random>(
5516                0,
5517                10,
5518                ViewDelta::new(15),
5519                bls12381_threshold_vrf::fixture::<MinPk, _>
5520            ),
5521            hailstorm::<_, _, Random>(
5522                0,
5523                10,
5524                ViewDelta::new(15),
5525                bls12381_threshold_vrf::fixture::<MinPk, _>
5526            ),
5527        );
5528    }
5529
5530    #[test_group("slow")]
5531    #[test_traced]
5532    fn test_hailstorm_bls12381_threshold_vrf_min_sig() {
5533        assert_eq!(
5534            hailstorm::<_, _, Random>(
5535                0,
5536                10,
5537                ViewDelta::new(15),
5538                bls12381_threshold_vrf::fixture::<MinSig, _>
5539            ),
5540            hailstorm::<_, _, Random>(
5541                0,
5542                10,
5543                ViewDelta::new(15),
5544                bls12381_threshold_vrf::fixture::<MinSig, _>
5545            ),
5546        );
5547    }
5548
5549    #[test_group("slow")]
5550    #[test_traced]
5551    fn test_hailstorm_bls12381_threshold_std_min_pk() {
5552        assert_eq!(
5553            hailstorm::<_, _, RoundRobin>(
5554                0,
5555                10,
5556                ViewDelta::new(15),
5557                bls12381_threshold_std::fixture::<MinPk, _>
5558            ),
5559            hailstorm::<_, _, RoundRobin>(
5560                0,
5561                10,
5562                ViewDelta::new(15),
5563                bls12381_threshold_std::fixture::<MinPk, _>
5564            ),
5565        );
5566    }
5567
5568    #[test_group("slow")]
5569    #[test_traced]
5570    fn test_hailstorm_bls12381_threshold_std_min_sig() {
5571        assert_eq!(
5572            hailstorm::<_, _, RoundRobin>(
5573                0,
5574                10,
5575                ViewDelta::new(15),
5576                bls12381_threshold_std::fixture::<MinSig, _>
5577            ),
5578            hailstorm::<_, _, RoundRobin>(
5579                0,
5580                10,
5581                ViewDelta::new(15),
5582                bls12381_threshold_std::fixture::<MinSig, _>
5583            ),
5584        );
5585    }
5586
5587    #[test_group("slow")]
5588    #[test_traced]
5589    fn test_hailstorm_bls12381_multisig_min_pk() {
5590        assert_eq!(
5591            hailstorm::<_, _, RoundRobin>(
5592                0,
5593                10,
5594                ViewDelta::new(15),
5595                bls12381_multisig::fixture::<MinPk, _>
5596            ),
5597            hailstorm::<_, _, RoundRobin>(
5598                0,
5599                10,
5600                ViewDelta::new(15),
5601                bls12381_multisig::fixture::<MinPk, _>
5602            ),
5603        );
5604    }
5605
5606    #[test_group("slow")]
5607    #[test_traced]
5608    fn test_hailstorm_bls12381_multisig_min_sig() {
5609        assert_eq!(
5610            hailstorm::<_, _, RoundRobin>(
5611                0,
5612                10,
5613                ViewDelta::new(15),
5614                bls12381_multisig::fixture::<MinSig, _>
5615            ),
5616            hailstorm::<_, _, RoundRobin>(
5617                0,
5618                10,
5619                ViewDelta::new(15),
5620                bls12381_multisig::fixture::<MinSig, _>
5621            ),
5622        );
5623    }
5624
5625    #[test_group("slow")]
5626    #[test_traced]
5627    fn test_hailstorm_ed25519() {
5628        assert_eq!(
5629            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture),
5630            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture)
5631        );
5632    }
5633
5634    #[test_group("slow")]
5635    #[test_traced]
5636    fn test_hailstorm_secp256r1() {
5637        assert_eq!(
5638            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture),
5639            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture)
5640        );
5641    }
5642
5643    /// Implementation of [Twins: BFT Systems Made Robust](https://arxiv.org/abs/2004.10617).
5644    fn twins<S, F, L>(seed: u64, n: u32, strategy: Strategy, link: Link, mut fixture: F)
5645    where
5646        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5647        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5648        L: Elector<S>,
5649    {
5650        let faults = N3f1::max_faults(n);
5651        let required_containers = View::new(100);
5652        let activity_timeout = ViewDelta::new(10);
5653        let skip_timeout = ViewDelta::new(5);
5654        let namespace = b"consensus".to_vec();
5655        let cfg = deterministic::Config::new()
5656            .with_seed(seed)
5657            .with_timeout(Some(Duration::from_secs(900)));
5658        let executor = deterministic::Runner::new(cfg);
5659        executor.start(|mut context| async move {
5660            let (network, mut oracle) = Network::new(
5661                context.with_label("network"),
5662                Config {
5663                    max_size: 1024 * 1024,
5664                    disconnect_on_block: false,
5665                    tracked_peer_sets: None,
5666                },
5667            );
5668            network.start();
5669
5670            let Fixture {
5671                participants,
5672                schemes,
5673                ..
5674            } = fixture(&mut context, &namespace, n);
5675            let participants: Arc<[_]> = participants.into();
5676            let mut registrations = register_validators(&mut oracle, &participants).await;
5677            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5678
5679            // We don't apply partitioning to the relay explicitly, however, a participant will only query the relay by digest
5680            // after receiving a vote (implicitly respecting the partitioning)
5681            let elector = L::default();
5682            let relay = Arc::new(mocks::relay::Relay::new());
5683            let mut reporters = Vec::new();
5684            let mut engine_handlers = Vec::new();
5685
5686            // Create twin engines (f Byzantine twins)
5687            for (idx, validator) in participants.iter().enumerate().take(faults as usize) {
5688                let (
5689                    (vote_sender, vote_receiver),
5690                    (certificate_sender, certificate_receiver),
5691                    (resolver_sender, resolver_receiver),
5692                ) = registrations
5693                    .remove(validator)
5694                    .expect("validator should be registered");
5695
5696                // Create forwarder closures for votes
5697                let make_vote_forwarder = || {
5698                    let participants = participants.clone();
5699                    move |origin: SplitOrigin, _: &Recipients<_>, message: &IoBuf| {
5700                        let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
5701                        let (primary, secondary) =
5702                            strategy.partitions(msg.view(), participants.as_ref());
5703                        match origin {
5704                            SplitOrigin::Primary => Some(Recipients::Some(primary)),
5705                            SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
5706                        }
5707                    }
5708                };
5709                // Create forwarder closures for certificates
5710                let make_certificate_forwarder = || {
5711                    let codec = schemes[idx].certificate_codec_config();
5712                    let participants = participants.clone();
5713                    move |origin: SplitOrigin, _: &Recipients<_>, message: &IoBuf| {
5714                        let msg: Certificate<S, D> =
5715                            Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
5716                        let (primary, secondary) =
5717                            strategy.partitions(msg.view(), participants.as_ref());
5718                        match origin {
5719                            SplitOrigin::Primary => Some(Recipients::Some(primary)),
5720                            SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
5721                        }
5722                    }
5723                };
5724                let make_drop_forwarder =
5725                    || move |_: SplitOrigin, _: &Recipients<_>, _: &IoBuf| None;
5726
5727                // Create router closures for votes
5728                let make_vote_router = || {
5729                    let participants = participants.clone();
5730                    move |(sender, message): &(_, IoBuf)| {
5731                        let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
5732                        strategy.route(msg.view(), sender, participants.as_ref())
5733                    }
5734                };
5735                // Create router closures for certificates
5736                let make_certificate_router = || {
5737                    let codec = schemes[idx].certificate_codec_config();
5738                    let participants = participants.clone();
5739                    move |(sender, message): &(_, IoBuf)| {
5740                        let msg: Certificate<S, D> =
5741                            Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
5742                        strategy.route(msg.view(), sender, participants.as_ref())
5743                    }
5744                };
5745                let make_drop_router = || move |(_, _): &(_, _)| SplitTarget::None;
5746
5747                // Apply view-based forwarder and router to pending and recovered channel
5748                let (vote_sender_primary, vote_sender_secondary) =
5749                    vote_sender.split_with(make_vote_forwarder());
5750                let (vote_receiver_primary, vote_receiver_secondary) = vote_receiver.split_with(
5751                    context.with_label(&format!("pending_split_{idx}")),
5752                    make_vote_router(),
5753                );
5754                let (certificate_sender_primary, certificate_sender_secondary) =
5755                    certificate_sender.split_with(make_certificate_forwarder());
5756                let (certificate_receiver_primary, certificate_receiver_secondary) =
5757                    certificate_receiver.split_with(
5758                        context.with_label(&format!("recovered_split_{idx}")),
5759                        make_certificate_router(),
5760                    );
5761
5762                // Prevent any resolver messages from being sent or received by twins (these messages aren't cleanly mapped to a view and allowing them to bypass partitions seems wrong)
5763                let (resolver_sender_primary, resolver_sender_secondary) =
5764                    resolver_sender.split_with(make_drop_forwarder());
5765                let (resolver_receiver_primary, resolver_receiver_secondary) = resolver_receiver
5766                    .split_with(
5767                        context.with_label(&format!("resolver_split_{idx}")),
5768                        make_drop_router(),
5769                    );
5770
5771                for (twin_label, pending, recovered, resolver) in [
5772                    (
5773                        "primary",
5774                        (vote_sender_primary, vote_receiver_primary),
5775                        (certificate_sender_primary, certificate_receiver_primary),
5776                        (resolver_sender_primary, resolver_receiver_primary),
5777                    ),
5778                    (
5779                        "secondary",
5780                        (vote_sender_secondary, vote_receiver_secondary),
5781                        (certificate_sender_secondary, certificate_receiver_secondary),
5782                        (resolver_sender_secondary, resolver_receiver_secondary),
5783                    ),
5784                ] {
5785                    let label = format!("twin_{idx}_{twin_label}");
5786                    let context = context.with_label(&label);
5787
5788                    let reporter_config = mocks::reporter::Config {
5789                        participants: participants.as_ref().try_into().unwrap(),
5790                        scheme: schemes[idx].clone(),
5791                        elector: elector.clone(),
5792                    };
5793                    let reporter = mocks::reporter::Reporter::new(
5794                        context.with_label("reporter"),
5795                        reporter_config,
5796                    );
5797                    reporters.push(reporter.clone());
5798
5799                    let application_cfg = mocks::application::Config {
5800                        hasher: Sha256::default(),
5801                        relay: relay.clone(),
5802                        me: validator.clone(),
5803                        propose_latency: (10.0, 5.0),
5804                        verify_latency: (10.0, 5.0),
5805                        certify_latency: (10.0, 5.0),
5806                        should_certify: mocks::application::Certifier::Sometimes,
5807                    };
5808                    let (actor, application) = mocks::application::Application::new(
5809                        context.with_label("application"),
5810                        application_cfg,
5811                    );
5812                    actor.start();
5813
5814                    let blocker = oracle.control(validator.clone());
5815                    let cfg = config::Config {
5816                        scheme: schemes[idx].clone(),
5817                        elector: elector.clone(),
5818                        blocker,
5819                        automaton: application.clone(),
5820                        relay: application.clone(),
5821                        reporter: reporter.clone(),
5822                        strategy: Sequential,
5823                        partition: label,
5824                        mailbox_size: 1024,
5825                        epoch: Epoch::new(333),
5826                        leader_timeout: Duration::from_secs(1),
5827                        certification_timeout: Duration::from_secs(2),
5828                        timeout_retry: Duration::from_secs(10),
5829                        fetch_timeout: Duration::from_secs(1),
5830                        activity_timeout,
5831                        skip_timeout,
5832                        fetch_concurrent: 4,
5833                        replay_buffer: NZUsize!(1024 * 1024),
5834                        write_buffer: NZUsize!(1024 * 1024),
5835                        page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5836                    };
5837                    let engine = Engine::new(context.with_label("engine"), cfg);
5838                    engine_handlers.push(engine.start(pending, recovered, resolver));
5839                }
5840            }
5841
5842            // Create honest engines
5843            for (idx, validator) in participants.iter().enumerate().skip(faults as usize) {
5844                let label = format!("honest_{idx}");
5845                let context = context.with_label(&label);
5846
5847                let reporter_config = mocks::reporter::Config {
5848                    participants: participants.as_ref().try_into().unwrap(),
5849                    scheme: schemes[idx].clone(),
5850                    elector: elector.clone(),
5851                };
5852                let reporter =
5853                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5854                reporters.push(reporter.clone());
5855
5856                let application_cfg = mocks::application::Config {
5857                    hasher: Sha256::default(),
5858                    relay: relay.clone(),
5859                    me: validator.clone(),
5860                    propose_latency: (10.0, 5.0),
5861                    verify_latency: (10.0, 5.0),
5862                    certify_latency: (10.0, 5.0),
5863                    should_certify: mocks::application::Certifier::Sometimes,
5864                };
5865                let (actor, application) = mocks::application::Application::new(
5866                    context.with_label("application"),
5867                    application_cfg,
5868                );
5869                actor.start();
5870
5871                let blocker = oracle.control(validator.clone());
5872                let cfg = config::Config {
5873                    scheme: schemes[idx].clone(),
5874                    elector: elector.clone(),
5875                    blocker,
5876                    automaton: application.clone(),
5877                    relay: application.clone(),
5878                    reporter: reporter.clone(),
5879                    strategy: Sequential,
5880                    partition: label,
5881                    mailbox_size: 1024,
5882                    epoch: Epoch::new(333),
5883                    leader_timeout: Duration::from_secs(1),
5884                    certification_timeout: Duration::from_secs(2),
5885                    timeout_retry: Duration::from_secs(10),
5886                    fetch_timeout: Duration::from_secs(1),
5887                    activity_timeout,
5888                    skip_timeout,
5889                    fetch_concurrent: 4,
5890                    replay_buffer: NZUsize!(1024 * 1024),
5891                    write_buffer: NZUsize!(1024 * 1024),
5892                    page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
5893                };
5894                let engine = Engine::new(context.with_label("engine"), cfg);
5895
5896                let (pending, recovered, resolver) = registrations
5897                    .remove(validator)
5898                    .expect("validator should be registered");
5899                engine_handlers.push(engine.start(pending, recovered, resolver));
5900            }
5901
5902            // Wait for progress (liveness check)
5903            let mut finalizers = Vec::new();
5904            for reporter in reporters.iter_mut() {
5905                let (mut latest, mut monitor) = reporter.subscribe().await;
5906                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5907                    while latest < required_containers {
5908                        latest = monitor.recv().await.expect("event missing");
5909                    }
5910                }));
5911            }
5912            join_all(finalizers).await;
5913
5914            // Verify safety: no conflicting finalizations across honest reporters
5915            let honest_start = faults as usize * 2; // Each twin produces 2 reporters
5916            let mut finalized_at_view: BTreeMap<View, D> = BTreeMap::new();
5917            for reporter in reporters.iter().skip(honest_start) {
5918                let finalizations = reporter.finalizations.lock();
5919                for (view, finalization) in finalizations.iter() {
5920                    let digest = finalization.proposal.payload;
5921                    if let Some(existing) = finalized_at_view.get(view) {
5922                        assert_eq!(
5923                            existing, &digest,
5924                            "safety violation: conflicting finalizations at view {view}"
5925                        );
5926                    } else {
5927                        finalized_at_view.insert(*view, digest);
5928                    }
5929                }
5930            }
5931
5932            // Verify no invalid signatures were observed
5933            for reporter in reporters.iter().skip(honest_start) {
5934                let invalid = reporter.invalid.lock();
5935                assert_eq!(*invalid, 0, "invalid signatures detected");
5936            }
5937
5938            // Ensure faults are attributable to twins
5939            let twin_identities: Vec<_> = participants.iter().take(faults as usize).collect();
5940            for reporter in reporters.iter().skip(honest_start) {
5941                let faults = reporter.faults.lock();
5942                for (faulter, _) in faults.iter() {
5943                    assert!(
5944                        twin_identities.contains(&faulter),
5945                        "fault from non-twin participant"
5946                    );
5947                }
5948            }
5949
5950            // Ensure blocked connections are attributable to twins
5951            let blocked = oracle.blocked().await.unwrap();
5952            for (_, faulter) in blocked.iter() {
5953                assert!(
5954                    twin_identities.contains(&faulter),
5955                    "blocked connection from non-twin participant"
5956                );
5957            }
5958        });
5959    }
5960
5961    fn test_twins<S, F, L>(mut fixture: F)
5962    where
5963        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5964        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5965        L: Elector<S>,
5966    {
5967        for strategy in [
5968            Strategy::View,
5969            Strategy::Fixed(3),
5970            Strategy::Isolate(4),
5971            Strategy::Broadcast,
5972            Strategy::Shuffle,
5973        ] {
5974            for link in [
5975                Link {
5976                    latency: Duration::from_millis(10),
5977                    jitter: Duration::from_millis(1),
5978                    success_rate: 1.0,
5979                },
5980                Link {
5981                    latency: Duration::from_millis(200),
5982                    jitter: Duration::from_millis(150),
5983                    success_rate: 0.75,
5984                },
5985            ] {
5986                twins::<S, _, L>(0, 5, strategy, link, |namespace, context, n| {
5987                    fixture(namespace, context, n)
5988                });
5989            }
5990        }
5991    }
5992
5993    #[test_group("slow")]
5994    #[test_traced]
5995    fn test_twins_multisig_min_pk() {
5996        test_twins::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
5997    }
5998
5999    #[test_group("slow")]
6000    #[test_traced]
6001    fn test_twins_multisig_min_sig() {
6002        test_twins::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
6003    }
6004
6005    #[test_group("slow")]
6006    #[test_traced]
6007    fn test_twins_threshold_vrf_min_pk() {
6008        test_twins::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
6009    }
6010
6011    #[test_group("slow")]
6012    #[test_traced]
6013    fn test_twins_threshold_vrf_min_sig() {
6014        test_twins::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
6015    }
6016
6017    #[test_group("slow")]
6018    #[test_traced]
6019    fn test_twins_threshold_std_min_pk() {
6020        test_twins::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
6021    }
6022
6023    #[test_group("slow")]
6024    #[test_traced]
6025    fn test_twins_threshold_std_min_sig() {
6026        test_twins::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
6027    }
6028
6029    #[test_group("slow")]
6030    #[test_traced]
6031    fn test_twins_ed25519() {
6032        test_twins::<_, _, RoundRobin>(ed25519::fixture);
6033    }
6034
6035    #[test_group("slow")]
6036    #[test_traced]
6037    fn test_twins_secp256r1() {
6038        test_twins::<_, _, RoundRobin>(secp256r1::fixture);
6039    }
6040
6041    #[test_group("slow")]
6042    #[test_traced]
6043    fn test_twins_large_view() {
6044        twins::<_, _, Random>(
6045            0,
6046            10,
6047            Strategy::View,
6048            Link {
6049                latency: Duration::from_millis(200),
6050                jitter: Duration::from_millis(150),
6051                success_rate: 0.75,
6052            },
6053            bls12381_threshold_vrf::fixture::<MinPk, _>,
6054        );
6055    }
6056
6057    #[test_group("slow")]
6058    #[test_traced]
6059    fn test_twins_large_shuffle() {
6060        twins::<_, _, Random>(
6061            0,
6062            10,
6063            Strategy::Shuffle,
6064            Link {
6065                latency: Duration::from_millis(200),
6066                jitter: Duration::from_millis(150),
6067                success_rate: 0.75,
6068            },
6069            bls12381_threshold_vrf::fixture::<MinPk, _>,
6070        );
6071    }
6072}