Skip to main content

commonware_consensus/simplex/
mod.rs

1//! Simple and fast BFT agreement inspired by Simplex Consensus.
2//!
3//! Inspired by [Simplex Consensus](https://eprint.iacr.org/2023/463), `simplex` provides simple and fast BFT
4//! agreement with network-speed view (i.e. block time) latency and optimal finalization latency in a
5//! partially synchronous setting.
6//!
7//! # Features
8//!
9//! * Wicked Fast Block Times (2 Network Hops)
10//! * Optimal Finalization Latency (3 Network Hops)
11//! * Externalized Uptime and Fault Proofs
12//! * Require Certification Before Finalization
13//! * Decoupled Block Broadcast and Sync
14//! * Lazy Message Verification
15//! * Application-Defined Block Format
16//! * Pluggable Hashing and Cryptography
17//! * Embedded VRF (via [scheme::bls12381_threshold::vrf])
18//!
19//! # Design
20//!
21//! ## Protocol Description
22//!
23//! ### Genesis
24//!
25//! Genesis (view 0) is implicitly finalized. There is no finalization certificate for genesis;
26//! the digest returned by [`Automaton::genesis`](crate::Automaton::genesis) serves as the initial
27//! finalized state. Voting begins at view 1, with the first proposal referencing genesis as its parent.
28//!
29//! ### Specification for View `v`
30//!
31//! Upon entering view `v`:
32//! * Determine leader `l` for view `v`
33//! * Set timer for leader proposal `t_l = 2Δ` and advance `t_a = 3Δ`
34//!     * If leader `l` has not been active in last `r` views, set `t_l` to 0.
35//! * If leader `l`, broadcast `notarize(c,v)`
36//!   * If can't propose container in view `v` because missing notarization/nullification for a
37//!     previous view `v_m`, request `v_m`
38//!
39//! Upon receiving first `notarize(c,v)` from `l`:
40//! * Cancel `t_l`
41//! * If the container's parent `c_parent` is finalized (or both notarized and certified) at `v_parent`
42//!   and we have nullifications for all views between `v` and `v_parent`, verify `c` and broadcast `notarize(c,v)`
43//!     * If verification of `c` fails, immediately broadcast `nullify(v)`
44//!
45//! Upon receiving `2f+1` `notarize(c,v)`:
46//! * Cancel `t_a`
47//! * Mark `c` as notarized
48//! * Broadcast `notarization(c,v)` (even if we have not verified `c`)
49//! * Attempt to certify `c` (see [Certification](#certification))
50//!     * On success: broadcast `finalize(c,v)` (if have not broadcast `nullify(v)`) and enter `v+1`
51//!     * On failure: broadcast `nullify(v)`
52//!
53//! Upon receiving `2f+1` `nullify(v)`:
54//! * Broadcast `nullification(v)`
55//! * Enter `v+1`
56//!
57//! Upon receiving `2f+1` `finalize(c,v)`:
58//! * Mark `c` as finalized (and recursively finalize its parents)
59//! * Broadcast `finalization(c,v)` (even if we have not verified `c`)
60//!
61//! Upon `t_l` or `t_a` firing:
62//! * Broadcast `nullify(v)`
63//! * Every `t_r` after `nullify(v)` broadcast that we are still in view `v`:
64//!    * Rebroadcast `nullify(v)` and either `notarization(v-1)` or `nullification(v-1)`
65//!
66//! _When `2f+1` votes of a given type (`notarize(c,v)`, `nullify(v)`, or `finalize(c,v)`) have been have been collected
67//! from unique participants, a certificate (`notarization(c,v)`, `nullification(v)`, or `finalization(c,v)`) can be assembled.
68//! These certificates serve as a standalone proof of consensus progress that downstream systems can ingest without executing
69//! the protocol._
70//!
71//! ### Joining Consensus
72//!
73//! As soon as `2f+1` nullifies or finalizes are observed for some view `v`, the `Voter` will
74//! enter `v+1`. Notarizations advance the view if-and-only-if the application certifies them.
75//! This means that a new participant joining consensus will immediately jump ahead on the previous
76//! view's nullification or finalization and begin participating in consensus at the current view.
77//!
78//! ### Deviations from Simplex Consensus
79//!
80//! * Fetch missing notarizations/nullifications as needed rather than assuming each proposal contains
81//!   a set of all notarizations/nullifications for all historical blocks.
82//! * Introduce distinct messages for `notarize` and `nullify` rather than referring to both as a `vote` for
83//!   either a "block" or a "dummy block", respectively.
84//! * Introduce a "leader timeout" to trigger early view transitions for unresponsive leaders.
85//! * Skip "leader timeout" and "notarization timeout" if a designated leader hasn't participated in
86//!   some number of views (again to trigger early view transition for an unresponsive leader).
87//! * Introduce message rebroadcast to continue making progress if messages from a given view are dropped (only way
88//!   to ensure messages are reliably delivered is with a heavyweight reliable broadcast protocol).
89//! * Immediately broadcast `nullify(v)` if the leader's proposal fails verification (rather than waiting for a
90//!   timeout to fire).
91//! * Upon seeing `notarization(c,v)`, instead of moving to the view `v+1` immediately, request certification from
92//!   the application (see [Certification](#certification)). Only move to view `v+1` and broadcast `finalize(c,v)`
93//!   if certification succeeds, otherwise broadcast `nullify(v)` and refuse to build upon `c`.
94//!
95//! ## Architecture
96//!
97//! All logic is split into four components: the `Batcher`, the `Voter`, the `Resolver`, and the `Application` (provided by the user).
98//! The `Batcher` is responsible for collecting messages from peers and lazily verifying them when a quorum is met. The `Voter`
99//! is responsible for directing participation in the current view. The `Resolver` is responsible for
100//! fetching artifacts from previous views required to verify proposed blocks in the latest view. Lastly, the `Application`
101//! is responsible for proposing new blocks and indicating whether some block is valid.
102//!
103//! To drive great performance, all interactions between `Batcher`, `Voter`, `Resolver`, and `Application` are
104//! non-blocking. This means that, for example, the `Voter` can continue processing messages while the
105//! `Application` verifies a proposed block or the `Resolver` fetches a notarization.
106//!
107//! ```txt
108//!                            +------------+          +++++++++++++++
109//!                            |            +--------->+             +
110//!                            |  Batcher   |          +    Peers    +
111//!                            |            |<---------+             +
112//!                            +-------+----+          +++++++++++++++
113//!                                |   ^
114//!                                |   |
115//!                                |   |
116//!                                |   |
117//!                                v   |
118//! +---------------+           +---------+            +++++++++++++++
119//! |               |<----------+         +----------->+             +
120//! |  Application  |           |  Voter  |            +    Peers    +
121//! |               +---------->|         |<-----------+             +
122//! +---------------+           +--+------+            +++++++++++++++
123//!                                |   ^
124//!                                |   |
125//!                                |   |
126//!                                |   |
127//!                                v   |
128//!                            +-------+----+          +++++++++++++++
129//!                            |            +--------->+             +
130//!                            |  Resolver  |          +    Peers    +
131//!                            |            |<---------+             +
132//!                            +------------+          +++++++++++++++
133//! ```
134//!
135//! ### Batched Verification
136//!
137//! Unlike other consensus constructions that verify all incoming messages received from peers, for schemes
138//! where [`Scheme::is_batchable()`](commonware_cryptography::certificate::Scheme::is_batchable) returns `true`
139//! (such as [scheme::ed25519], [scheme::bls12381_multisig] and [scheme::bls12381_threshold]), `simplex` lazily
140//! verifies messages (only when a quorum is met), enabling efficient batch verification. For schemes where
141//! `is_batchable()` returns `false` (such as [scheme::secp256r1]), signatures are verified eagerly as they
142//! arrive since there is no batching benefit.
143//!
144//! If an invalid signature is detected, the `Batcher` will perform repeated bisections over collected
145//! messages to find the offending message (and block the peer(s) that sent it via [commonware_p2p::Blocker]).
146//!
147//! _If using a p2p implementation that is not authenticated, it is not safe to employ this optimization
148//! as any attacking peer could simply reconnect from a different address. We recommend [commonware_p2p::authenticated]._
149//!
150//! ### Fetching Missing Certificates
151//!
152//! Instead of trying to fetch all possible certificates above the floor, we only attempt to fetch
153//! nullifications for all views from the floor (last certified notarization or finalization) to the current view.
154//! This technique, however, is not sufficient to guarantee progress.
155//!
156//! Consider the case where `f` honest participants have seen a finalization for a given view `v` (and nullifications only
157//! from `v` to the current view `c`) but the remaining `f+1` honest participants have not (they have exclusively seen
158//! nullifications from some view `o < v` to `c`). Neither partition of participants will vote for the other's proposals.
159//!
160//! To ensure progress is eventually made, leaders with nullified proposals directly broadcast the best finalization
161//! certificate they are aware of to ensure all honest participants eventually consider the same proposal ancestry valid.
162//!
163//! _While a more aggressive recovery mechanism could be employed, like requiring all participants to broadcast their highest
164//! finalization certificate after nullification, it would impose significant overhead under normal network
165//! conditions (whereas the approach described incurs no overhead under normal network conditions). Recall, honest participants
166//! already broadcast observed certificates to all other participants in each view (and misaligned participants should only ever
167//! be observed following severe network degradation)._
168//!
169//! ## Pluggable Hashing and Cryptography
170//!
171//! Hashing is abstracted via the [commonware_cryptography::Hasher] trait and cryptography is abstracted via
172//! the [commonware_cryptography::certificate::Scheme] trait, allowing deployments to employ approaches that best match their
173//! requirements (or to provide their own without modifying any consensus logic). The following schemes
174//! are supported out-of-the-box:
175//!
176//! ### [scheme::ed25519]
177//!
178//! [commonware_cryptography::ed25519] signatures are ["High-speed high-security signatures"](https://eprint.iacr.org/2011/368)
179//! with 32 byte public keys and 64 byte signatures. While they are well-supported by commercial HSMs and offer efficient batch
180//! verification, the signatures are not aggregatable (and certificates grow linearly with the quorum size).
181//!
182//! ### [scheme::bls12381_multisig]
183//!
184//! [commonware_cryptography::bls12381] is a ["digital signature scheme with aggregation properties"](https://www.ietf.org/archive/id/draft-irtf-cfrg-bls-signature-05.txt).
185//! Unlike [commonware_cryptography::ed25519], signatures from multiple participants (say the signers in a certificate) can be aggregated
186//! into a single signature (reducing bandwidth usage per broadcast). That being said, [commonware_cryptography::bls12381] is much slower
187//! to verify than [commonware_cryptography::ed25519] and isn't supported by most HSMs (a standardization effort expired in 2022).
188//!
189//! ### [scheme::secp256r1]
190//!
191//! [commonware_cryptography::secp256r1] signatures use the NIST P-256 elliptic curve (also known as prime256v1), which is widely
192//! supported by commercial HSMs and hardware security modules. Unlike [commonware_cryptography::ed25519], Secp256r1 does not
193//! benefit from batch verification, so signatures are verified individually. Certificates grow linearly with quorum size
194//! (similar to ed25519).
195//!
196//! ### [scheme::bls12381_threshold]
197//!
198//! [scheme::bls12381_threshold] employs threshold cryptography (BLS12-381 threshold signatures with a `2f+1` of `3f+1` quorum)
199//! to generate succinct consensus certificates (verifiable with just the static public key). This scheme requires instantiating
200//! the shared secret via [commonware_cryptography::bls12381::dkg] and resharing whenever participants change.
201//!
202//! Two (non-attributable) variants are provided:
203//!
204//! - [scheme::bls12381_threshold::standard]: Certificates contain only a vote signature.
205//!
206//! - [scheme::bls12381_threshold::vrf]: Certificates contain a vote signature and a view signature (i.e. a seed that can be used
207//!   as a VRF). This variant can be configured for random leader election (via [elector::Random]) and/or incorporate this randomness
208//!   into execution.
209//!
210//! #### Embedded VRF ([scheme::bls12381_threshold::vrf])
211//!
212//! Every `notarize(c,v)` or `nullify(v)` message includes an `attestation(v)` (a partial signature over the view `v`). After `2f+1`
213//! `notarize(c,v)` or `nullify(v)` messages are collected from unique participants, `seed(v)` can be recovered. Because `attestation(v)` is
214//! only over the view `v`, the seed derived for a given view `v` is the same regardless of whether or not a block was notarized in said
215//! view `v`.
216//!
217//! Because the value of `seed(v)` cannot be known prior to message broadcast by any participant (including the leader) in view `v`
218//! and cannot be manipulated by any participant (deterministic for any `2f+1` signers at a given view `v`), it can be used both as a beacon
219//! for leader election (where `seed(v)` determines the leader for `v+1`) and a source of randomness in execution (where `seed(v)`
220//! is used as a seed in `v`).
221//!
222//! #### Succinct Certificates
223//!
224//! All broadcast consensus messages (`notarize(c,v)`, `nullify(v)`, `finalize(c,v)`) contain attestations (partial signatures) for a static
225//! public key (derived from a group polynomial that can be recomputed during reconfiguration using [dkg](commonware_cryptography::bls12381::dkg)).
226//! As soon as `2f+1` messages are collected, a threshold signature over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)`
227//! can be recovered, respectively. Because the public key is static, any of these certificates can be verified by an external
228//! process without following the consensus instance and/or tracking the current set of participants (as is typically required
229//! to operate a lite client).
230//!
231//! These threshold signatures over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)` (i.e. the consensus certificates)
232//! can be used to secure interoperability between different consensus instances and user interactions with an infrastructure provider
233//! (where any data served can be proven to derive from some finalized block of some consensus instance with a known static public key).
234//!
235//! ## Certification
236//!
237//! After a payload is notarized, the application can optionally delay or prevent finalization via the
238//! [`CertifiableAutomaton::certify`](crate::CertifiableAutomaton::certify) method. By default, `certify`
239//! returns `true` for all payloads, meaning finalization proceeds immediately after notarization.
240//!
241//! Customizing `certify` is useful for systems that employ erasure coding, where participants may want
242//! to wait until they have received enough shards to reconstruct and validate the full block before
243//! voting to finalize.
244//!
245//! If `certify` returns `true`, the participant broadcasts a `finalize` vote for the payload and enters the
246//! next view. If `certify` returns `false`, the participant broadcasts `nullify` for the view instead (treating
247//! it as an immediate timeout), and will refuse to build upon the proposal or notarize proposals that build upon it.
248//! Thus, a payload can only be finalized if a quorum of participants certify it.
249//!
250//! _The decision returned by `certify` must be deterministic and consistent across all honest participants to ensure
251//! liveness._
252//!
253//! ## Persistence
254//!
255//! The `Voter` caches all data required to participate in consensus to avoid any disk reads on
256//! on the critical path. To enable recovery, the `Voter` writes valid messages it receives from
257//! consensus and messages it generates to a write-ahead log (WAL) implemented by [commonware_storage::journal::segmented::variable::Journal].
258//! Before sending a message, the `Journal` sync is invoked to prevent inadvertent Byzantine behavior
259//! on restart (especially in the case of unclean shutdown).
260
261pub mod elector;
262pub mod scheme;
263pub mod types;
264
265cfg_if::cfg_if! {
266    if #[cfg(not(target_arch = "wasm32"))] {
267        mod actors;
268        pub mod config;
269        pub use config::Config;
270        mod engine;
271        pub use engine::Engine;
272        mod metrics;
273    }
274}
275
276#[cfg(any(test, feature = "fuzz"))]
277pub mod mocks;
278
279#[cfg(not(target_arch = "wasm32"))]
280use crate::types::{View, ViewDelta};
281
282/// The minimum view we are tracking both in-memory and on-disk.
283#[cfg(not(target_arch = "wasm32"))]
284pub(crate) const fn min_active(activity_timeout: ViewDelta, last_finalized: View) -> View {
285    last_finalized.saturating_sub(activity_timeout)
286}
287
288/// Whether or not a view is interesting to us. This is a function
289/// of both `min_active` and whether or not the view is too far
290/// in the future (based on the view we are currently in).
291#[cfg(not(target_arch = "wasm32"))]
292pub(crate) fn interesting(
293    activity_timeout: ViewDelta,
294    last_finalized: View,
295    current: View,
296    pending: View,
297    allow_future: bool,
298) -> bool {
299    // If the view is genesis, skip it, genesis doesn't have votes
300    if pending.is_zero() {
301        return false;
302    }
303    if pending < min_active(activity_timeout, last_finalized) {
304        return false;
305    }
306    if !allow_future && pending > current.next() {
307        return false;
308    }
309    true
310}
311
312/// Convenience alias for [`N3f1::quorum`].
313#[cfg(test)]
314pub(crate) fn quorum(n: u32) -> u32 {
315    use commonware_utils::{Faults, N3f1};
316
317    N3f1::quorum(n)
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323    use crate::{
324        simplex::{
325            elector::{Config as Elector, Random, RoundRobin},
326            mocks::twins::Strategy,
327            scheme::{
328                bls12381_multisig,
329                bls12381_threshold::{
330                    standard as bls12381_threshold_std,
331                    vrf::{self as bls12381_threshold_vrf, Seedable},
332                },
333                ed25519, secp256r1, Scheme,
334            },
335            types::{
336                Certificate, Finalization as TFinalization, Finalize as TFinalize,
337                Notarization as TNotarization, Notarize as TNotarize,
338                Nullification as TNullification, Nullify as TNullify, Proposal, Vote,
339            },
340        },
341        types::{Epoch, Round},
342        Monitor, Viewable,
343    };
344    use commonware_codec::{Decode, DecodeExt, Encode};
345    use commonware_cryptography::{
346        bls12381::primitives::variant::{MinPk, MinSig, Variant},
347        certificate::mocks::Fixture,
348        ed25519::{PrivateKey, PublicKey},
349        sha256::{Digest as Sha256Digest, Digest as D},
350        Hasher as _, Sha256, Signer as _,
351    };
352    use commonware_macros::{select, test_group, test_traced};
353    use commonware_p2p::{
354        simulated::{Config, Link, Network, Oracle, Receiver, Sender, SplitOrigin, SplitTarget},
355        Recipients, Sender as _,
356    };
357    use commonware_parallel::Sequential;
358    use commonware_runtime::{
359        buffer::paged::CacheRef, count_running_tasks, deterministic, Clock, IoBuf, Metrics, Quota,
360        Runner, Spawner,
361    };
362    use commonware_utils::{test_rng, Faults, N3f1, NZUsize, NZU16};
363    use engine::Engine;
364    use futures::future::join_all;
365    use rand::{rngs::StdRng, Rng as _};
366    use std::{
367        collections::{BTreeMap, HashMap},
368        num::{NonZeroU16, NonZeroU32, NonZeroUsize},
369        sync::{Arc, Mutex},
370        time::Duration,
371    };
372    use tracing::{debug, info, warn};
373    use types::Activity;
374
375    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
376    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
377    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
378
379    #[test]
380    fn test_interesting() {
381        let activity_timeout = ViewDelta::new(10);
382
383        // Genesis view is never interesting
384        assert!(!interesting(
385            activity_timeout,
386            View::zero(),
387            View::zero(),
388            View::zero(),
389            false
390        ));
391        assert!(!interesting(
392            activity_timeout,
393            View::zero(),
394            View::new(1),
395            View::zero(),
396            true
397        ));
398
399        // View below min_active is not interesting
400        assert!(!interesting(
401            activity_timeout,
402            View::new(20),
403            View::new(25),
404            View::new(5), // below min_active (10)
405            false
406        ));
407
408        // View at min_active boundary is interesting
409        assert!(interesting(
410            activity_timeout,
411            View::new(20),
412            View::new(25),
413            View::new(10), // exactly min_active
414            false
415        ));
416
417        // Future view beyond current.next() is not interesting when allow_future is false
418        assert!(!interesting(
419            activity_timeout,
420            View::new(20),
421            View::new(25),
422            View::new(27),
423            false
424        ));
425
426        // Future view beyond current.next() is interesting when allow_future is true
427        assert!(interesting(
428            activity_timeout,
429            View::new(20),
430            View::new(25),
431            View::new(27),
432            true
433        ));
434
435        // View at current.next() is interesting
436        assert!(interesting(
437            activity_timeout,
438            View::new(20),
439            View::new(25),
440            View::new(26),
441            false
442        ));
443
444        // View within valid range is interesting
445        assert!(interesting(
446            activity_timeout,
447            View::new(20),
448            View::new(25),
449            View::new(22),
450            false
451        ));
452
453        // When last_finalized is 0 and activity_timeout would underflow
454        // min_active saturates at 0, so view 1 should still be interesting
455        assert!(interesting(
456            activity_timeout,
457            View::zero(),
458            View::new(5),
459            View::new(1),
460            false
461        ));
462    }
463
464    /// Register a validator with the oracle.
465    async fn register_validator(
466        oracle: &mut Oracle<PublicKey, deterministic::Context>,
467        validator: PublicKey,
468    ) -> (
469        (
470            Sender<PublicKey, deterministic::Context>,
471            Receiver<PublicKey>,
472        ),
473        (
474            Sender<PublicKey, deterministic::Context>,
475            Receiver<PublicKey>,
476        ),
477        (
478            Sender<PublicKey, deterministic::Context>,
479            Receiver<PublicKey>,
480        ),
481    ) {
482        let control = oracle.control(validator.clone());
483        let (vote_sender, vote_receiver) = control.register(0, TEST_QUOTA).await.unwrap();
484        let (certificate_sender, certificate_receiver) =
485            control.register(1, TEST_QUOTA).await.unwrap();
486        let (resolver_sender, resolver_receiver) = control.register(2, TEST_QUOTA).await.unwrap();
487        (
488            (vote_sender, vote_receiver),
489            (certificate_sender, certificate_receiver),
490            (resolver_sender, resolver_receiver),
491        )
492    }
493
494    /// Registers all validators using the oracle.
495    async fn register_validators(
496        oracle: &mut Oracle<PublicKey, deterministic::Context>,
497        validators: &[PublicKey],
498    ) -> HashMap<
499        PublicKey,
500        (
501            (
502                Sender<PublicKey, deterministic::Context>,
503                Receiver<PublicKey>,
504            ),
505            (
506                Sender<PublicKey, deterministic::Context>,
507                Receiver<PublicKey>,
508            ),
509            (
510                Sender<PublicKey, deterministic::Context>,
511                Receiver<PublicKey>,
512            ),
513        ),
514    > {
515        let mut registrations = HashMap::new();
516        for validator in validators.iter() {
517            let registration = register_validator(oracle, validator.clone()).await;
518            registrations.insert(validator.clone(), registration);
519        }
520        registrations
521    }
522
523    /// Enum to describe the action to take when linking validators.
524    enum Action {
525        Link(Link),
526        Update(Link), // Unlink and then link
527        Unlink,
528    }
529
530    /// Links (or unlinks) validators using the oracle.
531    ///
532    /// The `action` parameter determines the action (e.g. link, unlink) to take.
533    /// The `restrict_to` function can be used to restrict the linking to certain connections,
534    /// otherwise all validators will be linked to all other validators.
535    async fn link_validators(
536        oracle: &mut Oracle<PublicKey, deterministic::Context>,
537        validators: &[PublicKey],
538        action: Action,
539        restrict_to: Option<fn(usize, usize, usize) -> bool>,
540    ) {
541        for (i1, v1) in validators.iter().enumerate() {
542            for (i2, v2) in validators.iter().enumerate() {
543                // Ignore self
544                if v2 == v1 {
545                    continue;
546                }
547
548                // Restrict to certain connections
549                if let Some(f) = restrict_to {
550                    if !f(validators.len(), i1, i2) {
551                        continue;
552                    }
553                }
554
555                // Do any unlinking first
556                match action {
557                    Action::Update(_) | Action::Unlink => {
558                        oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
559                    }
560                    _ => {}
561                }
562
563                // Do any linking after
564                match action {
565                    Action::Link(ref link) | Action::Update(ref link) => {
566                        oracle
567                            .add_link(v1.clone(), v2.clone(), link.clone())
568                            .await
569                            .unwrap();
570                    }
571                    _ => {}
572                }
573            }
574        }
575    }
576
577    /// Counts lines where all patterns match and the trailing value is non-zero.
578    fn count_nonzero_metric_lines(encoded: &str, patterns: &[&str]) -> u32 {
579        encoded
580            .lines()
581            .filter(|line| patterns.iter().all(|p| line.contains(p)))
582            .filter(|line| {
583                line.split_whitespace()
584                    .last()
585                    .and_then(|s| s.parse::<u64>().ok())
586                    .is_some_and(|n| n > 0)
587            })
588            .count() as u32
589    }
590
591    fn all_online<S, F, L>(mut fixture: F)
592    where
593        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
594        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
595        L: Elector<S>,
596    {
597        // Create context
598        let n = 5;
599        let quorum = quorum(n) as usize;
600        let required_containers = View::new(100);
601        let activity_timeout = ViewDelta::new(10);
602        let skip_timeout = ViewDelta::new(5);
603        let namespace = b"consensus".to_vec();
604        let executor = deterministic::Runner::timed(Duration::from_secs(300));
605        executor.start(|mut context| async move {
606            // Create simulated network
607            let (network, mut oracle) = Network::new(
608                context.with_label("network"),
609                Config {
610                    max_size: 1024 * 1024,
611                    disconnect_on_block: true,
612                    tracked_peer_sets: None,
613                },
614            );
615
616            // Start network
617            network.start();
618
619            // Register participants
620            let Fixture {
621                participants,
622                schemes,
623                ..
624            } = fixture(&mut context, &namespace, n);
625            let mut registrations = register_validators(&mut oracle, &participants).await;
626
627            // Link all validators
628            let link = Link {
629                latency: Duration::from_millis(10),
630                jitter: Duration::from_millis(1),
631                success_rate: 1.0,
632            };
633            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
634
635            // Create engines
636            let elector = L::default();
637            let relay = Arc::new(mocks::relay::Relay::new());
638            let mut reporters = Vec::new();
639            let mut engine_handlers = Vec::new();
640            for (idx, validator) in participants.iter().enumerate() {
641                // Create scheme context
642                let context = context.with_label(&format!("validator_{}", *validator));
643
644                // Configure engine
645                let reporter_config = mocks::reporter::Config {
646                    participants: participants.clone().try_into().unwrap(),
647                    scheme: schemes[idx].clone(),
648                    elector: elector.clone(),
649                };
650                let reporter =
651                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
652                reporters.push(reporter.clone());
653                let application_cfg = mocks::application::Config {
654                    hasher: Sha256::default(),
655                    relay: relay.clone(),
656                    me: validator.clone(),
657                    propose_latency: (10.0, 5.0),
658                    verify_latency: (10.0, 5.0),
659                    certify_latency: (10.0, 5.0),
660                    should_certify: mocks::application::Certifier::Sometimes,
661                };
662                let (actor, application) = mocks::application::Application::new(
663                    context.with_label("application"),
664                    application_cfg,
665                );
666                actor.start();
667                let blocker = oracle.control(validator.clone());
668                let cfg = config::Config {
669                    scheme: schemes[idx].clone(),
670                    elector: elector.clone(),
671                    blocker,
672                    automaton: application.clone(),
673                    relay: application.clone(),
674                    reporter: reporter.clone(),
675                    strategy: Sequential,
676                    partition: validator.to_string(),
677                    mailbox_size: 1024,
678                    epoch: Epoch::new(333),
679                    leader_timeout: Duration::from_secs(1),
680                    notarization_timeout: Duration::from_secs(2),
681                    nullify_retry: Duration::from_secs(10),
682                    fetch_timeout: Duration::from_secs(1),
683                    activity_timeout,
684                    skip_timeout,
685                    fetch_concurrent: 4,
686                    replay_buffer: NZUsize!(1024 * 1024),
687                    write_buffer: NZUsize!(1024 * 1024),
688                    page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
689                };
690                let engine = Engine::new(context.with_label("engine"), cfg);
691
692                // Start engine
693                let (pending, recovered, resolver) = registrations
694                    .remove(validator)
695                    .expect("validator should be registered");
696                engine_handlers.push(engine.start(pending, recovered, resolver));
697            }
698
699            // Wait for all engines to finish
700            let mut finalizers = Vec::new();
701            for reporter in reporters.iter_mut() {
702                let (mut latest, mut monitor) = reporter.subscribe().await;
703                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
704                    while latest < required_containers {
705                        latest = monitor.recv().await.expect("event missing");
706                    }
707                }));
708            }
709            join_all(finalizers).await;
710
711            // Check reporters for correct activity
712            let latest_complete = required_containers.saturating_sub(activity_timeout);
713            for reporter in reporters.iter() {
714                // Ensure no faults
715                {
716                    let faults = reporter.faults.lock().unwrap();
717                    assert!(faults.is_empty());
718                }
719
720                // Ensure no invalid signatures
721                {
722                    let invalid = reporter.invalid.lock().unwrap();
723                    assert_eq!(*invalid, 0);
724                }
725
726                // Ensure certificates for all views
727                {
728                    let certified = reporter.certified.lock().unwrap();
729                    for view in View::range(View::new(1), latest_complete) {
730                        // Ensure certificate for every view
731                        if !certified.contains(&view) {
732                            panic!("view: {view}");
733                        }
734                    }
735                }
736
737                // Ensure no forks
738                let mut notarized = HashMap::new();
739                let mut finalized = HashMap::new();
740                {
741                    let notarizes = reporter.notarizes.lock().unwrap();
742                    for view in View::range(View::new(1), latest_complete) {
743                        // Ensure only one payload proposed per view
744                        let Some(payloads) = notarizes.get(&view) else {
745                            continue;
746                        };
747                        if payloads.len() > 1 {
748                            panic!("view: {view}");
749                        }
750                        let (digest, notarizers) = payloads.iter().next().unwrap();
751                        notarized.insert(view, *digest);
752
753                        if notarizers.len() < quorum {
754                            // We can't verify that everyone participated at every view because some nodes may
755                            // have started later.
756                            panic!("view: {view}");
757                        }
758                    }
759                }
760                {
761                    let notarizations = reporter.notarizations.lock().unwrap();
762                    for view in View::range(View::new(1), latest_complete) {
763                        // Ensure notarization matches digest from notarizes
764                        let Some(notarization) = notarizations.get(&view) else {
765                            continue;
766                        };
767                        let Some(digest) = notarized.get(&view) else {
768                            continue;
769                        };
770                        assert_eq!(&notarization.proposal.payload, digest);
771                    }
772                }
773                {
774                    let finalizes = reporter.finalizes.lock().unwrap();
775                    for view in View::range(View::new(1), latest_complete) {
776                        // Ensure only one payload proposed per view
777                        let Some(payloads) = finalizes.get(&view) else {
778                            continue;
779                        };
780                        if payloads.len() > 1 {
781                            panic!("view: {view}");
782                        }
783                        let (digest, finalizers) = payloads.iter().next().unwrap();
784                        finalized.insert(view, *digest);
785
786                        // Only check at views below timeout
787                        if view > latest_complete {
788                            continue;
789                        }
790
791                        // Ensure everyone participating
792                        if finalizers.len() < quorum {
793                            // We can't verify that everyone participated at every view because some nodes may
794                            // have started later.
795                            panic!("view: {view}");
796                        }
797
798                        // Ensure no nullifies for any finalizers
799                        let nullifies = reporter.nullifies.lock().unwrap();
800                        let Some(nullifies) = nullifies.get(&view) else {
801                            continue;
802                        };
803                        for (_, finalizers) in payloads.iter() {
804                            for finalizer in finalizers.iter() {
805                                if nullifies.contains(finalizer) {
806                                    panic!("should not nullify and finalize at same view");
807                                }
808                            }
809                        }
810                    }
811                }
812                {
813                    let finalizations = reporter.finalizations.lock().unwrap();
814                    for view in View::range(View::new(1), latest_complete) {
815                        // Ensure finalization matches digest from finalizes
816                        let Some(finalization) = finalizations.get(&view) else {
817                            continue;
818                        };
819                        let Some(digest) = finalized.get(&view) else {
820                            continue;
821                        };
822                        assert_eq!(&finalization.proposal.payload, digest);
823                    }
824                }
825            }
826
827            // Ensure no blocked connections
828            let blocked = oracle.blocked().await.unwrap();
829            assert!(blocked.is_empty());
830        });
831    }
832
833    #[test_traced]
834    fn test_all_online() {
835        all_online::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
836        all_online::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
837        all_online::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
838        all_online::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
839        all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
840        all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
841        all_online::<_, _, RoundRobin>(ed25519::fixture);
842        all_online::<_, _, RoundRobin>(secp256r1::fixture);
843    }
844
845    fn observer<S, F, L>(mut fixture: F)
846    where
847        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
848        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
849        L: Elector<S>,
850    {
851        // Create context
852        let n_active = 5;
853        let required_containers = View::new(100);
854        let activity_timeout = ViewDelta::new(10);
855        let skip_timeout = ViewDelta::new(5);
856        let namespace = b"consensus".to_vec();
857        let executor = deterministic::Runner::timed(Duration::from_secs(300));
858        executor.start(|mut context| async move {
859            // Create simulated network
860            let (network, mut oracle) = Network::new(
861                context.with_label("network"),
862                Config {
863                    max_size: 1024 * 1024,
864                    disconnect_on_block: true,
865                    tracked_peer_sets: None,
866                },
867            );
868
869            // Start network
870            network.start();
871
872            // Register participants (active)
873            let Fixture {
874                participants,
875                schemes,
876                verifier,
877                ..
878            } = fixture(&mut context, &namespace, n_active);
879
880            // Add observer (no share)
881            let private_key_observer = PrivateKey::from_seed(n_active as u64);
882            let public_key_observer = private_key_observer.public_key();
883
884            // Register all (including observer) with the network
885            let mut all_validators = participants.clone();
886            all_validators.push(public_key_observer.clone());
887            all_validators.sort();
888            let mut registrations = register_validators(&mut oracle, &all_validators).await;
889
890            // Link all peers (including observer)
891            let link = Link {
892                latency: Duration::from_millis(10),
893                jitter: Duration::from_millis(1),
894                success_rate: 1.0,
895            };
896            link_validators(&mut oracle, &all_validators, Action::Link(link), None).await;
897
898            // Create engines
899            let elector = L::default();
900            let relay = Arc::new(mocks::relay::Relay::new());
901            let mut reporters = Vec::new();
902
903            for (idx, validator) in participants.iter().enumerate() {
904                let is_observer = *validator == public_key_observer;
905
906                // Create scheme context
907                let context = context.with_label(&format!("validator_{}", *validator));
908
909                // Configure engine
910                let signing = if is_observer {
911                    verifier.clone()
912                } else {
913                    schemes[idx].clone()
914                };
915                let reporter_config = mocks::reporter::Config {
916                    participants: participants.clone().try_into().unwrap(),
917                    scheme: signing.clone(),
918                    elector: elector.clone(),
919                };
920                let reporter =
921                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
922                reporters.push(reporter.clone());
923                let application_cfg = mocks::application::Config {
924                    hasher: Sha256::default(),
925                    relay: relay.clone(),
926                    me: validator.clone(),
927                    propose_latency: (10.0, 5.0),
928                    verify_latency: (10.0, 5.0),
929                    certify_latency: (10.0, 5.0),
930                    should_certify: mocks::application::Certifier::Sometimes,
931                };
932                let (actor, application) = mocks::application::Application::new(
933                    context.with_label("application"),
934                    application_cfg,
935                );
936                actor.start();
937                let blocker = oracle.control(validator.clone());
938                let cfg = config::Config {
939                    scheme: signing.clone(),
940                    elector: elector.clone(),
941                    blocker,
942                    automaton: application.clone(),
943                    relay: application.clone(),
944                    reporter: reporter.clone(),
945                    strategy: Sequential,
946                    partition: validator.to_string(),
947                    mailbox_size: 1024,
948                    epoch: Epoch::new(333),
949                    leader_timeout: Duration::from_secs(1),
950                    notarization_timeout: Duration::from_secs(2),
951                    nullify_retry: Duration::from_secs(10),
952                    fetch_timeout: Duration::from_secs(1),
953                    activity_timeout,
954                    skip_timeout,
955                    fetch_concurrent: 4,
956                    replay_buffer: NZUsize!(1024 * 1024),
957                    write_buffer: NZUsize!(1024 * 1024),
958                    page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
959                };
960                let engine = Engine::new(context.with_label("engine"), cfg);
961
962                // Start engine
963                let (pending, recovered, resolver) = registrations
964                    .remove(validator)
965                    .expect("validator should be registered");
966                engine.start(pending, recovered, resolver);
967            }
968
969            // Wait for all  engines to finish
970            let mut finalizers = Vec::new();
971            for reporter in reporters.iter_mut() {
972                let (mut latest, mut monitor) = reporter.subscribe().await;
973                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
974                    while latest < required_containers {
975                        latest = monitor.recv().await.expect("event missing");
976                    }
977                }));
978            }
979            join_all(finalizers).await;
980
981            // Sanity check
982            for reporter in reporters.iter() {
983                // Ensure no faults or invalid signatures
984                {
985                    let faults = reporter.faults.lock().unwrap();
986                    assert!(faults.is_empty());
987                }
988                {
989                    let invalid = reporter.invalid.lock().unwrap();
990                    assert_eq!(*invalid, 0);
991                }
992
993                // Ensure no blocked connections
994                let blocked = oracle.blocked().await.unwrap();
995                assert!(blocked.is_empty());
996            }
997        });
998    }
999
1000    #[test_traced]
1001    fn test_observer() {
1002        observer::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1003        observer::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1004        observer::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1005        observer::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1006        observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1007        observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1008        observer::<_, _, RoundRobin>(ed25519::fixture);
1009        observer::<_, _, RoundRobin>(secp256r1::fixture);
1010    }
1011
1012    fn unclean_shutdown<S, F, L>(mut fixture: F)
1013    where
1014        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1015        F: FnMut(&mut StdRng, &[u8], u32) -> Fixture<S>,
1016        L: Elector<S>,
1017    {
1018        // Create context
1019        let n = 5;
1020        let required_containers = View::new(100);
1021        let activity_timeout = ViewDelta::new(10);
1022        let skip_timeout = ViewDelta::new(5);
1023        let namespace = b"consensus".to_vec();
1024
1025        // Random restarts every x seconds
1026        let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
1027        let supervised = Arc::new(Mutex::new(Vec::new()));
1028        let mut prev_checkpoint = None;
1029
1030        // Create validator keys
1031        let mut rng = test_rng();
1032        let Fixture {
1033            participants,
1034            schemes,
1035            ..
1036        } = fixture(&mut rng, &namespace, n);
1037
1038        // Create block relay, shared across restarts.
1039        let relay = Arc::new(mocks::relay::Relay::<Sha256Digest, S::PublicKey>::new());
1040
1041        loop {
1042            let rng = rng.clone();
1043            let participants = participants.clone();
1044            let schemes = schemes.clone();
1045            let shutdowns = shutdowns.clone();
1046            let supervised = supervised.clone();
1047            let relay = relay.clone();
1048            relay.deregister_all(); // Clear all recipients from previous restart.
1049
1050            let f = |mut context: deterministic::Context| async move {
1051                // Create simulated network
1052                let (network, mut oracle) = Network::new(
1053                    context.with_label("network"),
1054                    Config {
1055                        max_size: 1024 * 1024,
1056                        disconnect_on_block: true,
1057                        tracked_peer_sets: None,
1058                    },
1059                );
1060
1061                // Start network
1062                network.start();
1063
1064                // Register participants
1065                let mut registrations = register_validators(&mut oracle, &participants).await;
1066
1067                // Link all validators
1068                let link = Link {
1069                    latency: Duration::from_millis(50),
1070                    jitter: Duration::from_millis(50),
1071                    success_rate: 1.0,
1072                };
1073                link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1074
1075                // Create engines
1076                let elector = L::default();
1077                let relay = Arc::new(mocks::relay::Relay::new());
1078                let mut reporters = HashMap::new();
1079                let mut engine_handlers = Vec::new();
1080                for (idx, validator) in participants.iter().enumerate() {
1081                    // Create scheme context
1082                    let context = context.with_label(&format!("validator_{}", *validator));
1083
1084                    // Configure engine
1085                    let reporter_config = mocks::reporter::Config {
1086                        participants: participants.clone().try_into().unwrap(),
1087                        scheme: schemes[idx].clone(),
1088                        elector: elector.clone(),
1089                    };
1090                    let reporter = mocks::reporter::Reporter::new(rng.clone(), reporter_config);
1091                    reporters.insert(validator.clone(), reporter.clone());
1092                    let application_cfg = mocks::application::Config {
1093                        hasher: Sha256::default(),
1094                        relay: relay.clone(),
1095                        me: validator.clone(),
1096                        propose_latency: (10.0, 5.0),
1097                        verify_latency: (10.0, 5.0),
1098                        certify_latency: (10.0, 5.0),
1099                        should_certify: mocks::application::Certifier::Sometimes,
1100                    };
1101                    let (actor, application) = mocks::application::Application::new(
1102                        context.with_label("application"),
1103                        application_cfg,
1104                    );
1105                    actor.start();
1106                    let blocker = oracle.control(validator.clone());
1107                    let cfg = config::Config {
1108                        scheme: schemes[idx].clone(),
1109                        elector: elector.clone(),
1110                        blocker,
1111                        automaton: application.clone(),
1112                        relay: application.clone(),
1113                        reporter: reporter.clone(),
1114                        strategy: Sequential,
1115                        partition: validator.to_string(),
1116                        mailbox_size: 1024,
1117                        epoch: Epoch::new(333),
1118                        leader_timeout: Duration::from_secs(1),
1119                        notarization_timeout: Duration::from_secs(2),
1120                        nullify_retry: Duration::from_secs(10),
1121                        fetch_timeout: Duration::from_secs(1),
1122                        activity_timeout,
1123                        skip_timeout,
1124                        fetch_concurrent: 4,
1125                        replay_buffer: NZUsize!(1024 * 1024),
1126                        write_buffer: NZUsize!(1024 * 1024),
1127                        page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1128                    };
1129                    let engine = Engine::new(context.with_label("engine"), cfg);
1130
1131                    // Start engine
1132                    let (pending, recovered, resolver) = registrations
1133                        .remove(validator)
1134                        .expect("validator should be registered");
1135                    engine_handlers.push(engine.start(pending, recovered, resolver));
1136                }
1137
1138                // Store all finalizer handles
1139                let mut finalizers = Vec::new();
1140                for (_, reporter) in reporters.iter_mut() {
1141                    let (mut latest, mut monitor) = reporter.subscribe().await;
1142                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1143                        while latest < required_containers {
1144                            latest = monitor.recv().await.expect("event missing");
1145                        }
1146                    }));
1147                }
1148
1149                // Exit at random points for unclean shutdown of entire set
1150                let wait =
1151                    context.gen_range(Duration::from_millis(100)..Duration::from_millis(2_000));
1152                let result = select! {
1153                    _ = context.sleep(wait) => {
1154                        // Collect reporters to check faults
1155                        {
1156                            let mut shutdowns = shutdowns.lock().unwrap();
1157                            debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
1158                            *shutdowns += 1;
1159                        }
1160                        supervised.lock().unwrap().push(reporters);
1161                        false
1162                    },
1163                    _ = join_all(finalizers) => {
1164                        // Check reporters for faults activity
1165                        let supervised = supervised.lock().unwrap();
1166                        for reporters in supervised.iter() {
1167                            for (_, reporter) in reporters.iter() {
1168                                let faults = reporter.faults.lock().unwrap();
1169                                assert!(faults.is_empty());
1170                            }
1171                        }
1172                        true
1173                    },
1174                };
1175
1176                // Ensure no blocked connections
1177                let blocked = oracle.blocked().await.unwrap();
1178                assert!(blocked.is_empty());
1179
1180                result
1181            };
1182
1183            let (complete, checkpoint) = prev_checkpoint
1184                .map_or_else(
1185                    || deterministic::Runner::timed(Duration::from_secs(180)),
1186                    deterministic::Runner::from,
1187                )
1188                .start_and_recover(f);
1189
1190            // Check if we should exit
1191            if complete {
1192                break;
1193            }
1194
1195            prev_checkpoint = Some(checkpoint);
1196        }
1197    }
1198
1199    #[test_group("slow")]
1200    #[test_traced]
1201    fn test_unclean_shutdown() {
1202        unclean_shutdown::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1203        unclean_shutdown::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1204        unclean_shutdown::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1205        unclean_shutdown::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1206        unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1207        unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1208        unclean_shutdown::<_, _, RoundRobin>(ed25519::fixture);
1209        unclean_shutdown::<_, _, RoundRobin>(secp256r1::fixture);
1210    }
1211
1212    fn backfill<S, F, L>(mut fixture: F)
1213    where
1214        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1215        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1216        L: Elector<S>,
1217    {
1218        // Create context
1219        let n = 4;
1220        let required_containers = View::new(100);
1221        let activity_timeout = ViewDelta::new(10);
1222        let skip_timeout = ViewDelta::new(5);
1223        let namespace = b"consensus".to_vec();
1224        let executor = deterministic::Runner::timed(Duration::from_secs(240));
1225        executor.start(|mut context| async move {
1226            // Create simulated network
1227            let (network, mut oracle) = Network::new(
1228                context.with_label("network"),
1229                Config {
1230                    max_size: 1024 * 1024,
1231                    disconnect_on_block: true,
1232                    tracked_peer_sets: None,
1233                },
1234            );
1235
1236            // Start network
1237            network.start();
1238
1239            // Register participants
1240            let Fixture {
1241                participants,
1242                schemes,
1243                ..
1244            } = fixture(&mut context, &namespace, n);
1245            let mut registrations = register_validators(&mut oracle, &participants).await;
1246
1247            // Link all validators except first
1248            let link = Link {
1249                latency: Duration::from_millis(10),
1250                jitter: Duration::from_millis(1),
1251                success_rate: 1.0,
1252            };
1253            link_validators(
1254                &mut oracle,
1255                &participants,
1256                Action::Link(link),
1257                Some(|_, i, j| ![i, j].contains(&0usize)),
1258            )
1259            .await;
1260
1261            // Create engines
1262            let elector = L::default();
1263            let relay = Arc::new(mocks::relay::Relay::new());
1264            let mut reporters = Vec::new();
1265            let mut engine_handlers = Vec::new();
1266            for (idx_scheme, validator) in participants.iter().enumerate() {
1267                // Skip first peer
1268                if idx_scheme == 0 {
1269                    continue;
1270                }
1271
1272                // Create scheme context
1273                let context = context.with_label(&format!("validator_{}", *validator));
1274
1275                // Configure engine
1276                let reporter_config = mocks::reporter::Config {
1277                    participants: participants.clone().try_into().unwrap(),
1278                    scheme: schemes[idx_scheme].clone(),
1279                    elector: elector.clone(),
1280                };
1281                let reporter =
1282                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1283                reporters.push(reporter.clone());
1284                let application_cfg = mocks::application::Config {
1285                    hasher: Sha256::default(),
1286                    relay: relay.clone(),
1287                    me: validator.clone(),
1288                    propose_latency: (10.0, 5.0),
1289                    verify_latency: (10.0, 5.0),
1290                    certify_latency: (10.0, 5.0),
1291                    should_certify: mocks::application::Certifier::Sometimes,
1292                };
1293                let (actor, application) = mocks::application::Application::new(
1294                    context.with_label("application"),
1295                    application_cfg,
1296                );
1297                actor.start();
1298                let blocker = oracle.control(validator.clone());
1299                let cfg = config::Config {
1300                    scheme: schemes[idx_scheme].clone(),
1301                    elector: elector.clone(),
1302                    blocker,
1303                    automaton: application.clone(),
1304                    relay: application.clone(),
1305                    reporter: reporter.clone(),
1306                    strategy: Sequential,
1307                    partition: validator.to_string(),
1308                    mailbox_size: 1024,
1309                    epoch: Epoch::new(333),
1310                    leader_timeout: Duration::from_secs(1),
1311                    notarization_timeout: Duration::from_secs(2),
1312                    nullify_retry: Duration::from_secs(10),
1313                    fetch_timeout: Duration::from_secs(1),
1314                    activity_timeout,
1315                    skip_timeout,
1316                    fetch_concurrent: 4,
1317                    replay_buffer: NZUsize!(1024 * 1024),
1318                    write_buffer: NZUsize!(1024 * 1024),
1319                    page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1320                };
1321                let engine = Engine::new(context.with_label("engine"), cfg);
1322
1323                // Start engine
1324                let (pending, recovered, resolver) = registrations
1325                    .remove(validator)
1326                    .expect("validator should be registered");
1327                engine_handlers.push(engine.start(pending, recovered, resolver));
1328            }
1329
1330            // Wait for all engines to finish
1331            let mut finalizers = Vec::new();
1332            for reporter in reporters.iter_mut() {
1333                let (mut latest, mut monitor) = reporter.subscribe().await;
1334                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1335                    while latest < required_containers {
1336                        latest = monitor.recv().await.expect("event missing");
1337                    }
1338                }));
1339            }
1340            join_all(finalizers).await;
1341
1342            // Degrade network connections for online peers
1343            let link = Link {
1344                latency: Duration::from_secs(3),
1345                jitter: Duration::from_millis(0),
1346                success_rate: 1.0,
1347            };
1348            link_validators(
1349                &mut oracle,
1350                &participants,
1351                Action::Update(link.clone()),
1352                Some(|_, i, j| ![i, j].contains(&0usize)),
1353            )
1354            .await;
1355
1356            // Wait for nullifications to accrue
1357            context.sleep(Duration::from_secs(60)).await;
1358
1359            // Unlink second peer from all (except first)
1360            link_validators(
1361                &mut oracle,
1362                &participants,
1363                Action::Unlink,
1364                Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
1365            )
1366            .await;
1367
1368            // Configure engine for first peer
1369            let me = participants[0].clone();
1370            let context = context.with_label(&format!("validator_{me}"));
1371
1372            // Link first peer to all (except second)
1373            link_validators(
1374                &mut oracle,
1375                &participants,
1376                Action::Link(link),
1377                Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
1378            )
1379            .await;
1380
1381            // Restore network connections for all online peers
1382            let link = Link {
1383                latency: Duration::from_millis(10),
1384                jitter: Duration::from_millis(3),
1385                success_rate: 1.0,
1386            };
1387            link_validators(
1388                &mut oracle,
1389                &participants,
1390                Action::Update(link),
1391                Some(|_, i, j| ![i, j].contains(&1usize)),
1392            )
1393            .await;
1394
1395            // Configure engine
1396            let reporter_config = mocks::reporter::Config {
1397                participants: participants.clone().try_into().unwrap(),
1398                scheme: schemes[0].clone(),
1399                elector: elector.clone(),
1400            };
1401            let mut reporter =
1402                mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1403            reporters.push(reporter.clone());
1404            let application_cfg = mocks::application::Config {
1405                hasher: Sha256::default(),
1406                relay: relay.clone(),
1407                me: me.clone(),
1408                propose_latency: (10.0, 5.0),
1409                verify_latency: (10.0, 5.0),
1410                certify_latency: (10.0, 5.0),
1411                should_certify: mocks::application::Certifier::Sometimes,
1412            };
1413            let (actor, application) = mocks::application::Application::new(
1414                context.with_label("application"),
1415                application_cfg,
1416            );
1417            actor.start();
1418            let blocker = oracle.control(me.clone());
1419            let cfg = config::Config {
1420                scheme: schemes[0].clone(),
1421                elector: elector.clone(),
1422                blocker,
1423                automaton: application.clone(),
1424                relay: application.clone(),
1425                reporter: reporter.clone(),
1426                strategy: Sequential,
1427                partition: me.to_string(),
1428                mailbox_size: 1024,
1429                epoch: Epoch::new(333),
1430                leader_timeout: Duration::from_secs(1),
1431                notarization_timeout: Duration::from_secs(2),
1432                nullify_retry: Duration::from_secs(10),
1433                fetch_timeout: Duration::from_secs(1),
1434                activity_timeout,
1435                skip_timeout,
1436                fetch_concurrent: 4,
1437                replay_buffer: NZUsize!(1024 * 1024),
1438                write_buffer: NZUsize!(1024 * 1024),
1439                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1440            };
1441            let engine = Engine::new(context.with_label("engine"), cfg);
1442
1443            // Start engine
1444            let (pending, recovered, resolver) = registrations
1445                .remove(&me)
1446                .expect("validator should be registered");
1447            engine_handlers.push(engine.start(pending, recovered, resolver));
1448
1449            // Wait for new engine to finalize required
1450            let (mut latest, mut monitor) = reporter.subscribe().await;
1451            while latest < required_containers {
1452                latest = monitor.recv().await.expect("event missing");
1453            }
1454
1455            // Ensure no blocked connections
1456            let blocked = oracle.blocked().await.unwrap();
1457            assert!(blocked.is_empty());
1458        });
1459    }
1460
1461    #[test_traced]
1462    fn test_backfill() {
1463        backfill::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1464        backfill::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1465        backfill::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1466        backfill::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1467        backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1468        backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1469        backfill::<_, _, RoundRobin>(ed25519::fixture);
1470        backfill::<_, _, RoundRobin>(secp256r1::fixture);
1471    }
1472
1473    fn one_offline<S, F, L>(mut fixture: F)
1474    where
1475        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1476        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1477        L: Elector<S>,
1478    {
1479        // Create context
1480        let n = 5;
1481        let quorum = quorum(n) as usize;
1482        let required_containers = View::new(100);
1483        let activity_timeout = ViewDelta::new(10);
1484        let skip_timeout = ViewDelta::new(5);
1485        let max_exceptions = 10;
1486        let namespace = b"consensus".to_vec();
1487        let executor = deterministic::Runner::timed(Duration::from_secs(300));
1488        executor.start(|mut context| async move {
1489            // Create simulated network
1490            let (network, mut oracle) = Network::new(
1491                context.with_label("network"),
1492                Config {
1493                    max_size: 1024 * 1024,
1494                    disconnect_on_block: true,
1495                    tracked_peer_sets: None,
1496                },
1497            );
1498
1499            // Start network
1500            network.start();
1501
1502            // Register participants
1503            let Fixture {
1504                participants,
1505                schemes,
1506                ..
1507            } = fixture(&mut context, &namespace, n);
1508            let mut registrations = register_validators(&mut oracle, &participants).await;
1509
1510            // Link all validators except first
1511            let link = Link {
1512                latency: Duration::from_millis(10),
1513                jitter: Duration::from_millis(1),
1514                success_rate: 1.0,
1515            };
1516            link_validators(
1517                &mut oracle,
1518                &participants,
1519                Action::Link(link),
1520                Some(|_, i, j| ![i, j].contains(&0usize)),
1521            )
1522            .await;
1523
1524            // Create engines
1525            let elector = L::default();
1526            let relay = Arc::new(mocks::relay::Relay::new());
1527            let mut reporters = Vec::new();
1528            let mut engine_handlers = Vec::new();
1529            for (idx_scheme, validator) in participants.iter().enumerate() {
1530                // Skip first peer
1531                if idx_scheme == 0 {
1532                    continue;
1533                }
1534
1535                // Create scheme context
1536                let context = context.with_label(&format!("validator_{}", *validator));
1537
1538                // Configure engine
1539                let reporter_config = mocks::reporter::Config {
1540                    participants: participants.clone().try_into().unwrap(),
1541                    scheme: schemes[idx_scheme].clone(),
1542                    elector: elector.clone(),
1543                };
1544                let reporter =
1545                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1546                reporters.push(reporter.clone());
1547                let application_cfg = mocks::application::Config {
1548                    hasher: Sha256::default(),
1549                    relay: relay.clone(),
1550                    me: validator.clone(),
1551                    propose_latency: (10.0, 5.0),
1552                    verify_latency: (10.0, 5.0),
1553                    certify_latency: (10.0, 5.0),
1554                    should_certify: mocks::application::Certifier::Sometimes,
1555                };
1556                let (actor, application) = mocks::application::Application::new(
1557                    context.with_label("application"),
1558                    application_cfg,
1559                );
1560                actor.start();
1561                let blocker = oracle.control(validator.clone());
1562                let cfg = config::Config {
1563                    scheme: schemes[idx_scheme].clone(),
1564                    elector: elector.clone(),
1565                    blocker,
1566                    automaton: application.clone(),
1567                    relay: application.clone(),
1568                    reporter: reporter.clone(),
1569                    strategy: Sequential,
1570                    partition: validator.to_string(),
1571                    mailbox_size: 1024,
1572                    epoch: Epoch::new(333),
1573                    leader_timeout: Duration::from_secs(1),
1574                    notarization_timeout: Duration::from_secs(2),
1575                    nullify_retry: Duration::from_secs(10),
1576                    fetch_timeout: Duration::from_secs(1),
1577                    activity_timeout,
1578                    skip_timeout,
1579                    fetch_concurrent: 4,
1580                    replay_buffer: NZUsize!(1024 * 1024),
1581                    write_buffer: NZUsize!(1024 * 1024),
1582                    page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1583                };
1584                let engine = Engine::new(context.with_label("engine"), cfg);
1585
1586                // Start engine
1587                let (pending, recovered, resolver) = registrations
1588                    .remove(validator)
1589                    .expect("validator should be registered");
1590                engine_handlers.push(engine.start(pending, recovered, resolver));
1591            }
1592
1593            // Wait for all engines to finish
1594            let mut finalizers = Vec::new();
1595            for reporter in reporters.iter_mut() {
1596                let (mut latest, mut monitor) = reporter.subscribe().await;
1597                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1598                    while latest < required_containers {
1599                        latest = monitor.recv().await.expect("event missing");
1600                    }
1601                }));
1602            }
1603            join_all(finalizers).await;
1604
1605            // Check reporters for correct activity
1606            let exceptions = 0;
1607            let offline = &participants[0];
1608            for reporter in reporters.iter() {
1609                // Ensure no faults
1610                {
1611                    let faults = reporter.faults.lock().unwrap();
1612                    assert!(faults.is_empty());
1613                }
1614
1615                // Ensure no invalid signatures
1616                {
1617                    let invalid = reporter.invalid.lock().unwrap();
1618                    assert_eq!(*invalid, 0);
1619                }
1620
1621                // Ensure offline node is never active
1622                let mut exceptions = 0;
1623                {
1624                    let notarizes = reporter.notarizes.lock().unwrap();
1625                    for (view, payloads) in notarizes.iter() {
1626                        for (_, participants) in payloads.iter() {
1627                            if participants.contains(offline) {
1628                                panic!("view: {view}");
1629                            }
1630                        }
1631                    }
1632                }
1633                {
1634                    let nullifies = reporter.nullifies.lock().unwrap();
1635                    for (view, participants) in nullifies.iter() {
1636                        if participants.contains(offline) {
1637                            panic!("view: {view}");
1638                        }
1639                    }
1640                }
1641                {
1642                    let finalizes = reporter.finalizes.lock().unwrap();
1643                    for (view, payloads) in finalizes.iter() {
1644                        for (_, finalizers) in payloads.iter() {
1645                            if finalizers.contains(offline) {
1646                                panic!("view: {view}");
1647                            }
1648                        }
1649                    }
1650                }
1651
1652                // Identify offline views
1653                let mut offline_views = Vec::new();
1654                {
1655                    let leaders = reporter.leaders.lock().unwrap();
1656                    for (view, leader) in leaders.iter() {
1657                        if leader == offline {
1658                            offline_views.push(*view);
1659                        }
1660                    }
1661                }
1662                assert!(!offline_views.is_empty());
1663
1664                // Ensure nullifies/nullification collected for offline node
1665                {
1666                    let nullifies = reporter.nullifies.lock().unwrap();
1667                    for view in offline_views.iter() {
1668                        let nullifies = nullifies.get(view).map_or(0, |n| n.len());
1669                        if nullifies < quorum {
1670                            warn!("missing expected view nullifies: {}", view);
1671                            exceptions += 1;
1672                        }
1673                    }
1674                }
1675                {
1676                    let nullifications = reporter.nullifications.lock().unwrap();
1677                    for view in offline_views.iter() {
1678                        if !nullifications.contains_key(view) {
1679                            warn!("missing expected view nullifies: {}", view);
1680                            exceptions += 1;
1681                        }
1682                    }
1683                }
1684
1685                // Ensure exceptions within allowed
1686                assert!(exceptions <= max_exceptions);
1687            }
1688            assert!(exceptions <= max_exceptions);
1689
1690            // Ensure no blocked connections
1691            let blocked = oracle.blocked().await.unwrap();
1692            assert!(blocked.is_empty());
1693
1694            // Ensure online nodes are recording skips/nullifications for the offline leader
1695            let encoded = context.encode();
1696            let peer_label = format!("peer=\"{}\"", offline);
1697            for metric in ["_skips_per_leader", "_nullifications_per_leader"] {
1698                assert_eq!(
1699                    count_nonzero_metric_lines(&encoded, &[metric, &peer_label]),
1700                    n - 1,
1701                    "expected all online nodes to record {metric} for offline leader"
1702                );
1703            }
1704        });
1705    }
1706
1707    #[test_traced]
1708    fn test_one_offline() {
1709        one_offline::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1710        one_offline::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1711        one_offline::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1712        one_offline::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1713        one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1714        one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1715        one_offline::<_, _, RoundRobin>(ed25519::fixture);
1716        one_offline::<_, _, RoundRobin>(secp256r1::fixture);
1717    }
1718
1719    fn slow_validator<S, F, L>(mut fixture: F)
1720    where
1721        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1722        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1723        L: Elector<S>,
1724    {
1725        // Create context
1726        let n = 5;
1727        let required_containers = View::new(50);
1728        let activity_timeout = ViewDelta::new(10);
1729        let skip_timeout = ViewDelta::new(5);
1730        let namespace = b"consensus".to_vec();
1731        let executor = deterministic::Runner::timed(Duration::from_secs(300));
1732        executor.start(|mut context| async move {
1733            // Create simulated network
1734            let (network, mut oracle) = Network::new(
1735                context.with_label("network"),
1736                Config {
1737                    max_size: 1024 * 1024,
1738                    disconnect_on_block: true,
1739                    tracked_peer_sets: None,
1740                },
1741            );
1742
1743            // Start network
1744            network.start();
1745
1746            // Register participants
1747            let Fixture {
1748                participants,
1749                schemes,
1750                ..
1751            } = fixture(&mut context, &namespace, n);
1752            let mut registrations = register_validators(&mut oracle, &participants).await;
1753
1754            // Link all validators
1755            let link = Link {
1756                latency: Duration::from_millis(10),
1757                jitter: Duration::from_millis(1),
1758                success_rate: 1.0,
1759            };
1760            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1761
1762            // Create engines
1763            let elector = L::default();
1764            let relay = Arc::new(mocks::relay::Relay::new());
1765            let mut reporters = Vec::new();
1766            let mut engine_handlers = Vec::new();
1767            for (idx_scheme, validator) in participants.iter().enumerate() {
1768                // Create scheme context
1769                let context = context.with_label(&format!("validator_{}", *validator));
1770
1771                // Configure engine
1772                let reporter_config = mocks::reporter::Config {
1773                    participants: participants.clone().try_into().unwrap(),
1774                    scheme: schemes[idx_scheme].clone(),
1775                    elector: elector.clone(),
1776                };
1777                let reporter =
1778                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1779                reporters.push(reporter.clone());
1780                let application_cfg = if idx_scheme == 0 {
1781                    mocks::application::Config {
1782                        hasher: Sha256::default(),
1783                        relay: relay.clone(),
1784                        me: validator.clone(),
1785                        propose_latency: (10_000.0, 0.0),
1786                        verify_latency: (10_000.0, 5.0),
1787                        certify_latency: (10_000.0, 5.0),
1788                        should_certify: mocks::application::Certifier::Sometimes,
1789                    }
1790                } else {
1791                    mocks::application::Config {
1792                        hasher: Sha256::default(),
1793                        relay: relay.clone(),
1794                        me: validator.clone(),
1795                        propose_latency: (10.0, 5.0),
1796                        verify_latency: (10.0, 5.0),
1797                        certify_latency: (10.0, 5.0),
1798                        should_certify: mocks::application::Certifier::Sometimes,
1799                    }
1800                };
1801                let (actor, application) = mocks::application::Application::new(
1802                    context.with_label("application"),
1803                    application_cfg,
1804                );
1805                actor.start();
1806                let blocker = oracle.control(validator.clone());
1807                let cfg = config::Config {
1808                    scheme: schemes[idx_scheme].clone(),
1809                    elector: elector.clone(),
1810                    blocker,
1811                    automaton: application.clone(),
1812                    relay: application.clone(),
1813                    reporter: reporter.clone(),
1814                    strategy: Sequential,
1815                    partition: validator.to_string(),
1816                    mailbox_size: 1024,
1817                    epoch: Epoch::new(333),
1818                    leader_timeout: Duration::from_secs(1),
1819                    notarization_timeout: Duration::from_secs(2),
1820                    nullify_retry: Duration::from_secs(10),
1821                    fetch_timeout: Duration::from_secs(1),
1822                    activity_timeout,
1823                    skip_timeout,
1824                    fetch_concurrent: 4,
1825                    replay_buffer: NZUsize!(1024 * 1024),
1826                    write_buffer: NZUsize!(1024 * 1024),
1827                    page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1828                };
1829                let engine = Engine::new(context.with_label("engine"), cfg);
1830
1831                // Start engine
1832                let (pending, recovered, resolver) = registrations
1833                    .remove(validator)
1834                    .expect("validator should be registered");
1835                engine_handlers.push(engine.start(pending, recovered, resolver));
1836            }
1837
1838            // Wait for all engines to finish
1839            let mut finalizers = Vec::new();
1840            for reporter in reporters.iter_mut() {
1841                let (mut latest, mut monitor) = reporter.subscribe().await;
1842                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1843                    while latest < required_containers {
1844                        latest = monitor.recv().await.expect("event missing");
1845                    }
1846                }));
1847            }
1848            join_all(finalizers).await;
1849
1850            // Check reporters for correct activity
1851            let slow = &participants[0];
1852            for reporter in reporters.iter() {
1853                // Ensure no faults
1854                {
1855                    let faults = reporter.faults.lock().unwrap();
1856                    assert!(faults.is_empty());
1857                }
1858
1859                // Ensure no invalid signatures
1860                {
1861                    let invalid = reporter.invalid.lock().unwrap();
1862                    assert_eq!(*invalid, 0);
1863                }
1864
1865                // Ensure slow node still emits notarizes and finalizes (when receiving certificates)
1866                let mut observed = false;
1867                {
1868                    let notarizes = reporter.notarizes.lock().unwrap();
1869                    for (_, payloads) in notarizes.iter() {
1870                        for (_, participants) in payloads.iter() {
1871                            if participants.contains(slow) {
1872                                observed = true;
1873                                break;
1874                            }
1875                        }
1876                    }
1877                }
1878                {
1879                    let finalizes = reporter.finalizes.lock().unwrap();
1880                    for (_, payloads) in finalizes.iter() {
1881                        for (_, finalizers) in payloads.iter() {
1882                            if finalizers.contains(slow) {
1883                                observed = true;
1884                                break;
1885                            }
1886                        }
1887                    }
1888                }
1889                assert!(observed);
1890            }
1891
1892            // Ensure no blocked connections
1893            let blocked = oracle.blocked().await.unwrap();
1894            assert!(blocked.is_empty());
1895        });
1896    }
1897
1898    #[test_traced]
1899    fn test_slow_validator() {
1900        slow_validator::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
1901        slow_validator::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
1902        slow_validator::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
1903        slow_validator::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
1904        slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1905        slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1906        slow_validator::<_, _, RoundRobin>(ed25519::fixture);
1907        slow_validator::<_, _, RoundRobin>(secp256r1::fixture);
1908    }
1909
1910    fn all_recovery<S, F, L>(mut fixture: F)
1911    where
1912        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1913        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
1914        L: Elector<S>,
1915    {
1916        // Create context
1917        let n = 5;
1918        let required_containers = View::new(100);
1919        let activity_timeout = ViewDelta::new(10);
1920        let skip_timeout = ViewDelta::new(2);
1921        let namespace = b"consensus".to_vec();
1922        let executor = deterministic::Runner::timed(Duration::from_secs(1800));
1923        executor.start(|mut context| async move {
1924            // Create simulated network
1925            let (network, mut oracle) = Network::new(
1926                context.with_label("network"),
1927                Config {
1928                    max_size: 1024 * 1024,
1929                    disconnect_on_block: false,
1930                    tracked_peer_sets: None,
1931                },
1932            );
1933
1934            // Start network
1935            network.start();
1936
1937            // Register participants
1938            let Fixture {
1939                participants,
1940                schemes,
1941                ..
1942            } = fixture(&mut context, &namespace, n);
1943            let mut registrations = register_validators(&mut oracle, &participants).await;
1944
1945            // Link all validators
1946            let link = Link {
1947                latency: Duration::from_secs(3),
1948                jitter: Duration::from_millis(0),
1949                success_rate: 1.0,
1950            };
1951            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1952
1953            // Create engines
1954            let elector = L::default();
1955            let relay = Arc::new(mocks::relay::Relay::new());
1956            let mut reporters = Vec::new();
1957            let mut engine_handlers = Vec::new();
1958            for (idx, validator) in participants.iter().enumerate() {
1959                // Create scheme context
1960                let context = context.with_label(&format!("validator_{}", *validator));
1961
1962                // Configure engine
1963                let reporter_config = mocks::reporter::Config {
1964                    participants: participants.clone().try_into().unwrap(),
1965                    scheme: schemes[idx].clone(),
1966                    elector: elector.clone(),
1967                };
1968                let reporter =
1969                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1970                reporters.push(reporter.clone());
1971                let application_cfg = mocks::application::Config {
1972                    hasher: Sha256::default(),
1973                    relay: relay.clone(),
1974                    me: validator.clone(),
1975                    propose_latency: (10.0, 5.0),
1976                    verify_latency: (10.0, 5.0),
1977                    certify_latency: (10.0, 5.0),
1978                    should_certify: mocks::application::Certifier::Sometimes,
1979                };
1980                let (actor, application) = mocks::application::Application::new(
1981                    context.with_label("application"),
1982                    application_cfg,
1983                );
1984                actor.start();
1985                let blocker = oracle.control(validator.clone());
1986                let cfg = config::Config {
1987                    scheme: schemes[idx].clone(),
1988                    elector: elector.clone(),
1989                    blocker,
1990                    automaton: application.clone(),
1991                    relay: application.clone(),
1992                    reporter: reporter.clone(),
1993                    strategy: Sequential,
1994                    partition: validator.to_string(),
1995                    mailbox_size: 1024,
1996                    epoch: Epoch::new(333),
1997                    leader_timeout: Duration::from_secs(1),
1998                    notarization_timeout: Duration::from_secs(2),
1999                    nullify_retry: Duration::from_secs(10),
2000                    fetch_timeout: Duration::from_secs(1),
2001                    activity_timeout,
2002                    skip_timeout,
2003                    fetch_concurrent: 4,
2004                    replay_buffer: NZUsize!(1024 * 1024),
2005                    write_buffer: NZUsize!(1024 * 1024),
2006                    page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2007                };
2008                let engine = Engine::new(context.with_label("engine"), cfg);
2009
2010                // Start engine
2011                let (pending, recovered, resolver) = registrations
2012                    .remove(validator)
2013                    .expect("validator should be registered");
2014                engine_handlers.push(engine.start(pending, recovered, resolver));
2015            }
2016
2017            // Wait for a few virtual minutes (shouldn't finalize anything)
2018            let mut finalizers = Vec::new();
2019            for reporter in reporters.iter_mut() {
2020                let (_, mut monitor) = reporter.subscribe().await;
2021                finalizers.push(
2022                    context
2023                        .with_label("finalizer")
2024                        .spawn(move |context| async move {
2025                            select! {
2026                                _timeout = context.sleep(Duration::from_secs(60)) => {},
2027                                _done = monitor.recv() => {
2028                                    panic!("engine should not notarize or finalize anything");
2029                                },
2030                            }
2031                        }),
2032                );
2033            }
2034            join_all(finalizers).await;
2035
2036            // Unlink all validators to get latest view
2037            link_validators(&mut oracle, &participants, Action::Unlink, None).await;
2038
2039            // Wait for a virtual minute (nothing should happen)
2040            context.sleep(Duration::from_secs(60)).await;
2041
2042            // Get latest view
2043            let mut latest = View::zero();
2044            for reporter in reporters.iter() {
2045                let nullifies = reporter.nullifies.lock().unwrap();
2046                let max = nullifies.keys().max().unwrap();
2047                if *max > latest {
2048                    latest = *max;
2049                }
2050            }
2051
2052            // Update links
2053            let link = Link {
2054                latency: Duration::from_millis(10),
2055                jitter: Duration::from_millis(1),
2056                success_rate: 1.0,
2057            };
2058            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2059
2060            // Wait for all engines to finish
2061            let mut finalizers = Vec::new();
2062            for reporter in reporters.iter_mut() {
2063                let (mut latest, mut monitor) = reporter.subscribe().await;
2064                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2065                    while latest < required_containers {
2066                        latest = monitor.recv().await.expect("event missing");
2067                    }
2068                }));
2069            }
2070            join_all(finalizers).await;
2071
2072            // Check reporters for correct activity
2073            for reporter in reporters.iter() {
2074                // Ensure no faults
2075                {
2076                    let faults = reporter.faults.lock().unwrap();
2077                    assert!(faults.is_empty());
2078                }
2079
2080                // Ensure no invalid signatures
2081                {
2082                    let invalid = reporter.invalid.lock().unwrap();
2083                    assert_eq!(*invalid, 0);
2084                }
2085
2086                // Ensure quick recovery.
2087                //
2088                // If the skip timeout isn't implemented correctly, we may go many views before participants
2089                // start to notarize a validator's proposal.
2090                {
2091                    // Ensure nearly all views around latest are notarized.
2092                    // We don't check for finalization since some of the blocks may fail to be
2093                    // certified for the purposes of testing.
2094                    let mut found = 0;
2095                    let notarizations = reporter.notarizations.lock().unwrap();
2096                    for view in View::range(latest, latest.saturating_add(activity_timeout)) {
2097                        if notarizations.contains_key(&view) {
2098                            found += 1;
2099                        }
2100                    }
2101                    assert!(
2102                        found >= activity_timeout.get().saturating_sub(2),
2103                        "found: {found}"
2104                    );
2105                }
2106            }
2107
2108            // Ensure no blocked connections
2109            let blocked = oracle.blocked().await.unwrap();
2110            assert!(blocked.is_empty());
2111        });
2112    }
2113
2114    #[test_traced]
2115    fn test_all_recovery() {
2116        all_recovery::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
2117        all_recovery::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
2118        all_recovery::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
2119        all_recovery::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
2120        all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2121        all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2122        all_recovery::<_, _, RoundRobin>(ed25519::fixture);
2123        all_recovery::<_, _, RoundRobin>(secp256r1::fixture);
2124    }
2125
2126    fn partition<S, F, L>(mut fixture: F)
2127    where
2128        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2129        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2130        L: Elector<S>,
2131    {
2132        // Create context
2133        let n = 10;
2134        let required_containers = View::new(50);
2135        let activity_timeout = ViewDelta::new(10);
2136        let skip_timeout = ViewDelta::new(5);
2137        let namespace = b"consensus".to_vec();
2138        let executor = deterministic::Runner::timed(Duration::from_secs(900));
2139        executor.start(|mut context| async move {
2140            // Create simulated network
2141            let (network, mut oracle) = Network::new(
2142                context.with_label("network"),
2143                Config {
2144                    max_size: 1024 * 1024,
2145                    disconnect_on_block: false,
2146                    tracked_peer_sets: None,
2147                },
2148            );
2149
2150            // Start network
2151            network.start();
2152
2153            // Register participants
2154            let Fixture {
2155                participants,
2156                schemes,
2157                ..
2158            } = fixture(&mut context, &namespace, n);
2159            let mut registrations = register_validators(&mut oracle, &participants).await;
2160
2161            // Link all validators
2162            let link = Link {
2163                latency: Duration::from_millis(10),
2164                jitter: Duration::from_millis(1),
2165                success_rate: 1.0,
2166            };
2167            link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
2168
2169            // Create engines
2170            let elector = L::default();
2171            let relay = Arc::new(mocks::relay::Relay::new());
2172            let mut reporters = Vec::new();
2173            let mut engine_handlers = Vec::new();
2174            for (idx, validator) in participants.iter().enumerate() {
2175                // Create scheme context
2176                let context = context.with_label(&format!("validator_{}", *validator));
2177
2178                // Configure engine
2179                let reporter_config = mocks::reporter::Config {
2180                    participants: participants.clone().try_into().unwrap(),
2181                    scheme: schemes[idx].clone(),
2182                    elector: elector.clone(),
2183                };
2184                let reporter =
2185                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2186                reporters.push(reporter.clone());
2187                let application_cfg = mocks::application::Config {
2188                    hasher: Sha256::default(),
2189                    relay: relay.clone(),
2190                    me: validator.clone(),
2191                    propose_latency: (10.0, 5.0),
2192                    verify_latency: (10.0, 5.0),
2193                    certify_latency: (10.0, 5.0),
2194                    should_certify: mocks::application::Certifier::Sometimes,
2195                };
2196                let (actor, application) = mocks::application::Application::new(
2197                    context.with_label("application"),
2198                    application_cfg,
2199                );
2200                actor.start();
2201                let blocker = oracle.control(validator.clone());
2202                let cfg = config::Config {
2203                    scheme: schemes[idx].clone(),
2204                    elector: elector.clone(),
2205                    blocker,
2206                    automaton: application.clone(),
2207                    relay: application.clone(),
2208                    reporter: reporter.clone(),
2209                    strategy: Sequential,
2210                    partition: validator.to_string(),
2211                    mailbox_size: 1024,
2212                    epoch: Epoch::new(333),
2213                    leader_timeout: Duration::from_secs(1),
2214                    notarization_timeout: Duration::from_secs(2),
2215                    nullify_retry: Duration::from_secs(10),
2216                    fetch_timeout: Duration::from_secs(1),
2217                    activity_timeout,
2218                    skip_timeout,
2219                    fetch_concurrent: 4,
2220                    replay_buffer: NZUsize!(1024 * 1024),
2221                    write_buffer: NZUsize!(1024 * 1024),
2222                    page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2223                };
2224                let engine = Engine::new(context.with_label("engine"), cfg);
2225
2226                // Start engine
2227                let (pending, recovered, resolver) = registrations
2228                    .remove(validator)
2229                    .expect("validator should be registered");
2230                engine_handlers.push(engine.start(pending, recovered, resolver));
2231            }
2232
2233            // Wait for all engines to finish
2234            let mut finalizers = Vec::new();
2235            for reporter in reporters.iter_mut() {
2236                let (mut latest, mut monitor) = reporter.subscribe().await;
2237                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2238                    while latest < required_containers {
2239                        latest = monitor.recv().await.expect("event missing");
2240                    }
2241                }));
2242            }
2243            join_all(finalizers).await;
2244
2245            // Cut all links between validator halves
2246            fn separated(n: usize, a: usize, b: usize) -> bool {
2247                let m = n / 2;
2248                (a < m && b >= m) || (a >= m && b < m)
2249            }
2250            link_validators(&mut oracle, &participants, Action::Unlink, Some(separated)).await;
2251
2252            // Wait for any in-progress notarizations/finalizations to finish
2253            context.sleep(Duration::from_secs(10)).await;
2254
2255            // Wait for a few virtual minutes (shouldn't finalize anything)
2256            let mut finalizers = Vec::new();
2257            for reporter in reporters.iter_mut() {
2258                let (_, mut monitor) = reporter.subscribe().await;
2259                finalizers.push(
2260                    context
2261                        .with_label("finalizer")
2262                        .spawn(move |context| async move {
2263                            select! {
2264                                _timeout = context.sleep(Duration::from_secs(60)) => {},
2265                                _done = monitor.recv() => {
2266                                    panic!("engine should not notarize or finalize anything");
2267                                },
2268                            }
2269                        }),
2270                );
2271            }
2272            join_all(finalizers).await;
2273
2274            // Restore links
2275            link_validators(
2276                &mut oracle,
2277                &participants,
2278                Action::Link(link),
2279                Some(separated),
2280            )
2281            .await;
2282
2283            // Wait for all engines to finish
2284            let mut finalizers = Vec::new();
2285            for reporter in reporters.iter_mut() {
2286                let (mut latest, mut monitor) = reporter.subscribe().await;
2287                let required = latest.saturating_add(ViewDelta::new(required_containers.get()));
2288                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2289                    while latest < required {
2290                        latest = monitor.recv().await.expect("event missing");
2291                    }
2292                }));
2293            }
2294            join_all(finalizers).await;
2295
2296            // Check reporters for correct activity
2297            for reporter in reporters.iter() {
2298                // Ensure no faults
2299                {
2300                    let faults = reporter.faults.lock().unwrap();
2301                    assert!(faults.is_empty());
2302                }
2303
2304                // Ensure no invalid signatures
2305                {
2306                    let invalid = reporter.invalid.lock().unwrap();
2307                    assert_eq!(*invalid, 0);
2308                }
2309            }
2310
2311            // Ensure no blocked connections
2312            let blocked = oracle.blocked().await.unwrap();
2313            assert!(blocked.is_empty());
2314        });
2315    }
2316
2317    #[test_group("slow")]
2318    #[test_traced]
2319    fn test_partition() {
2320        partition::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
2321        partition::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
2322        partition::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
2323        partition::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
2324        partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2325        partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2326        partition::<_, _, RoundRobin>(ed25519::fixture);
2327        partition::<_, _, RoundRobin>(secp256r1::fixture);
2328    }
2329
2330    fn slow_and_lossy_links<S, F, L>(seed: u64, mut fixture: F) -> String
2331    where
2332        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2333        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2334        L: Elector<S>,
2335    {
2336        // Create context
2337        let n = 5;
2338        let required_containers = View::new(50);
2339        let activity_timeout = ViewDelta::new(10);
2340        let skip_timeout = ViewDelta::new(5);
2341        let namespace = b"consensus".to_vec();
2342        let cfg = deterministic::Config::new()
2343            .with_seed(seed)
2344            .with_timeout(Some(Duration::from_secs(5_000)));
2345        let executor = deterministic::Runner::new(cfg);
2346        executor.start(|mut context| async move {
2347            // Create simulated network
2348            let (network, mut oracle) = Network::new(
2349                context.with_label("network"),
2350                Config {
2351                    max_size: 1024 * 1024,
2352                    disconnect_on_block: false,
2353                    tracked_peer_sets: None,
2354                },
2355            );
2356
2357            // Start network
2358            network.start();
2359
2360            // Register participants
2361            let Fixture {
2362                participants,
2363                schemes,
2364                ..
2365            } = fixture(&mut context, &namespace, n);
2366            let mut registrations = register_validators(&mut oracle, &participants).await;
2367
2368            // Link all validators
2369            let degraded_link = Link {
2370                latency: Duration::from_millis(200),
2371                jitter: Duration::from_millis(150),
2372                success_rate: 0.5,
2373            };
2374            link_validators(
2375                &mut oracle,
2376                &participants,
2377                Action::Link(degraded_link),
2378                None,
2379            )
2380            .await;
2381
2382            // Create engines
2383            let elector = L::default();
2384            let relay = Arc::new(mocks::relay::Relay::new());
2385            let mut reporters = Vec::new();
2386            let mut engine_handlers = Vec::new();
2387            for (idx, validator) in participants.iter().enumerate() {
2388                // Create scheme context
2389                let context = context.with_label(&format!("validator_{}", *validator));
2390
2391                // Configure engine
2392                let reporter_config = mocks::reporter::Config {
2393                    participants: participants.clone().try_into().unwrap(),
2394                    scheme: schemes[idx].clone(),
2395                    elector: elector.clone(),
2396                };
2397                let reporter =
2398                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2399                reporters.push(reporter.clone());
2400                let application_cfg = mocks::application::Config {
2401                    hasher: Sha256::default(),
2402                    relay: relay.clone(),
2403                    me: validator.clone(),
2404                    propose_latency: (10.0, 5.0),
2405                    verify_latency: (10.0, 5.0),
2406                    certify_latency: (10.0, 5.0),
2407                    should_certify: mocks::application::Certifier::Sometimes,
2408                };
2409                let (actor, application) = mocks::application::Application::new(
2410                    context.with_label("application"),
2411                    application_cfg,
2412                );
2413                actor.start();
2414                let blocker = oracle.control(validator.clone());
2415                let cfg = config::Config {
2416                    scheme: schemes[idx].clone(),
2417                    elector: elector.clone(),
2418                    blocker,
2419                    automaton: application.clone(),
2420                    relay: application.clone(),
2421                    reporter: reporter.clone(),
2422                    strategy: Sequential,
2423                    partition: validator.to_string(),
2424                    mailbox_size: 1024,
2425                    epoch: Epoch::new(333),
2426                    leader_timeout: Duration::from_secs(1),
2427                    notarization_timeout: Duration::from_secs(2),
2428                    nullify_retry: Duration::from_secs(10),
2429                    fetch_timeout: Duration::from_secs(1),
2430                    activity_timeout,
2431                    skip_timeout,
2432                    fetch_concurrent: 4,
2433                    replay_buffer: NZUsize!(1024 * 1024),
2434                    write_buffer: NZUsize!(1024 * 1024),
2435                    page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2436                };
2437                let engine = Engine::new(context.with_label("engine"), cfg);
2438
2439                // Start engine
2440                let (pending, recovered, resolver) = registrations
2441                    .remove(validator)
2442                    .expect("validator should be registered");
2443                engine_handlers.push(engine.start(pending, recovered, resolver));
2444            }
2445
2446            // Wait for all engines to finish
2447            let mut finalizers = Vec::new();
2448            for reporter in reporters.iter_mut() {
2449                let (mut latest, mut monitor) = reporter.subscribe().await;
2450                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2451                    while latest < required_containers {
2452                        latest = monitor.recv().await.expect("event missing");
2453                    }
2454                }));
2455            }
2456            join_all(finalizers).await;
2457
2458            // Check reporters for correct activity
2459            for reporter in reporters.iter() {
2460                // Ensure no faults
2461                {
2462                    let faults = reporter.faults.lock().unwrap();
2463                    assert!(faults.is_empty());
2464                }
2465
2466                // Ensure no invalid signatures
2467                {
2468                    let invalid = reporter.invalid.lock().unwrap();
2469                    assert_eq!(*invalid, 0);
2470                }
2471            }
2472
2473            // Ensure no blocked connections
2474            let blocked = oracle.blocked().await.unwrap();
2475            assert!(blocked.is_empty());
2476
2477            context.auditor().state()
2478        })
2479    }
2480
2481    #[test_traced]
2482    fn test_slow_and_lossy_links() {
2483        slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold_vrf::fixture::<MinPk, _>);
2484        slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold_vrf::fixture::<MinSig, _>);
2485        slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_threshold_std::fixture::<MinPk, _>);
2486        slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_threshold_std::fixture::<MinSig, _>);
2487        slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinPk, _>);
2488        slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinSig, _>);
2489        slow_and_lossy_links::<_, _, RoundRobin>(0, ed25519::fixture);
2490        slow_and_lossy_links::<_, _, RoundRobin>(0, secp256r1::fixture);
2491    }
2492
2493    #[test_group("slow")]
2494    #[test_traced]
2495    fn test_determinism() {
2496        // We use slow and lossy links as the deterministic test
2497        // because it is the most complex test.
2498        for seed in 1..6 {
2499            let ts_vrf_pk_state_1 = slow_and_lossy_links::<_, _, Random>(
2500                seed,
2501                bls12381_threshold_vrf::fixture::<MinPk, _>,
2502            );
2503            let ts_vrf_pk_state_2 = slow_and_lossy_links::<_, _, Random>(
2504                seed,
2505                bls12381_threshold_vrf::fixture::<MinPk, _>,
2506            );
2507            assert_eq!(ts_vrf_pk_state_1, ts_vrf_pk_state_2);
2508
2509            let ts_vrf_sig_state_1 = slow_and_lossy_links::<_, _, Random>(
2510                seed,
2511                bls12381_threshold_vrf::fixture::<MinSig, _>,
2512            );
2513            let ts_vrf_sig_state_2 = slow_and_lossy_links::<_, _, Random>(
2514                seed,
2515                bls12381_threshold_vrf::fixture::<MinSig, _>,
2516            );
2517            assert_eq!(ts_vrf_sig_state_1, ts_vrf_sig_state_2);
2518
2519            let ts_std_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2520                seed,
2521                bls12381_threshold_std::fixture::<MinPk, _>,
2522            );
2523            let ts_std_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2524                seed,
2525                bls12381_threshold_std::fixture::<MinPk, _>,
2526            );
2527            assert_eq!(ts_std_pk_state_1, ts_std_pk_state_2);
2528
2529            let ts_std_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2530                seed,
2531                bls12381_threshold_std::fixture::<MinSig, _>,
2532            );
2533            let ts_std_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2534                seed,
2535                bls12381_threshold_std::fixture::<MinSig, _>,
2536            );
2537            assert_eq!(ts_std_sig_state_1, ts_std_sig_state_2);
2538
2539            let ms_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2540                seed,
2541                bls12381_multisig::fixture::<MinPk, _>,
2542            );
2543            let ms_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2544                seed,
2545                bls12381_multisig::fixture::<MinPk, _>,
2546            );
2547            assert_eq!(ms_pk_state_1, ms_pk_state_2);
2548
2549            let ms_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2550                seed,
2551                bls12381_multisig::fixture::<MinSig, _>,
2552            );
2553            let ms_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2554                seed,
2555                bls12381_multisig::fixture::<MinSig, _>,
2556            );
2557            assert_eq!(ms_sig_state_1, ms_sig_state_2);
2558
2559            let ed_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
2560            let ed_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
2561            assert_eq!(ed_state_1, ed_state_2);
2562
2563            let secp_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
2564            let secp_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, secp256r1::fixture);
2565            assert_eq!(secp_state_1, secp_state_2);
2566
2567            let states = [
2568                ("threshold-vrf-minpk", ts_vrf_pk_state_1),
2569                ("threshold-vrf-minsig", ts_vrf_sig_state_1),
2570                ("threshold-std-minpk", ts_std_pk_state_1),
2571                ("threshold-std-minsig", ts_std_sig_state_1),
2572                ("multisig-minpk", ms_pk_state_1),
2573                ("multisig-minsig", ms_sig_state_1),
2574                ("ed25519", ed_state_1),
2575                ("secp256r1", secp_state_1),
2576            ];
2577
2578            // Sanity check that different types can't be identical
2579            for pair in states.windows(2) {
2580                assert_ne!(
2581                    pair[0].1, pair[1].1,
2582                    "state {} equals state {}",
2583                    pair[0].0, pair[1].0
2584                );
2585            }
2586        }
2587    }
2588
2589    fn conflicter<S, F, L>(seed: u64, mut fixture: F)
2590    where
2591        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2592        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2593        L: Elector<S>,
2594    {
2595        // Create context
2596        let n = 4;
2597        let required_containers = View::new(50);
2598        let activity_timeout = ViewDelta::new(10);
2599        let skip_timeout = ViewDelta::new(5);
2600        let namespace = b"consensus".to_vec();
2601        let cfg = deterministic::Config::new()
2602            .with_seed(seed)
2603            .with_timeout(Some(Duration::from_secs(30)));
2604        let executor = deterministic::Runner::new(cfg);
2605        executor.start(|mut context| async move {
2606            // Create simulated network
2607            let (network, mut oracle) = Network::new(
2608                context.with_label("network"),
2609                Config {
2610                    max_size: 1024 * 1024,
2611                    disconnect_on_block: false,
2612                    tracked_peer_sets: None,
2613                },
2614            );
2615
2616            // Start network
2617            network.start();
2618
2619            // Register participants
2620            let Fixture {
2621                participants,
2622                schemes,
2623                ..
2624            } = fixture(&mut context, &namespace, n);
2625            let mut registrations = register_validators(&mut oracle, &participants).await;
2626
2627            // Link all validators
2628            let link = Link {
2629                latency: Duration::from_millis(10),
2630                jitter: Duration::from_millis(1),
2631                success_rate: 1.0,
2632            };
2633            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2634
2635            // Create engines
2636            let elector = L::default();
2637            let relay = Arc::new(mocks::relay::Relay::new());
2638            let mut reporters = Vec::new();
2639            for (idx_scheme, validator) in participants.iter().enumerate() {
2640                // Create scheme context
2641                let context = context.with_label(&format!("validator_{}", *validator));
2642
2643                // Start engine
2644                let reporter_config = mocks::reporter::Config {
2645                    participants: participants.clone().try_into().unwrap(),
2646                    scheme: schemes[idx_scheme].clone(),
2647                    elector: elector.clone(),
2648                };
2649                let reporter =
2650                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2651                let (pending, recovered, resolver) = registrations
2652                    .remove(validator)
2653                    .expect("validator should be registered");
2654                if idx_scheme == 0 {
2655                    let cfg = mocks::conflicter::Config {
2656                        scheme: schemes[idx_scheme].clone(),
2657                    };
2658
2659                    let engine: mocks::conflicter::Conflicter<_, _, Sha256> =
2660                        mocks::conflicter::Conflicter::new(
2661                            context.with_label("byzantine_engine"),
2662                            cfg,
2663                        );
2664                    engine.start(pending);
2665                } else {
2666                    reporters.push(reporter.clone());
2667                    let application_cfg = mocks::application::Config {
2668                        hasher: Sha256::default(),
2669                        relay: relay.clone(),
2670                        me: validator.clone(),
2671                        propose_latency: (10.0, 5.0),
2672                        verify_latency: (10.0, 5.0),
2673                        certify_latency: (10.0, 5.0),
2674                        should_certify: mocks::application::Certifier::Sometimes,
2675                    };
2676                    let (actor, application) = mocks::application::Application::new(
2677                        context.with_label("application"),
2678                        application_cfg,
2679                    );
2680                    actor.start();
2681                    let blocker = oracle.control(validator.clone());
2682                    let cfg = config::Config {
2683                        scheme: schemes[idx_scheme].clone(),
2684                        elector: elector.clone(),
2685                        blocker,
2686                        automaton: application.clone(),
2687                        relay: application.clone(),
2688                        reporter: reporter.clone(),
2689                        strategy: Sequential,
2690                        partition: validator.to_string(),
2691                        mailbox_size: 1024,
2692                        epoch: Epoch::new(333),
2693                        leader_timeout: Duration::from_secs(1),
2694                        notarization_timeout: Duration::from_secs(2),
2695                        nullify_retry: Duration::from_secs(10),
2696                        fetch_timeout: Duration::from_secs(1),
2697                        activity_timeout,
2698                        skip_timeout,
2699                        fetch_concurrent: 4,
2700                        replay_buffer: NZUsize!(1024 * 1024),
2701                        write_buffer: NZUsize!(1024 * 1024),
2702                        page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2703                    };
2704                    let engine = Engine::new(context.with_label("engine"), cfg);
2705                    engine.start(pending, recovered, resolver);
2706                }
2707            }
2708
2709            // Wait for all engines to finish
2710            let mut finalizers = Vec::new();
2711            for reporter in reporters.iter_mut() {
2712                let (mut latest, mut monitor) = reporter.subscribe().await;
2713                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2714                    while latest < required_containers {
2715                        latest = monitor.recv().await.expect("event missing");
2716                    }
2717                }));
2718            }
2719            join_all(finalizers).await;
2720
2721            // Check reporters for correct activity
2722            let byz = &participants[0];
2723            let mut count_conflicting = 0;
2724            for reporter in reporters.iter() {
2725                // Ensure only faults for byz
2726                {
2727                    let faults = reporter.faults.lock().unwrap();
2728                    assert_eq!(faults.len(), 1);
2729                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
2730                    for (_, faults) in faulter.iter() {
2731                        for fault in faults.iter() {
2732                            match fault {
2733                                Activity::ConflictingNotarize(_) => {
2734                                    count_conflicting += 1;
2735                                }
2736                                Activity::ConflictingFinalize(_) => {
2737                                    count_conflicting += 1;
2738                                }
2739                                _ => panic!("unexpected fault: {fault:?}"),
2740                            }
2741                        }
2742                    }
2743                }
2744
2745                // Ensure no invalid signatures
2746                {
2747                    let invalid = reporter.invalid.lock().unwrap();
2748                    assert_eq!(*invalid, 0);
2749                }
2750            }
2751            assert!(count_conflicting > 0);
2752
2753            // Ensure conflicter is blocked
2754            let blocked = oracle.blocked().await.unwrap();
2755            assert!(!blocked.is_empty());
2756            for (a, b) in blocked {
2757                assert_ne!(&a, byz);
2758                assert_eq!(&b, byz);
2759            }
2760        });
2761    }
2762
2763    #[test_group("slow")]
2764    #[test_traced]
2765    fn test_conflicter() {
2766        for seed in 0..5 {
2767            conflicter::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
2768            conflicter::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
2769            conflicter::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
2770            conflicter::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
2771            conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
2772            conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
2773            conflicter::<_, _, RoundRobin>(seed, ed25519::fixture);
2774            conflicter::<_, _, RoundRobin>(seed, secp256r1::fixture);
2775        }
2776    }
2777
2778    fn invalid<S, F, L>(seed: u64, mut fixture: F)
2779    where
2780        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2781        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2782        L: Elector<S>,
2783    {
2784        // Create context
2785        let n = 4;
2786        let required_containers = View::new(50);
2787        let activity_timeout = ViewDelta::new(10);
2788        let skip_timeout = ViewDelta::new(5);
2789        let namespace = b"consensus".to_vec();
2790        let cfg = deterministic::Config::new()
2791            .with_seed(seed)
2792            .with_timeout(Some(Duration::from_secs(30)));
2793        let executor = deterministic::Runner::new(cfg);
2794        executor.start(|mut context| async move {
2795            // Create simulated network
2796            let (network, mut oracle) = Network::new(
2797                context.with_label("network"),
2798                Config {
2799                    max_size: 1024 * 1024,
2800                    disconnect_on_block: false,
2801                    tracked_peer_sets: None,
2802                },
2803            );
2804
2805            // Start network
2806            network.start();
2807
2808            // Register participants
2809            let Fixture {
2810                participants,
2811                schemes,
2812                ..
2813            } = fixture(&mut context, &namespace, n);
2814
2815            // Create scheme with wrong namespace for byzantine node (index 0)
2816            let Fixture {
2817                schemes: wrong_schemes,
2818                ..
2819            } = fixture(&mut context, b"wrong-namespace", n);
2820
2821            let mut registrations = register_validators(&mut oracle, &participants).await;
2822
2823            // Link all validators
2824            let link = Link {
2825                latency: Duration::from_millis(10),
2826                jitter: Duration::from_millis(1),
2827                success_rate: 1.0,
2828            };
2829            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2830
2831            // Create engines
2832            let elector = L::default();
2833            let relay = Arc::new(mocks::relay::Relay::new());
2834            let mut reporters = Vec::new();
2835            for (idx_scheme, validator) in participants.iter().enumerate() {
2836                // Create scheme context
2837                let context = context.with_label(&format!("validator_{}", *validator));
2838
2839                // Byzantine node (idx 0) uses wrong namespace scheme to produce invalid signatures
2840                // Honest nodes use correct namespace schemes
2841                let scheme = if idx_scheme == 0 {
2842                    wrong_schemes[idx_scheme].clone()
2843                } else {
2844                    schemes[idx_scheme].clone()
2845                };
2846
2847                let reporter_config = mocks::reporter::Config {
2848                    participants: participants.clone().try_into().unwrap(),
2849                    scheme: schemes[idx_scheme].clone(),
2850                    elector: elector.clone(),
2851                };
2852                let reporter =
2853                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2854                reporters.push(reporter.clone());
2855
2856                let application_cfg = mocks::application::Config {
2857                    hasher: Sha256::default(),
2858                    relay: relay.clone(),
2859                    me: validator.clone(),
2860                    propose_latency: (10.0, 5.0),
2861                    verify_latency: (10.0, 5.0),
2862                    certify_latency: (10.0, 5.0),
2863                    should_certify: mocks::application::Certifier::Sometimes,
2864                };
2865                let (actor, application) = mocks::application::Application::new(
2866                    context.with_label("application"),
2867                    application_cfg,
2868                );
2869                actor.start();
2870                let blocker = oracle.control(validator.clone());
2871                let cfg = config::Config {
2872                    scheme,
2873                    elector: elector.clone(),
2874                    blocker,
2875                    automaton: application.clone(),
2876                    relay: application.clone(),
2877                    reporter: reporter.clone(),
2878                    strategy: Sequential,
2879                    partition: validator.clone().to_string(),
2880                    mailbox_size: 1024,
2881                    epoch: Epoch::new(333),
2882                    leader_timeout: Duration::from_secs(1),
2883                    notarization_timeout: Duration::from_secs(2),
2884                    nullify_retry: Duration::from_secs(10),
2885                    fetch_timeout: Duration::from_secs(1),
2886                    activity_timeout,
2887                    skip_timeout,
2888                    fetch_concurrent: 4,
2889                    replay_buffer: NZUsize!(1024 * 1024),
2890                    write_buffer: NZUsize!(1024 * 1024),
2891                    page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2892                };
2893                let engine = Engine::new(context.with_label("engine"), cfg);
2894                let (pending, recovered, resolver) = registrations
2895                    .remove(validator)
2896                    .expect("validator should be registered");
2897                engine.start(pending, recovered, resolver);
2898            }
2899
2900            // Wait for honest engines to finish (skip byzantine node at index 0)
2901            let mut finalizers = Vec::new();
2902            for reporter in reporters.iter_mut().skip(1) {
2903                let (mut latest, mut monitor) = reporter.subscribe().await;
2904                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2905                    while latest < required_containers {
2906                        latest = monitor.recv().await.expect("event missing");
2907                    }
2908                }));
2909            }
2910            join_all(finalizers).await;
2911
2912            // Check honest reporters (reporters[1..]) for correct activity
2913            let mut invalid_count = 0;
2914            for reporter in reporters.iter().skip(1) {
2915                // Ensure no faults
2916                {
2917                    let faults = reporter.faults.lock().unwrap();
2918                    assert!(faults.is_empty());
2919                }
2920
2921                // Count invalid signatures
2922                {
2923                    let invalid = reporter.invalid.lock().unwrap();
2924                    if *invalid > 0 {
2925                        invalid_count += 1;
2926                    }
2927                }
2928            }
2929
2930            // All honest nodes should see invalid signatures from the byzantine node
2931            assert_eq!(invalid_count, n - 1);
2932
2933            // Ensure byzantine node is blocked by honest nodes
2934            let blocked = oracle.blocked().await.unwrap();
2935            assert!(!blocked.is_empty());
2936            for (a, b) in blocked {
2937                if a != participants[0] {
2938                    assert_eq!(b, participants[0]);
2939                }
2940            }
2941        });
2942    }
2943
2944    #[test_group("slow")]
2945    #[test_traced]
2946    fn test_invalid() {
2947        for seed in 0..5 {
2948            invalid::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
2949            invalid::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
2950            invalid::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
2951            invalid::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
2952            invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
2953            invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
2954            invalid::<_, _, RoundRobin>(seed, ed25519::fixture);
2955            invalid::<_, _, RoundRobin>(seed, secp256r1::fixture);
2956        }
2957    }
2958
2959    fn impersonator<S, F, L>(seed: u64, mut fixture: F)
2960    where
2961        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2962        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
2963        L: Elector<S>,
2964    {
2965        // Create context
2966        let n = 4;
2967        let required_containers = View::new(50);
2968        let activity_timeout = ViewDelta::new(10);
2969        let skip_timeout = ViewDelta::new(5);
2970        let namespace = b"consensus".to_vec();
2971        let cfg = deterministic::Config::new()
2972            .with_seed(seed)
2973            .with_timeout(Some(Duration::from_secs(30)));
2974        let executor = deterministic::Runner::new(cfg);
2975        executor.start(|mut context| async move {
2976            // Create simulated network
2977            let (network, mut oracle) = Network::new(
2978                context.with_label("network"),
2979                Config {
2980                    max_size: 1024 * 1024,
2981                    disconnect_on_block: false,
2982                    tracked_peer_sets: None,
2983                },
2984            );
2985
2986            // Start network
2987            network.start();
2988
2989            // Register participants
2990            let Fixture {
2991                participants,
2992                schemes,
2993                ..
2994            } = fixture(&mut context, &namespace, n);
2995            let mut registrations = register_validators(&mut oracle, &participants).await;
2996
2997            // Link all validators
2998            let link = Link {
2999                latency: Duration::from_millis(10),
3000                jitter: Duration::from_millis(1),
3001                success_rate: 1.0,
3002            };
3003            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3004
3005            // Create engines
3006            let elector = L::default();
3007            let relay = Arc::new(mocks::relay::Relay::new());
3008            let mut reporters = Vec::new();
3009            for (idx_scheme, validator) in participants.iter().enumerate() {
3010                // Create scheme context
3011                let context = context.with_label(&format!("validator_{}", *validator));
3012
3013                // Start engine
3014                let reporter_config = mocks::reporter::Config {
3015                    participants: participants.clone().try_into().unwrap(),
3016                    scheme: schemes[idx_scheme].clone(),
3017                    elector: elector.clone(),
3018                };
3019                let reporter =
3020                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3021                let (pending, recovered, resolver) = registrations
3022                    .remove(validator)
3023                    .expect("validator should be registered");
3024                if idx_scheme == 0 {
3025                    let cfg = mocks::impersonator::Config {
3026                        scheme: schemes[idx_scheme].clone(),
3027                    };
3028
3029                    let engine: mocks::impersonator::Impersonator<_, _, Sha256> =
3030                        mocks::impersonator::Impersonator::new(
3031                            context.with_label("byzantine_engine"),
3032                            cfg,
3033                        );
3034                    engine.start(pending);
3035                } else {
3036                    reporters.push(reporter.clone());
3037                    let application_cfg = mocks::application::Config {
3038                        hasher: Sha256::default(),
3039                        relay: relay.clone(),
3040                        me: validator.clone(),
3041                        propose_latency: (10.0, 5.0),
3042                        verify_latency: (10.0, 5.0),
3043                        certify_latency: (10.0, 5.0),
3044                        should_certify: mocks::application::Certifier::Sometimes,
3045                    };
3046                    let (actor, application) = mocks::application::Application::new(
3047                        context.with_label("application"),
3048                        application_cfg,
3049                    );
3050                    actor.start();
3051                    let blocker = oracle.control(validator.clone());
3052                    let cfg = config::Config {
3053                        scheme: schemes[idx_scheme].clone(),
3054                        elector: elector.clone(),
3055                        blocker,
3056                        automaton: application.clone(),
3057                        relay: application.clone(),
3058                        reporter: reporter.clone(),
3059                        strategy: Sequential,
3060                        partition: validator.clone().to_string(),
3061                        mailbox_size: 1024,
3062                        epoch: Epoch::new(333),
3063                        leader_timeout: Duration::from_secs(1),
3064                        notarization_timeout: Duration::from_secs(2),
3065                        nullify_retry: Duration::from_secs(10),
3066                        fetch_timeout: Duration::from_secs(1),
3067                        activity_timeout,
3068                        skip_timeout,
3069                        fetch_concurrent: 4,
3070                        replay_buffer: NZUsize!(1024 * 1024),
3071                        write_buffer: NZUsize!(1024 * 1024),
3072                        page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3073                    };
3074                    let engine = Engine::new(context.with_label("engine"), cfg);
3075                    engine.start(pending, recovered, resolver);
3076                }
3077            }
3078
3079            // Wait for all engines to finish
3080            let mut finalizers = Vec::new();
3081            for reporter in reporters.iter_mut() {
3082                let (mut latest, mut monitor) = reporter.subscribe().await;
3083                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3084                    while latest < required_containers {
3085                        latest = monitor.recv().await.expect("event missing");
3086                    }
3087                }));
3088            }
3089            join_all(finalizers).await;
3090
3091            // Check reporters for correct activity
3092            let byz = &participants[0];
3093            for reporter in reporters.iter() {
3094                // Ensure no faults
3095                {
3096                    let faults = reporter.faults.lock().unwrap();
3097                    assert!(faults.is_empty());
3098                }
3099
3100                // Ensure no invalid signatures
3101                {
3102                    let invalid = reporter.invalid.lock().unwrap();
3103                    assert_eq!(*invalid, 0);
3104                }
3105            }
3106
3107            // Ensure invalid is blocked
3108            let blocked = oracle.blocked().await.unwrap();
3109            assert!(!blocked.is_empty());
3110            for (a, b) in blocked {
3111                assert_ne!(&a, byz);
3112                assert_eq!(&b, byz);
3113            }
3114        });
3115    }
3116
3117    #[test_group("slow")]
3118    #[test_traced]
3119    fn test_impersonator() {
3120        for seed in 0..5 {
3121            impersonator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3122            impersonator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3123            impersonator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3124            impersonator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3125            impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3126            impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3127            impersonator::<_, _, RoundRobin>(seed, ed25519::fixture);
3128            impersonator::<_, _, RoundRobin>(seed, secp256r1::fixture);
3129        }
3130    }
3131
3132    fn equivocator<S, F, L>(seed: u64, mut fixture: F) -> bool
3133    where
3134        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3135        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3136        L: Elector<S>,
3137    {
3138        // Create context
3139        let n = 7;
3140        let required_containers = View::new(50);
3141        let activity_timeout = ViewDelta::new(10);
3142        let skip_timeout = ViewDelta::new(5);
3143        let namespace = b"consensus".to_vec();
3144        let cfg = deterministic::Config::new()
3145            .with_seed(seed)
3146            .with_timeout(Some(Duration::from_secs(60)));
3147        let executor = deterministic::Runner::new(cfg);
3148        executor.start(|mut context| async move {
3149            // Create simulated network
3150            let (network, mut oracle) = Network::new(
3151                context.with_label("network"),
3152                Config {
3153                    max_size: 1024 * 1024,
3154                    disconnect_on_block: false,
3155                    tracked_peer_sets: None,
3156                },
3157            );
3158
3159            // Start network
3160            network.start();
3161
3162            // Register participants
3163            let Fixture {
3164                participants,
3165                schemes,
3166                ..
3167            } = fixture(&mut context, &namespace, n);
3168            let mut registrations = register_validators(&mut oracle, &participants).await;
3169
3170            // Link all validators
3171            let link = Link {
3172                latency: Duration::from_millis(10),
3173                jitter: Duration::from_millis(1),
3174                success_rate: 1.0,
3175            };
3176            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3177
3178            // Create engines
3179            let elector = L::default();
3180            let mut engines = Vec::new();
3181            let relay = Arc::new(mocks::relay::Relay::new());
3182            let mut reporters = Vec::new();
3183            for (idx_scheme, validator) in participants.iter().enumerate() {
3184                // Create scheme context
3185                let context = context.with_label(&format!("validator_{}", *validator));
3186
3187                // Start engine
3188                let reporter_config = mocks::reporter::Config {
3189                    participants: participants.clone().try_into().unwrap(),
3190                    scheme: schemes[idx_scheme].clone(),
3191                    elector: elector.clone(),
3192                };
3193                let reporter =
3194                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3195                reporters.push(reporter.clone());
3196                let (pending, recovered, resolver) = registrations
3197                    .remove(validator)
3198                    .expect("validator should be registered");
3199                if idx_scheme == 0 {
3200                    let cfg = mocks::equivocator::Config {
3201                        scheme: schemes[idx_scheme].clone(),
3202                        epoch: Epoch::new(333),
3203                        relay: relay.clone(),
3204                        hasher: Sha256::default(),
3205                        elector: elector.clone(),
3206                    };
3207
3208                    let engine = mocks::equivocator::Equivocator::new(
3209                        context.with_label("byzantine_engine"),
3210                        cfg,
3211                    );
3212                    engines.push(engine.start(pending, recovered));
3213                } else {
3214                    let application_cfg = mocks::application::Config {
3215                        hasher: Sha256::default(),
3216                        relay: relay.clone(),
3217                        me: validator.clone(),
3218                        propose_latency: (10.0, 5.0),
3219                        verify_latency: (10.0, 5.0),
3220                        certify_latency: (10.0, 5.0),
3221                        should_certify: mocks::application::Certifier::Sometimes,
3222                    };
3223                    let (actor, application) = mocks::application::Application::new(
3224                        context.with_label("application"),
3225                        application_cfg,
3226                    );
3227                    actor.start();
3228                    let blocker = oracle.control(validator.clone());
3229                    let cfg = config::Config {
3230                        scheme: schemes[idx_scheme].clone(),
3231                        elector: elector.clone(),
3232                        blocker,
3233                        automaton: application.clone(),
3234                        relay: application.clone(),
3235                        reporter: reporter.clone(),
3236                        strategy: Sequential,
3237                        partition: validator.to_string(),
3238                        mailbox_size: 1024,
3239                        epoch: Epoch::new(333),
3240                        leader_timeout: Duration::from_secs(1),
3241                        notarization_timeout: Duration::from_secs(2),
3242                        nullify_retry: Duration::from_secs(10),
3243                        fetch_timeout: Duration::from_secs(1),
3244                        activity_timeout,
3245                        skip_timeout,
3246                        fetch_concurrent: 4,
3247                        replay_buffer: NZUsize!(1024 * 1024),
3248                        write_buffer: NZUsize!(1024 * 1024),
3249                        page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3250                    };
3251                    let engine = Engine::new(context.with_label("engine"), cfg);
3252                    engines.push(engine.start(pending, recovered, resolver));
3253                }
3254            }
3255
3256            // Wait for all engines to hit required containers
3257            let mut finalizers = Vec::new();
3258            for reporter in reporters.iter_mut().skip(1) {
3259                let (mut latest, mut monitor) = reporter.subscribe().await;
3260                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3261                    while latest < required_containers {
3262                        latest = monitor.recv().await.expect("event missing");
3263                    }
3264                }));
3265            }
3266            join_all(finalizers).await;
3267
3268            // Abort a validator
3269            let idx = context.gen_range(1..engines.len()); // skip byzantine validator
3270            let validator = &participants[idx];
3271            let handle = engines.remove(idx);
3272            handle.abort();
3273            let _ = handle.await;
3274            reporters.remove(idx);
3275            info!(idx, ?validator, "aborted validator");
3276
3277            // Wait for all engines to hit required containers
3278            let mut finalizers = Vec::new();
3279            for reporter in reporters.iter_mut().skip(1) {
3280                let (mut latest, mut monitor) = reporter.subscribe().await;
3281                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3282                    while latest < View::new(required_containers.get() * 2) {
3283                        latest = monitor.recv().await.expect("event missing");
3284                    }
3285                }));
3286            }
3287            join_all(finalizers).await;
3288
3289            // Recreate engine
3290            info!(idx, ?validator, "restarting validator");
3291            let context = context.with_label(&format!("validator_{}_restarted", *validator));
3292
3293            // Start engine
3294            let reporter_config = mocks::reporter::Config {
3295                participants: participants.clone().try_into().unwrap(),
3296                scheme: schemes[idx].clone(),
3297                elector: elector.clone(),
3298            };
3299            let reporter =
3300                mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3301            let (pending, recovered, resolver) =
3302                register_validator(&mut oracle, validator.clone()).await;
3303            reporters.push(reporter.clone());
3304            let application_cfg = mocks::application::Config {
3305                hasher: Sha256::default(),
3306                relay: relay.clone(),
3307                me: validator.clone(),
3308                propose_latency: (10.0, 5.0),
3309                verify_latency: (10.0, 5.0),
3310                certify_latency: (10.0, 5.0),
3311                should_certify: mocks::application::Certifier::Sometimes,
3312            };
3313            let (actor, application) = mocks::application::Application::new(
3314                context.with_label("application"),
3315                application_cfg,
3316            );
3317            actor.start();
3318            let blocker = oracle.control(validator.clone());
3319            let cfg = config::Config {
3320                scheme: schemes[idx].clone(),
3321                elector: elector.clone(),
3322                blocker,
3323                automaton: application.clone(),
3324                relay: application.clone(),
3325                reporter: reporter.clone(),
3326                strategy: Sequential,
3327                partition: validator.to_string(),
3328                mailbox_size: 1024,
3329                epoch: Epoch::new(333),
3330                leader_timeout: Duration::from_secs(1),
3331                notarization_timeout: Duration::from_secs(2),
3332                nullify_retry: Duration::from_secs(10),
3333                fetch_timeout: Duration::from_secs(1),
3334                activity_timeout,
3335                skip_timeout,
3336                fetch_concurrent: 4,
3337                replay_buffer: NZUsize!(1024 * 1024),
3338                write_buffer: NZUsize!(1024 * 1024),
3339                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3340            };
3341            let engine = Engine::new(context.with_label("engine"), cfg);
3342            engine.start(pending, recovered, resolver);
3343
3344            // Wait for all engines to hit required containers
3345            let mut finalizers = Vec::new();
3346            for reporter in reporters.iter_mut().skip(1) {
3347                let (mut latest, mut monitor) = reporter.subscribe().await;
3348                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3349                    while latest < View::new(required_containers.get() * 3) {
3350                        latest = monitor.recv().await.expect("event missing");
3351                    }
3352                }));
3353            }
3354            join_all(finalizers).await;
3355
3356            // Check equivocator blocking (we aren't guaranteed a fault will be produced
3357            // because it may not be possible to extract a conflicting vote from the certificate
3358            // we receive)
3359            let byz = &participants[0];
3360            let blocked = oracle.blocked().await.unwrap();
3361            for (a, b) in &blocked {
3362                assert_ne!(a, byz);
3363                assert_eq!(b, byz);
3364            }
3365            !blocked.is_empty()
3366        })
3367    }
3368
3369    #[test_group("slow")]
3370    #[test_traced]
3371    fn test_equivocator_bls12381_threshold_vrf_min_pk() {
3372        let detected = (0..5).any(|seed| {
3373            equivocator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>)
3374        });
3375        assert!(
3376            detected,
3377            "expected at least one seed to detect equivocation"
3378        );
3379    }
3380
3381    #[test_group("slow")]
3382    #[test_traced]
3383    fn test_equivocator_bls12381_threshold_vrf_min_sig() {
3384        let detected = (0..5).any(|seed| {
3385            equivocator::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>)
3386        });
3387        assert!(
3388            detected,
3389            "expected at least one seed to detect equivocation"
3390        );
3391    }
3392
3393    #[test_group("slow")]
3394    #[test_traced]
3395    fn test_equivocator_bls12381_threshold_std_min_pk() {
3396        let detected = (0..5).any(|seed| {
3397            equivocator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>)
3398        });
3399        assert!(
3400            detected,
3401            "expected at least one seed to detect equivocation"
3402        );
3403    }
3404
3405    #[test_group("slow")]
3406    #[test_traced]
3407    fn test_equivocator_bls12381_threshold_std_min_sig() {
3408        let detected = (0..5).any(|seed| {
3409            equivocator::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>)
3410        });
3411        assert!(
3412            detected,
3413            "expected at least one seed to detect equivocation"
3414        );
3415    }
3416
3417    #[test_group("slow")]
3418    #[test_traced]
3419    fn test_equivocator_bls12381_multisig_min_pk() {
3420        let detected = (0..5).any(|seed| {
3421            equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>)
3422        });
3423        assert!(
3424            detected,
3425            "expected at least one seed to detect equivocation"
3426        );
3427    }
3428
3429    #[test_group("slow")]
3430    #[test_traced]
3431    fn test_equivocator_bls12381_multisig_min_sig() {
3432        let detected = (0..5).any(|seed| {
3433            equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>)
3434        });
3435        assert!(
3436            detected,
3437            "expected at least one seed to detect equivocation"
3438        );
3439    }
3440
3441    #[test_group("slow")]
3442    #[test_traced]
3443    fn test_equivocator_ed25519() {
3444        let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, ed25519::fixture));
3445        assert!(
3446            detected,
3447            "expected at least one seed to detect equivocation"
3448        );
3449    }
3450
3451    #[test_group("slow")]
3452    #[test_traced]
3453    fn test_equivocator_secp256r1() {
3454        let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, secp256r1::fixture));
3455        assert!(
3456            detected,
3457            "expected at least one seed to detect equivocation"
3458        );
3459    }
3460
3461    fn reconfigurer<S, F, L>(seed: u64, mut fixture: F)
3462    where
3463        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3464        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3465        L: Elector<S>,
3466    {
3467        // Create context
3468        let n = 4;
3469        let required_containers = View::new(50);
3470        let activity_timeout = ViewDelta::new(10);
3471        let skip_timeout = ViewDelta::new(5);
3472        let namespace = b"consensus".to_vec();
3473        let cfg = deterministic::Config::new()
3474            .with_seed(seed)
3475            .with_timeout(Some(Duration::from_secs(30)));
3476        let executor = deterministic::Runner::new(cfg);
3477        executor.start(|mut context| async move {
3478            // Create simulated network
3479            let (network, mut oracle) = Network::new(
3480                context.with_label("network"),
3481                Config {
3482                    max_size: 1024 * 1024,
3483                    disconnect_on_block: false,
3484                    tracked_peer_sets: None,
3485                },
3486            );
3487
3488            // Start network
3489            network.start();
3490
3491            // Register participants
3492            let Fixture {
3493                participants,
3494                schemes,
3495                ..
3496            } = fixture(&mut context, &namespace, n);
3497            let mut registrations = register_validators(&mut oracle, &participants).await;
3498
3499            // Link all validators
3500            let link = Link {
3501                latency: Duration::from_millis(10),
3502                jitter: Duration::from_millis(1),
3503                success_rate: 1.0,
3504            };
3505            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3506
3507            // Create engines
3508            let elector = L::default();
3509            let relay = Arc::new(mocks::relay::Relay::new());
3510            let mut reporters = Vec::new();
3511            for (idx_scheme, validator) in participants.iter().enumerate() {
3512                // Create scheme context
3513                let context = context.with_label(&format!("validator_{}", *validator));
3514
3515                // Start engine
3516                let reporter_config = mocks::reporter::Config {
3517                    participants: participants.clone().try_into().unwrap(),
3518                    scheme: schemes[idx_scheme].clone(),
3519                    elector: elector.clone(),
3520                };
3521                let reporter =
3522                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3523                let (pending, recovered, resolver) = registrations
3524                    .remove(validator)
3525                    .expect("validator should be registered");
3526                if idx_scheme == 0 {
3527                    let cfg = mocks::reconfigurer::Config {
3528                        scheme: schemes[idx_scheme].clone(),
3529                    };
3530                    let engine: mocks::reconfigurer::Reconfigurer<_, _, Sha256> =
3531                        mocks::reconfigurer::Reconfigurer::new(
3532                            context.with_label("byzantine_engine"),
3533                            cfg,
3534                        );
3535                    engine.start(pending);
3536                } else {
3537                    reporters.push(reporter.clone());
3538                    let application_cfg = mocks::application::Config {
3539                        hasher: Sha256::default(),
3540                        relay: relay.clone(),
3541                        me: validator.clone(),
3542                        propose_latency: (10.0, 5.0),
3543                        verify_latency: (10.0, 5.0),
3544                        certify_latency: (10.0, 5.0),
3545                        should_certify: mocks::application::Certifier::Sometimes,
3546                    };
3547                    let (actor, application) = mocks::application::Application::new(
3548                        context.with_label("application"),
3549                        application_cfg,
3550                    );
3551                    actor.start();
3552                    let blocker = oracle.control(validator.clone());
3553                    let cfg = config::Config {
3554                        scheme: schemes[idx_scheme].clone(),
3555                        elector: elector.clone(),
3556                        blocker,
3557                        automaton: application.clone(),
3558                        relay: application.clone(),
3559                        reporter: reporter.clone(),
3560                        strategy: Sequential,
3561                        partition: validator.to_string(),
3562                        mailbox_size: 1024,
3563                        epoch: Epoch::new(333),
3564                        leader_timeout: Duration::from_secs(1),
3565                        notarization_timeout: Duration::from_secs(2),
3566                        nullify_retry: Duration::from_secs(10),
3567                        fetch_timeout: Duration::from_secs(1),
3568                        activity_timeout,
3569                        skip_timeout,
3570                        fetch_concurrent: 4,
3571                        replay_buffer: NZUsize!(1024 * 1024),
3572                        write_buffer: NZUsize!(1024 * 1024),
3573                        page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3574                    };
3575                    let engine = Engine::new(context.with_label("engine"), cfg);
3576                    engine.start(pending, recovered, resolver);
3577                }
3578            }
3579
3580            // Wait for all engines to finish
3581            let mut finalizers = Vec::new();
3582            for reporter in reporters.iter_mut() {
3583                let (mut latest, mut monitor) = reporter.subscribe().await;
3584                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3585                    while latest < required_containers {
3586                        latest = monitor.recv().await.expect("event missing");
3587                    }
3588                }));
3589            }
3590            join_all(finalizers).await;
3591
3592            // Check reporters for correct activity
3593            let byz = &participants[0];
3594            for reporter in reporters.iter() {
3595                // Ensure no faults
3596                {
3597                    let faults = reporter.faults.lock().unwrap();
3598                    assert!(faults.is_empty());
3599                }
3600
3601                // Ensure no invalid signatures
3602                {
3603                    let invalid = reporter.invalid.lock().unwrap();
3604                    assert_eq!(*invalid, 0);
3605                }
3606            }
3607
3608            // Ensure reconfigurer is blocked (epoch mismatch)
3609            let blocked = oracle.blocked().await.unwrap();
3610            assert!(!blocked.is_empty());
3611            for (a, b) in blocked {
3612                assert_ne!(&a, byz);
3613                assert_eq!(&b, byz);
3614            }
3615        });
3616    }
3617
3618    #[test_group("slow")]
3619    #[test_traced]
3620    fn test_reconfigurer() {
3621        for seed in 0..5 {
3622            reconfigurer::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3623            reconfigurer::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3624            reconfigurer::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3625            reconfigurer::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3626            reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3627            reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3628            reconfigurer::<_, _, RoundRobin>(seed, ed25519::fixture);
3629            reconfigurer::<_, _, RoundRobin>(seed, secp256r1::fixture);
3630        }
3631    }
3632
3633    fn nuller<S, F, L>(seed: u64, mut fixture: F)
3634    where
3635        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3636        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3637        L: Elector<S>,
3638    {
3639        // Create context
3640        let n = 4;
3641        let required_containers = View::new(50);
3642        let activity_timeout = ViewDelta::new(10);
3643        let skip_timeout = ViewDelta::new(5);
3644        let namespace = b"consensus".to_vec();
3645        let cfg = deterministic::Config::new()
3646            .with_seed(seed)
3647            .with_timeout(Some(Duration::from_secs(30)));
3648        let executor = deterministic::Runner::new(cfg);
3649        executor.start(|mut context| async move {
3650            // Create simulated network
3651            let (network, mut oracle) = Network::new(
3652                context.with_label("network"),
3653                Config {
3654                    max_size: 1024 * 1024,
3655                    disconnect_on_block: false,
3656                    tracked_peer_sets: None,
3657                },
3658            );
3659
3660            // Start network
3661            network.start();
3662
3663            // Register participants
3664            let Fixture {
3665                participants,
3666                schemes,
3667                ..
3668            } = fixture(&mut context, &namespace, n);
3669            let mut registrations = register_validators(&mut oracle, &participants).await;
3670
3671            // Link all validators
3672            let link = Link {
3673                latency: Duration::from_millis(10),
3674                jitter: Duration::from_millis(1),
3675                success_rate: 1.0,
3676            };
3677            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3678
3679            // Create engines
3680            let elector = L::default();
3681            let relay = Arc::new(mocks::relay::Relay::new());
3682            let mut reporters = Vec::new();
3683            for (idx_scheme, validator) in participants.iter().enumerate() {
3684                // Create scheme context
3685                let context = context.with_label(&format!("validator_{}", *validator));
3686
3687                // Start engine
3688                let reporter_config = mocks::reporter::Config {
3689                    participants: participants.clone().try_into().unwrap(),
3690                    scheme: schemes[idx_scheme].clone(),
3691                    elector: elector.clone(),
3692                };
3693                let reporter =
3694                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3695                let (pending, recovered, resolver) = registrations
3696                    .remove(validator)
3697                    .expect("validator should be registered");
3698                if idx_scheme == 0 {
3699                    let cfg = mocks::nuller::Config {
3700                        scheme: schemes[idx_scheme].clone(),
3701                    };
3702                    let engine: mocks::nuller::Nuller<_, _, Sha256> =
3703                        mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
3704                    engine.start(pending);
3705                } else {
3706                    reporters.push(reporter.clone());
3707                    let application_cfg = mocks::application::Config {
3708                        hasher: Sha256::default(),
3709                        relay: relay.clone(),
3710                        me: validator.clone(),
3711                        propose_latency: (10.0, 5.0),
3712                        verify_latency: (10.0, 5.0),
3713                        certify_latency: (10.0, 5.0),
3714                        should_certify: mocks::application::Certifier::Sometimes,
3715                    };
3716                    let (actor, application) = mocks::application::Application::new(
3717                        context.with_label("application"),
3718                        application_cfg,
3719                    );
3720                    actor.start();
3721                    let blocker = oracle.control(validator.clone());
3722                    let cfg = config::Config {
3723                        scheme: schemes[idx_scheme].clone(),
3724                        elector: elector.clone(),
3725                        blocker,
3726                        automaton: application.clone(),
3727                        relay: application.clone(),
3728                        reporter: reporter.clone(),
3729                        strategy: Sequential,
3730                        partition: validator.clone().to_string(),
3731                        mailbox_size: 1024,
3732                        epoch: Epoch::new(333),
3733                        leader_timeout: Duration::from_secs(1),
3734                        notarization_timeout: Duration::from_secs(2),
3735                        nullify_retry: Duration::from_secs(10),
3736                        fetch_timeout: Duration::from_secs(1),
3737                        activity_timeout,
3738                        skip_timeout,
3739                        fetch_concurrent: 4,
3740                        replay_buffer: NZUsize!(1024 * 1024),
3741                        write_buffer: NZUsize!(1024 * 1024),
3742                        page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3743                    };
3744                    let engine = Engine::new(context.with_label("engine"), cfg);
3745                    engine.start(pending, recovered, resolver);
3746                }
3747            }
3748
3749            // Wait for all engines to finish
3750            let mut finalizers = Vec::new();
3751            for reporter in reporters.iter_mut() {
3752                let (mut latest, mut monitor) = reporter.subscribe().await;
3753                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3754                    while latest < required_containers {
3755                        latest = monitor.recv().await.expect("event missing");
3756                    }
3757                }));
3758            }
3759            join_all(finalizers).await;
3760
3761            // Check reporters for correct activity
3762            let byz = &participants[0];
3763            let mut count_nullify_and_finalize = 0;
3764            for reporter in reporters.iter() {
3765                // Ensure only faults for byz
3766                {
3767                    let faults = reporter.faults.lock().unwrap();
3768                    assert_eq!(faults.len(), 1);
3769                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
3770                    for (_, faults) in faulter.iter() {
3771                        for fault in faults.iter() {
3772                            match fault {
3773                                Activity::NullifyFinalize(_) => {
3774                                    count_nullify_and_finalize += 1;
3775                                }
3776                                _ => panic!("unexpected fault: {fault:?}"),
3777                            }
3778                        }
3779                    }
3780                }
3781
3782                // Ensure no invalid signatures
3783                {
3784                    let invalid = reporter.invalid.lock().unwrap();
3785                    assert_eq!(*invalid, 0);
3786                }
3787            }
3788            assert!(count_nullify_and_finalize > 0);
3789
3790            // Ensure nullifier is blocked
3791            let blocked = oracle.blocked().await.unwrap();
3792            assert!(!blocked.is_empty());
3793            for (a, b) in blocked {
3794                assert_ne!(&a, byz);
3795                assert_eq!(&b, byz);
3796            }
3797        });
3798    }
3799
3800    #[test_group("slow")]
3801    #[test_traced]
3802    fn test_nuller() {
3803        for seed in 0..5 {
3804            nuller::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3805            nuller::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3806            nuller::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3807            nuller::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3808            nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3809            nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3810            nuller::<_, _, RoundRobin>(seed, ed25519::fixture);
3811            nuller::<_, _, RoundRobin>(seed, secp256r1::fixture);
3812        }
3813    }
3814
3815    fn outdated<S, F, L>(seed: u64, mut fixture: F)
3816    where
3817        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3818        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3819        L: Elector<S>,
3820    {
3821        // Create context
3822        let n = 4;
3823        let required_containers = View::new(100);
3824        let activity_timeout = ViewDelta::new(10);
3825        let skip_timeout = ViewDelta::new(5);
3826        let namespace = b"consensus".to_vec();
3827        let cfg = deterministic::Config::new()
3828            .with_seed(seed)
3829            .with_timeout(Some(Duration::from_secs(30)));
3830        let executor = deterministic::Runner::new(cfg);
3831        executor.start(|mut context| async move {
3832            // Create simulated network
3833            let (network, mut oracle) = Network::new(
3834                context.with_label("network"),
3835                Config {
3836                    max_size: 1024 * 1024,
3837                    disconnect_on_block: false,
3838                    tracked_peer_sets: None,
3839                },
3840            );
3841
3842            // Start network
3843            network.start();
3844
3845            // Register participants
3846            let Fixture {
3847                participants,
3848                schemes,
3849                ..
3850            } = fixture(&mut context, &namespace, n);
3851            let mut registrations = register_validators(&mut oracle, &participants).await;
3852
3853            // Link all validators
3854            let link = Link {
3855                latency: Duration::from_millis(10),
3856                jitter: Duration::from_millis(1),
3857                success_rate: 1.0,
3858            };
3859            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3860
3861            // Create engines
3862            let elector = L::default();
3863            let relay = Arc::new(mocks::relay::Relay::new());
3864            let mut reporters = Vec::new();
3865            for (idx_scheme, validator) in participants.iter().enumerate() {
3866                // Create scheme context
3867                let context = context.with_label(&format!("validator_{}", *validator));
3868
3869                // Start engine
3870                let reporter_config = mocks::reporter::Config {
3871                    participants: participants.clone().try_into().unwrap(),
3872                    scheme: schemes[idx_scheme].clone(),
3873                    elector: elector.clone(),
3874                };
3875                let reporter =
3876                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3877                let (pending, recovered, resolver) = registrations
3878                    .remove(validator)
3879                    .expect("validator should be registered");
3880                if idx_scheme == 0 {
3881                    let cfg = mocks::outdated::Config {
3882                        scheme: schemes[idx_scheme].clone(),
3883                        view_delta: ViewDelta::new(activity_timeout.get().saturating_mul(4)),
3884                    };
3885                    let engine: mocks::outdated::Outdated<_, _, Sha256> =
3886                        mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
3887                    engine.start(pending);
3888                } else {
3889                    reporters.push(reporter.clone());
3890                    let application_cfg = mocks::application::Config {
3891                        hasher: Sha256::default(),
3892                        relay: relay.clone(),
3893                        me: validator.clone(),
3894                        propose_latency: (10.0, 5.0),
3895                        verify_latency: (10.0, 5.0),
3896                        certify_latency: (10.0, 5.0),
3897                        should_certify: mocks::application::Certifier::Sometimes,
3898                    };
3899                    let (actor, application) = mocks::application::Application::new(
3900                        context.with_label("application"),
3901                        application_cfg,
3902                    );
3903                    actor.start();
3904                    let blocker = oracle.control(validator.clone());
3905                    let cfg = config::Config {
3906                        scheme: schemes[idx_scheme].clone(),
3907                        elector: elector.clone(),
3908                        blocker,
3909                        automaton: application.clone(),
3910                        relay: application.clone(),
3911                        reporter: reporter.clone(),
3912                        strategy: Sequential,
3913                        partition: validator.clone().to_string(),
3914                        mailbox_size: 1024,
3915                        epoch: Epoch::new(333),
3916                        leader_timeout: Duration::from_secs(1),
3917                        notarization_timeout: Duration::from_secs(2),
3918                        nullify_retry: Duration::from_secs(10),
3919                        fetch_timeout: Duration::from_secs(1),
3920                        activity_timeout,
3921                        skip_timeout,
3922                        fetch_concurrent: 4,
3923                        replay_buffer: NZUsize!(1024 * 1024),
3924                        write_buffer: NZUsize!(1024 * 1024),
3925                        page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3926                    };
3927                    let engine = Engine::new(context.with_label("engine"), cfg);
3928                    engine.start(pending, recovered, resolver);
3929                }
3930            }
3931
3932            // Wait for all engines to finish
3933            let mut finalizers = Vec::new();
3934            for reporter in reporters.iter_mut() {
3935                let (mut latest, mut monitor) = reporter.subscribe().await;
3936                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3937                    while latest < required_containers {
3938                        latest = monitor.recv().await.expect("event missing");
3939                    }
3940                }));
3941            }
3942            join_all(finalizers).await;
3943
3944            // Check reporters for correct activity
3945            for reporter in reporters.iter() {
3946                // Ensure no faults
3947                {
3948                    let faults = reporter.faults.lock().unwrap();
3949                    assert!(faults.is_empty());
3950                }
3951
3952                // Ensure no invalid signatures
3953                {
3954                    let invalid = reporter.invalid.lock().unwrap();
3955                    assert_eq!(*invalid, 0);
3956                }
3957            }
3958
3959            // Ensure no blocked connections
3960            let blocked = oracle.blocked().await.unwrap();
3961            assert!(blocked.is_empty());
3962        });
3963    }
3964
3965    #[test_group("slow")]
3966    #[test_traced]
3967    fn test_outdated() {
3968        for seed in 0..5 {
3969            outdated::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinPk, _>);
3970            outdated::<_, _, Random>(seed, bls12381_threshold_vrf::fixture::<MinSig, _>);
3971            outdated::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinPk, _>);
3972            outdated::<_, _, RoundRobin>(seed, bls12381_threshold_std::fixture::<MinSig, _>);
3973            outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3974            outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3975            outdated::<_, _, RoundRobin>(seed, ed25519::fixture);
3976            outdated::<_, _, RoundRobin>(seed, secp256r1::fixture);
3977        }
3978    }
3979
3980    fn run_1k<S, F, L>(mut fixture: F)
3981    where
3982        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3983        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
3984        L: Elector<S>,
3985    {
3986        // Create context
3987        let n = 10;
3988        let required_containers = View::new(1_000);
3989        let activity_timeout = ViewDelta::new(10);
3990        let skip_timeout = ViewDelta::new(5);
3991        let namespace = b"consensus".to_vec();
3992        let cfg = deterministic::Config::new();
3993        let executor = deterministic::Runner::new(cfg);
3994        executor.start(|mut context| async move {
3995            // Create simulated network
3996            let (network, mut oracle) = Network::new(
3997                context.with_label("network"),
3998                Config {
3999                    max_size: 1024 * 1024,
4000                    disconnect_on_block: false,
4001                    tracked_peer_sets: None,
4002                },
4003            );
4004
4005            // Start network
4006            network.start();
4007
4008            // Register participants
4009            let Fixture {
4010                participants,
4011                schemes,
4012                ..
4013            } = fixture(&mut context, &namespace, n);
4014            let mut registrations = register_validators(&mut oracle, &participants).await;
4015
4016            // Link all validators
4017            let link = Link {
4018                latency: Duration::from_millis(80),
4019                jitter: Duration::from_millis(10),
4020                success_rate: 0.98,
4021            };
4022            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4023
4024            // Create engines
4025            let elector = L::default();
4026            let relay = Arc::new(mocks::relay::Relay::new());
4027            let mut reporters = Vec::new();
4028            let mut engine_handlers = Vec::new();
4029            for (idx, validator) in participants.iter().enumerate() {
4030                // Create scheme context
4031                let context = context.with_label(&format!("validator_{}", *validator));
4032
4033                // Configure engine
4034                let reporter_config = mocks::reporter::Config {
4035                    participants: participants.clone().try_into().unwrap(),
4036                    scheme: schemes[idx].clone(),
4037                    elector: elector.clone(),
4038                };
4039                let reporter =
4040                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
4041                reporters.push(reporter.clone());
4042                let application_cfg = mocks::application::Config {
4043                    hasher: Sha256::default(),
4044                    relay: relay.clone(),
4045                    me: validator.clone(),
4046                    propose_latency: (100.0, 50.0),
4047                    verify_latency: (50.0, 40.0),
4048                    certify_latency: (50.0, 40.0),
4049                    should_certify: mocks::application::Certifier::Sometimes,
4050                };
4051                let (actor, application) = mocks::application::Application::new(
4052                    context.with_label("application"),
4053                    application_cfg,
4054                );
4055                actor.start();
4056                let blocker = oracle.control(validator.clone());
4057                let cfg = config::Config {
4058                    scheme: schemes[idx].clone(),
4059                    elector: elector.clone(),
4060                    blocker,
4061                    automaton: application.clone(),
4062                    relay: application.clone(),
4063                    reporter: reporter.clone(),
4064                    strategy: Sequential,
4065                    partition: validator.to_string(),
4066                    mailbox_size: 1024,
4067                    epoch: Epoch::new(333),
4068                    leader_timeout: Duration::from_secs(1),
4069                    notarization_timeout: Duration::from_secs(2),
4070                    nullify_retry: Duration::from_secs(10),
4071                    fetch_timeout: Duration::from_secs(1),
4072                    activity_timeout,
4073                    skip_timeout,
4074                    fetch_concurrent: 4,
4075                    replay_buffer: NZUsize!(1024 * 1024),
4076                    write_buffer: NZUsize!(1024 * 1024),
4077                    page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4078                };
4079                let engine = Engine::new(context.with_label("engine"), cfg);
4080
4081                // Start engine
4082                let (pending, recovered, resolver) = registrations
4083                    .remove(validator)
4084                    .expect("validator should be registered");
4085                engine_handlers.push(engine.start(pending, recovered, resolver));
4086            }
4087
4088            // Wait for all engines to finish
4089            let mut finalizers = Vec::new();
4090            for reporter in reporters.iter_mut() {
4091                let (mut latest, mut monitor) = reporter.subscribe().await;
4092                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4093                    while latest < required_containers {
4094                        latest = monitor.recv().await.expect("event missing");
4095                    }
4096                }));
4097            }
4098            join_all(finalizers).await;
4099
4100            // Check reporters for correct activity
4101            for reporter in reporters.iter() {
4102                // Ensure no faults
4103                {
4104                    let faults = reporter.faults.lock().unwrap();
4105                    assert!(faults.is_empty());
4106                }
4107
4108                // Ensure no invalid signatures
4109                {
4110                    let invalid = reporter.invalid.lock().unwrap();
4111                    assert_eq!(*invalid, 0);
4112                }
4113            }
4114
4115            // Ensure no blocked connections
4116            let blocked = oracle.blocked().await.unwrap();
4117            assert!(blocked.is_empty());
4118        })
4119    }
4120
4121    #[test_group("slow")]
4122    #[test_traced]
4123    fn test_1k_bls12381_threshold_vrf_min_pk() {
4124        run_1k::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
4125    }
4126
4127    #[test_group("slow")]
4128    #[test_traced]
4129    fn test_1k_bls12381_threshold_vrf_min_sig() {
4130        run_1k::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
4131    }
4132
4133    #[test_group("slow")]
4134    #[test_traced]
4135    fn test_1k_bls12381_threshold_std_min_pk() {
4136        run_1k::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
4137    }
4138
4139    #[test_group("slow")]
4140    #[test_traced]
4141    fn test_1k_bls12381_threshold_std_min_sig() {
4142        run_1k::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
4143    }
4144
4145    #[test_group("slow")]
4146    #[test_traced]
4147    fn test_1k_bls12381_multisig_min_pk() {
4148        run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4149    }
4150
4151    #[test_group("slow")]
4152    #[test_traced]
4153    fn test_1k_bls12381_multisig_min_sig() {
4154        run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
4155    }
4156
4157    #[test_group("slow")]
4158    #[test_traced]
4159    fn test_1k_ed25519() {
4160        run_1k::<_, _, RoundRobin>(ed25519::fixture);
4161    }
4162
4163    #[test_group("slow")]
4164    #[test_traced]
4165    fn test_1k_secp256r1() {
4166        run_1k::<_, _, RoundRobin>(secp256r1::fixture);
4167    }
4168
4169    fn engine_shutdown<S, F, L>(seed: u64, mut fixture: F, graceful: bool)
4170    where
4171        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4172        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4173        L: Elector<S>,
4174    {
4175        let n = 1;
4176        let namespace = b"consensus".to_vec();
4177        let cfg = deterministic::Config::default()
4178            .with_seed(seed)
4179            .with_timeout(Some(Duration::from_secs(10)));
4180        let executor = deterministic::Runner::new(cfg);
4181        executor.start(|mut context| async move {
4182            // Create simulated network
4183            let (network, mut oracle) = Network::new(
4184                context.with_label("network"),
4185                Config {
4186                    max_size: 1024 * 1024,
4187                    disconnect_on_block: true,
4188                    tracked_peer_sets: None,
4189                },
4190            );
4191
4192            // Start network
4193            network.start();
4194
4195            // Register a single participant
4196            let Fixture {
4197                participants,
4198                schemes,
4199                ..
4200            } = fixture(&mut context, &namespace, n);
4201            let mut registrations = register_validators(&mut oracle, &participants).await;
4202
4203            // Link the single validator to itself (no-ops for completeness)
4204            let link = Link {
4205                latency: Duration::from_millis(1),
4206                jitter: Duration::from_millis(0),
4207                success_rate: 1.0,
4208            };
4209            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4210
4211            // Create engine
4212            let elector = L::default();
4213            let reporter_config = mocks::reporter::Config {
4214                participants: participants.clone().try_into().unwrap(),
4215                scheme: schemes[0].clone(),
4216                elector: elector.clone(),
4217            };
4218            let reporter =
4219                mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
4220            let relay = Arc::new(mocks::relay::Relay::new());
4221            let application_cfg = mocks::application::Config {
4222                hasher: Sha256::default(),
4223                relay: relay.clone(),
4224                me: participants[0].clone(),
4225                propose_latency: (1.0, 0.0),
4226                verify_latency: (1.0, 0.0),
4227                certify_latency: (1.0, 0.0),
4228                should_certify: mocks::application::Certifier::Sometimes,
4229            };
4230            let (actor, application) = mocks::application::Application::new(
4231                context.with_label("application"),
4232                application_cfg,
4233            );
4234            actor.start();
4235            let blocker = oracle.control(participants[0].clone());
4236            let cfg = config::Config {
4237                scheme: schemes[0].clone(),
4238                elector: elector.clone(),
4239                blocker,
4240                automaton: application.clone(),
4241                relay: application.clone(),
4242                reporter: reporter.clone(),
4243                strategy: Sequential,
4244                partition: participants[0].clone().to_string(),
4245                mailbox_size: 64,
4246                epoch: Epoch::new(333),
4247                leader_timeout: Duration::from_millis(50),
4248                notarization_timeout: Duration::from_millis(100),
4249                nullify_retry: Duration::from_millis(250),
4250                fetch_timeout: Duration::from_millis(50),
4251                activity_timeout: ViewDelta::new(4),
4252                skip_timeout: ViewDelta::new(2),
4253                fetch_concurrent: 4,
4254                replay_buffer: NZUsize!(1024 * 16),
4255                write_buffer: NZUsize!(1024 * 16),
4256                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4257            };
4258            let engine = Engine::new(context.with_label("engine"), cfg);
4259
4260            // Start engine
4261            let (pending, recovered, resolver) = registrations
4262                .remove(&participants[0])
4263                .expect("validator should be registered");
4264            let handle = engine.start(pending, recovered, resolver);
4265
4266            // Allow tasks to start
4267            context.sleep(Duration::from_millis(1000)).await;
4268
4269            // Count running tasks under the engine prefix
4270            let running_before = count_running_tasks(&context, "engine");
4271            assert!(
4272                running_before > 0,
4273                "at least one engine task should be running"
4274            );
4275
4276            // Make sure the engine is still running after some time
4277            context.sleep(Duration::from_millis(1500)).await;
4278            assert!(
4279                count_running_tasks(&context, "engine") > 0,
4280                "engine tasks should still be running"
4281            );
4282
4283            // Shutdown engine and ensure children stop
4284            let running_after = if graceful {
4285                let metrics_context = context.clone();
4286                let result = context.stop(0, Some(Duration::from_secs(5))).await;
4287                assert!(
4288                    result.is_ok(),
4289                    "graceful shutdown should complete: {result:?}"
4290                );
4291                count_running_tasks(&metrics_context, "engine")
4292            } else {
4293                handle.abort();
4294                let _ = handle.await; // ensure parent tear-down runs
4295
4296                // Give the runtime a tick to process aborts
4297                context.sleep(Duration::from_millis(1000)).await;
4298                count_running_tasks(&context, "engine")
4299            };
4300            assert_eq!(
4301                running_after, 0,
4302                "all engine tasks should be stopped, but {running_after} still running"
4303            );
4304        });
4305    }
4306
4307    #[test_traced]
4308    fn test_children_shutdown_on_engine_abort() {
4309        for seed in 0..10 {
4310            engine_shutdown::<_, _, Random>(
4311                seed,
4312                bls12381_threshold_vrf::fixture::<MinPk, _>,
4313                false,
4314            );
4315            engine_shutdown::<_, _, Random>(
4316                seed,
4317                bls12381_threshold_vrf::fixture::<MinSig, _>,
4318                false,
4319            );
4320            engine_shutdown::<_, _, RoundRobin>(
4321                seed,
4322                bls12381_threshold_std::fixture::<MinPk, _>,
4323                false,
4324            );
4325            engine_shutdown::<_, _, RoundRobin>(
4326                seed,
4327                bls12381_threshold_std::fixture::<MinSig, _>,
4328                false,
4329            );
4330            engine_shutdown::<_, _, RoundRobin>(
4331                seed,
4332                bls12381_multisig::fixture::<MinPk, _>,
4333                false,
4334            );
4335            engine_shutdown::<_, _, RoundRobin>(
4336                seed,
4337                bls12381_multisig::fixture::<MinSig, _>,
4338                false,
4339            );
4340            engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, false);
4341            engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, false);
4342        }
4343    }
4344
4345    #[test_traced]
4346    fn test_graceful_shutdown() {
4347        for seed in 0..10 {
4348            engine_shutdown::<_, _, Random>(
4349                seed,
4350                bls12381_threshold_vrf::fixture::<MinPk, _>,
4351                true,
4352            );
4353            engine_shutdown::<_, _, Random>(
4354                seed,
4355                bls12381_threshold_vrf::fixture::<MinSig, _>,
4356                true,
4357            );
4358            engine_shutdown::<_, _, RoundRobin>(
4359                seed,
4360                bls12381_threshold_std::fixture::<MinPk, _>,
4361                true,
4362            );
4363            engine_shutdown::<_, _, RoundRobin>(
4364                seed,
4365                bls12381_threshold_std::fixture::<MinSig, _>,
4366                true,
4367            );
4368            engine_shutdown::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>, true);
4369            engine_shutdown::<_, _, RoundRobin>(
4370                seed,
4371                bls12381_multisig::fixture::<MinSig, _>,
4372                true,
4373            );
4374            engine_shutdown::<_, _, RoundRobin>(seed, ed25519::fixture, true);
4375            engine_shutdown::<_, _, RoundRobin>(seed, secp256r1::fixture, true);
4376        }
4377    }
4378
4379    fn attributable_reporter_filtering<S, F, L>(mut fixture: F)
4380    where
4381        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4382        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4383        L: Elector<S>,
4384    {
4385        let n = 3;
4386        let required_containers = View::new(10);
4387        let activity_timeout = ViewDelta::new(10);
4388        let skip_timeout = ViewDelta::new(5);
4389        let namespace = b"consensus".to_vec();
4390        let executor = deterministic::Runner::timed(Duration::from_secs(30));
4391        executor.start(|mut context| async move {
4392            // Create simulated network
4393            let (network, mut oracle) = Network::new(
4394                context.with_label("network"),
4395                Config {
4396                    max_size: 1024 * 1024,
4397                    disconnect_on_block: false,
4398                    tracked_peer_sets: None,
4399                },
4400            );
4401            network.start();
4402
4403            // Register participants
4404            let Fixture {
4405                participants,
4406                schemes,
4407                ..
4408            } = fixture(&mut context, &namespace, n);
4409            let mut registrations = register_validators(&mut oracle, &participants).await;
4410
4411            // Link all validators
4412            let link = Link {
4413                latency: Duration::from_millis(10),
4414                jitter: Duration::from_millis(1),
4415                success_rate: 1.0,
4416            };
4417            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4418
4419            // Create engines with `AttributableReporter` wrapper
4420            let elector = L::default();
4421            let relay = Arc::new(mocks::relay::Relay::new());
4422            let mut reporters = Vec::new();
4423            for (idx, validator) in participants.iter().enumerate() {
4424                let context = context.with_label(&format!("validator_{}", *validator));
4425
4426                let reporter_config = mocks::reporter::Config {
4427                    participants: participants.clone().try_into().unwrap(),
4428                    scheme: schemes[idx].clone(),
4429                    elector: elector.clone(),
4430                };
4431                let mock_reporter = mocks::reporter::Reporter::new(
4432                    context.with_label("mock_reporter"),
4433                    reporter_config,
4434                );
4435
4436                // Wrap with `AttributableReporter`
4437                let attributable_reporter = scheme::reporter::AttributableReporter::new(
4438                    context.with_label("rng"),
4439                    schemes[idx].clone(),
4440                    mock_reporter.clone(),
4441                    Sequential,
4442                    true, // Enable verification
4443                );
4444                reporters.push(mock_reporter.clone());
4445
4446                let application_cfg = mocks::application::Config {
4447                    hasher: Sha256::default(),
4448                    relay: relay.clone(),
4449                    me: validator.clone(),
4450                    propose_latency: (10.0, 5.0),
4451                    verify_latency: (10.0, 5.0),
4452                    certify_latency: (10.0, 5.0),
4453                    should_certify: mocks::application::Certifier::Sometimes,
4454                };
4455                let (actor, application) = mocks::application::Application::new(
4456                    context.with_label("application"),
4457                    application_cfg,
4458                );
4459                actor.start();
4460                let blocker = oracle.control(validator.clone());
4461                let cfg = config::Config {
4462                    scheme: schemes[idx].clone(),
4463                    elector: elector.clone(),
4464                    blocker,
4465                    automaton: application.clone(),
4466                    relay: application.clone(),
4467                    reporter: attributable_reporter,
4468                    strategy: Sequential,
4469                    partition: validator.to_string(),
4470                    mailbox_size: 1024,
4471                    epoch: Epoch::new(333),
4472                    leader_timeout: Duration::from_secs(1),
4473                    notarization_timeout: Duration::from_secs(2),
4474                    nullify_retry: Duration::from_secs(10),
4475                    fetch_timeout: Duration::from_secs(1),
4476                    activity_timeout,
4477                    skip_timeout,
4478                    fetch_concurrent: 4,
4479                    replay_buffer: NZUsize!(1024 * 1024),
4480                    write_buffer: NZUsize!(1024 * 1024),
4481                    page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4482                };
4483                let engine = Engine::new(context.with_label("engine"), cfg);
4484
4485                // Start engine
4486                let (pending, recovered, resolver) = registrations
4487                    .remove(validator)
4488                    .expect("validator should be registered");
4489                engine.start(pending, recovered, resolver);
4490            }
4491
4492            // Wait for all engines to finish
4493            let mut finalizers = Vec::new();
4494            for reporter in reporters.iter_mut() {
4495                let (mut latest, mut monitor) = reporter.subscribe().await;
4496                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4497                    while latest < required_containers {
4498                        latest = monitor.recv().await.expect("event missing");
4499                    }
4500                }));
4501            }
4502            join_all(finalizers).await;
4503
4504            // Verify filtering behavior based on scheme attributability
4505            for reporter in reporters.iter() {
4506                // Ensure no faults (normal operation)
4507                {
4508                    let faults = reporter.faults.lock().unwrap();
4509                    assert!(faults.is_empty(), "No faults should be reported");
4510                }
4511
4512                // Ensure no invalid signatures
4513                {
4514                    let invalid = reporter.invalid.lock().unwrap();
4515                    assert_eq!(*invalid, 0, "No invalid signatures");
4516                }
4517
4518                // Check that we have certificates reported
4519                {
4520                    let notarizations = reporter.notarizations.lock().unwrap();
4521                    let finalizations = reporter.finalizations.lock().unwrap();
4522                    assert!(
4523                        !notarizations.is_empty() || !finalizations.is_empty(),
4524                        "Certificates should be reported"
4525                    );
4526                }
4527
4528                // Check notarizes
4529                let notarizes = reporter.notarizes.lock().unwrap();
4530                let last_view = notarizes.keys().max().cloned().unwrap_or_default();
4531                for (view, payloads) in notarizes.iter() {
4532                    if *view == last_view {
4533                        continue; // Skip last view
4534                    }
4535
4536                    let signers: usize = payloads.values().map(|signers| signers.len()).sum();
4537
4538                    // For attributable schemes, we should see peer activities
4539                    if S::is_attributable() {
4540                        assert!(signers > 1, "view {view}: {signers}");
4541                    } else {
4542                        // For non-attributable, we shouldn't see any peer activities
4543                        assert_eq!(signers, 0);
4544                    }
4545                }
4546
4547                // Check finalizes
4548                let finalizes = reporter.finalizes.lock().unwrap();
4549                for (_, payloads) in finalizes.iter() {
4550                    let signers: usize = payloads.values().map(|signers| signers.len()).sum();
4551
4552                    // For attributable schemes, we should see peer activities
4553                    if S::is_attributable() {
4554                        assert!(signers > 1);
4555                    } else {
4556                        // For non-attributable, we shouldn't see any peer activities
4557                        assert_eq!(signers, 0);
4558                    }
4559                }
4560            }
4561
4562            // Ensure no blocked connections (normal operation)
4563            let blocked = oracle.blocked().await.unwrap();
4564            assert!(blocked.is_empty());
4565        });
4566    }
4567
4568    #[test_traced]
4569    fn test_attributable_reporter_filtering() {
4570        attributable_reporter_filtering::<_, _, Random>(
4571            bls12381_threshold_vrf::fixture::<MinPk, _>,
4572        );
4573        attributable_reporter_filtering::<_, _, Random>(
4574            bls12381_threshold_vrf::fixture::<MinSig, _>,
4575        );
4576        attributable_reporter_filtering::<_, _, RoundRobin>(
4577            bls12381_threshold_std::fixture::<MinPk, _>,
4578        );
4579        attributable_reporter_filtering::<_, _, RoundRobin>(
4580            bls12381_threshold_std::fixture::<MinSig, _>,
4581        );
4582        attributable_reporter_filtering::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4583        attributable_reporter_filtering::<_, _, RoundRobin>(
4584            bls12381_multisig::fixture::<MinSig, _>,
4585        );
4586        attributable_reporter_filtering::<_, _, RoundRobin>(ed25519::fixture);
4587        attributable_reporter_filtering::<_, _, RoundRobin>(secp256r1::fixture);
4588    }
4589
4590    fn split_views_no_lockup<S, F, L>(mut fixture: F)
4591    where
4592        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4593        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
4594        L: Elector<S>,
4595    {
4596        // Scenario:
4597        // - View F: Finalization of B_1 seen by all participants.
4598        // - View F+1:
4599        //   - Nullification seen by honest (4..=6,7) and all 3 byzantines
4600        //   - Notarization of B_2A seen by honest (1..=3)
4601        // - View F+2:
4602        //   - Nullification seen by honest (1..=3,7) and all 3 byzantines
4603        //   - Notarization of B_2B seen by honest (4..=6)
4604        // - View F+3: Nullification. Seen by all participants.
4605        // - Then ensure progress resumes beyond F+3 after reconnecting
4606
4607        // Define participant types
4608        enum ParticipantType {
4609            Group1,    // receives notarization for f+1, nullification for f+2
4610            Group2,    // receives nullification for f+1, notarization for f+2
4611            Ignorant,  // receives nullification for f+1 and f+2
4612            Byzantine, // nullify-only
4613        }
4614        let get_type = |idx: usize| -> ParticipantType {
4615            match idx {
4616                0..3 => ParticipantType::Group1,
4617                3..6 => ParticipantType::Group2,
4618                6 => ParticipantType::Ignorant,
4619                7..10 => ParticipantType::Byzantine,
4620                _ => unreachable!(),
4621            }
4622        };
4623
4624        // Create context
4625        let n = 10;
4626        let quorum = quorum(n) as usize;
4627        assert_eq!(quorum, 7);
4628        let activity_timeout = ViewDelta::new(10);
4629        let skip_timeout = ViewDelta::new(5);
4630        let namespace = b"consensus".to_vec();
4631        let executor = deterministic::Runner::timed(Duration::from_secs(300));
4632        executor.start(|mut context| async move {
4633            // Create simulated network
4634            let (network, mut oracle) = Network::new(
4635                context.with_label("network"),
4636                Config {
4637                    max_size: 1024 * 1024,
4638                    disconnect_on_block: false,
4639                    tracked_peer_sets: None,
4640                },
4641            );
4642            network.start();
4643
4644            // Register participants
4645            let Fixture {
4646                participants,
4647                schemes,
4648                ..
4649            } = fixture(&mut context, &namespace, n);
4650            let mut registrations = register_validators(&mut oracle, &participants).await;
4651
4652            // ========== Create engines ==========
4653
4654            // Do not link validators yet; we will inject certificates first, then link everyone.
4655
4656            // Create engines: 7 honest engines, 3 byzantine
4657            let elector = L::default();
4658            let relay = Arc::new(mocks::relay::Relay::new());
4659            let mut honest_reporters = Vec::new();
4660            for (idx, validator) in participants.iter().enumerate() {
4661                let (pending, recovered, resolver) = registrations
4662                    .remove(validator)
4663                    .expect("validator should be registered");
4664                let participant_type = get_type(idx);
4665                if matches!(participant_type, ParticipantType::Byzantine) {
4666                    // Byzantine engines
4667                    let cfg = mocks::nullify_only::Config {
4668                        scheme: schemes[idx].clone(),
4669                    };
4670                    let engine: mocks::nullify_only::NullifyOnly<_, _, Sha256> =
4671                        mocks::nullify_only::NullifyOnly::new(
4672                            context.with_label(&format!("byzantine_{}", *validator)),
4673                            cfg,
4674                        );
4675                    engine.start(pending);
4676                    // Recovered/resolver channels are unused for byzantine actors.
4677                    drop(recovered);
4678                    drop(resolver);
4679                } else {
4680                    // Honest engines
4681                    let reporter_config = mocks::reporter::Config {
4682                        participants: participants.clone().try_into().unwrap(),
4683                        scheme: schemes[idx].clone(),
4684                        elector: elector.clone(),
4685                    };
4686                    let reporter = mocks::reporter::Reporter::new(
4687                        context.with_label(&format!("reporter_{}", *validator)),
4688                        reporter_config,
4689                    );
4690                    honest_reporters.push(reporter.clone());
4691
4692                    let application_cfg = mocks::application::Config {
4693                        hasher: Sha256::default(),
4694                        relay: relay.clone(),
4695                        me: validator.clone(),
4696                        propose_latency: (10.0, 5.0),
4697                        verify_latency: (10.0, 5.0),
4698                        certify_latency: (10.0, 5.0),
4699                        should_certify: mocks::application::Certifier::Sometimes,
4700                    };
4701                    let (actor, application) = mocks::application::Application::new(
4702                        context.with_label(&format!("application_{}", *validator)),
4703                        application_cfg,
4704                    );
4705                    actor.start();
4706                    let blocker = oracle.control(validator.clone());
4707                    let cfg = config::Config {
4708                        scheme: schemes[idx].clone(),
4709                        elector: elector.clone(),
4710                        blocker,
4711                        automaton: application.clone(),
4712                        relay: application.clone(),
4713                        reporter: reporter.clone(),
4714                        strategy: Sequential,
4715                        partition: validator.to_string(),
4716                        mailbox_size: 1024,
4717                        epoch: Epoch::new(333),
4718                        leader_timeout: Duration::from_secs(10),
4719                        notarization_timeout: Duration::from_secs(10),
4720                        nullify_retry: Duration::from_secs(10),
4721                        fetch_timeout: Duration::from_secs(1),
4722                        activity_timeout,
4723                        skip_timeout,
4724                        fetch_concurrent: 4,
4725                        replay_buffer: NZUsize!(1024 * 1024),
4726                        write_buffer: NZUsize!(1024 * 1024),
4727                        page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4728                    };
4729                    let engine =
4730                        Engine::new(context.with_label(&format!("engine_{}", *validator)), cfg);
4731                    engine.start(pending, recovered, resolver);
4732                }
4733            }
4734
4735            // ========== Build the certificates manually ==========
4736
4737            // Helper: assemble finalization from explicit signer indices
4738            let build_finalization = |proposal: &Proposal<D>| -> TFinalization<_, D> {
4739                let votes: Vec<_> = (0..=quorum)
4740                    .map(|i| TFinalize::sign(&schemes[i], proposal.clone()).unwrap())
4741                    .collect();
4742                TFinalization::from_finalizes(&schemes[0], &votes, &Sequential)
4743                    .expect("finalization quorum")
4744            };
4745            // Helper: assemble notarization from explicit signer indices
4746            let build_notarization = |proposal: &Proposal<D>| -> TNotarization<_, D> {
4747                let votes: Vec<_> = (0..=quorum)
4748                    .map(|i| TNotarize::sign(&schemes[i], proposal.clone()).unwrap())
4749                    .collect();
4750                TNotarization::from_notarizes(&schemes[0], &votes, &Sequential)
4751                    .expect("notarization quorum")
4752            };
4753            let build_nullification = |round: Round| -> TNullification<_> {
4754                let votes: Vec<_> = (0..=quorum)
4755                    .map(|i| TNullify::sign::<D>(&schemes[i], round).unwrap())
4756                    .collect();
4757                TNullification::from_nullifies(&schemes[0], &votes, &Sequential)
4758                    .expect("nullification quorum")
4759            };
4760            // Choose F=1 and construct B_1, B_2A, B_2B
4761            let f_view = 1;
4762            let round_f = Round::new(Epoch::new(333), View::new(f_view));
4763            let payload_b0 = Sha256::hash(b"B_F");
4764            let proposal_b0 = Proposal::new(round_f, View::new(f_view - 1), payload_b0);
4765            let payload_b1a = Sha256::hash(b"B_G1");
4766            let proposal_b1a = Proposal::new(
4767                Round::new(Epoch::new(333), View::new(f_view + 1)),
4768                View::new(f_view),
4769                payload_b1a,
4770            );
4771            let payload_b1b = Sha256::hash(b"B_G2");
4772            let proposal_b1b = Proposal::new(
4773                Round::new(Epoch::new(333), View::new(f_view + 2)),
4774                View::new(f_view),
4775                payload_b1b,
4776            );
4777
4778            // Build notarization and finalization for the first block
4779            let b0_notarization = build_notarization(&proposal_b0);
4780            let b0_finalization = build_finalization(&proposal_b0);
4781            // Build notarizations for F+1 and F+2
4782            let b1a_notarization = build_notarization(&proposal_b1a);
4783            let b1b_notarization = build_notarization(&proposal_b1b);
4784            // Build nullifications for F+1 and F+2
4785            let null_a = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 1)));
4786            let null_b = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 2)));
4787
4788            // Create an 11th non-participant injector and obtain senders
4789            let injector_pk = PrivateKey::from_seed(1_000_000).public_key();
4790            let (mut injector_sender, _inj_certificate_receiver) = oracle
4791                .control(injector_pk.clone())
4792                .register(1, TEST_QUOTA)
4793                .await
4794                .unwrap();
4795
4796            // Create minimal one-way links from injector to all participants (not full mesh)
4797            let link = Link {
4798                latency: Duration::from_millis(10),
4799                jitter: Duration::from_millis(0),
4800                success_rate: 1.0,
4801            };
4802            for p in participants.iter() {
4803                oracle
4804                    .add_link(injector_pk.clone(), p.clone(), link.clone())
4805                    .await
4806                    .unwrap();
4807            }
4808
4809            // ========== Broadcast certificates over recovered network. ==========
4810
4811            // Broadcasts are in reverse order of views to make the tests easier by preventing the
4812            // proposer from making a proposal in F+1 or F+2, as it may panic when it proposes
4813            // something but generates a certificate for a different proposal.
4814
4815            // View F+2:
4816            let notarization_msg = Certificate::<_, D>::Notarization(b1b_notarization);
4817            let nullification_msg = Certificate::<_, D>::Nullification(null_b.clone());
4818            for (i, participant) in participants.iter().enumerate() {
4819                let recipient = Recipients::One(participant.clone());
4820                let msg = match get_type(i) {
4821                    ParticipantType::Group2 => notarization_msg.encode(),
4822                    _ => nullification_msg.encode(),
4823                };
4824                injector_sender.send(recipient, msg, true).await.unwrap();
4825            }
4826            // View F+1:
4827            let notarization_msg = Certificate::<_, D>::Notarization(b1a_notarization);
4828            let nullification_msg = Certificate::<_, D>::Nullification(null_a.clone());
4829            for (i, participant) in participants.iter().enumerate() {
4830                let recipient = Recipients::One(participant.clone());
4831                let msg = match get_type(i) {
4832                    ParticipantType::Group1 => notarization_msg.encode(),
4833                    _ => nullification_msg.encode(),
4834                };
4835                injector_sender.send(recipient, msg, true).await.unwrap();
4836            }
4837            // View F:
4838            let msg = Certificate::<_, D>::Notarization(b0_notarization).encode();
4839            injector_sender
4840                .send(Recipients::All, msg, true)
4841                .await
4842                .unwrap();
4843            let msg = Certificate::<_, D>::Finalization(b0_finalization).encode();
4844            injector_sender
4845                .send(Recipients::All, msg, true)
4846                .await
4847                .unwrap();
4848
4849            // Wait for a while to let the certificates propagate, but not so long that we
4850            // nullify view F+2.
4851            debug!("waiting for certificates to propagate");
4852            context.sleep(Duration::from_secs(5)).await;
4853            debug!("certificates propagated");
4854
4855            // ========== Assert the exact certificates are seen in each view ==========
4856
4857            // Assert the exact certificates in view F
4858            // All participants should have finalized B_0
4859            let view = View::new(f_view);
4860            for reporter in honest_reporters.iter() {
4861                let finalizations = reporter.finalizations.lock().unwrap();
4862                assert!(finalizations.contains_key(&view));
4863            }
4864
4865            // Assert the exact certificates in view F+1
4866            // Group 1 should have notarized B_1A only
4867            // All other participants should have nullified F+1
4868            let view = View::new(f_view + 1);
4869            for (i, reporter) in honest_reporters.iter().enumerate() {
4870                let finalizations = reporter.finalizations.lock().unwrap();
4871                assert!(!finalizations.contains_key(&view));
4872                let nullifications = reporter.nullifications.lock().unwrap();
4873                let notarizations = reporter.notarizations.lock().unwrap();
4874                match get_type(i) {
4875                    ParticipantType::Group1 => {
4876                        assert!(notarizations.contains_key(&view));
4877                        assert!(!nullifications.contains_key(&view));
4878                    }
4879                    _ => {
4880                        assert!(nullifications.contains_key(&view));
4881                        assert!(!notarizations.contains_key(&view));
4882                    }
4883                }
4884            }
4885
4886            // Assert the exact certificates in view F+2
4887            // Group 2 should have notarized B_1B only
4888            // All other participants should have nullified F+2
4889            let view = View::new(f_view + 2);
4890            for (i, reporter) in honest_reporters.iter().enumerate() {
4891                let finalizations = reporter.finalizations.lock().unwrap();
4892                assert!(!finalizations.contains_key(&view));
4893                let nullifications = reporter.nullifications.lock().unwrap();
4894                let notarizations = reporter.notarizations.lock().unwrap();
4895                match get_type(i) {
4896                    ParticipantType::Group2 => {
4897                        assert!(notarizations.contains_key(&view));
4898                        assert!(!nullifications.contains_key(&view));
4899                    }
4900                    _ => {
4901                        assert!(nullifications.contains_key(&view));
4902                        assert!(!notarizations.contains_key(&view));
4903                    }
4904                }
4905            }
4906
4907            // Assert no members have yet nullified view F+3
4908            let next_view = View::new(f_view + 3);
4909            for (i, reporter) in honest_reporters.iter().enumerate() {
4910                let nullifies = reporter.nullifies.lock().unwrap();
4911                assert!(!nullifies.contains_key(&next_view), "reporter {i}");
4912            }
4913
4914            // ========== Reconnect all participants ==========
4915
4916            // Reconnect all participants fully using the helper
4917            link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
4918
4919            // Wait until all honest reporters finalize strictly past F+2 (e.g., at least F+3)
4920            {
4921                let target = View::new(f_view + 3);
4922                let mut finalizers = Vec::new();
4923                for reporter in honest_reporters.iter_mut() {
4924                    let (mut latest, mut monitor) = reporter.subscribe().await;
4925                    finalizers.push(context.with_label("resume_finalizer").spawn(
4926                        move |_| async move {
4927                            while latest < target {
4928                                latest = monitor.recv().await.expect("event missing");
4929                            }
4930                        },
4931                    ));
4932                }
4933                join_all(finalizers).await;
4934            }
4935
4936            // Sanity checks: no faults/invalid signatures, and no peers blocked
4937            for reporter in honest_reporters.iter() {
4938                {
4939                    let faults = reporter.faults.lock().unwrap();
4940                    assert!(faults.is_empty());
4941                }
4942                {
4943                    let invalid = reporter.invalid.lock().unwrap();
4944                    assert_eq!(*invalid, 0);
4945                }
4946            }
4947            let blocked = oracle.blocked().await.unwrap();
4948            assert!(blocked.is_empty());
4949        });
4950    }
4951
4952    #[test_traced]
4953    fn test_split_views_no_lockup() {
4954        split_views_no_lockup::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
4955        split_views_no_lockup::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
4956        split_views_no_lockup::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
4957        split_views_no_lockup::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
4958        split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4959        split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
4960        split_views_no_lockup::<_, _, RoundRobin>(ed25519::fixture);
4961        split_views_no_lockup::<_, _, RoundRobin>(secp256r1::fixture);
4962    }
4963
4964    fn tle<V, L>()
4965    where
4966        V: Variant,
4967        L: Elector<bls12381_threshold_vrf::Scheme<PublicKey, V>>,
4968    {
4969        // Create context
4970        let n = 4;
4971        let namespace = b"consensus".to_vec();
4972        let activity_timeout = ViewDelta::new(100);
4973        let skip_timeout = ViewDelta::new(50);
4974        let executor = deterministic::Runner::timed(Duration::from_secs(30));
4975        executor.start(|mut context| async move {
4976            // Create simulated network
4977            let (network, mut oracle) = Network::new(
4978                context.with_label("network"),
4979                Config {
4980                    max_size: 1024 * 1024,
4981                    disconnect_on_block: false,
4982                    tracked_peer_sets: None,
4983                },
4984            );
4985
4986            // Start network
4987            network.start();
4988
4989            // Register participants
4990            let Fixture {
4991                participants,
4992                schemes,
4993                ..
4994            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, &namespace, n);
4995            let mut registrations = register_validators(&mut oracle, &participants).await;
4996
4997            // Link all validators
4998            let link = Link {
4999                latency: Duration::from_millis(10),
5000                jitter: Duration::from_millis(5),
5001                success_rate: 1.0,
5002            };
5003            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5004
5005            // Create engines and reporters
5006            let elector = L::default();
5007            let relay = Arc::new(mocks::relay::Relay::new());
5008            let mut reporters = Vec::new();
5009            let mut engine_handlers = Vec::new();
5010            let monitor_reporter = Arc::new(Mutex::new(None));
5011            for (idx, validator) in participants.iter().enumerate() {
5012                // Create scheme context
5013                let context = context.with_label(&format!("validator_{}", *validator));
5014
5015                // Store first reporter for monitoring
5016                let reporter_config = mocks::reporter::Config {
5017                    participants: participants.clone().try_into().unwrap(),
5018                    scheme: schemes[idx].clone(),
5019                    elector: elector.clone(),
5020                };
5021                let reporter =
5022                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5023                reporters.push(reporter.clone());
5024                if idx == 0 {
5025                    *monitor_reporter.lock().unwrap() = Some(reporter.clone());
5026                }
5027
5028                // Configure application
5029                let application_cfg = mocks::application::Config {
5030                    hasher: Sha256::default(),
5031                    relay: relay.clone(),
5032                    me: validator.clone(),
5033                    propose_latency: (10.0, 5.0),
5034                    verify_latency: (10.0, 5.0),
5035                    certify_latency: (10.0, 5.0),
5036                    should_certify: mocks::application::Certifier::Sometimes,
5037                };
5038                let (actor, application) = mocks::application::Application::new(
5039                    context.with_label("application"),
5040                    application_cfg,
5041                );
5042                actor.start();
5043                let blocker = oracle.control(validator.clone());
5044                let cfg = config::Config {
5045                    scheme: schemes[idx].clone(),
5046                    elector: elector.clone(),
5047                    blocker,
5048                    automaton: application.clone(),
5049                    relay: application.clone(),
5050                    reporter: reporter.clone(),
5051                    strategy: Sequential,
5052                    partition: validator.to_string(),
5053                    mailbox_size: 1024,
5054                    epoch: Epoch::new(333),
5055                    leader_timeout: Duration::from_millis(100),
5056                    notarization_timeout: Duration::from_millis(200),
5057                    nullify_retry: Duration::from_millis(500),
5058                    fetch_timeout: Duration::from_millis(100),
5059                    activity_timeout,
5060                    skip_timeout,
5061                    fetch_concurrent: 4,
5062                    replay_buffer: NZUsize!(1024 * 1024),
5063                    write_buffer: NZUsize!(1024 * 1024),
5064                    page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5065                };
5066                let engine = Engine::new(context.with_label("engine"), cfg);
5067
5068                // Start engine
5069                let (pending, recovered, resolver) = registrations
5070                    .remove(validator)
5071                    .expect("validator should be registered");
5072                engine_handlers.push(engine.start(pending, recovered, resolver));
5073            }
5074
5075            // Prepare TLE test data
5076            let target = Round::new(Epoch::new(333), View::new(10)); // Encrypt for round (epoch 333, view 10)
5077            let message = b"Secret message for future view10"; // 32 bytes
5078
5079            // Encrypt message
5080            let ciphertext = schemes[0].encrypt(&mut context, target, *message);
5081
5082            // Wait for consensus to reach the target view and then decrypt
5083            let reporter = monitor_reporter.lock().unwrap().clone().unwrap();
5084            loop {
5085                // Wait for notarization
5086                context.sleep(Duration::from_millis(100)).await;
5087                let notarizations = reporter.notarizations.lock().unwrap();
5088                let Some(notarization) = notarizations.get(&target.view()) else {
5089                    continue;
5090                };
5091
5092                // Decrypt the message using the seed
5093                let seed = notarization.seed();
5094                let decrypted = seed
5095                    .decrypt(&ciphertext)
5096                    .expect("Decryption should succeed with valid seed signature");
5097                assert_eq!(
5098                    message,
5099                    decrypted.as_ref(),
5100                    "Decrypted message should match original message"
5101                );
5102                break;
5103            }
5104        });
5105    }
5106
5107    #[test_traced]
5108    fn test_tle() {
5109        tle::<MinPk, Random>();
5110        tle::<MinSig, Random>();
5111    }
5112
5113    fn hailstorm<S, F, L>(
5114        seed: u64,
5115        shutdowns: usize,
5116        interval: ViewDelta,
5117        mut fixture: F,
5118    ) -> String
5119    where
5120        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5121        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5122        L: Elector<S>,
5123    {
5124        // Create context
5125        let n = 5;
5126        let activity_timeout = ViewDelta::new(10);
5127        let skip_timeout = ViewDelta::new(5);
5128        let namespace = b"consensus".to_vec();
5129        let cfg = deterministic::Config::new().with_seed(seed);
5130        let executor = deterministic::Runner::new(cfg);
5131        executor.start(|mut context| async move {
5132            // Create simulated network
5133            let (network, mut oracle) = Network::new(
5134                context.with_label("network"),
5135                Config {
5136                    max_size: 1024 * 1024,
5137                    disconnect_on_block: true,
5138                    tracked_peer_sets: None,
5139                },
5140            );
5141
5142            // Start network
5143            network.start();
5144
5145            // Register participants
5146            let Fixture {
5147                participants,
5148                schemes,
5149                ..
5150            } = fixture(&mut context, &namespace, n);
5151            let mut registrations = register_validators(&mut oracle, &participants).await;
5152
5153            // Link all validators
5154            let link = Link {
5155                latency: Duration::from_millis(10),
5156                jitter: Duration::from_millis(1),
5157                success_rate: 1.0,
5158            };
5159            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5160
5161            // Create engines
5162            let elector = L::default();
5163            let relay = Arc::new(mocks::relay::Relay::new());
5164            let mut reporters = BTreeMap::new();
5165            let mut engine_handlers = BTreeMap::new();
5166            for (idx, validator) in participants.iter().enumerate() {
5167                // Create scheme context
5168                let context = context.with_label(&format!("validator_{}", *validator));
5169
5170                // Configure engine
5171                let reporter_config = mocks::reporter::Config {
5172                    participants: participants.clone().try_into().unwrap(),
5173                    scheme: schemes[idx].clone(),
5174                    elector: elector.clone(),
5175                };
5176                let reporter =
5177                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5178                reporters.insert(idx, reporter.clone());
5179                let application_cfg = mocks::application::Config {
5180                    hasher: Sha256::default(),
5181                    relay: relay.clone(),
5182                    me: validator.clone(),
5183                    propose_latency: (10.0, 5.0),
5184                    verify_latency: (10.0, 5.0),
5185                    certify_latency: (10.0, 5.0),
5186                    should_certify: mocks::application::Certifier::Sometimes,
5187                };
5188                let (actor, application) = mocks::application::Application::new(
5189                    context.with_label("application"),
5190                    application_cfg,
5191                );
5192                actor.start();
5193                let blocker = oracle.control(validator.clone());
5194                let cfg = config::Config {
5195                    scheme: schemes[idx].clone(),
5196                    elector: elector.clone(),
5197                    blocker,
5198                    automaton: application.clone(),
5199                    relay: application.clone(),
5200                    reporter: reporter.clone(),
5201                    strategy: Sequential,
5202                    partition: validator.to_string(),
5203                    mailbox_size: 1024,
5204                    epoch: Epoch::new(333),
5205                    leader_timeout: Duration::from_secs(1),
5206                    notarization_timeout: Duration::from_secs(2),
5207                    nullify_retry: Duration::from_secs(10),
5208                    fetch_timeout: Duration::from_secs(1),
5209                    activity_timeout,
5210                    skip_timeout,
5211                    fetch_concurrent: 4,
5212                    replay_buffer: NZUsize!(1024 * 1024),
5213                    write_buffer: NZUsize!(1024 * 1024),
5214                    page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5215                };
5216                let engine = Engine::new(context.with_label("engine"), cfg);
5217
5218                // Start engine
5219                let (pending, recovered, resolver) = registrations
5220                    .remove(validator)
5221                    .expect("validator should be registered");
5222                engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
5223            }
5224
5225            // Run shutdowns
5226            let mut target = View::zero();
5227            for i in 0..shutdowns {
5228                // Update target
5229                target = target.saturating_add(interval);
5230
5231                // Wait for all engines to finish
5232                let mut finalizers = Vec::new();
5233                for (_, reporter) in reporters.iter_mut() {
5234                    let (mut latest, mut monitor) = reporter.subscribe().await;
5235                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5236                        while latest < target {
5237                            latest = monitor.recv().await.expect("event missing");
5238                        }
5239                    }));
5240                }
5241                join_all(finalizers).await;
5242                target = target.saturating_add(interval);
5243
5244                // Select a random engine to shutdown
5245                let idx = context.gen_range(0..engine_handlers.len());
5246                let validator = &participants[idx];
5247                let handle = engine_handlers.remove(&idx).unwrap();
5248                handle.abort();
5249                let _ = handle.await;
5250                let selected_reporter = reporters.remove(&idx).unwrap();
5251                info!(idx, ?validator, "shutdown validator");
5252
5253                // Wait for all engines to finish
5254                let mut finalizers = Vec::new();
5255                for (_, reporter) in reporters.iter_mut() {
5256                    let (mut latest, mut monitor) = reporter.subscribe().await;
5257                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5258                        while latest < target {
5259                            latest = monitor.recv().await.expect("event missing");
5260                        }
5261                    }));
5262                }
5263                join_all(finalizers).await;
5264                target = target.saturating_add(interval);
5265
5266                // Recreate engine
5267                info!(idx, ?validator, "restarting validator");
5268                let context =
5269                    context.with_label(&format!("validator_{}_restarted_{}", *validator, i));
5270
5271                // Start engine
5272                let (pending, recovered, resolver) =
5273                    register_validator(&mut oracle, validator.clone()).await;
5274                let application_cfg = mocks::application::Config {
5275                    hasher: Sha256::default(),
5276                    relay: relay.clone(),
5277                    me: validator.clone(),
5278                    propose_latency: (10.0, 5.0),
5279                    verify_latency: (10.0, 5.0),
5280                    certify_latency: (10.0, 5.0),
5281                    should_certify: mocks::application::Certifier::Sometimes,
5282                };
5283                let (actor, application) = mocks::application::Application::new(
5284                    context.with_label("application"),
5285                    application_cfg,
5286                );
5287                actor.start();
5288                reporters.insert(idx, selected_reporter.clone());
5289                let blocker = oracle.control(validator.clone());
5290                let cfg = config::Config {
5291                    scheme: schemes[idx].clone(),
5292                    elector: elector.clone(),
5293                    blocker,
5294                    automaton: application.clone(),
5295                    relay: application.clone(),
5296                    reporter: selected_reporter,
5297                    strategy: Sequential,
5298                    partition: validator.to_string(),
5299                    mailbox_size: 1024,
5300                    epoch: Epoch::new(333),
5301                    leader_timeout: Duration::from_secs(1),
5302                    notarization_timeout: Duration::from_secs(2),
5303                    nullify_retry: Duration::from_secs(10),
5304                    fetch_timeout: Duration::from_secs(1),
5305                    activity_timeout,
5306                    skip_timeout,
5307                    fetch_concurrent: 4,
5308                    replay_buffer: NZUsize!(1024 * 1024),
5309                    write_buffer: NZUsize!(1024 * 1024),
5310                    page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5311                };
5312                let engine = Engine::new(context.with_label("engine"), cfg);
5313                engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
5314
5315                // Wait for all engines to hit required containers
5316                let mut finalizers = Vec::new();
5317                for (_, reporter) in reporters.iter_mut() {
5318                    let (mut latest, mut monitor) = reporter.subscribe().await;
5319                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5320                        while latest < target {
5321                            latest = monitor.recv().await.expect("event missing");
5322                        }
5323                    }));
5324                }
5325                join_all(finalizers).await;
5326                info!(idx, ?validator, "validator recovered");
5327            }
5328
5329            // Check reporters for correct activity
5330            let latest_complete = target.saturating_sub(activity_timeout);
5331            for (_, reporter) in reporters.iter() {
5332                // Ensure no faults
5333                {
5334                    let faults = reporter.faults.lock().unwrap();
5335                    assert!(faults.is_empty());
5336                }
5337
5338                // Ensure no invalid signatures
5339                {
5340                    let invalid = reporter.invalid.lock().unwrap();
5341                    assert_eq!(*invalid, 0);
5342                }
5343
5344                // Ensure no forks
5345                let mut notarized = HashMap::new();
5346                let mut finalized = HashMap::new();
5347                {
5348                    let notarizes = reporter.notarizes.lock().unwrap();
5349                    for view in View::range(View::new(1), latest_complete) {
5350                        // Ensure only one payload proposed per view
5351                        let Some(payloads) = notarizes.get(&view) else {
5352                            continue;
5353                        };
5354                        if payloads.len() > 1 {
5355                            panic!("view: {view}");
5356                        }
5357                        let (digest, _) = payloads.iter().next().unwrap();
5358                        notarized.insert(view, *digest);
5359                    }
5360                }
5361                {
5362                    let notarizations = reporter.notarizations.lock().unwrap();
5363                    for view in View::range(View::new(1), latest_complete) {
5364                        // Ensure notarization matches digest from notarizes
5365                        let Some(notarization) = notarizations.get(&view) else {
5366                            continue;
5367                        };
5368                        let Some(digest) = notarized.get(&view) else {
5369                            continue;
5370                        };
5371                        assert_eq!(&notarization.proposal.payload, digest);
5372                    }
5373                }
5374                {
5375                    let finalizes = reporter.finalizes.lock().unwrap();
5376                    for view in View::range(View::new(1), latest_complete) {
5377                        // Ensure only one payload proposed per view
5378                        let Some(payloads) = finalizes.get(&view) else {
5379                            continue;
5380                        };
5381                        if payloads.len() > 1 {
5382                            panic!("view: {view}");
5383                        }
5384                        let (digest, _) = payloads.iter().next().unwrap();
5385                        finalized.insert(view, *digest);
5386
5387                        // Only check at views below timeout
5388                        if view > latest_complete {
5389                            continue;
5390                        }
5391
5392                        // Ensure no nullifies for any finalizers
5393                        let nullifies = reporter.nullifies.lock().unwrap();
5394                        let Some(nullifies) = nullifies.get(&view) else {
5395                            continue;
5396                        };
5397                        for (_, finalizers) in payloads.iter() {
5398                            for finalizer in finalizers.iter() {
5399                                if nullifies.contains(finalizer) {
5400                                    panic!("should not nullify and finalize at same view");
5401                                }
5402                            }
5403                        }
5404                    }
5405                }
5406                {
5407                    let finalizations = reporter.finalizations.lock().unwrap();
5408                    for view in View::range(View::new(1), latest_complete) {
5409                        // Ensure finalization matches digest from finalizes
5410                        let Some(finalization) = finalizations.get(&view) else {
5411                            continue;
5412                        };
5413                        let Some(digest) = finalized.get(&view) else {
5414                            continue;
5415                        };
5416                        assert_eq!(&finalization.proposal.payload, digest);
5417                    }
5418                }
5419            }
5420
5421            // Ensure no blocked connections
5422            let blocked = oracle.blocked().await.unwrap();
5423            assert!(blocked.is_empty());
5424
5425            // Return state for audit
5426            context.auditor().state()
5427        })
5428    }
5429
5430    #[test_group("slow")]
5431    #[test_traced]
5432    fn test_hailstorm_bls12381_threshold_vrf_min_pk() {
5433        assert_eq!(
5434            hailstorm::<_, _, Random>(
5435                0,
5436                10,
5437                ViewDelta::new(15),
5438                bls12381_threshold_vrf::fixture::<MinPk, _>
5439            ),
5440            hailstorm::<_, _, Random>(
5441                0,
5442                10,
5443                ViewDelta::new(15),
5444                bls12381_threshold_vrf::fixture::<MinPk, _>
5445            ),
5446        );
5447    }
5448
5449    #[test_group("slow")]
5450    #[test_traced]
5451    fn test_hailstorm_bls12381_threshold_vrf_min_sig() {
5452        assert_eq!(
5453            hailstorm::<_, _, Random>(
5454                0,
5455                10,
5456                ViewDelta::new(15),
5457                bls12381_threshold_vrf::fixture::<MinSig, _>
5458            ),
5459            hailstorm::<_, _, Random>(
5460                0,
5461                10,
5462                ViewDelta::new(15),
5463                bls12381_threshold_vrf::fixture::<MinSig, _>
5464            ),
5465        );
5466    }
5467
5468    #[test_group("slow")]
5469    #[test_traced]
5470    fn test_hailstorm_bls12381_threshold_std_min_pk() {
5471        assert_eq!(
5472            hailstorm::<_, _, RoundRobin>(
5473                0,
5474                10,
5475                ViewDelta::new(15),
5476                bls12381_threshold_std::fixture::<MinPk, _>
5477            ),
5478            hailstorm::<_, _, RoundRobin>(
5479                0,
5480                10,
5481                ViewDelta::new(15),
5482                bls12381_threshold_std::fixture::<MinPk, _>
5483            ),
5484        );
5485    }
5486
5487    #[test_group("slow")]
5488    #[test_traced]
5489    fn test_hailstorm_bls12381_threshold_std_min_sig() {
5490        assert_eq!(
5491            hailstorm::<_, _, RoundRobin>(
5492                0,
5493                10,
5494                ViewDelta::new(15),
5495                bls12381_threshold_std::fixture::<MinSig, _>
5496            ),
5497            hailstorm::<_, _, RoundRobin>(
5498                0,
5499                10,
5500                ViewDelta::new(15),
5501                bls12381_threshold_std::fixture::<MinSig, _>
5502            ),
5503        );
5504    }
5505
5506    #[test_group("slow")]
5507    #[test_traced]
5508    fn test_hailstorm_bls12381_multisig_min_pk() {
5509        assert_eq!(
5510            hailstorm::<_, _, RoundRobin>(
5511                0,
5512                10,
5513                ViewDelta::new(15),
5514                bls12381_multisig::fixture::<MinPk, _>
5515            ),
5516            hailstorm::<_, _, RoundRobin>(
5517                0,
5518                10,
5519                ViewDelta::new(15),
5520                bls12381_multisig::fixture::<MinPk, _>
5521            ),
5522        );
5523    }
5524
5525    #[test_group("slow")]
5526    #[test_traced]
5527    fn test_hailstorm_bls12381_multisig_min_sig() {
5528        assert_eq!(
5529            hailstorm::<_, _, RoundRobin>(
5530                0,
5531                10,
5532                ViewDelta::new(15),
5533                bls12381_multisig::fixture::<MinSig, _>
5534            ),
5535            hailstorm::<_, _, RoundRobin>(
5536                0,
5537                10,
5538                ViewDelta::new(15),
5539                bls12381_multisig::fixture::<MinSig, _>
5540            ),
5541        );
5542    }
5543
5544    #[test_group("slow")]
5545    #[test_traced]
5546    fn test_hailstorm_ed25519() {
5547        assert_eq!(
5548            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture),
5549            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture)
5550        );
5551    }
5552
5553    #[test_group("slow")]
5554    #[test_traced]
5555    fn test_hailstorm_secp256r1() {
5556        assert_eq!(
5557            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture),
5558            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), secp256r1::fixture)
5559        );
5560    }
5561
5562    /// Implementation of [Twins: BFT Systems Made Robust](https://arxiv.org/abs/2004.10617).
5563    fn twins<S, F, L>(seed: u64, n: u32, strategy: Strategy, link: Link, mut fixture: F)
5564    where
5565        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5566        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5567        L: Elector<S>,
5568    {
5569        let faults = N3f1::max_faults(n);
5570        let required_containers = View::new(100);
5571        let activity_timeout = ViewDelta::new(10);
5572        let skip_timeout = ViewDelta::new(5);
5573        let namespace = b"consensus".to_vec();
5574        let cfg = deterministic::Config::new()
5575            .with_seed(seed)
5576            .with_timeout(Some(Duration::from_secs(900)));
5577        let executor = deterministic::Runner::new(cfg);
5578        executor.start(|mut context| async move {
5579            let (network, mut oracle) = Network::new(
5580                context.with_label("network"),
5581                Config {
5582                    max_size: 1024 * 1024,
5583                    disconnect_on_block: false,
5584                    tracked_peer_sets: None,
5585                },
5586            );
5587            network.start();
5588
5589            let Fixture {
5590                participants,
5591                schemes,
5592                ..
5593            } = fixture(&mut context, &namespace, n);
5594            let participants: Arc<[_]> = participants.into();
5595            let mut registrations = register_validators(&mut oracle, &participants).await;
5596            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5597
5598            // We don't apply partitioning to the relay explicitly, however, a participant will only query the relay by digest
5599            // after receiving a vote (implicitly respecting the partitioning)
5600            let elector = L::default();
5601            let relay = Arc::new(mocks::relay::Relay::new());
5602            let mut reporters = Vec::new();
5603            let mut engine_handlers = Vec::new();
5604
5605            // Create twin engines (f Byzantine twins)
5606            for (idx, validator) in participants.iter().enumerate().take(faults as usize) {
5607                let (
5608                    (vote_sender, vote_receiver),
5609                    (certificate_sender, certificate_receiver),
5610                    (resolver_sender, resolver_receiver),
5611                ) = registrations
5612                    .remove(validator)
5613                    .expect("validator should be registered");
5614
5615                // Create forwarder closures for votes
5616                let make_vote_forwarder = || {
5617                    let participants = participants.clone();
5618                    move |origin: SplitOrigin, _: &Recipients<_>, message: &IoBuf| {
5619                        let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
5620                        let (primary, secondary) =
5621                            strategy.partitions(msg.view(), participants.as_ref());
5622                        match origin {
5623                            SplitOrigin::Primary => Some(Recipients::Some(primary)),
5624                            SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
5625                        }
5626                    }
5627                };
5628                // Create forwarder closures for certificates
5629                let make_certificate_forwarder = || {
5630                    let codec = schemes[idx].certificate_codec_config();
5631                    let participants = participants.clone();
5632                    move |origin: SplitOrigin, _: &Recipients<_>, message: &IoBuf| {
5633                        let msg: Certificate<S, D> =
5634                            Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
5635                        let (primary, secondary) =
5636                            strategy.partitions(msg.view(), participants.as_ref());
5637                        match origin {
5638                            SplitOrigin::Primary => Some(Recipients::Some(primary)),
5639                            SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
5640                        }
5641                    }
5642                };
5643                let make_drop_forwarder =
5644                    || move |_: SplitOrigin, _: &Recipients<_>, _: &IoBuf| None;
5645
5646                // Create router closures for votes
5647                let make_vote_router = || {
5648                    let participants = participants.clone();
5649                    move |(sender, message): &(_, IoBuf)| {
5650                        let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
5651                        strategy.route(msg.view(), sender, participants.as_ref())
5652                    }
5653                };
5654                // Create router closures for certificates
5655                let make_certificate_router = || {
5656                    let codec = schemes[idx].certificate_codec_config();
5657                    let participants = participants.clone();
5658                    move |(sender, message): &(_, IoBuf)| {
5659                        let msg: Certificate<S, D> =
5660                            Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
5661                        strategy.route(msg.view(), sender, participants.as_ref())
5662                    }
5663                };
5664                let make_drop_router = || move |(_, _): &(_, _)| SplitTarget::None;
5665
5666                // Apply view-based forwarder and router to pending and recovered channel
5667                let (vote_sender_primary, vote_sender_secondary) =
5668                    vote_sender.split_with(make_vote_forwarder());
5669                let (vote_receiver_primary, vote_receiver_secondary) = vote_receiver.split_with(
5670                    context.with_label(&format!("pending_split_{idx}")),
5671                    make_vote_router(),
5672                );
5673                let (certificate_sender_primary, certificate_sender_secondary) =
5674                    certificate_sender.split_with(make_certificate_forwarder());
5675                let (certificate_receiver_primary, certificate_receiver_secondary) =
5676                    certificate_receiver.split_with(
5677                        context.with_label(&format!("recovered_split_{idx}")),
5678                        make_certificate_router(),
5679                    );
5680
5681                // Prevent any resolver messages from being sent or received by twins (these messages aren't cleanly mapped to a view and allowing them to bypass partitions seems wrong)
5682                let (resolver_sender_primary, resolver_sender_secondary) =
5683                    resolver_sender.split_with(make_drop_forwarder());
5684                let (resolver_receiver_primary, resolver_receiver_secondary) = resolver_receiver
5685                    .split_with(
5686                        context.with_label(&format!("resolver_split_{idx}")),
5687                        make_drop_router(),
5688                    );
5689
5690                for (twin_label, pending, recovered, resolver) in [
5691                    (
5692                        "primary",
5693                        (vote_sender_primary, vote_receiver_primary),
5694                        (certificate_sender_primary, certificate_receiver_primary),
5695                        (resolver_sender_primary, resolver_receiver_primary),
5696                    ),
5697                    (
5698                        "secondary",
5699                        (vote_sender_secondary, vote_receiver_secondary),
5700                        (certificate_sender_secondary, certificate_receiver_secondary),
5701                        (resolver_sender_secondary, resolver_receiver_secondary),
5702                    ),
5703                ] {
5704                    let label = format!("twin_{idx}_{twin_label}");
5705                    let context = context.with_label(&label);
5706
5707                    let reporter_config = mocks::reporter::Config {
5708                        participants: participants.as_ref().try_into().unwrap(),
5709                        scheme: schemes[idx].clone(),
5710                        elector: elector.clone(),
5711                    };
5712                    let reporter = mocks::reporter::Reporter::new(
5713                        context.with_label("reporter"),
5714                        reporter_config,
5715                    );
5716                    reporters.push(reporter.clone());
5717
5718                    let application_cfg = mocks::application::Config {
5719                        hasher: Sha256::default(),
5720                        relay: relay.clone(),
5721                        me: validator.clone(),
5722                        propose_latency: (10.0, 5.0),
5723                        verify_latency: (10.0, 5.0),
5724                        certify_latency: (10.0, 5.0),
5725                        should_certify: mocks::application::Certifier::Sometimes,
5726                    };
5727                    let (actor, application) = mocks::application::Application::new(
5728                        context.with_label("application"),
5729                        application_cfg,
5730                    );
5731                    actor.start();
5732
5733                    let blocker = oracle.control(validator.clone());
5734                    let cfg = config::Config {
5735                        scheme: schemes[idx].clone(),
5736                        elector: elector.clone(),
5737                        blocker,
5738                        automaton: application.clone(),
5739                        relay: application.clone(),
5740                        reporter: reporter.clone(),
5741                        strategy: Sequential,
5742                        partition: label,
5743                        mailbox_size: 1024,
5744                        epoch: Epoch::new(333),
5745                        leader_timeout: Duration::from_secs(1),
5746                        notarization_timeout: Duration::from_secs(2),
5747                        nullify_retry: Duration::from_secs(10),
5748                        fetch_timeout: Duration::from_secs(1),
5749                        activity_timeout,
5750                        skip_timeout,
5751                        fetch_concurrent: 4,
5752                        replay_buffer: NZUsize!(1024 * 1024),
5753                        write_buffer: NZUsize!(1024 * 1024),
5754                        page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5755                    };
5756                    let engine = Engine::new(context.with_label("engine"), cfg);
5757                    engine_handlers.push(engine.start(pending, recovered, resolver));
5758                }
5759            }
5760
5761            // Create honest engines
5762            for (idx, validator) in participants.iter().enumerate().skip(faults as usize) {
5763                let label = format!("honest_{idx}");
5764                let context = context.with_label(&label);
5765
5766                let reporter_config = mocks::reporter::Config {
5767                    participants: participants.as_ref().try_into().unwrap(),
5768                    scheme: schemes[idx].clone(),
5769                    elector: elector.clone(),
5770                };
5771                let reporter =
5772                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5773                reporters.push(reporter.clone());
5774
5775                let application_cfg = mocks::application::Config {
5776                    hasher: Sha256::default(),
5777                    relay: relay.clone(),
5778                    me: validator.clone(),
5779                    propose_latency: (10.0, 5.0),
5780                    verify_latency: (10.0, 5.0),
5781                    certify_latency: (10.0, 5.0),
5782                    should_certify: mocks::application::Certifier::Sometimes,
5783                };
5784                let (actor, application) = mocks::application::Application::new(
5785                    context.with_label("application"),
5786                    application_cfg,
5787                );
5788                actor.start();
5789
5790                let blocker = oracle.control(validator.clone());
5791                let cfg = config::Config {
5792                    scheme: schemes[idx].clone(),
5793                    elector: elector.clone(),
5794                    blocker,
5795                    automaton: application.clone(),
5796                    relay: application.clone(),
5797                    reporter: reporter.clone(),
5798                    strategy: Sequential,
5799                    partition: label,
5800                    mailbox_size: 1024,
5801                    epoch: Epoch::new(333),
5802                    leader_timeout: Duration::from_secs(1),
5803                    notarization_timeout: Duration::from_secs(2),
5804                    nullify_retry: Duration::from_secs(10),
5805                    fetch_timeout: Duration::from_secs(1),
5806                    activity_timeout,
5807                    skip_timeout,
5808                    fetch_concurrent: 4,
5809                    replay_buffer: NZUsize!(1024 * 1024),
5810                    write_buffer: NZUsize!(1024 * 1024),
5811                    page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5812                };
5813                let engine = Engine::new(context.with_label("engine"), cfg);
5814
5815                let (pending, recovered, resolver) = registrations
5816                    .remove(validator)
5817                    .expect("validator should be registered");
5818                engine_handlers.push(engine.start(pending, recovered, resolver));
5819            }
5820
5821            // Wait for progress (liveness check)
5822            let mut finalizers = Vec::new();
5823            for reporter in reporters.iter_mut() {
5824                let (mut latest, mut monitor) = reporter.subscribe().await;
5825                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5826                    while latest < required_containers {
5827                        latest = monitor.recv().await.expect("event missing");
5828                    }
5829                }));
5830            }
5831            join_all(finalizers).await;
5832
5833            // Verify safety: no conflicting finalizations across honest reporters
5834            let honest_start = faults as usize * 2; // Each twin produces 2 reporters
5835            let mut finalized_at_view: BTreeMap<View, D> = BTreeMap::new();
5836            for reporter in reporters.iter().skip(honest_start) {
5837                let finalizations = reporter.finalizations.lock().unwrap();
5838                for (view, finalization) in finalizations.iter() {
5839                    let digest = finalization.proposal.payload;
5840                    if let Some(existing) = finalized_at_view.get(view) {
5841                        assert_eq!(
5842                            existing, &digest,
5843                            "safety violation: conflicting finalizations at view {view}"
5844                        );
5845                    } else {
5846                        finalized_at_view.insert(*view, digest);
5847                    }
5848                }
5849            }
5850
5851            // Verify no invalid signatures were observed
5852            for reporter in reporters.iter().skip(honest_start) {
5853                let invalid = reporter.invalid.lock().unwrap();
5854                assert_eq!(*invalid, 0, "invalid signatures detected");
5855            }
5856
5857            // Ensure faults are attributable to twins
5858            let twin_identities: Vec<_> = participants.iter().take(faults as usize).collect();
5859            for reporter in reporters.iter().skip(honest_start) {
5860                let faults = reporter.faults.lock().unwrap();
5861                for (faulter, _) in faults.iter() {
5862                    assert!(
5863                        twin_identities.contains(&faulter),
5864                        "fault from non-twin participant"
5865                    );
5866                }
5867            }
5868
5869            // Ensure blocked connections are attributable to twins
5870            let blocked = oracle.blocked().await.unwrap();
5871            for (_, faulter) in blocked.iter() {
5872                assert!(
5873                    twin_identities.contains(&faulter),
5874                    "blocked connection from non-twin participant"
5875                );
5876            }
5877        });
5878    }
5879
5880    fn test_twins<S, F, L>(mut fixture: F)
5881    where
5882        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5883        F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
5884        L: Elector<S>,
5885    {
5886        for strategy in [
5887            Strategy::View,
5888            Strategy::Fixed(3),
5889            Strategy::Isolate(4),
5890            Strategy::Broadcast,
5891            Strategy::Shuffle,
5892        ] {
5893            for link in [
5894                Link {
5895                    latency: Duration::from_millis(10),
5896                    jitter: Duration::from_millis(1),
5897                    success_rate: 1.0,
5898                },
5899                Link {
5900                    latency: Duration::from_millis(200),
5901                    jitter: Duration::from_millis(150),
5902                    success_rate: 0.75,
5903                },
5904            ] {
5905                twins::<S, _, L>(0, 5, strategy, link, |namespace, context, n| {
5906                    fixture(namespace, context, n)
5907                });
5908            }
5909        }
5910    }
5911
5912    #[test_group("slow")]
5913    #[test_traced]
5914    fn test_twins_multisig_min_pk() {
5915        test_twins::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
5916    }
5917
5918    #[test_group("slow")]
5919    #[test_traced]
5920    fn test_twins_multisig_min_sig() {
5921        test_twins::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
5922    }
5923
5924    #[test_group("slow")]
5925    #[test_traced]
5926    fn test_twins_threshold_vrf_min_pk() {
5927        test_twins::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinPk, _>);
5928    }
5929
5930    #[test_group("slow")]
5931    #[test_traced]
5932    fn test_twins_threshold_vrf_min_sig() {
5933        test_twins::<_, _, Random>(bls12381_threshold_vrf::fixture::<MinSig, _>);
5934    }
5935
5936    #[test_group("slow")]
5937    #[test_traced]
5938    fn test_twins_threshold_std_min_pk() {
5939        test_twins::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinPk, _>);
5940    }
5941
5942    #[test_group("slow")]
5943    #[test_traced]
5944    fn test_twins_threshold_std_min_sig() {
5945        test_twins::<_, _, RoundRobin>(bls12381_threshold_std::fixture::<MinSig, _>);
5946    }
5947
5948    #[test_group("slow")]
5949    #[test_traced]
5950    fn test_twins_ed25519() {
5951        test_twins::<_, _, RoundRobin>(ed25519::fixture);
5952    }
5953
5954    #[test_group("slow")]
5955    #[test_traced]
5956    fn test_twins_secp256r1() {
5957        test_twins::<_, _, RoundRobin>(secp256r1::fixture);
5958    }
5959
5960    #[test_group("slow")]
5961    #[test_traced]
5962    fn test_twins_large_view() {
5963        twins::<_, _, Random>(
5964            0,
5965            10,
5966            Strategy::View,
5967            Link {
5968                latency: Duration::from_millis(200),
5969                jitter: Duration::from_millis(150),
5970                success_rate: 0.75,
5971            },
5972            bls12381_threshold_vrf::fixture::<MinPk, _>,
5973        );
5974    }
5975
5976    #[test_group("slow")]
5977    #[test_traced]
5978    fn test_twins_large_shuffle() {
5979        twins::<_, _, Random>(
5980            0,
5981            10,
5982            Strategy::Shuffle,
5983            Link {
5984                latency: Duration::from_millis(200),
5985                jitter: Duration::from_millis(150),
5986                success_rate: 0.75,
5987            },
5988            bls12381_threshold_vrf::fixture::<MinPk, _>,
5989        );
5990    }
5991}