commonware_consensus/simplex/
mod.rs

1//! Simple and fast BFT agreement inspired by Simplex Consensus.
2//!
3//! Inspired by [Simplex Consensus](https://eprint.iacr.org/2023/463), `simplex` provides simple and fast BFT
4//! agreement with network-speed view (i.e. block time) latency and optimal finalization latency in a
5//! partially synchronous setting.
6//!
7//! # Features
8//!
9//! * Wicked Fast Block Times (2 Network Hops)
10//! * Optimal Finalization Latency (3 Network Hops)
11//! * Externalized Uptime and Fault Proofs
12//! * Require Certification Before Finalization
13//! * Decoupled Block Broadcast and Sync
14//! * Lazy Message Verification
15//! * Application-Defined Block Format
16//! * Pluggable Hashing and Cryptography
17//! * Embedded VRF (via [scheme::bls12381_threshold])
18//!
19//! # Design
20//!
21//! ## Protocol Description
22//!
23//! ### Specification for View `v`
24//!
25//! Upon entering view `v`:
26//! * Determine leader `l` for view `v`
27//! * Set timer for leader proposal `t_l = 2Δ` and advance `t_a = 3Δ`
28//!     * If leader `l` has not been active in last `r` views, set `t_l` to 0.
29//! * If leader `l`, broadcast `notarize(c,v)`
30//!   * If can't propose container in view `v` because missing notarization/nullification for a
31//!     previous view `v_m`, request `v_m`
32//!
33//! Upon receiving first `notarize(c,v)` from `l`:
34//! * Cancel `t_l`
35//! * If the container's parent `c_parent` is notarized at `v_parent` and we have nullifications for all views
36//!   between `v` and `v_parent`, verify `c` and broadcast `notarize(c,v)`
37//!
38//! Upon receiving `2f+1` `notarize(c,v)`:
39//! * Cancel `t_a`
40//! * Mark `c` as notarized
41//! * Broadcast `notarization(c,v)` (even if we have not verified `c`)
42//! * If have not broadcast `nullify(v)`, broadcast `finalize(c,v)`
43//! * Enter `v+1`
44//!
45//! Upon receiving `2f+1` `nullify(v)`:
46//! * Broadcast `nullification(v)`
47//! * Enter `v+1`
48//!
49//! Upon receiving `2f+1` `finalize(c,v)`:
50//! * Mark `c` as finalized (and recursively finalize its parents)
51//! * Broadcast `finalization(c,v)` (even if we have not verified `c`)
52//!
53//! Upon `t_l` or `t_a` firing:
54//! * Broadcast `nullify(v)`
55//! * Every `t_r` after `nullify(v)` broadcast that we are still in view `v`:
56//!    * Rebroadcast `nullify(v)` and either `notarization(v-1)` or `nullification(v-1)`
57//!
58//! _When `2f+1` votes of a given type (`notarize(c,v)`, `nullify(v)`, or `finalize(c,v)`) have been have been collected
59//! from unique participants, a certificate (`notarization(c,v)`, `nullification(v)`, or `finalization(c,v)`) can be assembled.
60//! These certificates serve as a standalone proof of consensus progress that downstream systems can ingest without executing
61//! the protocol._
62//!
63//! ### Joining Consensus
64//!
65//! As soon as `2f+1` notarizes, nullifies, or finalizes are observed for some view `v`, the `Voter` will
66//! enter `v+1`. This means that a new participant joining consensus will immediately jump ahead to the
67//! latest view and begin participating in consensus (assuming it can verify blocks).
68//!
69//! ### Deviations from Simplex Consensus
70//!
71//! * Fetch missing notarizations/nullifications as needed rather than assuming each proposal contains
72//!   a set of all notarizations/nullifications for all historical blocks.
73//! * Introduce distinct messages for `notarize` and `nullify` rather than referring to both as a `vote` for
74//!   either a "block" or a "dummy block", respectively.
75//! * Introduce a "leader timeout" to trigger early view transitions for unresponsive leaders.
76//! * Skip "leader timeout" and "notarization timeout" if a designated leader hasn't participated in
77//!   some number of views (again to trigger early view transition for an unresponsive leader).
78//! * Introduce message rebroadcast to continue making progress if messages from a given view are dropped (only way
79//!   to ensure messages are reliably delivered is with a heavyweight reliable broadcast protocol).
80//!
81//! ## Architecture
82//!
83//! All logic is split into four components: the `Batcher`, the `Voter`, the `Resolver`, and the `Application` (provided by the user).
84//! The `Batcher` is responsible for collecting messages from peers and lazily verifying them when a quorum is met. The `Voter`
85//! is responsible for directing participation in the current view. The `Resolver` is responsible for
86//! fetching artifacts from previous views required to verify proposed blocks in the latest view. Lastly, the `Application`
87//! is responsible for proposing new blocks and indicating whether some block is valid.
88//!
89//! To drive great performance, all interactions between `Batcher`, `Voter`, `Resolver`, and `Application` are
90//! non-blocking. This means that, for example, the `Voter` can continue processing messages while the
91//! `Application` verifies a proposed block or the `Resolver` fetches a notarization.
92//!
93//! ```txt
94//!                            +------------+          +++++++++++++++
95//!                            |            +--------->+             +
96//!                            |  Batcher   |          +    Peers    +
97//!                            |            |<---------+             +
98//!                            +-------+----+          +++++++++++++++
99//!                                |   ^
100//!                                |   |
101//!                                |   |
102//!                                |   |
103//!                                v   |
104//! +---------------+           +---------+            +++++++++++++++
105//! |               |<----------+         +----------->+             +
106//! |  Application  |           |  Voter  |            +    Peers    +
107//! |               +---------->|         |<-----------+             +
108//! +---------------+           +--+------+            +++++++++++++++
109//!                                |   ^
110//!                                |   |
111//!                                |   |
112//!                                |   |
113//!                                v   |
114//!                            +-------+----+          +++++++++++++++
115//!                            |            +--------->+             +
116//!                            |  Resolver  |          +    Peers    +
117//!                            |            |<---------+             +
118//!                            +------------+          +++++++++++++++
119//! ```
120//!
121//! ### Batched Verification
122//!
123//! Unlike other consensus constructions that verify all incoming messages received from peers, `simplex`
124//! lazily verifies messages (only when a quorum is met). If an invalid signature is detected, the `Batcher`
125//! will perform repeated bisections over collected messages to find the offending message (and block the
126//! peer(s) that sent it via [commonware_p2p::Blocker]).
127//!
128//! _If using a p2p implementation that is not authenticated, it is not safe to employ this optimization
129//! as any attacking peer could simply reconnect from a different address. We recommend [commonware_p2p::authenticated]._
130//!
131//! ### Fetching Missing Certificates
132//!
133//! Instead of trying to fetch all possible certificates above the floor, we only attempt to fetch
134//! nullifications for all views from the floor (last certified notarization or finalization) to the current view.
135//! This technique, however, is not sufficient to guarantee progress.
136//!
137//! Consider the case where `f` honest participants have seen a finalization for a given view `v` (and nullifications only
138//! from `v` to the current view `c`) but the remaining `f+1` honest participants have not (they have exclusively seen
139//! nullifications from some view `o < v` to `c`). Neither partition of participants will vote for the other's proposals.
140//!
141//! To ensure progress is eventually made, leaders with nullified proposals directly broadcast the best finalization
142//! certificate they are aware of to ensure all honest participants eventually consider the same proposal ancestry valid.
143//!
144//! _While a more aggressive recovery mechanism could be employed, like requiring all participants to broadcast their highest
145//! finalization certificate after nullification, it would impose significant overhead under normal network
146//! conditions (whereas the approach described incurs no overhead under normal network conditions). Recall, honest participants
147//! already broadcast observed certificates to all other participants in each view (and misaligned participants should only ever
148//! be observed following severe network degradation)._
149//!
150//! ## Pluggable Hashing and Cryptography
151//!
152//! Hashing is abstracted via the [commonware_cryptography::Hasher] trait and cryptography is abstracted via
153//! the [commonware_cryptography::certificate::Scheme] trait, allowing deployments to employ approaches that best match their
154//! requirements (or to provide their own without modifying any consensus logic). The following schemes
155//! are supported out-of-the-box:
156//!
157//! ### [scheme::ed25519]
158//!
159//! [commonware_cryptography::ed25519] signatures are ["High-speed high-security signatures"](https://eprint.iacr.org/2011/368)
160//! with 32 byte public keys and 64 byte signatures. While they are well-supported by commercial HSMs and offer efficient batch
161//! verification, the signatures are not aggregatable (and certificates grow linearly with the quorum size).
162//!
163//! ### [scheme::bls12381_multisig]
164//!
165//! [commonware_cryptography::bls12381] is a ["digital signature scheme with aggregation properties"](https://www.ietf.org/archive/id/draft-irtf-cfrg-bls-signature-05.txt).
166//! Unlike [commonware_cryptography::ed25519], signatures from multiple participants (say the signers in a certificate) can be aggregated
167//! into a single signature (reducing bandwidth usage per broadcast). That being said, [commonware_cryptography::bls12381] is much slower
168//! to verify than [commonware_cryptography::ed25519] and isn't supported by most HSMs (a standardization effort expired in 2022).
169//!
170//! ### [scheme::bls12381_threshold]
171//!
172//! Last but not least, [scheme::bls12381_threshold]  employs threshold cryptography (specifically BLS12-381 threshold signatures
173//! with a `2f+1` of `3f+1` quorum) to generate both a bias-resistant beacon (for leader election and post-facto execution randomness)
174//! and succinct consensus certificates (any certificate can be verified with just the static public key of the consensus instance) for each view
175//! with zero message overhead (natively integrated). While powerful, this scheme requires both instantiating the shared secret
176//! via [commonware_cryptography::bls12381::dkg] and performing a resharing procedure whenever participants are added or removed.
177//!
178//! #### Embedded VRF
179//!
180//! Every `notarize(c,v)` or `nullify(v)` message includes an `attestation(v)` (a partial signature over the view `v`). After `2f+1`
181//! `notarize(c,v)` or `nullify(v)` messages are collected from unique participants, `seed(v)` can be recovered. Because `attestation(v)` is
182//! only over the view `v`, the seed derived for a given view `v` is the same regardless of whether or not a block was notarized in said
183//! view `v`.
184//!
185//! Because the value of `seed(v)` cannot be known prior to message broadcast by any participant (including the leader) in view `v`
186//! and cannot be manipulated by any participant (deterministic for any `2f+1` signers at a given view `v`), it can be used both as a beacon
187//! for leader election (where `seed(v)` determines the leader for `v+1`) and a source of randomness in execution (where `seed(v)`
188//! is used as a seed in `v`).
189//!
190//! #### Succinct Certificates
191//!
192//! All broadcast consensus messages (`notarize(c,v)`, `nullify(v)`, `finalize(c,v)`) contain attestations (partial signatures) for a static
193//! public key (derived from a group polynomial that can be recomputed during reconfiguration using [dkg](commonware_cryptography::bls12381::dkg)).
194//! As soon as `2f+1` messages are collected, a threshold signature over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)`
195//! can be recovered, respectively. Because the public key is static, any of these certificates can be verified by an external
196//! process without following the consensus instance and/or tracking the current set of participants (as is typically required
197//! to operate a lite client).
198//!
199//! These threshold signatures over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)` (i.e. the consensus certificates)
200//! can be used to secure interoperability between different consensus instances and user interactions with an infrastructure provider
201//! (where any data served can be proven to derive from some finalized block of some consensus instance with a known static public key).
202//!
203//! ## Certification
204//!
205//! After a payload is notarized, the application can optionally delay or prevent finalization via the
206//! [`CertifiableAutomaton::certify`](crate::CertifiableAutomaton::certify) method. This is particularly useful for systems that employ
207//! erasure coding, where participants may want to wait until they have received enough shards to reconstruct
208//! and validate the full block before voting to finalize.
209//!
210//! If `certify` returns `false`, the participant will not broadcast a `finalize` vote for the payload.
211//! Because finalization requires `2f+1` `finalize` votes, a payload will only be finalized if a quorum
212//! of participants certify it. By default, `certify` returns `true` for all payloads, meaning finalization
213//! proceeds immediately after notarization.
214//!
215//! _The decision returned by `certify` must be deterministic and consistent across all honest participants to ensure
216//! liveness._
217//!
218//! ## Persistence
219//!
220//! The `Voter` caches all data required to participate in consensus to avoid any disk reads on
221//! on the critical path. To enable recovery, the `Voter` writes valid messages it receives from
222//! consensus and messages it generates to a write-ahead log (WAL) implemented by [commonware_storage::journal::segmented::variable::Journal].
223//! Before sending a message, the `Journal` sync is invoked to prevent inadvertent Byzantine behavior
224//! on restart (especially in the case of unclean shutdown).
225
226pub mod elector;
227pub mod scheme;
228pub mod types;
229
230cfg_if::cfg_if! {
231    if #[cfg(not(target_arch = "wasm32"))] {
232        mod actors;
233        pub mod config;
234        pub use config::Config;
235        mod engine;
236        pub use engine::Engine;
237        mod metrics;
238    }
239}
240
241#[cfg(any(test, feature = "fuzz"))]
242pub mod mocks;
243
244use crate::types::{View, ViewDelta};
245
246/// The minimum view we are tracking both in-memory and on-disk.
247pub(crate) const fn min_active(activity_timeout: ViewDelta, last_finalized: View) -> View {
248    last_finalized.saturating_sub(activity_timeout)
249}
250
251/// Whether or not a view is interesting to us. This is a function
252/// of both `min_active` and whether or not the view is too far
253/// in the future (based on the view we are currently in).
254pub(crate) fn interesting(
255    activity_timeout: ViewDelta,
256    last_finalized: View,
257    current: View,
258    pending: View,
259    allow_future: bool,
260) -> bool {
261    if pending < min_active(activity_timeout, last_finalized) {
262        return false;
263    }
264    if !allow_future && pending > current.next() {
265        return false;
266    }
267    true
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273    use crate::{
274        simplex::{
275            elector::{Config as Elector, Random, RoundRobin},
276            mocks::twins::Strategy,
277            scheme::{
278                bls12381_multisig, bls12381_threshold, bls12381_threshold::Seedable, ed25519,
279                Scheme,
280            },
281            types::{
282                Certificate, Finalization as TFinalization, Finalize as TFinalize,
283                Notarization as TNotarization, Notarize as TNotarize,
284                Nullification as TNullification, Nullify as TNullify, Proposal, Vote,
285            },
286        },
287        types::{Epoch, Round},
288        Monitor, Viewable,
289    };
290    use bytes::Bytes;
291    use commonware_codec::{Decode, DecodeExt, Encode};
292    use commonware_cryptography::{
293        bls12381::primitives::variant::{MinPk, MinSig, Variant},
294        certificate::mocks::Fixture,
295        ed25519::{PrivateKey, PublicKey},
296        sha256::{Digest as Sha256Digest, Digest as D},
297        Hasher as _, Sha256, Signer as _,
298    };
299    use commonware_macros::{select, test_group, test_traced};
300    use commonware_p2p::{
301        simulated::{Config, Link, Network, Oracle, Receiver, Sender, SplitOrigin, SplitTarget},
302        Recipients, Sender as _,
303    };
304    use commonware_runtime::{
305        buffer::PoolRef, deterministic, Clock, Metrics, Quota, Runner, Spawner,
306    };
307    use commonware_utils::{max_faults, quorum, NZUsize};
308    use engine::Engine;
309    use futures::{future::join_all, StreamExt};
310    use rand::{rngs::StdRng, Rng as _, SeedableRng as _};
311    use std::{
312        collections::{BTreeMap, HashMap},
313        num::{NonZeroU32, NonZeroUsize},
314        sync::{Arc, Mutex},
315        time::Duration,
316    };
317    use tracing::{debug, info, warn};
318    use types::Activity;
319
320    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
321    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
322    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
323
324    /// Register a validator with the oracle.
325    async fn register_validator(
326        oracle: &mut Oracle<PublicKey, deterministic::Context>,
327        validator: PublicKey,
328    ) -> (
329        (
330            Sender<PublicKey, deterministic::Context>,
331            Receiver<PublicKey>,
332        ),
333        (
334            Sender<PublicKey, deterministic::Context>,
335            Receiver<PublicKey>,
336        ),
337        (
338            Sender<PublicKey, deterministic::Context>,
339            Receiver<PublicKey>,
340        ),
341    ) {
342        let mut control = oracle.control(validator.clone());
343        let (vote_sender, vote_receiver) = control.register(0, TEST_QUOTA).await.unwrap();
344        let (certificate_sender, certificate_receiver) =
345            control.register(1, TEST_QUOTA).await.unwrap();
346        let (resolver_sender, resolver_receiver) = control.register(2, TEST_QUOTA).await.unwrap();
347        (
348            (vote_sender, vote_receiver),
349            (certificate_sender, certificate_receiver),
350            (resolver_sender, resolver_receiver),
351        )
352    }
353
354    /// Registers all validators using the oracle.
355    async fn register_validators(
356        oracle: &mut Oracle<PublicKey, deterministic::Context>,
357        validators: &[PublicKey],
358    ) -> HashMap<
359        PublicKey,
360        (
361            (
362                Sender<PublicKey, deterministic::Context>,
363                Receiver<PublicKey>,
364            ),
365            (
366                Sender<PublicKey, deterministic::Context>,
367                Receiver<PublicKey>,
368            ),
369            (
370                Sender<PublicKey, deterministic::Context>,
371                Receiver<PublicKey>,
372            ),
373        ),
374    > {
375        let mut registrations = HashMap::new();
376        for validator in validators.iter() {
377            let registration = register_validator(oracle, validator.clone()).await;
378            registrations.insert(validator.clone(), registration);
379        }
380        registrations
381    }
382
383    /// Enum to describe the action to take when linking validators.
384    enum Action {
385        Link(Link),
386        Update(Link), // Unlink and then link
387        Unlink,
388    }
389
390    /// Links (or unlinks) validators using the oracle.
391    ///
392    /// The `action` parameter determines the action (e.g. link, unlink) to take.
393    /// The `restrict_to` function can be used to restrict the linking to certain connections,
394    /// otherwise all validators will be linked to all other validators.
395    async fn link_validators(
396        oracle: &mut Oracle<PublicKey, deterministic::Context>,
397        validators: &[PublicKey],
398        action: Action,
399        restrict_to: Option<fn(usize, usize, usize) -> bool>,
400    ) {
401        for (i1, v1) in validators.iter().enumerate() {
402            for (i2, v2) in validators.iter().enumerate() {
403                // Ignore self
404                if v2 == v1 {
405                    continue;
406                }
407
408                // Restrict to certain connections
409                if let Some(f) = restrict_to {
410                    if !f(validators.len(), i1, i2) {
411                        continue;
412                    }
413                }
414
415                // Do any unlinking first
416                match action {
417                    Action::Update(_) | Action::Unlink => {
418                        oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
419                    }
420                    _ => {}
421                }
422
423                // Do any linking after
424                match action {
425                    Action::Link(ref link) | Action::Update(ref link) => {
426                        oracle
427                            .add_link(v1.clone(), v2.clone(), link.clone())
428                            .await
429                            .unwrap();
430                    }
431                    _ => {}
432                }
433            }
434        }
435    }
436
437    fn all_online<S, F, L>(mut fixture: F)
438    where
439        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
440        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
441        L: Elector<S>,
442    {
443        // Create context
444        let n = 5;
445        let quorum = quorum(n) as usize;
446        let required_containers = View::new(100);
447        let activity_timeout = ViewDelta::new(10);
448        let skip_timeout = ViewDelta::new(5);
449        let namespace = b"consensus".to_vec();
450        let executor = deterministic::Runner::timed(Duration::from_secs(300));
451        executor.start(|mut context| async move {
452            // Create simulated network
453            let (network, mut oracle) = Network::new(
454                context.with_label("network"),
455                Config {
456                    max_size: 1024 * 1024,
457                    disconnect_on_block: true,
458                    tracked_peer_sets: None,
459                },
460            );
461
462            // Start network
463            network.start();
464
465            // Register participants
466            let Fixture {
467                participants,
468                schemes,
469                ..
470            } = fixture(&mut context, n);
471            let mut registrations = register_validators(&mut oracle, &participants).await;
472
473            // Link all validators
474            let link = Link {
475                latency: Duration::from_millis(10),
476                jitter: Duration::from_millis(1),
477                success_rate: 1.0,
478            };
479            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
480
481            // Create engines
482            let elector = L::default();
483            let relay = Arc::new(mocks::relay::Relay::new());
484            let mut reporters = Vec::new();
485            let mut engine_handlers = Vec::new();
486            for (idx, validator) in participants.iter().enumerate() {
487                // Create scheme context
488                let context = context.with_label(&format!("validator_{}", *validator));
489
490                // Configure engine
491                let reporter_config = mocks::reporter::Config {
492                    namespace: namespace.clone(),
493                    participants: participants.clone().try_into().unwrap(),
494                    scheme: schemes[idx].clone(),
495                    elector: elector.clone(),
496                };
497                let reporter =
498                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
499                reporters.push(reporter.clone());
500                let application_cfg = mocks::application::Config {
501                    hasher: Sha256::default(),
502                    relay: relay.clone(),
503                    me: validator.clone(),
504                    propose_latency: (10.0, 5.0),
505                    verify_latency: (10.0, 5.0),
506                    certify_latency: (10.0, 5.0),
507                    should_certify: mocks::application::Certifier::Sometimes,
508                };
509                let (actor, application) = mocks::application::Application::new(
510                    context.with_label("application"),
511                    application_cfg,
512                );
513                actor.start();
514                let blocker = oracle.control(validator.clone());
515                let cfg = config::Config {
516                    scheme: schemes[idx].clone(),
517                    elector: elector.clone(),
518                    blocker,
519                    automaton: application.clone(),
520                    relay: application.clone(),
521                    reporter: reporter.clone(),
522                    partition: validator.to_string(),
523                    mailbox_size: 1024,
524                    epoch: Epoch::new(333),
525                    namespace: namespace.clone(),
526                    leader_timeout: Duration::from_secs(1),
527                    notarization_timeout: Duration::from_secs(2),
528                    nullify_retry: Duration::from_secs(10),
529                    fetch_timeout: Duration::from_secs(1),
530                    activity_timeout,
531                    skip_timeout,
532                    fetch_concurrent: 4,
533                    replay_buffer: NZUsize!(1024 * 1024),
534                    write_buffer: NZUsize!(1024 * 1024),
535                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
536                };
537                let engine = Engine::new(context.with_label("engine"), cfg);
538
539                // Start engine
540                let (pending, recovered, resolver) = registrations
541                    .remove(validator)
542                    .expect("validator should be registered");
543                engine_handlers.push(engine.start(pending, recovered, resolver));
544            }
545
546            // Wait for all engines to finish
547            let mut finalizers = Vec::new();
548            for reporter in reporters.iter_mut() {
549                let (mut latest, mut monitor) = reporter.subscribe().await;
550                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
551                    while latest < required_containers {
552                        latest = monitor.next().await.expect("event missing");
553                    }
554                }));
555            }
556            join_all(finalizers).await;
557
558            // Check reporters for correct activity
559            let latest_complete = required_containers.saturating_sub(activity_timeout);
560            for reporter in reporters.iter() {
561                // Ensure no faults
562                {
563                    let faults = reporter.faults.lock().unwrap();
564                    assert!(faults.is_empty());
565                }
566
567                // Ensure no invalid signatures
568                {
569                    let invalid = reporter.invalid.lock().unwrap();
570                    assert_eq!(*invalid, 0);
571                }
572
573                // Ensure certificates for all views
574                {
575                    let certified = reporter.certified.lock().unwrap();
576                    for view in View::range(View::new(1), latest_complete) {
577                        // Ensure certificate for every view
578                        if !certified.contains(&view) {
579                            panic!("view: {view}");
580                        }
581                    }
582                }
583
584                // Ensure no forks
585                let mut notarized = HashMap::new();
586                let mut finalized = HashMap::new();
587                {
588                    let notarizes = reporter.notarizes.lock().unwrap();
589                    for view in View::range(View::new(1), latest_complete) {
590                        // Ensure only one payload proposed per view
591                        let Some(payloads) = notarizes.get(&view) else {
592                            continue;
593                        };
594                        if payloads.len() > 1 {
595                            panic!("view: {view}");
596                        }
597                        let (digest, notarizers) = payloads.iter().next().unwrap();
598                        notarized.insert(view, *digest);
599
600                        if notarizers.len() < quorum {
601                            // We can't verify that everyone participated at every view because some nodes may
602                            // have started later.
603                            panic!("view: {view}");
604                        }
605                    }
606                }
607                {
608                    let notarizations = reporter.notarizations.lock().unwrap();
609                    for view in View::range(View::new(1), latest_complete) {
610                        // Ensure notarization matches digest from notarizes
611                        let Some(notarization) = notarizations.get(&view) else {
612                            continue;
613                        };
614                        let Some(digest) = notarized.get(&view) else {
615                            continue;
616                        };
617                        assert_eq!(&notarization.proposal.payload, digest);
618                    }
619                }
620                {
621                    let finalizes = reporter.finalizes.lock().unwrap();
622                    for view in View::range(View::new(1), latest_complete) {
623                        // Ensure only one payload proposed per view
624                        let Some(payloads) = finalizes.get(&view) else {
625                            continue;
626                        };
627                        if payloads.len() > 1 {
628                            panic!("view: {view}");
629                        }
630                        let (digest, finalizers) = payloads.iter().next().unwrap();
631                        finalized.insert(view, *digest);
632
633                        // Only check at views below timeout
634                        if view > latest_complete {
635                            continue;
636                        }
637
638                        // Ensure everyone participating
639                        if finalizers.len() < quorum {
640                            // We can't verify that everyone participated at every view because some nodes may
641                            // have started later.
642                            panic!("view: {view}");
643                        }
644
645                        // Ensure no nullifies for any finalizers
646                        let nullifies = reporter.nullifies.lock().unwrap();
647                        let Some(nullifies) = nullifies.get(&view) else {
648                            continue;
649                        };
650                        for (_, finalizers) in payloads.iter() {
651                            for finalizer in finalizers.iter() {
652                                if nullifies.contains(finalizer) {
653                                    panic!("should not nullify and finalize at same view");
654                                }
655                            }
656                        }
657                    }
658                }
659                {
660                    let finalizations = reporter.finalizations.lock().unwrap();
661                    for view in View::range(View::new(1), latest_complete) {
662                        // Ensure finalization matches digest from finalizes
663                        let Some(finalization) = finalizations.get(&view) else {
664                            continue;
665                        };
666                        let Some(digest) = finalized.get(&view) else {
667                            continue;
668                        };
669                        assert_eq!(&finalization.proposal.payload, digest);
670                    }
671                }
672            }
673
674            // Ensure no blocked connections
675            let blocked = oracle.blocked().await.unwrap();
676            assert!(blocked.is_empty());
677        });
678    }
679
680    #[test_traced]
681    fn test_all_online() {
682        all_online::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
683        all_online::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
684        all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
685        all_online::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
686        all_online::<_, _, RoundRobin>(ed25519::fixture);
687    }
688
689    fn observer<S, F, L>(mut fixture: F)
690    where
691        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
692        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
693        L: Elector<S>,
694    {
695        // Create context
696        let n_active = 5;
697        let required_containers = View::new(100);
698        let activity_timeout = ViewDelta::new(10);
699        let skip_timeout = ViewDelta::new(5);
700        let namespace = b"consensus".to_vec();
701        let executor = deterministic::Runner::timed(Duration::from_secs(300));
702        executor.start(|mut context| async move {
703            // Create simulated network
704            let (network, mut oracle) = Network::new(
705                context.with_label("network"),
706                Config {
707                    max_size: 1024 * 1024,
708                    disconnect_on_block: true,
709                    tracked_peer_sets: None,
710                },
711            );
712
713            // Start network
714            network.start();
715
716            // Register participants (active)
717            let Fixture {
718                participants,
719                schemes,
720                verifier,
721                ..
722            } = fixture(&mut context, n_active);
723
724            // Add observer (no share)
725            let private_key_observer = PrivateKey::from_seed(n_active as u64);
726            let public_key_observer = private_key_observer.public_key();
727
728            // Register all (including observer) with the network
729            let mut all_validators = participants.clone();
730            all_validators.push(public_key_observer.clone());
731            all_validators.sort();
732            let mut registrations = register_validators(&mut oracle, &all_validators).await;
733
734            // Link all peers (including observer)
735            let link = Link {
736                latency: Duration::from_millis(10),
737                jitter: Duration::from_millis(1),
738                success_rate: 1.0,
739            };
740            link_validators(&mut oracle, &all_validators, Action::Link(link), None).await;
741
742            // Create engines
743            let elector = L::default();
744            let relay = Arc::new(mocks::relay::Relay::new());
745            let mut reporters = Vec::new();
746
747            for (idx, validator) in participants.iter().enumerate() {
748                let is_observer = *validator == public_key_observer;
749
750                // Create scheme context
751                let context = context.with_label(&format!("validator_{}", *validator));
752
753                // Configure engine
754                let signing = if is_observer {
755                    verifier.clone()
756                } else {
757                    schemes[idx].clone()
758                };
759                let reporter_config = mocks::reporter::Config {
760                    namespace: namespace.clone(),
761                    participants: participants.clone().try_into().unwrap(),
762                    scheme: signing.clone(),
763                    elector: elector.clone(),
764                };
765                let reporter =
766                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
767                reporters.push(reporter.clone());
768                let application_cfg = mocks::application::Config {
769                    hasher: Sha256::default(),
770                    relay: relay.clone(),
771                    me: validator.clone(),
772                    propose_latency: (10.0, 5.0),
773                    verify_latency: (10.0, 5.0),
774                    certify_latency: (10.0, 5.0),
775                    should_certify: mocks::application::Certifier::Sometimes,
776                };
777                let (actor, application) = mocks::application::Application::new(
778                    context.with_label("application"),
779                    application_cfg,
780                );
781                actor.start();
782                let blocker = oracle.control(validator.clone());
783                let cfg = config::Config {
784                    scheme: signing.clone(),
785                    elector: elector.clone(),
786                    blocker,
787                    automaton: application.clone(),
788                    relay: application.clone(),
789                    reporter: reporter.clone(),
790                    partition: validator.to_string(),
791                    mailbox_size: 1024,
792                    epoch: Epoch::new(333),
793                    namespace: namespace.clone(),
794                    leader_timeout: Duration::from_secs(1),
795                    notarization_timeout: Duration::from_secs(2),
796                    nullify_retry: Duration::from_secs(10),
797                    fetch_timeout: Duration::from_secs(1),
798                    activity_timeout,
799                    skip_timeout,
800                    fetch_concurrent: 4,
801                    replay_buffer: NZUsize!(1024 * 1024),
802                    write_buffer: NZUsize!(1024 * 1024),
803                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
804                };
805                let engine = Engine::new(context.with_label("engine"), cfg);
806
807                // Start engine
808                let (pending, recovered, resolver) = registrations
809                    .remove(validator)
810                    .expect("validator should be registered");
811                engine.start(pending, recovered, resolver);
812            }
813
814            // Wait for all  engines to finish
815            let mut finalizers = Vec::new();
816            for reporter in reporters.iter_mut() {
817                let (mut latest, mut monitor) = reporter.subscribe().await;
818                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
819                    while latest < required_containers {
820                        latest = monitor.next().await.expect("event missing");
821                    }
822                }));
823            }
824            join_all(finalizers).await;
825
826            // Sanity check
827            for reporter in reporters.iter() {
828                // Ensure no faults or invalid signatures
829                {
830                    let faults = reporter.faults.lock().unwrap();
831                    assert!(faults.is_empty());
832                }
833                {
834                    let invalid = reporter.invalid.lock().unwrap();
835                    assert_eq!(*invalid, 0);
836                }
837
838                // Ensure no blocked connections
839                let blocked = oracle.blocked().await.unwrap();
840                assert!(blocked.is_empty());
841            }
842        });
843    }
844
845    #[test_traced]
846    fn test_observer() {
847        observer::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
848        observer::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
849        observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
850        observer::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
851        observer::<_, _, RoundRobin>(ed25519::fixture);
852    }
853
854    fn unclean_shutdown<S, F, L>(mut fixture: F)
855    where
856        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
857        F: FnMut(&mut StdRng, u32) -> Fixture<S>,
858        L: Elector<S>,
859    {
860        // Create context
861        let n = 5;
862        let required_containers = View::new(100);
863        let activity_timeout = ViewDelta::new(10);
864        let skip_timeout = ViewDelta::new(5);
865        let namespace = b"consensus".to_vec();
866
867        // Random restarts every x seconds
868        let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
869        let supervised = Arc::new(Mutex::new(Vec::new()));
870        let mut prev_checkpoint = None;
871
872        // Create validator keys
873        let mut rng = StdRng::seed_from_u64(0);
874        let Fixture {
875            participants,
876            schemes,
877            ..
878        } = fixture(&mut rng, n);
879
880        // Create block relay, shared across restarts.
881        let relay = Arc::new(mocks::relay::Relay::<Sha256Digest, S::PublicKey>::new());
882
883        loop {
884            let rng = rng.clone();
885            let participants = participants.clone();
886            let schemes = schemes.clone();
887            let namespace = namespace.clone();
888            let shutdowns = shutdowns.clone();
889            let supervised = supervised.clone();
890            let relay = relay.clone();
891            relay.deregister_all(); // Clear all recipients from previous restart.
892
893            let f = |mut context: deterministic::Context| async move {
894                // Create simulated network
895                let (network, mut oracle) = Network::new(
896                    context.with_label("network"),
897                    Config {
898                        max_size: 1024 * 1024,
899                        disconnect_on_block: true,
900                        tracked_peer_sets: None,
901                    },
902                );
903
904                // Start network
905                network.start();
906
907                // Register participants
908                let mut registrations = register_validators(&mut oracle, &participants).await;
909
910                // Link all validators
911                let link = Link {
912                    latency: Duration::from_millis(50),
913                    jitter: Duration::from_millis(50),
914                    success_rate: 1.0,
915                };
916                link_validators(&mut oracle, &participants, Action::Link(link), None).await;
917
918                // Create engines
919                let elector = L::default();
920                let relay = Arc::new(mocks::relay::Relay::new());
921                let mut reporters = HashMap::new();
922                let mut engine_handlers = Vec::new();
923                for (idx, validator) in participants.iter().enumerate() {
924                    // Create scheme context
925                    let context = context.with_label(&format!("validator_{}", *validator));
926
927                    // Configure engine
928                    let reporter_config = mocks::reporter::Config {
929                        namespace: namespace.clone(),
930                        participants: participants.clone().try_into().unwrap(),
931                        scheme: schemes[idx].clone(),
932                        elector: elector.clone(),
933                    };
934                    let reporter = mocks::reporter::Reporter::new(rng.clone(), reporter_config);
935                    reporters.insert(validator.clone(), reporter.clone());
936                    let application_cfg = mocks::application::Config {
937                        hasher: Sha256::default(),
938                        relay: relay.clone(),
939                        me: validator.clone(),
940                        propose_latency: (10.0, 5.0),
941                        verify_latency: (10.0, 5.0),
942                        certify_latency: (10.0, 5.0),
943                        should_certify: mocks::application::Certifier::Sometimes,
944                    };
945                    let (actor, application) = mocks::application::Application::new(
946                        context.with_label("application"),
947                        application_cfg,
948                    );
949                    actor.start();
950                    let blocker = oracle.control(validator.clone());
951                    let cfg = config::Config {
952                        scheme: schemes[idx].clone(),
953                        elector: elector.clone(),
954                        blocker,
955                        automaton: application.clone(),
956                        relay: application.clone(),
957                        reporter: reporter.clone(),
958                        partition: validator.to_string(),
959                        mailbox_size: 1024,
960                        epoch: Epoch::new(333),
961                        namespace: namespace.clone(),
962                        leader_timeout: Duration::from_secs(1),
963                        notarization_timeout: Duration::from_secs(2),
964                        nullify_retry: Duration::from_secs(10),
965                        fetch_timeout: Duration::from_secs(1),
966                        activity_timeout,
967                        skip_timeout,
968                        fetch_concurrent: 4,
969                        replay_buffer: NZUsize!(1024 * 1024),
970                        write_buffer: NZUsize!(1024 * 1024),
971                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
972                    };
973                    let engine = Engine::new(context.with_label("engine"), cfg);
974
975                    // Start engine
976                    let (pending, recovered, resolver) = registrations
977                        .remove(validator)
978                        .expect("validator should be registered");
979                    engine_handlers.push(engine.start(pending, recovered, resolver));
980                }
981
982                // Store all finalizer handles
983                let mut finalizers = Vec::new();
984                for (_, reporter) in reporters.iter_mut() {
985                    let (mut latest, mut monitor) = reporter.subscribe().await;
986                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
987                        while latest < required_containers {
988                            latest = monitor.next().await.expect("event missing");
989                        }
990                    }));
991                }
992
993                // Exit at random points for unclean shutdown of entire set
994                let wait =
995                    context.gen_range(Duration::from_millis(100)..Duration::from_millis(2_000));
996                let result = select! {
997                    _ = context.sleep(wait) => {
998                        // Collect reporters to check faults
999                        {
1000                            let mut shutdowns = shutdowns.lock().unwrap();
1001                            debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
1002                            *shutdowns += 1;
1003                        }
1004                        supervised.lock().unwrap().push(reporters);
1005                        false
1006                    },
1007                    _ = join_all(finalizers) => {
1008                        // Check reporters for faults activity
1009                        let supervised = supervised.lock().unwrap();
1010                        for reporters in supervised.iter() {
1011                            for (_, reporter) in reporters.iter() {
1012                                let faults = reporter.faults.lock().unwrap();
1013                                assert!(faults.is_empty());
1014                            }
1015                        }
1016                        true
1017                    }
1018                };
1019
1020                // Ensure no blocked connections
1021                let blocked = oracle.blocked().await.unwrap();
1022                assert!(blocked.is_empty());
1023
1024                result
1025            };
1026
1027            let (complete, checkpoint) = prev_checkpoint
1028                .map_or_else(
1029                    || deterministic::Runner::timed(Duration::from_secs(180)),
1030                    deterministic::Runner::from,
1031                )
1032                .start_and_recover(f);
1033
1034            // Check if we should exit
1035            if complete {
1036                break;
1037            }
1038
1039            prev_checkpoint = Some(checkpoint);
1040        }
1041    }
1042
1043    #[test_group("slow")]
1044    #[test_traced]
1045    fn test_unclean_shutdown() {
1046        unclean_shutdown::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
1047        unclean_shutdown::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
1048        unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1049        unclean_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1050        unclean_shutdown::<_, _, RoundRobin>(ed25519::fixture);
1051    }
1052
1053    fn backfill<S, F, L>(mut fixture: F)
1054    where
1055        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1056        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
1057        L: Elector<S>,
1058    {
1059        // Create context
1060        let n = 4;
1061        let required_containers = View::new(100);
1062        let activity_timeout = ViewDelta::new(10);
1063        let skip_timeout = ViewDelta::new(5);
1064        let namespace = b"consensus".to_vec();
1065        let executor = deterministic::Runner::timed(Duration::from_secs(240));
1066        executor.start(|mut context| async move {
1067            // Create simulated network
1068            let (network, mut oracle) = Network::new(
1069                context.with_label("network"),
1070                Config {
1071                    max_size: 1024 * 1024,
1072                    disconnect_on_block: true,
1073                    tracked_peer_sets: None,
1074                },
1075            );
1076
1077            // Start network
1078            network.start();
1079
1080            // Register participants
1081            let Fixture {
1082                participants,
1083                schemes,
1084                ..
1085            } = fixture(&mut context, n);
1086            let mut registrations = register_validators(&mut oracle, &participants).await;
1087
1088            // Link all validators except first
1089            let link = Link {
1090                latency: Duration::from_millis(10),
1091                jitter: Duration::from_millis(1),
1092                success_rate: 1.0,
1093            };
1094            link_validators(
1095                &mut oracle,
1096                &participants,
1097                Action::Link(link),
1098                Some(|_, i, j| ![i, j].contains(&0usize)),
1099            )
1100            .await;
1101
1102            // Create engines
1103            let elector = L::default();
1104            let relay = Arc::new(mocks::relay::Relay::new());
1105            let mut reporters = Vec::new();
1106            let mut engine_handlers = Vec::new();
1107            for (idx_scheme, validator) in participants.iter().enumerate() {
1108                // Skip first peer
1109                if idx_scheme == 0 {
1110                    continue;
1111                }
1112
1113                // Create scheme context
1114                let context = context.with_label(&format!("validator_{}", *validator));
1115
1116                // Configure engine
1117                let reporter_config = mocks::reporter::Config {
1118                    namespace: namespace.clone(),
1119                    participants: participants.clone().try_into().unwrap(),
1120                    scheme: schemes[idx_scheme].clone(),
1121                    elector: elector.clone(),
1122                };
1123                let reporter =
1124                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1125                reporters.push(reporter.clone());
1126                let application_cfg = mocks::application::Config {
1127                    hasher: Sha256::default(),
1128                    relay: relay.clone(),
1129                    me: validator.clone(),
1130                    propose_latency: (10.0, 5.0),
1131                    verify_latency: (10.0, 5.0),
1132                    certify_latency: (10.0, 5.0),
1133                    should_certify: mocks::application::Certifier::Sometimes,
1134                };
1135                let (actor, application) = mocks::application::Application::new(
1136                    context.with_label("application"),
1137                    application_cfg,
1138                );
1139                actor.start();
1140                let blocker = oracle.control(validator.clone());
1141                let cfg = config::Config {
1142                    scheme: schemes[idx_scheme].clone(),
1143                    elector: elector.clone(),
1144                    blocker,
1145                    automaton: application.clone(),
1146                    relay: application.clone(),
1147                    reporter: reporter.clone(),
1148                    partition: validator.to_string(),
1149                    mailbox_size: 1024,
1150                    epoch: Epoch::new(333),
1151                    namespace: namespace.clone(),
1152                    leader_timeout: Duration::from_secs(1),
1153                    notarization_timeout: Duration::from_secs(2),
1154                    nullify_retry: Duration::from_secs(10),
1155                    fetch_timeout: Duration::from_secs(1),
1156                    activity_timeout,
1157                    skip_timeout,
1158                    fetch_concurrent: 4,
1159                    replay_buffer: NZUsize!(1024 * 1024),
1160                    write_buffer: NZUsize!(1024 * 1024),
1161                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1162                };
1163                let engine = Engine::new(context.with_label("engine"), cfg);
1164
1165                // Start engine
1166                let (pending, recovered, resolver) = registrations
1167                    .remove(validator)
1168                    .expect("validator should be registered");
1169                engine_handlers.push(engine.start(pending, recovered, resolver));
1170            }
1171
1172            // Wait for all engines to finish
1173            let mut finalizers = Vec::new();
1174            for reporter in reporters.iter_mut() {
1175                let (mut latest, mut monitor) = reporter.subscribe().await;
1176                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1177                    while latest < required_containers {
1178                        latest = monitor.next().await.expect("event missing");
1179                    }
1180                }));
1181            }
1182            join_all(finalizers).await;
1183
1184            // Degrade network connections for online peers
1185            let link = Link {
1186                latency: Duration::from_secs(3),
1187                jitter: Duration::from_millis(0),
1188                success_rate: 1.0,
1189            };
1190            link_validators(
1191                &mut oracle,
1192                &participants,
1193                Action::Update(link.clone()),
1194                Some(|_, i, j| ![i, j].contains(&0usize)),
1195            )
1196            .await;
1197
1198            // Wait for nullifications to accrue
1199            context.sleep(Duration::from_secs(60)).await;
1200
1201            // Unlink second peer from all (except first)
1202            link_validators(
1203                &mut oracle,
1204                &participants,
1205                Action::Unlink,
1206                Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
1207            )
1208            .await;
1209
1210            // Configure engine for first peer
1211            let me = participants[0].clone();
1212            let context = context.with_label(&format!("validator_{me}"));
1213
1214            // Link first peer to all (except second)
1215            link_validators(
1216                &mut oracle,
1217                &participants,
1218                Action::Link(link),
1219                Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
1220            )
1221            .await;
1222
1223            // Restore network connections for all online peers
1224            let link = Link {
1225                latency: Duration::from_millis(10),
1226                jitter: Duration::from_millis(3),
1227                success_rate: 1.0,
1228            };
1229            link_validators(
1230                &mut oracle,
1231                &participants,
1232                Action::Update(link),
1233                Some(|_, i, j| ![i, j].contains(&1usize)),
1234            )
1235            .await;
1236
1237            // Configure engine
1238            let reporter_config = mocks::reporter::Config {
1239                namespace: namespace.clone(),
1240                participants: participants.clone().try_into().unwrap(),
1241                scheme: schemes[0].clone(),
1242                elector: elector.clone(),
1243            };
1244            let mut reporter =
1245                mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1246            reporters.push(reporter.clone());
1247            let application_cfg = mocks::application::Config {
1248                hasher: Sha256::default(),
1249                relay: relay.clone(),
1250                me: me.clone(),
1251                propose_latency: (10.0, 5.0),
1252                verify_latency: (10.0, 5.0),
1253                certify_latency: (10.0, 5.0),
1254                should_certify: mocks::application::Certifier::Sometimes,
1255            };
1256            let (actor, application) = mocks::application::Application::new(
1257                context.with_label("application"),
1258                application_cfg,
1259            );
1260            actor.start();
1261            let blocker = oracle.control(me.clone());
1262            let cfg = config::Config {
1263                scheme: schemes[0].clone(),
1264                elector: elector.clone(),
1265                blocker,
1266                automaton: application.clone(),
1267                relay: application.clone(),
1268                reporter: reporter.clone(),
1269                partition: me.to_string(),
1270                mailbox_size: 1024,
1271                epoch: Epoch::new(333),
1272                namespace: namespace.clone(),
1273                leader_timeout: Duration::from_secs(1),
1274                notarization_timeout: Duration::from_secs(2),
1275                nullify_retry: Duration::from_secs(10),
1276                fetch_timeout: Duration::from_secs(1),
1277                activity_timeout,
1278                skip_timeout,
1279                fetch_concurrent: 4,
1280                replay_buffer: NZUsize!(1024 * 1024),
1281                write_buffer: NZUsize!(1024 * 1024),
1282                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1283            };
1284            let engine = Engine::new(context.with_label("engine"), cfg);
1285
1286            // Start engine
1287            let (pending, recovered, resolver) = registrations
1288                .remove(&me)
1289                .expect("validator should be registered");
1290            engine_handlers.push(engine.start(pending, recovered, resolver));
1291
1292            // Wait for new engine to finalize required
1293            let (mut latest, mut monitor) = reporter.subscribe().await;
1294            while latest < required_containers {
1295                latest = monitor.next().await.expect("event missing");
1296            }
1297
1298            // Ensure no blocked connections
1299            let blocked = oracle.blocked().await.unwrap();
1300            assert!(blocked.is_empty());
1301        });
1302    }
1303
1304    #[test_traced]
1305    fn test_backfill() {
1306        backfill::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
1307        backfill::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
1308        backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1309        backfill::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1310        backfill::<_, _, RoundRobin>(ed25519::fixture);
1311    }
1312
1313    fn one_offline<S, F, L>(mut fixture: F)
1314    where
1315        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1316        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
1317        L: Elector<S>,
1318    {
1319        // Create context
1320        let n = 5;
1321        let quorum = quorum(n) as usize;
1322        let required_containers = View::new(100);
1323        let activity_timeout = ViewDelta::new(10);
1324        let skip_timeout = ViewDelta::new(5);
1325        let max_exceptions = 10;
1326        let namespace = b"consensus".to_vec();
1327        let executor = deterministic::Runner::timed(Duration::from_secs(300));
1328        executor.start(|mut context| async move {
1329            // Create simulated network
1330            let (network, mut oracle) = Network::new(
1331                context.with_label("network"),
1332                Config {
1333                    max_size: 1024 * 1024,
1334                    disconnect_on_block: true,
1335                    tracked_peer_sets: None,
1336                },
1337            );
1338
1339            // Start network
1340            network.start();
1341
1342            // Register participants
1343            let Fixture {
1344                participants,
1345                schemes,
1346                ..
1347            } = fixture(&mut context, n);
1348            let mut registrations = register_validators(&mut oracle, &participants).await;
1349
1350            // Link all validators except first
1351            let link = Link {
1352                latency: Duration::from_millis(10),
1353                jitter: Duration::from_millis(1),
1354                success_rate: 1.0,
1355            };
1356            link_validators(
1357                &mut oracle,
1358                &participants,
1359                Action::Link(link),
1360                Some(|_, i, j| ![i, j].contains(&0usize)),
1361            )
1362            .await;
1363
1364            // Create engines
1365            let elector = L::default();
1366            let relay = Arc::new(mocks::relay::Relay::new());
1367            let mut reporters = Vec::new();
1368            let mut engine_handlers = Vec::new();
1369            for (idx_scheme, validator) in participants.iter().enumerate() {
1370                // Skip first peer
1371                if idx_scheme == 0 {
1372                    continue;
1373                }
1374
1375                // Create scheme context
1376                let context = context.with_label(&format!("validator_{}", *validator));
1377
1378                // Configure engine
1379                let reporter_config = mocks::reporter::Config {
1380                    namespace: namespace.clone(),
1381                    participants: participants.clone().try_into().unwrap(),
1382                    scheme: schemes[idx_scheme].clone(),
1383                    elector: elector.clone(),
1384                };
1385                let reporter =
1386                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1387                reporters.push(reporter.clone());
1388                let application_cfg = mocks::application::Config {
1389                    hasher: Sha256::default(),
1390                    relay: relay.clone(),
1391                    me: validator.clone(),
1392                    propose_latency: (10.0, 5.0),
1393                    verify_latency: (10.0, 5.0),
1394                    certify_latency: (10.0, 5.0),
1395                    should_certify: mocks::application::Certifier::Sometimes,
1396                };
1397                let (actor, application) = mocks::application::Application::new(
1398                    context.with_label("application"),
1399                    application_cfg,
1400                );
1401                actor.start();
1402                let blocker = oracle.control(validator.clone());
1403                let cfg = config::Config {
1404                    scheme: schemes[idx_scheme].clone(),
1405                    elector: elector.clone(),
1406                    blocker,
1407                    automaton: application.clone(),
1408                    relay: application.clone(),
1409                    reporter: reporter.clone(),
1410                    partition: validator.to_string(),
1411                    mailbox_size: 1024,
1412                    epoch: Epoch::new(333),
1413                    namespace: namespace.clone(),
1414                    leader_timeout: Duration::from_secs(1),
1415                    notarization_timeout: Duration::from_secs(2),
1416                    nullify_retry: Duration::from_secs(10),
1417                    fetch_timeout: Duration::from_secs(1),
1418                    activity_timeout,
1419                    skip_timeout,
1420                    fetch_concurrent: 4,
1421                    replay_buffer: NZUsize!(1024 * 1024),
1422                    write_buffer: NZUsize!(1024 * 1024),
1423                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1424                };
1425                let engine = Engine::new(context.with_label("engine"), cfg);
1426
1427                // Start engine
1428                let (pending, recovered, resolver) = registrations
1429                    .remove(validator)
1430                    .expect("validator should be registered");
1431                engine_handlers.push(engine.start(pending, recovered, resolver));
1432            }
1433
1434            // Wait for all engines to finish
1435            let mut finalizers = Vec::new();
1436            for reporter in reporters.iter_mut() {
1437                let (mut latest, mut monitor) = reporter.subscribe().await;
1438                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1439                    while latest < required_containers {
1440                        latest = monitor.next().await.expect("event missing");
1441                    }
1442                }));
1443            }
1444            join_all(finalizers).await;
1445
1446            // Check reporters for correct activity
1447            let exceptions = 0;
1448            let offline = &participants[0];
1449            for reporter in reporters.iter() {
1450                // Ensure no faults
1451                {
1452                    let faults = reporter.faults.lock().unwrap();
1453                    assert!(faults.is_empty());
1454                }
1455
1456                // Ensure no invalid signatures
1457                {
1458                    let invalid = reporter.invalid.lock().unwrap();
1459                    assert_eq!(*invalid, 0);
1460                }
1461
1462                // Ensure offline node is never active
1463                let mut exceptions = 0;
1464                {
1465                    let notarizes = reporter.notarizes.lock().unwrap();
1466                    for (view, payloads) in notarizes.iter() {
1467                        for (_, participants) in payloads.iter() {
1468                            if participants.contains(offline) {
1469                                panic!("view: {view}");
1470                            }
1471                        }
1472                    }
1473                }
1474                {
1475                    let nullifies = reporter.nullifies.lock().unwrap();
1476                    for (view, participants) in nullifies.iter() {
1477                        if participants.contains(offline) {
1478                            panic!("view: {view}");
1479                        }
1480                    }
1481                }
1482                {
1483                    let finalizes = reporter.finalizes.lock().unwrap();
1484                    for (view, payloads) in finalizes.iter() {
1485                        for (_, finalizers) in payloads.iter() {
1486                            if finalizers.contains(offline) {
1487                                panic!("view: {view}");
1488                            }
1489                        }
1490                    }
1491                }
1492
1493                // Identify offline views
1494                let mut offline_views = Vec::new();
1495                {
1496                    let leaders = reporter.leaders.lock().unwrap();
1497                    for (view, leader) in leaders.iter() {
1498                        if leader == offline {
1499                            offline_views.push(*view);
1500                        }
1501                    }
1502                }
1503                assert!(!offline_views.is_empty());
1504
1505                // Ensure nullifies/nullification collected for offline node
1506                {
1507                    let nullifies = reporter.nullifies.lock().unwrap();
1508                    for view in offline_views.iter() {
1509                        let nullifies = nullifies.get(view).map_or(0, |n| n.len());
1510                        if nullifies < quorum {
1511                            warn!("missing expected view nullifies: {}", view);
1512                            exceptions += 1;
1513                        }
1514                    }
1515                }
1516                {
1517                    let nullifications = reporter.nullifications.lock().unwrap();
1518                    for view in offline_views.iter() {
1519                        if !nullifications.contains_key(view) {
1520                            warn!("missing expected view nullifies: {}", view);
1521                            exceptions += 1;
1522                        }
1523                    }
1524                }
1525
1526                // Ensure exceptions within allowed
1527                assert!(exceptions <= max_exceptions);
1528            }
1529            assert!(exceptions <= max_exceptions);
1530
1531            // Ensure no blocked connections
1532            let blocked = oracle.blocked().await.unwrap();
1533            assert!(blocked.is_empty());
1534
1535            // Ensure we are skipping views
1536            let encoded = context.encode();
1537            let lines = encoded.lines();
1538            let mut skipped_views = 0;
1539            let mut nodes_skipping = 0;
1540            for line in lines {
1541                if line.contains("_skipped_views_total") {
1542                    let parts: Vec<&str> = line.split_whitespace().collect();
1543                    if let Some(number_str) = parts.last() {
1544                        if let Ok(number) = number_str.parse::<u64>() {
1545                            if number > 0 {
1546                                nodes_skipping += 1;
1547                            }
1548                            if number > skipped_views {
1549                                skipped_views = number;
1550                            }
1551                        }
1552                    }
1553                }
1554            }
1555            assert!(
1556                skipped_views > 0,
1557                "expected skipped views to be greater than 0"
1558            );
1559            assert_eq!(
1560                nodes_skipping,
1561                n - 1,
1562                "expected all online nodes to be skipping views"
1563            );
1564        });
1565    }
1566
1567    #[test_traced]
1568    fn test_one_offline() {
1569        one_offline::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
1570        one_offline::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
1571        one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1572        one_offline::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1573        one_offline::<_, _, RoundRobin>(ed25519::fixture);
1574    }
1575
1576    fn slow_validator<S, F, L>(mut fixture: F)
1577    where
1578        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1579        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
1580        L: Elector<S>,
1581    {
1582        // Create context
1583        let n = 5;
1584        let required_containers = View::new(50);
1585        let activity_timeout = ViewDelta::new(10);
1586        let skip_timeout = ViewDelta::new(5);
1587        let namespace = b"consensus".to_vec();
1588        let executor = deterministic::Runner::timed(Duration::from_secs(300));
1589        executor.start(|mut context| async move {
1590            // Create simulated network
1591            let (network, mut oracle) = Network::new(
1592                context.with_label("network"),
1593                Config {
1594                    max_size: 1024 * 1024,
1595                    disconnect_on_block: true,
1596                    tracked_peer_sets: None,
1597                },
1598            );
1599
1600            // Start network
1601            network.start();
1602
1603            // Register participants
1604            let Fixture {
1605                participants,
1606                schemes,
1607                ..
1608            } = fixture(&mut context, n);
1609            let mut registrations = register_validators(&mut oracle, &participants).await;
1610
1611            // Link all validators
1612            let link = Link {
1613                latency: Duration::from_millis(10),
1614                jitter: Duration::from_millis(1),
1615                success_rate: 1.0,
1616            };
1617            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1618
1619            // Create engines
1620            let elector = L::default();
1621            let relay = Arc::new(mocks::relay::Relay::new());
1622            let mut reporters = Vec::new();
1623            let mut engine_handlers = Vec::new();
1624            for (idx_scheme, validator) in participants.iter().enumerate() {
1625                // Create scheme context
1626                let context = context.with_label(&format!("validator_{}", *validator));
1627
1628                // Configure engine
1629                let reporter_config = mocks::reporter::Config {
1630                    namespace: namespace.clone(),
1631                    participants: participants.clone().try_into().unwrap(),
1632                    scheme: schemes[idx_scheme].clone(),
1633                    elector: elector.clone(),
1634                };
1635                let reporter =
1636                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1637                reporters.push(reporter.clone());
1638                let application_cfg = if idx_scheme == 0 {
1639                    mocks::application::Config {
1640                        hasher: Sha256::default(),
1641                        relay: relay.clone(),
1642                        me: validator.clone(),
1643                        propose_latency: (10_000.0, 0.0),
1644                        verify_latency: (10_000.0, 5.0),
1645                        certify_latency: (10_000.0, 5.0),
1646                        should_certify: mocks::application::Certifier::Sometimes,
1647                    }
1648                } else {
1649                    mocks::application::Config {
1650                        hasher: Sha256::default(),
1651                        relay: relay.clone(),
1652                        me: validator.clone(),
1653                        propose_latency: (10.0, 5.0),
1654                        verify_latency: (10.0, 5.0),
1655                        certify_latency: (10.0, 5.0),
1656                        should_certify: mocks::application::Certifier::Sometimes,
1657                    }
1658                };
1659                let (actor, application) = mocks::application::Application::new(
1660                    context.with_label("application"),
1661                    application_cfg,
1662                );
1663                actor.start();
1664                let blocker = oracle.control(validator.clone());
1665                let cfg = config::Config {
1666                    scheme: schemes[idx_scheme].clone(),
1667                    elector: elector.clone(),
1668                    blocker,
1669                    automaton: application.clone(),
1670                    relay: application.clone(),
1671                    reporter: reporter.clone(),
1672                    partition: validator.to_string(),
1673                    mailbox_size: 1024,
1674                    epoch: Epoch::new(333),
1675                    namespace: namespace.clone(),
1676                    leader_timeout: Duration::from_secs(1),
1677                    notarization_timeout: Duration::from_secs(2),
1678                    nullify_retry: Duration::from_secs(10),
1679                    fetch_timeout: Duration::from_secs(1),
1680                    activity_timeout,
1681                    skip_timeout,
1682                    fetch_concurrent: 4,
1683                    replay_buffer: NZUsize!(1024 * 1024),
1684                    write_buffer: NZUsize!(1024 * 1024),
1685                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1686                };
1687                let engine = Engine::new(context.with_label("engine"), cfg);
1688
1689                // Start engine
1690                let (pending, recovered, resolver) = registrations
1691                    .remove(validator)
1692                    .expect("validator should be registered");
1693                engine_handlers.push(engine.start(pending, recovered, resolver));
1694            }
1695
1696            // Wait for all engines to finish
1697            let mut finalizers = Vec::new();
1698            for reporter in reporters.iter_mut() {
1699                let (mut latest, mut monitor) = reporter.subscribe().await;
1700                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1701                    while latest < required_containers {
1702                        latest = monitor.next().await.expect("event missing");
1703                    }
1704                }));
1705            }
1706            join_all(finalizers).await;
1707
1708            // Check reporters for correct activity
1709            let slow = &participants[0];
1710            for reporter in reporters.iter() {
1711                // Ensure no faults
1712                {
1713                    let faults = reporter.faults.lock().unwrap();
1714                    assert!(faults.is_empty());
1715                }
1716
1717                // Ensure no invalid signatures
1718                {
1719                    let invalid = reporter.invalid.lock().unwrap();
1720                    assert_eq!(*invalid, 0);
1721                }
1722
1723                // Ensure slow node still emits notarizes and finalizes (when receiving certificates)
1724                let mut observed = false;
1725                {
1726                    let notarizes = reporter.notarizes.lock().unwrap();
1727                    for (_, payloads) in notarizes.iter() {
1728                        for (_, participants) in payloads.iter() {
1729                            if participants.contains(slow) {
1730                                observed = true;
1731                                break;
1732                            }
1733                        }
1734                    }
1735                }
1736                {
1737                    let finalizes = reporter.finalizes.lock().unwrap();
1738                    for (_, payloads) in finalizes.iter() {
1739                        for (_, finalizers) in payloads.iter() {
1740                            if finalizers.contains(slow) {
1741                                observed = true;
1742                                break;
1743                            }
1744                        }
1745                    }
1746                }
1747                assert!(observed);
1748            }
1749
1750            // Ensure no blocked connections
1751            let blocked = oracle.blocked().await.unwrap();
1752            assert!(blocked.is_empty());
1753        });
1754    }
1755
1756    #[test_traced]
1757    fn test_slow_validator() {
1758        slow_validator::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
1759        slow_validator::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
1760        slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1761        slow_validator::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1762        slow_validator::<_, _, RoundRobin>(ed25519::fixture);
1763    }
1764
1765    fn all_recovery<S, F, L>(mut fixture: F)
1766    where
1767        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1768        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
1769        L: Elector<S>,
1770    {
1771        // Create context
1772        let n = 5;
1773        let required_containers = View::new(100);
1774        let activity_timeout = ViewDelta::new(10);
1775        let skip_timeout = ViewDelta::new(2);
1776        let namespace = b"consensus".to_vec();
1777        let executor = deterministic::Runner::timed(Duration::from_secs(1800));
1778        executor.start(|mut context| async move {
1779            // Create simulated network
1780            let (network, mut oracle) = Network::new(
1781                context.with_label("network"),
1782                Config {
1783                    max_size: 1024 * 1024,
1784                    disconnect_on_block: false,
1785                    tracked_peer_sets: None,
1786                },
1787            );
1788
1789            // Start network
1790            network.start();
1791
1792            // Register participants
1793            let Fixture {
1794                participants,
1795                schemes,
1796                ..
1797            } = fixture(&mut context, n);
1798            let mut registrations = register_validators(&mut oracle, &participants).await;
1799
1800            // Link all validators
1801            let link = Link {
1802                latency: Duration::from_secs(3),
1803                jitter: Duration::from_millis(0),
1804                success_rate: 1.0,
1805            };
1806            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1807
1808            // Create engines
1809            let elector = L::default();
1810            let relay = Arc::new(mocks::relay::Relay::new());
1811            let mut reporters = Vec::new();
1812            let mut engine_handlers = Vec::new();
1813            for (idx, validator) in participants.iter().enumerate() {
1814                // Create scheme context
1815                let context = context.with_label(&format!("validator_{}", *validator));
1816
1817                // Configure engine
1818                let reporter_config = mocks::reporter::Config {
1819                    namespace: namespace.clone(),
1820                    participants: participants.clone().try_into().unwrap(),
1821                    scheme: schemes[idx].clone(),
1822                    elector: elector.clone(),
1823                };
1824                let reporter =
1825                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1826                reporters.push(reporter.clone());
1827                let application_cfg = mocks::application::Config {
1828                    hasher: Sha256::default(),
1829                    relay: relay.clone(),
1830                    me: validator.clone(),
1831                    propose_latency: (10.0, 5.0),
1832                    verify_latency: (10.0, 5.0),
1833                    certify_latency: (10.0, 5.0),
1834                    should_certify: mocks::application::Certifier::Sometimes,
1835                };
1836                let (actor, application) = mocks::application::Application::new(
1837                    context.with_label("application"),
1838                    application_cfg,
1839                );
1840                actor.start();
1841                let blocker = oracle.control(validator.clone());
1842                let cfg = config::Config {
1843                    scheme: schemes[idx].clone(),
1844                    elector: elector.clone(),
1845                    blocker,
1846                    automaton: application.clone(),
1847                    relay: application.clone(),
1848                    reporter: reporter.clone(),
1849                    partition: validator.to_string(),
1850                    mailbox_size: 1024,
1851                    epoch: Epoch::new(333),
1852                    namespace: namespace.clone(),
1853                    leader_timeout: Duration::from_secs(1),
1854                    notarization_timeout: Duration::from_secs(2),
1855                    nullify_retry: Duration::from_secs(10),
1856                    fetch_timeout: Duration::from_secs(1),
1857                    activity_timeout,
1858                    skip_timeout,
1859                    fetch_concurrent: 4,
1860                    replay_buffer: NZUsize!(1024 * 1024),
1861                    write_buffer: NZUsize!(1024 * 1024),
1862                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1863                };
1864                let engine = Engine::new(context.with_label("engine"), cfg);
1865
1866                // Start engine
1867                let (pending, recovered, resolver) = registrations
1868                    .remove(validator)
1869                    .expect("validator should be registered");
1870                engine_handlers.push(engine.start(pending, recovered, resolver));
1871            }
1872
1873            // Wait for a few virtual minutes (shouldn't finalize anything)
1874            let mut finalizers = Vec::new();
1875            for reporter in reporters.iter_mut() {
1876                let (_, mut monitor) = reporter.subscribe().await;
1877                finalizers.push(
1878                    context
1879                        .with_label("finalizer")
1880                        .spawn(move |context| async move {
1881                            select! {
1882                                _timeout = context.sleep(Duration::from_secs(60)) => {},
1883                                _done = monitor.next() => {
1884                                    panic!("engine should not notarize or finalize anything");
1885                                }
1886                            }
1887                        }),
1888                );
1889            }
1890            join_all(finalizers).await;
1891
1892            // Unlink all validators to get latest view
1893            link_validators(&mut oracle, &participants, Action::Unlink, None).await;
1894
1895            // Wait for a virtual minute (nothing should happen)
1896            context.sleep(Duration::from_secs(60)).await;
1897
1898            // Get latest view
1899            let mut latest = View::zero();
1900            for reporter in reporters.iter() {
1901                let nullifies = reporter.nullifies.lock().unwrap();
1902                let max = nullifies.keys().max().unwrap();
1903                if *max > latest {
1904                    latest = *max;
1905                }
1906            }
1907
1908            // Update links
1909            let link = Link {
1910                latency: Duration::from_millis(10),
1911                jitter: Duration::from_millis(1),
1912                success_rate: 1.0,
1913            };
1914            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1915
1916            // Wait for all engines to finish
1917            let mut finalizers = Vec::new();
1918            for reporter in reporters.iter_mut() {
1919                let (mut latest, mut monitor) = reporter.subscribe().await;
1920                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1921                    while latest < required_containers {
1922                        latest = monitor.next().await.expect("event missing");
1923                    }
1924                }));
1925            }
1926            join_all(finalizers).await;
1927
1928            // Check reporters for correct activity
1929            for reporter in reporters.iter() {
1930                // Ensure no faults
1931                {
1932                    let faults = reporter.faults.lock().unwrap();
1933                    assert!(faults.is_empty());
1934                }
1935
1936                // Ensure no invalid signatures
1937                {
1938                    let invalid = reporter.invalid.lock().unwrap();
1939                    assert_eq!(*invalid, 0);
1940                }
1941
1942                // Ensure quick recovery.
1943                //
1944                // If the skip timeout isn't implemented correctly, we may go many views before participants
1945                // start to notarize a validator's proposal.
1946                {
1947                    // Ensure nearly all views around latest are notarized.
1948                    // We don't check for finalization since some of the blocks may fail to be
1949                    // certified for the purposes of testing.
1950                    let mut found = 0;
1951                    let notarizations = reporter.notarizations.lock().unwrap();
1952                    for view in View::range(latest, latest.saturating_add(activity_timeout)) {
1953                        if notarizations.contains_key(&view) {
1954                            found += 1;
1955                        }
1956                    }
1957                    assert!(
1958                        found >= activity_timeout.get().saturating_sub(2),
1959                        "found: {found}"
1960                    );
1961                }
1962            }
1963
1964            // Ensure no blocked connections
1965            let blocked = oracle.blocked().await.unwrap();
1966            assert!(blocked.is_empty());
1967        });
1968    }
1969
1970    #[test_traced]
1971    fn test_all_recovery() {
1972        all_recovery::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
1973        all_recovery::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
1974        all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
1975        all_recovery::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
1976        all_recovery::<_, _, RoundRobin>(ed25519::fixture);
1977    }
1978
1979    fn partition<S, F, L>(mut fixture: F)
1980    where
1981        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
1982        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
1983        L: Elector<S>,
1984    {
1985        // Create context
1986        let n = 10;
1987        let required_containers = View::new(50);
1988        let activity_timeout = ViewDelta::new(10);
1989        let skip_timeout = ViewDelta::new(5);
1990        let namespace = b"consensus".to_vec();
1991        let executor = deterministic::Runner::timed(Duration::from_secs(900));
1992        executor.start(|mut context| async move {
1993            // Create simulated network
1994            let (network, mut oracle) = Network::new(
1995                context.with_label("network"),
1996                Config {
1997                    max_size: 1024 * 1024,
1998                    disconnect_on_block: false,
1999                    tracked_peer_sets: None,
2000                },
2001            );
2002
2003            // Start network
2004            network.start();
2005
2006            // Register participants
2007            let Fixture {
2008                participants,
2009                schemes,
2010                ..
2011            } = fixture(&mut context, n);
2012            let mut registrations = register_validators(&mut oracle, &participants).await;
2013
2014            // Link all validators
2015            let link = Link {
2016                latency: Duration::from_millis(10),
2017                jitter: Duration::from_millis(1),
2018                success_rate: 1.0,
2019            };
2020            link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
2021
2022            // Create engines
2023            let elector = L::default();
2024            let relay = Arc::new(mocks::relay::Relay::new());
2025            let mut reporters = Vec::new();
2026            let mut engine_handlers = Vec::new();
2027            for (idx, validator) in participants.iter().enumerate() {
2028                // Create scheme context
2029                let context = context.with_label(&format!("validator_{}", *validator));
2030
2031                // Configure engine
2032                let reporter_config = mocks::reporter::Config {
2033                    namespace: namespace.clone(),
2034                    participants: participants.clone().try_into().unwrap(),
2035                    scheme: schemes[idx].clone(),
2036                    elector: elector.clone(),
2037                };
2038                let reporter =
2039                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2040                reporters.push(reporter.clone());
2041                let application_cfg = mocks::application::Config {
2042                    hasher: Sha256::default(),
2043                    relay: relay.clone(),
2044                    me: validator.clone(),
2045                    propose_latency: (10.0, 5.0),
2046                    verify_latency: (10.0, 5.0),
2047                    certify_latency: (10.0, 5.0),
2048                    should_certify: mocks::application::Certifier::Sometimes,
2049                };
2050                let (actor, application) = mocks::application::Application::new(
2051                    context.with_label("application"),
2052                    application_cfg,
2053                );
2054                actor.start();
2055                let blocker = oracle.control(validator.clone());
2056                let cfg = config::Config {
2057                    scheme: schemes[idx].clone(),
2058                    elector: elector.clone(),
2059                    blocker,
2060                    automaton: application.clone(),
2061                    relay: application.clone(),
2062                    reporter: reporter.clone(),
2063                    partition: validator.to_string(),
2064                    mailbox_size: 1024,
2065                    epoch: Epoch::new(333),
2066                    namespace: namespace.clone(),
2067                    leader_timeout: Duration::from_secs(1),
2068                    notarization_timeout: Duration::from_secs(2),
2069                    nullify_retry: Duration::from_secs(10),
2070                    fetch_timeout: Duration::from_secs(1),
2071                    activity_timeout,
2072                    skip_timeout,
2073                    fetch_concurrent: 4,
2074                    replay_buffer: NZUsize!(1024 * 1024),
2075                    write_buffer: NZUsize!(1024 * 1024),
2076                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2077                };
2078                let engine = Engine::new(context.with_label("engine"), cfg);
2079
2080                // Start engine
2081                let (pending, recovered, resolver) = registrations
2082                    .remove(validator)
2083                    .expect("validator should be registered");
2084                engine_handlers.push(engine.start(pending, recovered, resolver));
2085            }
2086
2087            // Wait for all engines to finish
2088            let mut finalizers = Vec::new();
2089            for reporter in reporters.iter_mut() {
2090                let (mut latest, mut monitor) = reporter.subscribe().await;
2091                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2092                    while latest < required_containers {
2093                        latest = monitor.next().await.expect("event missing");
2094                    }
2095                }));
2096            }
2097            join_all(finalizers).await;
2098
2099            // Cut all links between validator halves
2100            fn separated(n: usize, a: usize, b: usize) -> bool {
2101                let m = n / 2;
2102                (a < m && b >= m) || (a >= m && b < m)
2103            }
2104            link_validators(&mut oracle, &participants, Action::Unlink, Some(separated)).await;
2105
2106            // Wait for any in-progress notarizations/finalizations to finish
2107            context.sleep(Duration::from_secs(10)).await;
2108
2109            // Wait for a few virtual minutes (shouldn't finalize anything)
2110            let mut finalizers = Vec::new();
2111            for reporter in reporters.iter_mut() {
2112                let (_, mut monitor) = reporter.subscribe().await;
2113                finalizers.push(
2114                    context
2115                        .with_label("finalizer")
2116                        .spawn(move |context| async move {
2117                            select! {
2118                                _timeout = context.sleep(Duration::from_secs(60)) => {},
2119                                _done = monitor.next() => {
2120                                    panic!("engine should not notarize or finalize anything");
2121                                }
2122                            }
2123                        }),
2124                );
2125            }
2126            join_all(finalizers).await;
2127
2128            // Restore links
2129            link_validators(
2130                &mut oracle,
2131                &participants,
2132                Action::Link(link),
2133                Some(separated),
2134            )
2135            .await;
2136
2137            // Wait for all engines to finish
2138            let mut finalizers = Vec::new();
2139            for reporter in reporters.iter_mut() {
2140                let (mut latest, mut monitor) = reporter.subscribe().await;
2141                let required = latest.saturating_add(ViewDelta::new(required_containers.get()));
2142                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2143                    while latest < required {
2144                        latest = monitor.next().await.expect("event missing");
2145                    }
2146                }));
2147            }
2148            join_all(finalizers).await;
2149
2150            // Check reporters for correct activity
2151            for reporter in reporters.iter() {
2152                // Ensure no faults
2153                {
2154                    let faults = reporter.faults.lock().unwrap();
2155                    assert!(faults.is_empty());
2156                }
2157
2158                // Ensure no invalid signatures
2159                {
2160                    let invalid = reporter.invalid.lock().unwrap();
2161                    assert_eq!(*invalid, 0);
2162                }
2163            }
2164
2165            // Ensure no blocked connections
2166            let blocked = oracle.blocked().await.unwrap();
2167            assert!(blocked.is_empty());
2168        });
2169    }
2170
2171    #[test_group("slow")]
2172    #[test_traced]
2173    fn test_partition() {
2174        partition::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
2175        partition::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
2176        partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
2177        partition::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
2178        partition::<_, _, RoundRobin>(ed25519::fixture);
2179    }
2180
2181    fn slow_and_lossy_links<S, F, L>(seed: u64, mut fixture: F) -> String
2182    where
2183        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2184        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2185        L: Elector<S>,
2186    {
2187        // Create context
2188        let n = 5;
2189        let required_containers = View::new(50);
2190        let activity_timeout = ViewDelta::new(10);
2191        let skip_timeout = ViewDelta::new(5);
2192        let namespace = b"consensus".to_vec();
2193        let cfg = deterministic::Config::new()
2194            .with_seed(seed)
2195            .with_timeout(Some(Duration::from_secs(5_000)));
2196        let executor = deterministic::Runner::new(cfg);
2197        executor.start(|mut context| async move {
2198            // Create simulated network
2199            let (network, mut oracle) = Network::new(
2200                context.with_label("network"),
2201                Config {
2202                    max_size: 1024 * 1024,
2203                    disconnect_on_block: false,
2204                    tracked_peer_sets: None,
2205                },
2206            );
2207
2208            // Start network
2209            network.start();
2210
2211            // Register participants
2212            let Fixture {
2213                participants,
2214                schemes,
2215                ..
2216            } = fixture(&mut context, n);
2217            let mut registrations = register_validators(&mut oracle, &participants).await;
2218
2219            // Link all validators
2220            let degraded_link = Link {
2221                latency: Duration::from_millis(200),
2222                jitter: Duration::from_millis(150),
2223                success_rate: 0.5,
2224            };
2225            link_validators(
2226                &mut oracle,
2227                &participants,
2228                Action::Link(degraded_link),
2229                None,
2230            )
2231            .await;
2232
2233            // Create engines
2234            let elector = L::default();
2235            let relay = Arc::new(mocks::relay::Relay::new());
2236            let mut reporters = Vec::new();
2237            let mut engine_handlers = Vec::new();
2238            for (idx, validator) in participants.iter().enumerate() {
2239                // Create scheme context
2240                let context = context.with_label(&format!("validator_{}", *validator));
2241
2242                // Configure engine
2243                let reporter_config = mocks::reporter::Config {
2244                    namespace: namespace.clone(),
2245                    participants: participants.clone().try_into().unwrap(),
2246                    scheme: schemes[idx].clone(),
2247                    elector: elector.clone(),
2248                };
2249                let reporter =
2250                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2251                reporters.push(reporter.clone());
2252                let application_cfg = mocks::application::Config {
2253                    hasher: Sha256::default(),
2254                    relay: relay.clone(),
2255                    me: validator.clone(),
2256                    propose_latency: (10.0, 5.0),
2257                    verify_latency: (10.0, 5.0),
2258                    certify_latency: (10.0, 5.0),
2259                    should_certify: mocks::application::Certifier::Sometimes,
2260                };
2261                let (actor, application) = mocks::application::Application::new(
2262                    context.with_label("application"),
2263                    application_cfg,
2264                );
2265                actor.start();
2266                let blocker = oracle.control(validator.clone());
2267                let cfg = config::Config {
2268                    scheme: schemes[idx].clone(),
2269                    elector: elector.clone(),
2270                    blocker,
2271                    automaton: application.clone(),
2272                    relay: application.clone(),
2273                    reporter: reporter.clone(),
2274                    partition: validator.to_string(),
2275                    mailbox_size: 1024,
2276                    epoch: Epoch::new(333),
2277                    namespace: namespace.clone(),
2278                    leader_timeout: Duration::from_secs(1),
2279                    notarization_timeout: Duration::from_secs(2),
2280                    nullify_retry: Duration::from_secs(10),
2281                    fetch_timeout: Duration::from_secs(1),
2282                    activity_timeout,
2283                    skip_timeout,
2284                    fetch_concurrent: 4,
2285                    replay_buffer: NZUsize!(1024 * 1024),
2286                    write_buffer: NZUsize!(1024 * 1024),
2287                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2288                };
2289                let engine = Engine::new(context.with_label("engine"), cfg);
2290
2291                // Start engine
2292                let (pending, recovered, resolver) = registrations
2293                    .remove(validator)
2294                    .expect("validator should be registered");
2295                engine_handlers.push(engine.start(pending, recovered, resolver));
2296            }
2297
2298            // Wait for all engines to finish
2299            let mut finalizers = Vec::new();
2300            for reporter in reporters.iter_mut() {
2301                let (mut latest, mut monitor) = reporter.subscribe().await;
2302                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2303                    while latest < required_containers {
2304                        latest = monitor.next().await.expect("event missing");
2305                    }
2306                }));
2307            }
2308            join_all(finalizers).await;
2309
2310            // Check reporters for correct activity
2311            for reporter in reporters.iter() {
2312                // Ensure no faults
2313                {
2314                    let faults = reporter.faults.lock().unwrap();
2315                    assert!(faults.is_empty());
2316                }
2317
2318                // Ensure no invalid signatures
2319                {
2320                    let invalid = reporter.invalid.lock().unwrap();
2321                    assert_eq!(*invalid, 0);
2322                }
2323            }
2324
2325            // Ensure no blocked connections
2326            let blocked = oracle.blocked().await.unwrap();
2327            assert!(blocked.is_empty());
2328
2329            context.auditor().state()
2330        })
2331    }
2332
2333    #[test_traced]
2334    fn test_slow_and_lossy_links() {
2335        slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold::fixture::<MinPk, _>);
2336        slow_and_lossy_links::<_, _, Random>(0, bls12381_threshold::fixture::<MinSig, _>);
2337        slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinPk, _>);
2338        slow_and_lossy_links::<_, _, RoundRobin>(0, bls12381_multisig::fixture::<MinSig, _>);
2339        slow_and_lossy_links::<_, _, RoundRobin>(0, ed25519::fixture);
2340    }
2341
2342    #[test_group("slow")]
2343    #[test_traced]
2344    fn test_determinism() {
2345        // We use slow and lossy links as the deterministic test
2346        // because it is the most complex test.
2347        for seed in 1..6 {
2348            let ts_pk_state_1 =
2349                slow_and_lossy_links::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
2350            let ts_pk_state_2 =
2351                slow_and_lossy_links::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
2352            assert_eq!(ts_pk_state_1, ts_pk_state_2);
2353
2354            let ts_sig_state_1 = slow_and_lossy_links::<_, _, Random>(
2355                seed,
2356                bls12381_threshold::fixture::<MinSig, _>,
2357            );
2358            let ts_sig_state_2 = slow_and_lossy_links::<_, _, Random>(
2359                seed,
2360                bls12381_threshold::fixture::<MinSig, _>,
2361            );
2362            assert_eq!(ts_sig_state_1, ts_sig_state_2);
2363
2364            let ms_pk_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2365                seed,
2366                bls12381_multisig::fixture::<MinPk, _>,
2367            );
2368            let ms_pk_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2369                seed,
2370                bls12381_multisig::fixture::<MinPk, _>,
2371            );
2372            assert_eq!(ms_pk_state_1, ms_pk_state_2);
2373
2374            let ms_sig_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(
2375                seed,
2376                bls12381_multisig::fixture::<MinSig, _>,
2377            );
2378            let ms_sig_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(
2379                seed,
2380                bls12381_multisig::fixture::<MinSig, _>,
2381            );
2382            assert_eq!(ms_sig_state_1, ms_sig_state_2);
2383
2384            let ed_state_1 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
2385            let ed_state_2 = slow_and_lossy_links::<_, _, RoundRobin>(seed, ed25519::fixture);
2386            assert_eq!(ed_state_1, ed_state_2);
2387
2388            let states = [
2389                ("threshold-minpk", ts_pk_state_1),
2390                ("threshold-minsig", ts_sig_state_1),
2391                ("multisig-minpk", ms_pk_state_1),
2392                ("multisig-minsig", ms_sig_state_1),
2393                ("ed25519", ed_state_1),
2394            ];
2395
2396            // Sanity check that different types can't be identical
2397            for pair in states.windows(2) {
2398                assert_ne!(
2399                    pair[0].1, pair[1].1,
2400                    "state {} equals state {}",
2401                    pair[0].0, pair[1].0
2402                );
2403            }
2404        }
2405    }
2406
2407    fn conflicter<S, F, L>(seed: u64, mut fixture: F)
2408    where
2409        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2410        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2411        L: Elector<S>,
2412    {
2413        // Create context
2414        let n = 4;
2415        let required_containers = View::new(50);
2416        let activity_timeout = ViewDelta::new(10);
2417        let skip_timeout = ViewDelta::new(5);
2418        let namespace = b"consensus".to_vec();
2419        let cfg = deterministic::Config::new()
2420            .with_seed(seed)
2421            .with_timeout(Some(Duration::from_secs(30)));
2422        let executor = deterministic::Runner::new(cfg);
2423        executor.start(|mut context| async move {
2424            // Create simulated network
2425            let (network, mut oracle) = Network::new(
2426                context.with_label("network"),
2427                Config {
2428                    max_size: 1024 * 1024,
2429                    disconnect_on_block: false,
2430                    tracked_peer_sets: None,
2431                },
2432            );
2433
2434            // Start network
2435            network.start();
2436
2437            // Register participants
2438            let Fixture {
2439                participants,
2440                schemes,
2441                ..
2442            } = fixture(&mut context, n);
2443            let mut registrations = register_validators(&mut oracle, &participants).await;
2444
2445            // Link all validators
2446            let link = Link {
2447                latency: Duration::from_millis(10),
2448                jitter: Duration::from_millis(1),
2449                success_rate: 1.0,
2450            };
2451            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2452
2453            // Create engines
2454            let elector = L::default();
2455            let relay = Arc::new(mocks::relay::Relay::new());
2456            let mut reporters = Vec::new();
2457            for (idx_scheme, validator) in participants.iter().enumerate() {
2458                // Create scheme context
2459                let context = context.with_label(&format!("validator_{}", *validator));
2460
2461                // Start engine
2462                let reporter_config = mocks::reporter::Config {
2463                    namespace: namespace.clone(),
2464                    participants: participants.clone().try_into().unwrap(),
2465                    scheme: schemes[idx_scheme].clone(),
2466                    elector: elector.clone(),
2467                };
2468                let reporter =
2469                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2470                let (pending, recovered, resolver) = registrations
2471                    .remove(validator)
2472                    .expect("validator should be registered");
2473                if idx_scheme == 0 {
2474                    let cfg = mocks::conflicter::Config {
2475                        namespace: namespace.clone(),
2476                        scheme: schemes[idx_scheme].clone(),
2477                    };
2478
2479                    let engine: mocks::conflicter::Conflicter<_, _, Sha256> =
2480                        mocks::conflicter::Conflicter::new(
2481                            context.with_label("byzantine_engine"),
2482                            cfg,
2483                        );
2484                    engine.start(pending);
2485                } else {
2486                    reporters.push(reporter.clone());
2487                    let application_cfg = mocks::application::Config {
2488                        hasher: Sha256::default(),
2489                        relay: relay.clone(),
2490                        me: validator.clone(),
2491                        propose_latency: (10.0, 5.0),
2492                        verify_latency: (10.0, 5.0),
2493                        certify_latency: (10.0, 5.0),
2494                        should_certify: mocks::application::Certifier::Sometimes,
2495                    };
2496                    let (actor, application) = mocks::application::Application::new(
2497                        context.with_label("application"),
2498                        application_cfg,
2499                    );
2500                    actor.start();
2501                    let blocker = oracle.control(validator.clone());
2502                    let cfg = config::Config {
2503                        scheme: schemes[idx_scheme].clone(),
2504                        elector: elector.clone(),
2505                        blocker,
2506                        automaton: application.clone(),
2507                        relay: application.clone(),
2508                        reporter: reporter.clone(),
2509                        partition: validator.to_string(),
2510                        mailbox_size: 1024,
2511                        epoch: Epoch::new(333),
2512                        namespace: namespace.clone(),
2513                        leader_timeout: Duration::from_secs(1),
2514                        notarization_timeout: Duration::from_secs(2),
2515                        nullify_retry: Duration::from_secs(10),
2516                        fetch_timeout: Duration::from_secs(1),
2517                        activity_timeout,
2518                        skip_timeout,
2519                        fetch_concurrent: 4,
2520                        replay_buffer: NZUsize!(1024 * 1024),
2521                        write_buffer: NZUsize!(1024 * 1024),
2522                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2523                    };
2524                    let engine = Engine::new(context.with_label("engine"), cfg);
2525                    engine.start(pending, recovered, resolver);
2526                }
2527            }
2528
2529            // Wait for all engines to finish
2530            let mut finalizers = Vec::new();
2531            for reporter in reporters.iter_mut() {
2532                let (mut latest, mut monitor) = reporter.subscribe().await;
2533                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2534                    while latest < required_containers {
2535                        latest = monitor.next().await.expect("event missing");
2536                    }
2537                }));
2538            }
2539            join_all(finalizers).await;
2540
2541            // Check reporters for correct activity
2542            let byz = &participants[0];
2543            let mut count_conflicting = 0;
2544            for reporter in reporters.iter() {
2545                // Ensure only faults for byz
2546                {
2547                    let faults = reporter.faults.lock().unwrap();
2548                    assert_eq!(faults.len(), 1);
2549                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
2550                    for (_, faults) in faulter.iter() {
2551                        for fault in faults.iter() {
2552                            match fault {
2553                                Activity::ConflictingNotarize(_) => {
2554                                    count_conflicting += 1;
2555                                }
2556                                Activity::ConflictingFinalize(_) => {
2557                                    count_conflicting += 1;
2558                                }
2559                                _ => panic!("unexpected fault: {fault:?}"),
2560                            }
2561                        }
2562                    }
2563                }
2564
2565                // Ensure no invalid signatures
2566                {
2567                    let invalid = reporter.invalid.lock().unwrap();
2568                    assert_eq!(*invalid, 0);
2569                }
2570            }
2571            assert!(count_conflicting > 0);
2572
2573            // Ensure conflicter is blocked
2574            let blocked = oracle.blocked().await.unwrap();
2575            assert!(!blocked.is_empty());
2576            for (a, b) in blocked {
2577                assert_ne!(&a, byz);
2578                assert_eq!(&b, byz);
2579            }
2580        });
2581    }
2582
2583    #[test_group("slow")]
2584    #[test_traced]
2585    fn test_conflicter() {
2586        for seed in 0..5 {
2587            conflicter::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
2588            conflicter::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
2589            conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
2590            conflicter::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
2591            conflicter::<_, _, RoundRobin>(seed, ed25519::fixture);
2592        }
2593    }
2594
2595    fn invalid<S, F, L>(seed: u64, mut fixture: F)
2596    where
2597        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2598        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2599        L: Elector<S>,
2600    {
2601        // Create context
2602        let n = 4;
2603        let required_containers = View::new(50);
2604        let activity_timeout = ViewDelta::new(10);
2605        let skip_timeout = ViewDelta::new(5);
2606        let namespace = b"consensus".to_vec();
2607        let cfg = deterministic::Config::new()
2608            .with_seed(seed)
2609            .with_timeout(Some(Duration::from_secs(30)));
2610        let executor = deterministic::Runner::new(cfg);
2611        executor.start(|mut context| async move {
2612            // Create simulated network
2613            let (network, mut oracle) = Network::new(
2614                context.with_label("network"),
2615                Config {
2616                    max_size: 1024 * 1024,
2617                    disconnect_on_block: false,
2618                    tracked_peer_sets: None,
2619                },
2620            );
2621
2622            // Start network
2623            network.start();
2624
2625            // Register participants
2626            let Fixture {
2627                participants,
2628                schemes,
2629                ..
2630            } = fixture(&mut context, n);
2631            let mut registrations = register_validators(&mut oracle, &participants).await;
2632
2633            // Link all validators
2634            let link = Link {
2635                latency: Duration::from_millis(10),
2636                jitter: Duration::from_millis(1),
2637                success_rate: 1.0,
2638            };
2639            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2640
2641            // Create engines
2642            let elector = L::default();
2643            let relay = Arc::new(mocks::relay::Relay::new());
2644            let mut reporters = Vec::new();
2645            for (idx_scheme, validator) in participants.iter().enumerate() {
2646                // Create scheme context
2647                let context = context.with_label(&format!("validator_{}", *validator));
2648
2649                // Byzantine node (idx 0) uses empty namespace to produce invalid signatures
2650                let engine_namespace = if idx_scheme == 0 {
2651                    vec![]
2652                } else {
2653                    namespace.clone()
2654                };
2655
2656                let reporter_config = mocks::reporter::Config {
2657                    namespace: namespace.clone(), // Reporter always uses correct namespace
2658                    participants: participants.clone().try_into().unwrap(),
2659                    scheme: schemes[idx_scheme].clone(),
2660                    elector: elector.clone(),
2661                };
2662                let reporter =
2663                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2664                reporters.push(reporter.clone());
2665
2666                let application_cfg = mocks::application::Config {
2667                    hasher: Sha256::default(),
2668                    relay: relay.clone(),
2669                    me: validator.clone(),
2670                    propose_latency: (10.0, 5.0),
2671                    verify_latency: (10.0, 5.0),
2672                    certify_latency: (10.0, 5.0),
2673                    should_certify: mocks::application::Certifier::Sometimes,
2674                };
2675                let (actor, application) = mocks::application::Application::new(
2676                    context.with_label("application"),
2677                    application_cfg,
2678                );
2679                actor.start();
2680                let blocker = oracle.control(validator.clone());
2681                let cfg = config::Config {
2682                    scheme: schemes[idx_scheme].clone(),
2683                    elector: elector.clone(),
2684                    blocker,
2685                    automaton: application.clone(),
2686                    relay: application.clone(),
2687                    reporter: reporter.clone(),
2688                    partition: validator.clone().to_string(),
2689                    mailbox_size: 1024,
2690                    epoch: Epoch::new(333),
2691                    namespace: engine_namespace,
2692                    leader_timeout: Duration::from_secs(1),
2693                    notarization_timeout: Duration::from_secs(2),
2694                    nullify_retry: Duration::from_secs(10),
2695                    fetch_timeout: Duration::from_secs(1),
2696                    activity_timeout,
2697                    skip_timeout,
2698                    fetch_concurrent: 4,
2699                    replay_buffer: NZUsize!(1024 * 1024),
2700                    write_buffer: NZUsize!(1024 * 1024),
2701                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2702                };
2703                let engine = Engine::new(context.with_label("engine"), cfg);
2704                let (pending, recovered, resolver) = registrations
2705                    .remove(validator)
2706                    .expect("validator should be registered");
2707                engine.start(pending, recovered, resolver);
2708            }
2709
2710            // Wait for honest engines to finish (skip byzantine node at index 0)
2711            let mut finalizers = Vec::new();
2712            for reporter in reporters.iter_mut().skip(1) {
2713                let (mut latest, mut monitor) = reporter.subscribe().await;
2714                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2715                    while latest < required_containers {
2716                        latest = monitor.next().await.expect("event missing");
2717                    }
2718                }));
2719            }
2720            join_all(finalizers).await;
2721
2722            // Check honest reporters (reporters[1..]) for correct activity
2723            let mut invalid_count = 0;
2724            for reporter in reporters.iter().skip(1) {
2725                // Ensure no faults
2726                {
2727                    let faults = reporter.faults.lock().unwrap();
2728                    assert!(faults.is_empty());
2729                }
2730
2731                // Count invalid signatures
2732                {
2733                    let invalid = reporter.invalid.lock().unwrap();
2734                    if *invalid > 0 {
2735                        invalid_count += 1;
2736                    }
2737                }
2738            }
2739
2740            // All honest nodes should see invalid signatures from the byzantine node
2741            assert_eq!(invalid_count, n - 1);
2742
2743            // Ensure byzantine node is blocked by honest nodes
2744            let blocked = oracle.blocked().await.unwrap();
2745            assert!(!blocked.is_empty());
2746            for (a, b) in blocked {
2747                if a != participants[0] {
2748                    assert_eq!(b, participants[0]);
2749                }
2750            }
2751        });
2752    }
2753
2754    #[test_group("slow")]
2755    #[test_traced]
2756    fn test_invalid() {
2757        for seed in 0..5 {
2758            invalid::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
2759            invalid::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
2760            invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
2761            invalid::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
2762            invalid::<_, _, RoundRobin>(seed, ed25519::fixture);
2763        }
2764    }
2765
2766    fn impersonator<S, F, L>(seed: u64, mut fixture: F)
2767    where
2768        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2769        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2770        L: Elector<S>,
2771    {
2772        // Create context
2773        let n = 4;
2774        let required_containers = View::new(50);
2775        let activity_timeout = ViewDelta::new(10);
2776        let skip_timeout = ViewDelta::new(5);
2777        let namespace = b"consensus".to_vec();
2778        let cfg = deterministic::Config::new()
2779            .with_seed(seed)
2780            .with_timeout(Some(Duration::from_secs(30)));
2781        let executor = deterministic::Runner::new(cfg);
2782        executor.start(|mut context| async move {
2783            // Create simulated network
2784            let (network, mut oracle) = Network::new(
2785                context.with_label("network"),
2786                Config {
2787                    max_size: 1024 * 1024,
2788                    disconnect_on_block: false,
2789                    tracked_peer_sets: None,
2790                },
2791            );
2792
2793            // Start network
2794            network.start();
2795
2796            // Register participants
2797            let Fixture {
2798                participants,
2799                schemes,
2800                ..
2801            } = fixture(&mut context, n);
2802            let mut registrations = register_validators(&mut oracle, &participants).await;
2803
2804            // Link all validators
2805            let link = Link {
2806                latency: Duration::from_millis(10),
2807                jitter: Duration::from_millis(1),
2808                success_rate: 1.0,
2809            };
2810            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2811
2812            // Create engines
2813            let elector = L::default();
2814            let relay = Arc::new(mocks::relay::Relay::new());
2815            let mut reporters = Vec::new();
2816            for (idx_scheme, validator) in participants.iter().enumerate() {
2817                // Create scheme context
2818                let context = context.with_label(&format!("validator_{}", *validator));
2819
2820                // Start engine
2821                let reporter_config = mocks::reporter::Config {
2822                    namespace: namespace.clone(),
2823                    participants: participants.clone().try_into().unwrap(),
2824                    scheme: schemes[idx_scheme].clone(),
2825                    elector: elector.clone(),
2826                };
2827                let reporter =
2828                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2829                let (pending, recovered, resolver) = registrations
2830                    .remove(validator)
2831                    .expect("validator should be registered");
2832                if idx_scheme == 0 {
2833                    let cfg = mocks::impersonator::Config {
2834                        scheme: schemes[idx_scheme].clone(),
2835                        namespace: namespace.clone(),
2836                    };
2837
2838                    let engine: mocks::impersonator::Impersonator<_, _, Sha256> =
2839                        mocks::impersonator::Impersonator::new(
2840                            context.with_label("byzantine_engine"),
2841                            cfg,
2842                        );
2843                    engine.start(pending);
2844                } else {
2845                    reporters.push(reporter.clone());
2846                    let application_cfg = mocks::application::Config {
2847                        hasher: Sha256::default(),
2848                        relay: relay.clone(),
2849                        me: validator.clone(),
2850                        propose_latency: (10.0, 5.0),
2851                        verify_latency: (10.0, 5.0),
2852                        certify_latency: (10.0, 5.0),
2853                        should_certify: mocks::application::Certifier::Sometimes,
2854                    };
2855                    let (actor, application) = mocks::application::Application::new(
2856                        context.with_label("application"),
2857                        application_cfg,
2858                    );
2859                    actor.start();
2860                    let blocker = oracle.control(validator.clone());
2861                    let cfg = config::Config {
2862                        scheme: schemes[idx_scheme].clone(),
2863                        elector: elector.clone(),
2864                        blocker,
2865                        automaton: application.clone(),
2866                        relay: application.clone(),
2867                        reporter: reporter.clone(),
2868                        partition: validator.clone().to_string(),
2869                        mailbox_size: 1024,
2870                        epoch: Epoch::new(333),
2871                        namespace: namespace.clone(),
2872                        leader_timeout: Duration::from_secs(1),
2873                        notarization_timeout: Duration::from_secs(2),
2874                        nullify_retry: Duration::from_secs(10),
2875                        fetch_timeout: Duration::from_secs(1),
2876                        activity_timeout,
2877                        skip_timeout,
2878                        fetch_concurrent: 4,
2879                        replay_buffer: NZUsize!(1024 * 1024),
2880                        write_buffer: NZUsize!(1024 * 1024),
2881                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2882                    };
2883                    let engine = Engine::new(context.with_label("engine"), cfg);
2884                    engine.start(pending, recovered, resolver);
2885                }
2886            }
2887
2888            // Wait for all engines to finish
2889            let mut finalizers = Vec::new();
2890            for reporter in reporters.iter_mut() {
2891                let (mut latest, mut monitor) = reporter.subscribe().await;
2892                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2893                    while latest < required_containers {
2894                        latest = monitor.next().await.expect("event missing");
2895                    }
2896                }));
2897            }
2898            join_all(finalizers).await;
2899
2900            // Check reporters for correct activity
2901            let byz = &participants[0];
2902            for reporter in reporters.iter() {
2903                // Ensure no faults
2904                {
2905                    let faults = reporter.faults.lock().unwrap();
2906                    assert!(faults.is_empty());
2907                }
2908
2909                // Ensure no invalid signatures
2910                {
2911                    let invalid = reporter.invalid.lock().unwrap();
2912                    assert_eq!(*invalid, 0);
2913                }
2914            }
2915
2916            // Ensure invalid is blocked
2917            let blocked = oracle.blocked().await.unwrap();
2918            assert!(!blocked.is_empty());
2919            for (a, b) in blocked {
2920                assert_ne!(&a, byz);
2921                assert_eq!(&b, byz);
2922            }
2923        });
2924    }
2925
2926    #[test_group("slow")]
2927    #[test_traced]
2928    fn test_impersonator() {
2929        for seed in 0..5 {
2930            impersonator::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
2931            impersonator::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
2932            impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
2933            impersonator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
2934            impersonator::<_, _, RoundRobin>(seed, ed25519::fixture);
2935        }
2936    }
2937
2938    fn equivocator<S, F, L>(seed: u64, mut fixture: F) -> bool
2939    where
2940        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
2941        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2942        L: Elector<S>,
2943    {
2944        // Create context
2945        let n = 7;
2946        let required_containers = View::new(50);
2947        let activity_timeout = ViewDelta::new(10);
2948        let skip_timeout = ViewDelta::new(5);
2949        let namespace = b"consensus".to_vec();
2950        let cfg = deterministic::Config::new()
2951            .with_seed(seed)
2952            .with_timeout(Some(Duration::from_secs(30)));
2953        let executor = deterministic::Runner::new(cfg);
2954        executor.start(|mut context| async move {
2955            // Create simulated network
2956            let (network, mut oracle) = Network::new(
2957                context.with_label("network"),
2958                Config {
2959                    max_size: 1024 * 1024,
2960                    disconnect_on_block: false,
2961                    tracked_peer_sets: None,
2962                },
2963            );
2964
2965            // Start network
2966            network.start();
2967
2968            // Register participants
2969            let Fixture {
2970                participants,
2971                schemes,
2972                ..
2973            } = fixture(&mut context, n);
2974            let mut registrations = register_validators(&mut oracle, &participants).await;
2975
2976            // Link all validators
2977            let link = Link {
2978                latency: Duration::from_millis(10),
2979                jitter: Duration::from_millis(1),
2980                success_rate: 1.0,
2981            };
2982            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2983
2984            // Create engines
2985            let elector = L::default();
2986            let mut engines = Vec::new();
2987            let relay = Arc::new(mocks::relay::Relay::new());
2988            let mut reporters = Vec::new();
2989            for (idx_scheme, validator) in participants.iter().enumerate() {
2990                // Create scheme context
2991                let context = context.with_label(&format!("validator_{}", *validator));
2992
2993                // Start engine
2994                let reporter_config = mocks::reporter::Config {
2995                    namespace: namespace.clone(),
2996                    participants: participants.clone().try_into().unwrap(),
2997                    scheme: schemes[idx_scheme].clone(),
2998                    elector: elector.clone(),
2999                };
3000                let reporter =
3001                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3002                reporters.push(reporter.clone());
3003                let (pending, recovered, resolver) = registrations
3004                    .remove(validator)
3005                    .expect("validator should be registered");
3006                if idx_scheme == 0 {
3007                    let cfg = mocks::equivocator::Config {
3008                        namespace: namespace.clone(),
3009                        scheme: schemes[idx_scheme].clone(),
3010                        epoch: Epoch::new(333),
3011                        relay: relay.clone(),
3012                        hasher: Sha256::default(),
3013                        elector: elector.clone(),
3014                    };
3015
3016                    let engine = mocks::equivocator::Equivocator::new(
3017                        context.with_label("byzantine_engine"),
3018                        cfg,
3019                    );
3020                    engines.push(engine.start(pending, recovered));
3021                } else {
3022                    let application_cfg = mocks::application::Config {
3023                        hasher: Sha256::default(),
3024                        relay: relay.clone(),
3025                        me: validator.clone(),
3026                        propose_latency: (10.0, 5.0),
3027                        verify_latency: (10.0, 5.0),
3028                        certify_latency: (10.0, 5.0),
3029                        should_certify: mocks::application::Certifier::Sometimes,
3030                    };
3031                    let (actor, application) = mocks::application::Application::new(
3032                        context.with_label("application"),
3033                        application_cfg,
3034                    );
3035                    actor.start();
3036                    let blocker = oracle.control(validator.clone());
3037                    let cfg = config::Config {
3038                        scheme: schemes[idx_scheme].clone(),
3039                        elector: elector.clone(),
3040                        blocker,
3041                        automaton: application.clone(),
3042                        relay: application.clone(),
3043                        reporter: reporter.clone(),
3044                        partition: validator.to_string(),
3045                        mailbox_size: 1024,
3046                        epoch: Epoch::new(333),
3047                        namespace: namespace.clone(),
3048                        leader_timeout: Duration::from_secs(1),
3049                        notarization_timeout: Duration::from_secs(2),
3050                        nullify_retry: Duration::from_secs(10),
3051                        fetch_timeout: Duration::from_secs(1),
3052                        activity_timeout,
3053                        skip_timeout,
3054                        fetch_concurrent: 4,
3055                        replay_buffer: NZUsize!(1024 * 1024),
3056                        write_buffer: NZUsize!(1024 * 1024),
3057                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3058                    };
3059                    let engine = Engine::new(context.with_label("engine"), cfg);
3060                    engines.push(engine.start(pending, recovered, resolver));
3061                }
3062            }
3063
3064            // Wait for all engines to hit required containers
3065            let mut finalizers = Vec::new();
3066            for reporter in reporters.iter_mut().skip(1) {
3067                let (mut latest, mut monitor) = reporter.subscribe().await;
3068                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3069                    while latest < required_containers {
3070                        latest = monitor.next().await.expect("event missing");
3071                    }
3072                }));
3073            }
3074            join_all(finalizers).await;
3075
3076            // Abort a validator
3077            let idx = context.gen_range(1..engines.len()); // skip byzantine validator
3078            let validator = &participants[idx];
3079            let handle = engines.remove(idx);
3080            handle.abort();
3081            let _ = handle.await;
3082            reporters.remove(idx);
3083            info!(idx, ?validator, "aborted validator");
3084
3085            // Wait for all engines to hit required containers
3086            let mut finalizers = Vec::new();
3087            for reporter in reporters.iter_mut().skip(1) {
3088                let (mut latest, mut monitor) = reporter.subscribe().await;
3089                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3090                    while latest < View::new(required_containers.get() * 2) {
3091                        latest = monitor.next().await.expect("event missing");
3092                    }
3093                }));
3094            }
3095            join_all(finalizers).await;
3096
3097            // Recreate engine
3098            info!(idx, ?validator, "restarting validator");
3099            let context = context.with_label(&format!("validator_{}_restarted", *validator));
3100
3101            // Start engine
3102            let reporter_config = mocks::reporter::Config {
3103                namespace: namespace.clone(),
3104                participants: participants.clone().try_into().unwrap(),
3105                scheme: schemes[idx].clone(),
3106                elector: elector.clone(),
3107            };
3108            let reporter =
3109                mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3110            let (pending, recovered, resolver) =
3111                register_validator(&mut oracle, validator.clone()).await;
3112            reporters.push(reporter.clone());
3113            let application_cfg = mocks::application::Config {
3114                hasher: Sha256::default(),
3115                relay: relay.clone(),
3116                me: validator.clone(),
3117                propose_latency: (10.0, 5.0),
3118                verify_latency: (10.0, 5.0),
3119                certify_latency: (10.0, 5.0),
3120                should_certify: mocks::application::Certifier::Sometimes,
3121            };
3122            let (actor, application) = mocks::application::Application::new(
3123                context.with_label("application"),
3124                application_cfg,
3125            );
3126            actor.start();
3127            let blocker = oracle.control(validator.clone());
3128            let cfg = config::Config {
3129                scheme: schemes[idx].clone(),
3130                elector: elector.clone(),
3131                blocker,
3132                automaton: application.clone(),
3133                relay: application.clone(),
3134                reporter: reporter.clone(),
3135                partition: validator.to_string(),
3136                mailbox_size: 1024,
3137                epoch: Epoch::new(333),
3138                namespace: namespace.clone(),
3139                leader_timeout: Duration::from_secs(1),
3140                notarization_timeout: Duration::from_secs(2),
3141                nullify_retry: Duration::from_secs(10),
3142                fetch_timeout: Duration::from_secs(1),
3143                activity_timeout,
3144                skip_timeout,
3145                fetch_concurrent: 4,
3146                replay_buffer: NZUsize!(1024 * 1024),
3147                write_buffer: NZUsize!(1024 * 1024),
3148                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3149            };
3150            let engine = Engine::new(context.with_label("engine"), cfg);
3151            engine.start(pending, recovered, resolver);
3152
3153            // Wait for all engines to hit required containers
3154            let mut finalizers = Vec::new();
3155            for reporter in reporters.iter_mut().skip(1) {
3156                let (mut latest, mut monitor) = reporter.subscribe().await;
3157                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3158                    while latest < View::new(required_containers.get() * 3) {
3159                        latest = monitor.next().await.expect("event missing");
3160                    }
3161                }));
3162            }
3163            join_all(finalizers).await;
3164
3165            // Check equivocator blocking (we aren't guaranteed a fault will be produced
3166            // because it may not be possible to extract a conflicting vote from the certificate
3167            // we receive)
3168            let byz = &participants[0];
3169            let blocked = oracle.blocked().await.unwrap();
3170            for (a, b) in &blocked {
3171                assert_ne!(a, byz);
3172                assert_eq!(b, byz);
3173            }
3174            !blocked.is_empty()
3175        })
3176    }
3177
3178    #[test_group("slow")]
3179    #[test_traced]
3180    fn test_equivocator_bls12381_threshold_min_pk() {
3181        let detected = (0..5)
3182            .any(|seed| equivocator::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>));
3183        assert!(
3184            detected,
3185            "expected at least one seed to detect equivocation"
3186        );
3187    }
3188
3189    #[test_group("slow")]
3190    #[test_traced]
3191    fn test_equivocator_bls12381_threshold_min_sig() {
3192        let detected = (0..5).any(|seed| {
3193            equivocator::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>)
3194        });
3195        assert!(
3196            detected,
3197            "expected at least one seed to detect equivocation"
3198        );
3199    }
3200
3201    #[test_group("slow")]
3202    #[test_traced]
3203    fn test_equivocator_bls12381_multisig_min_pk() {
3204        let detected = (0..5).any(|seed| {
3205            equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>)
3206        });
3207        assert!(
3208            detected,
3209            "expected at least one seed to detect equivocation"
3210        );
3211    }
3212
3213    #[test_group("slow")]
3214    #[test_traced]
3215    fn test_equivocator_bls12381_multisig_min_sig() {
3216        let detected = (0..5).any(|seed| {
3217            equivocator::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>)
3218        });
3219        assert!(
3220            detected,
3221            "expected at least one seed to detect equivocation"
3222        );
3223    }
3224
3225    #[test_group("slow")]
3226    #[test_traced]
3227    fn test_equivocator_ed25519() {
3228        let detected = (0..5).any(|seed| equivocator::<_, _, RoundRobin>(seed, ed25519::fixture));
3229        assert!(
3230            detected,
3231            "expected at least one seed to detect equivocation"
3232        );
3233    }
3234
3235    fn reconfigurer<S, F, L>(seed: u64, mut fixture: F)
3236    where
3237        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3238        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
3239        L: Elector<S>,
3240    {
3241        // Create context
3242        let n = 4;
3243        let required_containers = View::new(50);
3244        let activity_timeout = ViewDelta::new(10);
3245        let skip_timeout = ViewDelta::new(5);
3246        let namespace = b"consensus".to_vec();
3247        let cfg = deterministic::Config::new()
3248            .with_seed(seed)
3249            .with_timeout(Some(Duration::from_secs(30)));
3250        let executor = deterministic::Runner::new(cfg);
3251        executor.start(|mut context| async move {
3252            // Create simulated network
3253            let (network, mut oracle) = Network::new(
3254                context.with_label("network"),
3255                Config {
3256                    max_size: 1024 * 1024,
3257                    disconnect_on_block: false,
3258                    tracked_peer_sets: None,
3259                },
3260            );
3261
3262            // Start network
3263            network.start();
3264
3265            // Register participants
3266            let Fixture {
3267                participants,
3268                schemes,
3269                ..
3270            } = fixture(&mut context, n);
3271            let mut registrations = register_validators(&mut oracle, &participants).await;
3272
3273            // Link all validators
3274            let link = Link {
3275                latency: Duration::from_millis(10),
3276                jitter: Duration::from_millis(1),
3277                success_rate: 1.0,
3278            };
3279            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3280
3281            // Create engines
3282            let elector = L::default();
3283            let relay = Arc::new(mocks::relay::Relay::new());
3284            let mut reporters = Vec::new();
3285            for (idx_scheme, validator) in participants.iter().enumerate() {
3286                // Create scheme context
3287                let context = context.with_label(&format!("validator_{}", *validator));
3288
3289                // Start engine
3290                let reporter_config = mocks::reporter::Config {
3291                    namespace: namespace.clone(),
3292                    participants: participants.clone().try_into().unwrap(),
3293                    scheme: schemes[idx_scheme].clone(),
3294                    elector: elector.clone(),
3295                };
3296                let reporter =
3297                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3298                let (pending, recovered, resolver) = registrations
3299                    .remove(validator)
3300                    .expect("validator should be registered");
3301                if idx_scheme == 0 {
3302                    let cfg = mocks::reconfigurer::Config {
3303                        scheme: schemes[idx_scheme].clone(),
3304                        namespace: namespace.clone(),
3305                    };
3306                    let engine: mocks::reconfigurer::Reconfigurer<_, _, Sha256> =
3307                        mocks::reconfigurer::Reconfigurer::new(
3308                            context.with_label("byzantine_engine"),
3309                            cfg,
3310                        );
3311                    engine.start(pending);
3312                } else {
3313                    reporters.push(reporter.clone());
3314                    let application_cfg = mocks::application::Config {
3315                        hasher: Sha256::default(),
3316                        relay: relay.clone(),
3317                        me: validator.clone(),
3318                        propose_latency: (10.0, 5.0),
3319                        verify_latency: (10.0, 5.0),
3320                        certify_latency: (10.0, 5.0),
3321                        should_certify: mocks::application::Certifier::Sometimes,
3322                    };
3323                    let (actor, application) = mocks::application::Application::new(
3324                        context.with_label("application"),
3325                        application_cfg,
3326                    );
3327                    actor.start();
3328                    let blocker = oracle.control(validator.clone());
3329                    let cfg = config::Config {
3330                        scheme: schemes[idx_scheme].clone(),
3331                        elector: elector.clone(),
3332                        blocker,
3333                        automaton: application.clone(),
3334                        relay: application.clone(),
3335                        reporter: reporter.clone(),
3336                        partition: validator.to_string(),
3337                        mailbox_size: 1024,
3338                        epoch: Epoch::new(333),
3339                        namespace: namespace.clone(),
3340                        leader_timeout: Duration::from_secs(1),
3341                        notarization_timeout: Duration::from_secs(2),
3342                        nullify_retry: Duration::from_secs(10),
3343                        fetch_timeout: Duration::from_secs(1),
3344                        activity_timeout,
3345                        skip_timeout,
3346                        fetch_concurrent: 4,
3347                        replay_buffer: NZUsize!(1024 * 1024),
3348                        write_buffer: NZUsize!(1024 * 1024),
3349                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3350                    };
3351                    let engine = Engine::new(context.with_label("engine"), cfg);
3352                    engine.start(pending, recovered, resolver);
3353                }
3354            }
3355
3356            // Wait for all engines to finish
3357            let mut finalizers = Vec::new();
3358            for reporter in reporters.iter_mut() {
3359                let (mut latest, mut monitor) = reporter.subscribe().await;
3360                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3361                    while latest < required_containers {
3362                        latest = monitor.next().await.expect("event missing");
3363                    }
3364                }));
3365            }
3366            join_all(finalizers).await;
3367
3368            // Check reporters for correct activity
3369            let byz = &participants[0];
3370            for reporter in reporters.iter() {
3371                // Ensure no faults
3372                {
3373                    let faults = reporter.faults.lock().unwrap();
3374                    assert!(faults.is_empty());
3375                }
3376
3377                // Ensure no invalid signatures
3378                {
3379                    let invalid = reporter.invalid.lock().unwrap();
3380                    assert_eq!(*invalid, 0);
3381                }
3382            }
3383
3384            // Ensure reconfigurer is blocked (epoch mismatch)
3385            let blocked = oracle.blocked().await.unwrap();
3386            assert!(!blocked.is_empty());
3387            for (a, b) in blocked {
3388                assert_ne!(&a, byz);
3389                assert_eq!(&b, byz);
3390            }
3391        });
3392    }
3393
3394    #[test_group("slow")]
3395    #[test_traced]
3396    fn test_reconfigurer() {
3397        for seed in 0..5 {
3398            reconfigurer::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
3399            reconfigurer::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
3400            reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3401            reconfigurer::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3402            reconfigurer::<_, _, RoundRobin>(seed, ed25519::fixture);
3403        }
3404    }
3405
3406    fn nuller<S, F, L>(seed: u64, mut fixture: F)
3407    where
3408        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3409        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
3410        L: Elector<S>,
3411    {
3412        // Create context
3413        let n = 4;
3414        let required_containers = View::new(50);
3415        let activity_timeout = ViewDelta::new(10);
3416        let skip_timeout = ViewDelta::new(5);
3417        let namespace = b"consensus".to_vec();
3418        let cfg = deterministic::Config::new()
3419            .with_seed(seed)
3420            .with_timeout(Some(Duration::from_secs(30)));
3421        let executor = deterministic::Runner::new(cfg);
3422        executor.start(|mut context| async move {
3423            // Create simulated network
3424            let (network, mut oracle) = Network::new(
3425                context.with_label("network"),
3426                Config {
3427                    max_size: 1024 * 1024,
3428                    disconnect_on_block: false,
3429                    tracked_peer_sets: None,
3430                },
3431            );
3432
3433            // Start network
3434            network.start();
3435
3436            // Register participants
3437            let Fixture {
3438                participants,
3439                schemes,
3440                ..
3441            } = fixture(&mut context, n);
3442            let mut registrations = register_validators(&mut oracle, &participants).await;
3443
3444            // Link all validators
3445            let link = Link {
3446                latency: Duration::from_millis(10),
3447                jitter: Duration::from_millis(1),
3448                success_rate: 1.0,
3449            };
3450            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3451
3452            // Create engines
3453            let elector = L::default();
3454            let relay = Arc::new(mocks::relay::Relay::new());
3455            let mut reporters = Vec::new();
3456            for (idx_scheme, validator) in participants.iter().enumerate() {
3457                // Create scheme context
3458                let context = context.with_label(&format!("validator_{}", *validator));
3459
3460                // Start engine
3461                let reporter_config = mocks::reporter::Config {
3462                    namespace: namespace.clone(),
3463                    participants: participants.clone().try_into().unwrap(),
3464                    scheme: schemes[idx_scheme].clone(),
3465                    elector: elector.clone(),
3466                };
3467                let reporter =
3468                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3469                let (pending, recovered, resolver) = registrations
3470                    .remove(validator)
3471                    .expect("validator should be registered");
3472                if idx_scheme == 0 {
3473                    let cfg = mocks::nuller::Config {
3474                        namespace: namespace.clone(),
3475                        scheme: schemes[idx_scheme].clone(),
3476                    };
3477                    let engine: mocks::nuller::Nuller<_, _, Sha256> =
3478                        mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
3479                    engine.start(pending);
3480                } else {
3481                    reporters.push(reporter.clone());
3482                    let application_cfg = mocks::application::Config {
3483                        hasher: Sha256::default(),
3484                        relay: relay.clone(),
3485                        me: validator.clone(),
3486                        propose_latency: (10.0, 5.0),
3487                        verify_latency: (10.0, 5.0),
3488                        certify_latency: (10.0, 5.0),
3489                        should_certify: mocks::application::Certifier::Sometimes,
3490                    };
3491                    let (actor, application) = mocks::application::Application::new(
3492                        context.with_label("application"),
3493                        application_cfg,
3494                    );
3495                    actor.start();
3496                    let blocker = oracle.control(validator.clone());
3497                    let cfg = config::Config {
3498                        scheme: schemes[idx_scheme].clone(),
3499                        elector: elector.clone(),
3500                        blocker,
3501                        automaton: application.clone(),
3502                        relay: application.clone(),
3503                        reporter: reporter.clone(),
3504                        partition: validator.clone().to_string(),
3505                        mailbox_size: 1024,
3506                        epoch: Epoch::new(333),
3507                        namespace: namespace.clone(),
3508                        leader_timeout: Duration::from_secs(1),
3509                        notarization_timeout: Duration::from_secs(2),
3510                        nullify_retry: Duration::from_secs(10),
3511                        fetch_timeout: Duration::from_secs(1),
3512                        activity_timeout,
3513                        skip_timeout,
3514                        fetch_concurrent: 4,
3515                        replay_buffer: NZUsize!(1024 * 1024),
3516                        write_buffer: NZUsize!(1024 * 1024),
3517                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3518                    };
3519                    let engine = Engine::new(context.with_label("engine"), cfg);
3520                    engine.start(pending, recovered, resolver);
3521                }
3522            }
3523
3524            // Wait for all engines to finish
3525            let mut finalizers = Vec::new();
3526            for reporter in reporters.iter_mut() {
3527                let (mut latest, mut monitor) = reporter.subscribe().await;
3528                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3529                    while latest < required_containers {
3530                        latest = monitor.next().await.expect("event missing");
3531                    }
3532                }));
3533            }
3534            join_all(finalizers).await;
3535
3536            // Check reporters for correct activity
3537            let byz = &participants[0];
3538            let mut count_nullify_and_finalize = 0;
3539            for reporter in reporters.iter() {
3540                // Ensure only faults for byz
3541                {
3542                    let faults = reporter.faults.lock().unwrap();
3543                    assert_eq!(faults.len(), 1);
3544                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
3545                    for (_, faults) in faulter.iter() {
3546                        for fault in faults.iter() {
3547                            match fault {
3548                                Activity::NullifyFinalize(_) => {
3549                                    count_nullify_and_finalize += 1;
3550                                }
3551                                _ => panic!("unexpected fault: {fault:?}"),
3552                            }
3553                        }
3554                    }
3555                }
3556
3557                // Ensure no invalid signatures
3558                {
3559                    let invalid = reporter.invalid.lock().unwrap();
3560                    assert_eq!(*invalid, 0);
3561                }
3562            }
3563            assert!(count_nullify_and_finalize > 0);
3564
3565            // Ensure nullifier is blocked
3566            let blocked = oracle.blocked().await.unwrap();
3567            assert!(!blocked.is_empty());
3568            for (a, b) in blocked {
3569                assert_ne!(&a, byz);
3570                assert_eq!(&b, byz);
3571            }
3572        });
3573    }
3574
3575    #[test_group("slow")]
3576    #[test_traced]
3577    fn test_nuller() {
3578        for seed in 0..5 {
3579            nuller::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
3580            nuller::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
3581            nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3582            nuller::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3583            nuller::<_, _, RoundRobin>(seed, ed25519::fixture);
3584        }
3585    }
3586
3587    fn outdated<S, F, L>(seed: u64, mut fixture: F)
3588    where
3589        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3590        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
3591        L: Elector<S>,
3592    {
3593        // Create context
3594        let n = 4;
3595        let required_containers = View::new(100);
3596        let activity_timeout = ViewDelta::new(10);
3597        let skip_timeout = ViewDelta::new(5);
3598        let namespace = b"consensus".to_vec();
3599        let cfg = deterministic::Config::new()
3600            .with_seed(seed)
3601            .with_timeout(Some(Duration::from_secs(30)));
3602        let executor = deterministic::Runner::new(cfg);
3603        executor.start(|mut context| async move {
3604            // Create simulated network
3605            let (network, mut oracle) = Network::new(
3606                context.with_label("network"),
3607                Config {
3608                    max_size: 1024 * 1024,
3609                    disconnect_on_block: false,
3610                    tracked_peer_sets: None,
3611                },
3612            );
3613
3614            // Start network
3615            network.start();
3616
3617            // Register participants
3618            let Fixture {
3619                participants,
3620                schemes,
3621                ..
3622            } = fixture(&mut context, n);
3623            let mut registrations = register_validators(&mut oracle, &participants).await;
3624
3625            // Link all validators
3626            let link = Link {
3627                latency: Duration::from_millis(10),
3628                jitter: Duration::from_millis(1),
3629                success_rate: 1.0,
3630            };
3631            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3632
3633            // Create engines
3634            let elector = L::default();
3635            let relay = Arc::new(mocks::relay::Relay::new());
3636            let mut reporters = Vec::new();
3637            for (idx_scheme, validator) in participants.iter().enumerate() {
3638                // Create scheme context
3639                let context = context.with_label(&format!("validator_{}", *validator));
3640
3641                // Start engine
3642                let reporter_config = mocks::reporter::Config {
3643                    namespace: namespace.clone(),
3644                    participants: participants.clone().try_into().unwrap(),
3645                    scheme: schemes[idx_scheme].clone(),
3646                    elector: elector.clone(),
3647                };
3648                let reporter =
3649                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3650                let (pending, recovered, resolver) = registrations
3651                    .remove(validator)
3652                    .expect("validator should be registered");
3653                if idx_scheme == 0 {
3654                    let cfg = mocks::outdated::Config {
3655                        scheme: schemes[idx_scheme].clone(),
3656                        namespace: namespace.clone(),
3657                        view_delta: ViewDelta::new(activity_timeout.get().saturating_mul(4)),
3658                    };
3659                    let engine: mocks::outdated::Outdated<_, _, Sha256> =
3660                        mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
3661                    engine.start(pending);
3662                } else {
3663                    reporters.push(reporter.clone());
3664                    let application_cfg = mocks::application::Config {
3665                        hasher: Sha256::default(),
3666                        relay: relay.clone(),
3667                        me: validator.clone(),
3668                        propose_latency: (10.0, 5.0),
3669                        verify_latency: (10.0, 5.0),
3670                        certify_latency: (10.0, 5.0),
3671                        should_certify: mocks::application::Certifier::Sometimes,
3672                    };
3673                    let (actor, application) = mocks::application::Application::new(
3674                        context.with_label("application"),
3675                        application_cfg,
3676                    );
3677                    actor.start();
3678                    let blocker = oracle.control(validator.clone());
3679                    let cfg = config::Config {
3680                        scheme: schemes[idx_scheme].clone(),
3681                        elector: elector.clone(),
3682                        blocker,
3683                        automaton: application.clone(),
3684                        relay: application.clone(),
3685                        reporter: reporter.clone(),
3686                        partition: validator.clone().to_string(),
3687                        mailbox_size: 1024,
3688                        epoch: Epoch::new(333),
3689                        namespace: namespace.clone(),
3690                        leader_timeout: Duration::from_secs(1),
3691                        notarization_timeout: Duration::from_secs(2),
3692                        nullify_retry: Duration::from_secs(10),
3693                        fetch_timeout: Duration::from_secs(1),
3694                        activity_timeout,
3695                        skip_timeout,
3696                        fetch_concurrent: 4,
3697                        replay_buffer: NZUsize!(1024 * 1024),
3698                        write_buffer: NZUsize!(1024 * 1024),
3699                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3700                    };
3701                    let engine = Engine::new(context.with_label("engine"), cfg);
3702                    engine.start(pending, recovered, resolver);
3703                }
3704            }
3705
3706            // Wait for all engines to finish
3707            let mut finalizers = Vec::new();
3708            for reporter in reporters.iter_mut() {
3709                let (mut latest, mut monitor) = reporter.subscribe().await;
3710                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3711                    while latest < required_containers {
3712                        latest = monitor.next().await.expect("event missing");
3713                    }
3714                }));
3715            }
3716            join_all(finalizers).await;
3717
3718            // Check reporters for correct activity
3719            for reporter in reporters.iter() {
3720                // Ensure no faults
3721                {
3722                    let faults = reporter.faults.lock().unwrap();
3723                    assert!(faults.is_empty());
3724                }
3725
3726                // Ensure no invalid signatures
3727                {
3728                    let invalid = reporter.invalid.lock().unwrap();
3729                    assert_eq!(*invalid, 0);
3730                }
3731            }
3732
3733            // Ensure no blocked connections
3734            let blocked = oracle.blocked().await.unwrap();
3735            assert!(blocked.is_empty());
3736        });
3737    }
3738
3739    #[test_group("slow")]
3740    #[test_traced]
3741    fn test_outdated() {
3742        for seed in 0..5 {
3743            outdated::<_, _, Random>(seed, bls12381_threshold::fixture::<MinPk, _>);
3744            outdated::<_, _, Random>(seed, bls12381_threshold::fixture::<MinSig, _>);
3745            outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinPk, _>);
3746            outdated::<_, _, RoundRobin>(seed, bls12381_multisig::fixture::<MinSig, _>);
3747            outdated::<_, _, RoundRobin>(seed, ed25519::fixture);
3748        }
3749    }
3750
3751    fn run_1k<S, F, L>(mut fixture: F)
3752    where
3753        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3754        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
3755        L: Elector<S>,
3756    {
3757        // Create context
3758        let n = 10;
3759        let required_containers = View::new(1_000);
3760        let activity_timeout = ViewDelta::new(10);
3761        let skip_timeout = ViewDelta::new(5);
3762        let namespace = b"consensus".to_vec();
3763        let cfg = deterministic::Config::new();
3764        let executor = deterministic::Runner::new(cfg);
3765        executor.start(|mut context| async move {
3766            // Create simulated network
3767            let (network, mut oracle) = Network::new(
3768                context.with_label("network"),
3769                Config {
3770                    max_size: 1024 * 1024,
3771                    disconnect_on_block: false,
3772                    tracked_peer_sets: None,
3773                },
3774            );
3775
3776            // Start network
3777            network.start();
3778
3779            // Register participants
3780            let Fixture {
3781                participants,
3782                schemes,
3783                ..
3784            } = fixture(&mut context, n);
3785            let mut registrations = register_validators(&mut oracle, &participants).await;
3786
3787            // Link all validators
3788            let link = Link {
3789                latency: Duration::from_millis(80),
3790                jitter: Duration::from_millis(10),
3791                success_rate: 0.98,
3792            };
3793            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3794
3795            // Create engines
3796            let elector = L::default();
3797            let relay = Arc::new(mocks::relay::Relay::new());
3798            let mut reporters = Vec::new();
3799            let mut engine_handlers = Vec::new();
3800            for (idx, validator) in participants.iter().enumerate() {
3801                // Create scheme context
3802                let context = context.with_label(&format!("validator_{}", *validator));
3803
3804                // Configure engine
3805                let reporter_config = mocks::reporter::Config {
3806                    namespace: namespace.clone(),
3807                    participants: participants.clone().try_into().unwrap(),
3808                    scheme: schemes[idx].clone(),
3809                    elector: elector.clone(),
3810                };
3811                let reporter =
3812                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3813                reporters.push(reporter.clone());
3814                let application_cfg = mocks::application::Config {
3815                    hasher: Sha256::default(),
3816                    relay: relay.clone(),
3817                    me: validator.clone(),
3818                    propose_latency: (100.0, 50.0),
3819                    verify_latency: (50.0, 40.0),
3820                    certify_latency: (50.0, 40.0),
3821                    should_certify: mocks::application::Certifier::Sometimes,
3822                };
3823                let (actor, application) = mocks::application::Application::new(
3824                    context.with_label("application"),
3825                    application_cfg,
3826                );
3827                actor.start();
3828                let blocker = oracle.control(validator.clone());
3829                let cfg = config::Config {
3830                    scheme: schemes[idx].clone(),
3831                    elector: elector.clone(),
3832                    blocker,
3833                    automaton: application.clone(),
3834                    relay: application.clone(),
3835                    reporter: reporter.clone(),
3836                    partition: validator.to_string(),
3837                    mailbox_size: 1024,
3838                    epoch: Epoch::new(333),
3839                    namespace: namespace.clone(),
3840                    leader_timeout: Duration::from_secs(1),
3841                    notarization_timeout: Duration::from_secs(2),
3842                    nullify_retry: Duration::from_secs(10),
3843                    fetch_timeout: Duration::from_secs(1),
3844                    activity_timeout,
3845                    skip_timeout,
3846                    fetch_concurrent: 4,
3847                    replay_buffer: NZUsize!(1024 * 1024),
3848                    write_buffer: NZUsize!(1024 * 1024),
3849                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3850                };
3851                let engine = Engine::new(context.with_label("engine"), cfg);
3852
3853                // Start engine
3854                let (pending, recovered, resolver) = registrations
3855                    .remove(validator)
3856                    .expect("validator should be registered");
3857                engine_handlers.push(engine.start(pending, recovered, resolver));
3858            }
3859
3860            // Wait for all engines to finish
3861            let mut finalizers = Vec::new();
3862            for reporter in reporters.iter_mut() {
3863                let (mut latest, mut monitor) = reporter.subscribe().await;
3864                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3865                    while latest < required_containers {
3866                        latest = monitor.next().await.expect("event missing");
3867                    }
3868                }));
3869            }
3870            join_all(finalizers).await;
3871
3872            // Check reporters for correct activity
3873            for reporter in reporters.iter() {
3874                // Ensure no faults
3875                {
3876                    let faults = reporter.faults.lock().unwrap();
3877                    assert!(faults.is_empty());
3878                }
3879
3880                // Ensure no invalid signatures
3881                {
3882                    let invalid = reporter.invalid.lock().unwrap();
3883                    assert_eq!(*invalid, 0);
3884                }
3885            }
3886
3887            // Ensure no blocked connections
3888            let blocked = oracle.blocked().await.unwrap();
3889            assert!(blocked.is_empty());
3890        })
3891    }
3892
3893    #[test_group("slow")]
3894    #[test_traced]
3895    fn test_1k_bls12381_threshold_min_pk() {
3896        run_1k::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
3897    }
3898
3899    #[test_group("slow")]
3900    #[test_traced]
3901    fn test_1k_bls12381_threshold_min_sig() {
3902        run_1k::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
3903    }
3904
3905    #[test_group("slow")]
3906    #[test_traced]
3907    fn test_1k_bls12381_multisig_min_pk() {
3908        run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
3909    }
3910
3911    #[test_group("slow")]
3912    #[test_traced]
3913    fn test_1k_bls12381_multisig_min_sig() {
3914        run_1k::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
3915    }
3916
3917    #[test_group("slow")]
3918    #[test_traced]
3919    fn test_1k_ed25519() {
3920        run_1k::<_, _, RoundRobin>(ed25519::fixture);
3921    }
3922
3923    fn engine_shutdown<S, F, L>(mut fixture: F, graceful: bool)
3924    where
3925        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
3926        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
3927        L: Elector<S>,
3928    {
3929        // Create context
3930        let n = 1;
3931        let namespace = b"consensus".to_vec();
3932        let executor = deterministic::Runner::timed(Duration::from_secs(10));
3933        executor.start(|mut context| async move {
3934            // Create simulated network
3935            let (network, mut oracle) = Network::new(
3936                context.with_label("network"),
3937                Config {
3938                    max_size: 1024 * 1024,
3939                    disconnect_on_block: true,
3940                    tracked_peer_sets: None,
3941                },
3942            );
3943
3944            // Start network
3945            network.start();
3946
3947            // Register a single participant
3948            let Fixture {
3949                participants,
3950                schemes,
3951                ..
3952            } = fixture(&mut context, n);
3953            let mut registrations = register_validators(&mut oracle, &participants).await;
3954
3955            // Link the single validator to itself (no-ops for completeness)
3956            let link = Link {
3957                latency: Duration::from_millis(1),
3958                jitter: Duration::from_millis(0),
3959                success_rate: 1.0,
3960            };
3961            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3962
3963            // Create engine
3964            let elector = L::default();
3965            let reporter_config = mocks::reporter::Config {
3966                namespace: namespace.clone(),
3967                participants: participants.clone().try_into().unwrap(),
3968                scheme: schemes[0].clone(),
3969                elector: elector.clone(),
3970            };
3971            let reporter =
3972                mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3973            let relay = Arc::new(mocks::relay::Relay::new());
3974            let application_cfg = mocks::application::Config {
3975                hasher: Sha256::default(),
3976                relay: relay.clone(),
3977                me: participants[0].clone(),
3978                propose_latency: (1.0, 0.0),
3979                verify_latency: (1.0, 0.0),
3980                certify_latency: (1.0, 0.0),
3981                should_certify: mocks::application::Certifier::Sometimes,
3982            };
3983            let (actor, application) = mocks::application::Application::new(
3984                context.with_label("application"),
3985                application_cfg,
3986            );
3987            actor.start();
3988            let blocker = oracle.control(participants[0].clone());
3989            let cfg = config::Config {
3990                scheme: schemes[0].clone(),
3991                elector: elector.clone(),
3992                blocker,
3993                automaton: application.clone(),
3994                relay: application.clone(),
3995                reporter: reporter.clone(),
3996                partition: participants[0].clone().to_string(),
3997                mailbox_size: 64,
3998                epoch: Epoch::new(333),
3999                namespace: namespace.clone(),
4000                leader_timeout: Duration::from_millis(50),
4001                notarization_timeout: Duration::from_millis(100),
4002                nullify_retry: Duration::from_millis(250),
4003                fetch_timeout: Duration::from_millis(50),
4004                activity_timeout: ViewDelta::new(4),
4005                skip_timeout: ViewDelta::new(2),
4006                fetch_concurrent: 4,
4007                replay_buffer: NZUsize!(1024 * 16),
4008                write_buffer: NZUsize!(1024 * 16),
4009                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4010            };
4011            let engine = Engine::new(context.with_label("engine"), cfg);
4012
4013            // Start engine
4014            let (pending, recovered, resolver) = registrations
4015                .remove(&participants[0])
4016                .expect("validator should be registered");
4017            let handle = engine.start(pending, recovered, resolver);
4018
4019            // Allow tasks to start
4020            context.sleep(Duration::from_millis(1000)).await;
4021
4022            // Verify that engine and child actors are running
4023            let metrics_before = context.encode();
4024            let is_running = |name: &str| -> bool {
4025                metrics_before.lines().any(|line| {
4026                    line.starts_with("runtime_tasks_running{")
4027                        && line.contains(&format!("name=\"{name}\""))
4028                        && line.contains("kind=\"Task\"")
4029                        && line.trim_end().ends_with(" 1")
4030                })
4031            };
4032            assert!(is_running("engine"));
4033            assert!(is_running("engine_batcher"));
4034            assert!(is_running("engine_voter"));
4035            assert!(is_running("engine_resolver"));
4036
4037            // Make sure the engine is still running
4038            context.sleep(Duration::from_millis(1000)).await;
4039            assert!(is_running("engine"));
4040
4041            // Shutdown engine and ensure children stop
4042            let metrics_after = if graceful {
4043                let metrics_context = context.clone();
4044                let result = context.stop(0, Some(Duration::from_secs(5))).await;
4045                assert!(
4046                    result.is_ok(),
4047                    "graceful shutdown should complete: {result:?}"
4048                );
4049                metrics_context.encode()
4050            } else {
4051                handle.abort();
4052                let _ = handle.await; // ensure parent tear-down runs
4053
4054                // Give the runtime a tick to process aborts
4055                context.sleep(Duration::from_millis(1000)).await;
4056                context.encode()
4057            };
4058            let is_stopped = |name: &str| -> bool {
4059                // Either the gauge is 0, or the entry is absent (both imply not running)
4060                metrics_after.lines().any(|line| {
4061                    line.starts_with("runtime_tasks_running{")
4062                        && line.contains(&format!("name=\"{name}\""))
4063                        && line.contains("kind=\"Task\"")
4064                        && line.trim_end().ends_with(" 0")
4065                })
4066            };
4067            assert!(is_stopped("engine"));
4068            assert!(is_stopped("engine_batcher"));
4069            assert!(is_stopped("engine_voter"));
4070            assert!(is_stopped("engine_resolver"));
4071        });
4072    }
4073
4074    #[test_traced]
4075    fn test_children_shutdown_on_engine_abort() {
4076        engine_shutdown::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>, false);
4077        engine_shutdown::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>, false);
4078        engine_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>, false);
4079        engine_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>, false);
4080        engine_shutdown::<_, _, RoundRobin>(ed25519::fixture, false);
4081    }
4082
4083    #[test_traced]
4084    fn test_graceful_shutdown() {
4085        engine_shutdown::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>, true);
4086        engine_shutdown::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>, true);
4087        engine_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>, true);
4088        engine_shutdown::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>, true);
4089        engine_shutdown::<_, _, RoundRobin>(ed25519::fixture, true);
4090    }
4091
4092    fn attributable_reporter_filtering<S, F, L>(mut fixture: F)
4093    where
4094        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4095        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
4096        L: Elector<S>,
4097    {
4098        let n = 3;
4099        let required_containers = View::new(10);
4100        let activity_timeout = ViewDelta::new(10);
4101        let skip_timeout = ViewDelta::new(5);
4102        let namespace = b"consensus".to_vec();
4103        let executor = deterministic::Runner::timed(Duration::from_secs(30));
4104        executor.start(|mut context| async move {
4105            // Create simulated network
4106            let (network, mut oracle) = Network::new(
4107                context.with_label("network"),
4108                Config {
4109                    max_size: 1024 * 1024,
4110                    disconnect_on_block: false,
4111                    tracked_peer_sets: None,
4112                },
4113            );
4114            network.start();
4115
4116            // Register participants
4117            let Fixture {
4118                participants,
4119                schemes,
4120                ..
4121            } = fixture(&mut context, n);
4122            let mut registrations = register_validators(&mut oracle, &participants).await;
4123
4124            // Link all validators
4125            let link = Link {
4126                latency: Duration::from_millis(10),
4127                jitter: Duration::from_millis(1),
4128                success_rate: 1.0,
4129            };
4130            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4131
4132            // Create engines with `AttributableReporter` wrapper
4133            let elector = L::default();
4134            let relay = Arc::new(mocks::relay::Relay::new());
4135            let mut reporters = Vec::new();
4136            for (idx, validator) in participants.iter().enumerate() {
4137                let context = context.with_label(&format!("validator_{}", *validator));
4138
4139                let reporter_config = mocks::reporter::Config {
4140                    namespace: namespace.clone(),
4141                    participants: participants.clone().try_into().unwrap(),
4142                    scheme: schemes[idx].clone(),
4143                    elector: elector.clone(),
4144                };
4145                let mock_reporter = mocks::reporter::Reporter::new(
4146                    context.with_label("mock_reporter"),
4147                    reporter_config,
4148                );
4149
4150                // Wrap with `AttributableReporter`
4151                let attributable_reporter = scheme::reporter::AttributableReporter::new(
4152                    context.with_label("rng"),
4153                    schemes[idx].clone(),
4154                    namespace.clone(),
4155                    mock_reporter.clone(),
4156                    true, // Enable verification
4157                );
4158                reporters.push(mock_reporter.clone());
4159
4160                let application_cfg = mocks::application::Config {
4161                    hasher: Sha256::default(),
4162                    relay: relay.clone(),
4163                    me: validator.clone(),
4164                    propose_latency: (10.0, 5.0),
4165                    verify_latency: (10.0, 5.0),
4166                    certify_latency: (10.0, 5.0),
4167                    should_certify: mocks::application::Certifier::Sometimes,
4168                };
4169                let (actor, application) = mocks::application::Application::new(
4170                    context.with_label("application"),
4171                    application_cfg,
4172                );
4173                actor.start();
4174                let blocker = oracle.control(validator.clone());
4175                let cfg = config::Config {
4176                    scheme: schemes[idx].clone(),
4177                    elector: elector.clone(),
4178                    blocker,
4179                    automaton: application.clone(),
4180                    relay: application.clone(),
4181                    reporter: attributable_reporter,
4182                    partition: validator.to_string(),
4183                    mailbox_size: 1024,
4184                    epoch: Epoch::new(333),
4185                    namespace: namespace.clone(),
4186                    leader_timeout: Duration::from_secs(1),
4187                    notarization_timeout: Duration::from_secs(2),
4188                    nullify_retry: Duration::from_secs(10),
4189                    fetch_timeout: Duration::from_secs(1),
4190                    activity_timeout,
4191                    skip_timeout,
4192                    fetch_concurrent: 4,
4193                    replay_buffer: NZUsize!(1024 * 1024),
4194                    write_buffer: NZUsize!(1024 * 1024),
4195                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4196                };
4197                let engine = Engine::new(context.with_label("engine"), cfg);
4198
4199                // Start engine
4200                let (pending, recovered, resolver) = registrations
4201                    .remove(validator)
4202                    .expect("validator should be registered");
4203                engine.start(pending, recovered, resolver);
4204            }
4205
4206            // Wait for all engines to finish
4207            let mut finalizers = Vec::new();
4208            for reporter in reporters.iter_mut() {
4209                let (mut latest, mut monitor) = reporter.subscribe().await;
4210                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4211                    while latest < required_containers {
4212                        latest = monitor.next().await.expect("event missing");
4213                    }
4214                }));
4215            }
4216            join_all(finalizers).await;
4217
4218            // Verify filtering behavior based on scheme attributability
4219            for reporter in reporters.iter() {
4220                // Ensure no faults (normal operation)
4221                {
4222                    let faults = reporter.faults.lock().unwrap();
4223                    assert!(faults.is_empty(), "No faults should be reported");
4224                }
4225
4226                // Ensure no invalid signatures
4227                {
4228                    let invalid = reporter.invalid.lock().unwrap();
4229                    assert_eq!(*invalid, 0, "No invalid signatures");
4230                }
4231
4232                // Check that we have certificates reported
4233                {
4234                    let notarizations = reporter.notarizations.lock().unwrap();
4235                    let finalizations = reporter.finalizations.lock().unwrap();
4236                    assert!(
4237                        !notarizations.is_empty() || !finalizations.is_empty(),
4238                        "Certificates should be reported"
4239                    );
4240                }
4241
4242                // Check notarizes
4243                let notarizes = reporter.notarizes.lock().unwrap();
4244                let last_view = notarizes.keys().max().cloned().unwrap_or_default();
4245                for (view, payloads) in notarizes.iter() {
4246                    if *view == last_view {
4247                        continue; // Skip last view
4248                    }
4249
4250                    let signers: usize = payloads.values().map(|signers| signers.len()).sum();
4251
4252                    // For attributable schemes, we should see peer activities
4253                    if schemes[0].is_attributable() {
4254                        assert!(signers > 1, "view {view}: {signers}");
4255                    } else {
4256                        // For non-attributable, we shouldn't see any peer activities
4257                        assert_eq!(signers, 0);
4258                    }
4259                }
4260
4261                // Check finalizes
4262                let finalizes = reporter.finalizes.lock().unwrap();
4263                for (_, payloads) in finalizes.iter() {
4264                    let signers: usize = payloads.values().map(|signers| signers.len()).sum();
4265
4266                    // For attributable schemes, we should see peer activities
4267                    if schemes[0].is_attributable() {
4268                        assert!(signers > 1);
4269                    } else {
4270                        // For non-attributable, we shouldn't see any peer activities
4271                        assert_eq!(signers, 0);
4272                    }
4273                }
4274            }
4275
4276            // Ensure no blocked connections (normal operation)
4277            let blocked = oracle.blocked().await.unwrap();
4278            assert!(blocked.is_empty());
4279        });
4280    }
4281
4282    #[test_traced]
4283    fn test_attributable_reporter_filtering() {
4284        attributable_reporter_filtering::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
4285        attributable_reporter_filtering::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
4286        attributable_reporter_filtering::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4287        attributable_reporter_filtering::<_, _, RoundRobin>(
4288            bls12381_multisig::fixture::<MinSig, _>,
4289        );
4290        attributable_reporter_filtering::<_, _, RoundRobin>(ed25519::fixture);
4291    }
4292
4293    fn split_views_no_lockup<S, F, L>(mut fixture: F)
4294    where
4295        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4296        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
4297        L: Elector<S>,
4298    {
4299        // Scenario:
4300        // - View F: Finalization of B_1 seen by all participants.
4301        // - View F+1:
4302        //   - Nullification seen by honest (4..=6,7) and all 3 byzantines
4303        //   - Notarization of B_2A seen by honest (1..=3)
4304        // - View F+2:
4305        //   - Nullification seen by honest (1..=3,7) and all 3 byzantines
4306        //   - Notarization of B_2B seen by honest (4..=6)
4307        // - View F+3: Nullification. Seen by all participants.
4308        // - Then ensure progress resumes beyond F+3 after reconnecting
4309
4310        // Define participant types
4311        enum ParticipantType {
4312            Group1,    // receives notarization for f+1, nullification for f+2
4313            Group2,    // receives nullification for f+1, notarization for f+2
4314            Ignorant,  // receives nullification for f+1 and f+2
4315            Byzantine, // nullify-only
4316        }
4317        let get_type = |idx: usize| -> ParticipantType {
4318            match idx {
4319                0..3 => ParticipantType::Group1,
4320                3..6 => ParticipantType::Group2,
4321                6 => ParticipantType::Ignorant,
4322                7..10 => ParticipantType::Byzantine,
4323                _ => unreachable!(),
4324            }
4325        };
4326
4327        // Create context
4328        let n = 10;
4329        let quorum = quorum(n) as usize;
4330        assert_eq!(quorum, 7);
4331        let activity_timeout = ViewDelta::new(10);
4332        let skip_timeout = ViewDelta::new(5);
4333        let namespace = b"consensus".to_vec();
4334        let executor = deterministic::Runner::timed(Duration::from_secs(300));
4335        executor.start(|mut context| async move {
4336            // Create simulated network
4337            let (network, mut oracle) = Network::new(
4338                context.with_label("network"),
4339                Config {
4340                    max_size: 1024 * 1024,
4341                    disconnect_on_block: false,
4342                    tracked_peer_sets: None,
4343                },
4344            );
4345            network.start();
4346
4347            // Register participants
4348            let Fixture {
4349                participants,
4350                schemes,
4351                ..
4352            } = fixture(&mut context, n);
4353            let mut registrations = register_validators(&mut oracle, &participants).await;
4354
4355            // ========== Create engines ==========
4356
4357            // Do not link validators yet; we will inject certificates first, then link everyone.
4358
4359            // Create engines: 7 honest engines, 3 byzantine
4360            let elector = L::default();
4361            let relay = Arc::new(mocks::relay::Relay::new());
4362            let mut honest_reporters = Vec::new();
4363            for (idx, validator) in participants.iter().enumerate() {
4364                let (pending, recovered, resolver) = registrations
4365                    .remove(validator)
4366                    .expect("validator should be registered");
4367                let participant_type = get_type(idx);
4368                if matches!(participant_type, ParticipantType::Byzantine) {
4369                    // Byzantine engines
4370                    let cfg = mocks::nullify_only::Config {
4371                        scheme: schemes[idx].clone(),
4372                        namespace: namespace.clone(),
4373                    };
4374                    let engine: mocks::nullify_only::NullifyOnly<_, _, Sha256> =
4375                        mocks::nullify_only::NullifyOnly::new(
4376                            context.with_label(&format!("byzantine_{}", *validator)),
4377                            cfg,
4378                        );
4379                    engine.start(pending);
4380                    // Recovered/resolver channels are unused for byzantine actors.
4381                    drop(recovered);
4382                    drop(resolver);
4383                } else {
4384                    // Honest engines
4385                    let reporter_config = mocks::reporter::Config {
4386                        namespace: namespace.clone(),
4387                        participants: participants.clone().try_into().unwrap(),
4388                        scheme: schemes[idx].clone(),
4389                        elector: elector.clone(),
4390                    };
4391                    let reporter = mocks::reporter::Reporter::new(
4392                        context.with_label(&format!("reporter_{}", *validator)),
4393                        reporter_config,
4394                    );
4395                    honest_reporters.push(reporter.clone());
4396
4397                    let application_cfg = mocks::application::Config {
4398                        hasher: Sha256::default(),
4399                        relay: relay.clone(),
4400                        me: validator.clone(),
4401                        propose_latency: (10.0, 5.0),
4402                        verify_latency: (10.0, 5.0),
4403                        certify_latency: (10.0, 5.0),
4404                        should_certify: mocks::application::Certifier::Sometimes,
4405                    };
4406                    let (actor, application) = mocks::application::Application::new(
4407                        context.with_label(&format!("application_{}", *validator)),
4408                        application_cfg,
4409                    );
4410                    actor.start();
4411                    let blocker = oracle.control(validator.clone());
4412                    let cfg = config::Config {
4413                        scheme: schemes[idx].clone(),
4414                        elector: elector.clone(),
4415                        blocker,
4416                        automaton: application.clone(),
4417                        relay: application.clone(),
4418                        reporter: reporter.clone(),
4419                        partition: validator.to_string(),
4420                        mailbox_size: 1024,
4421                        epoch: Epoch::new(333),
4422                        namespace: namespace.clone(),
4423                        leader_timeout: Duration::from_secs(10),
4424                        notarization_timeout: Duration::from_secs(10),
4425                        nullify_retry: Duration::from_secs(10),
4426                        fetch_timeout: Duration::from_secs(1),
4427                        activity_timeout,
4428                        skip_timeout,
4429                        fetch_concurrent: 4,
4430                        replay_buffer: NZUsize!(1024 * 1024),
4431                        write_buffer: NZUsize!(1024 * 1024),
4432                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4433                    };
4434                    let engine =
4435                        Engine::new(context.with_label(&format!("engine_{}", *validator)), cfg);
4436                    engine.start(pending, recovered, resolver);
4437                }
4438            }
4439
4440            // ========== Build the certificates manually ==========
4441
4442            // Helper: assemble finalization from explicit signer indices
4443            let build_finalization = |proposal: &Proposal<D>| -> TFinalization<_, D> {
4444                let votes: Vec<_> = (0..=quorum)
4445                    .map(|i| TFinalize::sign(&schemes[i], &namespace, proposal.clone()).unwrap())
4446                    .collect();
4447                TFinalization::from_finalizes(&schemes[0], &votes).expect("finalization quorum")
4448            };
4449            // Helper: assemble notarization from explicit signer indices
4450            let build_notarization = |proposal: &Proposal<D>| -> TNotarization<_, D> {
4451                let votes: Vec<_> = (0..=quorum)
4452                    .map(|i| TNotarize::sign(&schemes[i], &namespace, proposal.clone()).unwrap())
4453                    .collect();
4454                TNotarization::from_notarizes(&schemes[0], &votes).expect("notarization quorum")
4455            };
4456            let build_nullification = |round: Round| -> TNullification<_> {
4457                let votes: Vec<_> = (0..=quorum)
4458                    .map(|i| TNullify::sign::<D>(&schemes[i], &namespace, round).unwrap())
4459                    .collect();
4460                TNullification::from_nullifies(&schemes[0], &votes).expect("nullification quorum")
4461            };
4462            // Choose F=1 and construct B_1, B_2A, B_2B
4463            let f_view = 1;
4464            let round_f = Round::new(Epoch::new(333), View::new(f_view));
4465            let payload_b0 = Sha256::hash(b"B_F");
4466            let proposal_b0 = Proposal::new(round_f, View::new(f_view - 1), payload_b0);
4467            let payload_b1a = Sha256::hash(b"B_G1");
4468            let proposal_b1a = Proposal::new(
4469                Round::new(Epoch::new(333), View::new(f_view + 1)),
4470                View::new(f_view),
4471                payload_b1a,
4472            );
4473            let payload_b1b = Sha256::hash(b"B_G2");
4474            let proposal_b1b = Proposal::new(
4475                Round::new(Epoch::new(333), View::new(f_view + 2)),
4476                View::new(f_view),
4477                payload_b1b,
4478            );
4479
4480            // Build notarization and finalization for the first block
4481            let b0_notarization = build_notarization(&proposal_b0);
4482            let b0_finalization = build_finalization(&proposal_b0);
4483            // Build notarizations for F+1 and F+2
4484            let b1a_notarization = build_notarization(&proposal_b1a);
4485            let b1b_notarization = build_notarization(&proposal_b1b);
4486            // Build nullifications for F+1 and F+2
4487            let null_a = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 1)));
4488            let null_b = build_nullification(Round::new(Epoch::new(333), View::new(f_view + 2)));
4489
4490            // Create an 11th non-participant injector and obtain senders
4491            let injector_pk = PrivateKey::from_seed(1_000_000).public_key();
4492            let (mut injector_sender, _inj_certificate_receiver) = oracle
4493                .control(injector_pk.clone())
4494                .register(1, TEST_QUOTA)
4495                .await
4496                .unwrap();
4497
4498            // Create minimal one-way links from injector to all participants (not full mesh)
4499            let link = Link {
4500                latency: Duration::from_millis(10),
4501                jitter: Duration::from_millis(0),
4502                success_rate: 1.0,
4503            };
4504            for p in participants.iter() {
4505                oracle
4506                    .add_link(injector_pk.clone(), p.clone(), link.clone())
4507                    .await
4508                    .unwrap();
4509            }
4510
4511            // ========== Broadcast certificates over recovered network. ==========
4512
4513            // Broadcasts are in reverse order of views to make the tests easier by preventing the
4514            // proposer from making a proposal in F+1 or F+2, as it may panic when it proposes
4515            // something but generates a certificate for a different proposal.
4516
4517            // View F+2:
4518            let notarization_msg = Certificate::<_, D>::Notarization(b1b_notarization);
4519            let nullification_msg = Certificate::<_, D>::Nullification(null_b.clone());
4520            for (i, participant) in participants.iter().enumerate() {
4521                let recipient = Recipients::One(participant.clone());
4522                let msg = match get_type(i) {
4523                    ParticipantType::Group2 => notarization_msg.encode().into(),
4524                    _ => nullification_msg.encode().into(),
4525                };
4526                injector_sender.send(recipient, msg, true).await.unwrap();
4527            }
4528            // View F+1:
4529            let notarization_msg = Certificate::<_, D>::Notarization(b1a_notarization);
4530            let nullification_msg = Certificate::<_, D>::Nullification(null_a.clone());
4531            for (i, participant) in participants.iter().enumerate() {
4532                let recipient = Recipients::One(participant.clone());
4533                let msg = match get_type(i) {
4534                    ParticipantType::Group1 => notarization_msg.encode().into(),
4535                    _ => nullification_msg.encode().into(),
4536                };
4537                injector_sender.send(recipient, msg, true).await.unwrap();
4538            }
4539            // View F:
4540            let msg = Certificate::<_, D>::Notarization(b0_notarization)
4541                .encode()
4542                .into();
4543            injector_sender
4544                .send(Recipients::All, msg, true)
4545                .await
4546                .unwrap();
4547            let msg = Certificate::<_, D>::Finalization(b0_finalization)
4548                .encode()
4549                .into();
4550            injector_sender
4551                .send(Recipients::All, msg, true)
4552                .await
4553                .unwrap();
4554
4555            // Wait for a while to let the certificates propagate, but not so long that we
4556            // nullify view F+2.
4557            debug!("waiting for certificates to propagate");
4558            context.sleep(Duration::from_secs(5)).await;
4559            debug!("certificates propagated");
4560
4561            // ========== Assert the exact certificates are seen in each view ==========
4562
4563            // Assert the exact certificates in view F
4564            // All participants should have finalized B_0
4565            let view = View::new(f_view);
4566            for reporter in honest_reporters.iter() {
4567                let finalizations = reporter.finalizations.lock().unwrap();
4568                assert!(finalizations.contains_key(&view));
4569            }
4570
4571            // Assert the exact certificates in view F+1
4572            // Group 1 should have notarized B_1A only
4573            // All other participants should have nullified F+1
4574            let view = View::new(f_view + 1);
4575            for (i, reporter) in honest_reporters.iter().enumerate() {
4576                let finalizations = reporter.finalizations.lock().unwrap();
4577                assert!(!finalizations.contains_key(&view));
4578                let nullifications = reporter.nullifications.lock().unwrap();
4579                let notarizations = reporter.notarizations.lock().unwrap();
4580                match get_type(i) {
4581                    ParticipantType::Group1 => {
4582                        assert!(notarizations.contains_key(&view));
4583                        assert!(!nullifications.contains_key(&view));
4584                    }
4585                    _ => {
4586                        assert!(nullifications.contains_key(&view));
4587                        assert!(!notarizations.contains_key(&view));
4588                    }
4589                }
4590            }
4591
4592            // Assert the exact certificates in view F+2
4593            // Group 2 should have notarized B_1B only
4594            // All other participants should have nullified F+2
4595            let view = View::new(f_view + 2);
4596            for (i, reporter) in honest_reporters.iter().enumerate() {
4597                let finalizations = reporter.finalizations.lock().unwrap();
4598                assert!(!finalizations.contains_key(&view));
4599                let nullifications = reporter.nullifications.lock().unwrap();
4600                let notarizations = reporter.notarizations.lock().unwrap();
4601                match get_type(i) {
4602                    ParticipantType::Group2 => {
4603                        assert!(notarizations.contains_key(&view));
4604                        assert!(!nullifications.contains_key(&view));
4605                    }
4606                    _ => {
4607                        assert!(nullifications.contains_key(&view));
4608                        assert!(!notarizations.contains_key(&view));
4609                    }
4610                }
4611            }
4612
4613            // Assert no members have yet nullified view F+3
4614            let next_view = View::new(f_view + 3);
4615            for (i, reporter) in honest_reporters.iter().enumerate() {
4616                let nullifies = reporter.nullifies.lock().unwrap();
4617                assert!(!nullifies.contains_key(&next_view), "reporter {i}");
4618            }
4619
4620            // ========== Reconnect all participants ==========
4621
4622            // Reconnect all participants fully using the helper
4623            link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
4624
4625            // Wait until all honest reporters finalize strictly past F+2 (e.g., at least F+3)
4626            {
4627                let target = View::new(f_view + 3);
4628                let mut finalizers = Vec::new();
4629                for reporter in honest_reporters.iter_mut() {
4630                    let (mut latest, mut monitor) = reporter.subscribe().await;
4631                    finalizers.push(context.with_label("resume_finalizer").spawn(
4632                        move |_| async move {
4633                            while latest < target {
4634                                latest = monitor.next().await.expect("event missing");
4635                            }
4636                        },
4637                    ));
4638                }
4639                join_all(finalizers).await;
4640            }
4641
4642            // Sanity checks: no faults/invalid signatures, and no peers blocked
4643            for reporter in honest_reporters.iter() {
4644                {
4645                    let faults = reporter.faults.lock().unwrap();
4646                    assert!(faults.is_empty());
4647                }
4648                {
4649                    let invalid = reporter.invalid.lock().unwrap();
4650                    assert_eq!(*invalid, 0);
4651                }
4652            }
4653            let blocked = oracle.blocked().await.unwrap();
4654            assert!(blocked.is_empty());
4655        });
4656    }
4657
4658    #[test_traced]
4659    fn test_split_views_no_lockup() {
4660        split_views_no_lockup::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
4661        split_views_no_lockup::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
4662        split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
4663        split_views_no_lockup::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
4664        split_views_no_lockup::<_, _, RoundRobin>(ed25519::fixture);
4665    }
4666
4667    fn tle<V, L>()
4668    where
4669        V: Variant,
4670        L: Elector<bls12381_threshold::Scheme<PublicKey, V>>,
4671    {
4672        // Create context
4673        let n = 4;
4674        let namespace = b"consensus".to_vec();
4675        let activity_timeout = ViewDelta::new(100);
4676        let skip_timeout = ViewDelta::new(50);
4677        let executor = deterministic::Runner::timed(Duration::from_secs(30));
4678        executor.start(|mut context| async move {
4679            // Create simulated network
4680            let (network, mut oracle) = Network::new(
4681                context.with_label("network"),
4682                Config {
4683                    max_size: 1024 * 1024,
4684                    disconnect_on_block: false,
4685                    tracked_peer_sets: None,
4686                },
4687            );
4688
4689            // Start network
4690            network.start();
4691
4692            // Register participants
4693            let Fixture {
4694                participants,
4695                schemes,
4696                ..
4697            } = bls12381_threshold::fixture::<V, _>(&mut context, n);
4698            let mut registrations = register_validators(&mut oracle, &participants).await;
4699
4700            // Link all validators
4701            let link = Link {
4702                latency: Duration::from_millis(10),
4703                jitter: Duration::from_millis(5),
4704                success_rate: 1.0,
4705            };
4706            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4707
4708            // Create engines and reporters
4709            let elector = L::default();
4710            let relay = Arc::new(mocks::relay::Relay::new());
4711            let mut reporters = Vec::new();
4712            let mut engine_handlers = Vec::new();
4713            let monitor_reporter = Arc::new(Mutex::new(None));
4714            for (idx, validator) in participants.iter().enumerate() {
4715                // Create scheme context
4716                let context = context.with_label(&format!("validator_{}", *validator));
4717
4718                // Store first reporter for monitoring
4719                let reporter_config = mocks::reporter::Config {
4720                    namespace: namespace.clone(),
4721                    participants: participants.clone().try_into().unwrap(),
4722                    scheme: schemes[idx].clone(),
4723                    elector: elector.clone(),
4724                };
4725                let reporter =
4726                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
4727                reporters.push(reporter.clone());
4728                if idx == 0 {
4729                    *monitor_reporter.lock().unwrap() = Some(reporter.clone());
4730                }
4731
4732                // Configure application
4733                let application_cfg = mocks::application::Config {
4734                    hasher: Sha256::default(),
4735                    relay: relay.clone(),
4736                    me: validator.clone(),
4737                    propose_latency: (10.0, 5.0),
4738                    verify_latency: (10.0, 5.0),
4739                    certify_latency: (10.0, 5.0),
4740                    should_certify: mocks::application::Certifier::Sometimes,
4741                };
4742                let (actor, application) = mocks::application::Application::new(
4743                    context.with_label("application"),
4744                    application_cfg,
4745                );
4746                actor.start();
4747                let blocker = oracle.control(validator.clone());
4748                let cfg = config::Config {
4749                    scheme: schemes[idx].clone(),
4750                    elector: elector.clone(),
4751                    blocker,
4752                    automaton: application.clone(),
4753                    relay: application.clone(),
4754                    reporter: reporter.clone(),
4755                    partition: validator.to_string(),
4756                    mailbox_size: 1024,
4757                    epoch: Epoch::new(333),
4758                    namespace: namespace.clone(),
4759                    leader_timeout: Duration::from_millis(100),
4760                    notarization_timeout: Duration::from_millis(200),
4761                    nullify_retry: Duration::from_millis(500),
4762                    fetch_timeout: Duration::from_millis(100),
4763                    activity_timeout,
4764                    skip_timeout,
4765                    fetch_concurrent: 4,
4766                    replay_buffer: NZUsize!(1024 * 1024),
4767                    write_buffer: NZUsize!(1024 * 1024),
4768                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4769                };
4770                let engine = Engine::new(context.with_label("engine"), cfg);
4771
4772                // Start engine
4773                let (pending, recovered, resolver) = registrations
4774                    .remove(validator)
4775                    .expect("validator should be registered");
4776                engine_handlers.push(engine.start(pending, recovered, resolver));
4777            }
4778
4779            // Prepare TLE test data
4780            let target = Round::new(Epoch::new(333), View::new(10)); // Encrypt for round (epoch 333, view 10)
4781            let message = b"Secret message for future view10"; // 32 bytes
4782
4783            // Encrypt message
4784            let ciphertext = schemes[0].encrypt(&mut context, &namespace, target, *message);
4785
4786            // Wait for consensus to reach the target view and then decrypt
4787            let reporter = monitor_reporter.lock().unwrap().clone().unwrap();
4788            loop {
4789                // Wait for notarization
4790                context.sleep(Duration::from_millis(100)).await;
4791                let notarizations = reporter.notarizations.lock().unwrap();
4792                let Some(notarization) = notarizations.get(&target.view()) else {
4793                    continue;
4794                };
4795
4796                // Decrypt the message using the seed
4797                let seed = notarization.seed();
4798                let decrypted = seed
4799                    .decrypt(&ciphertext)
4800                    .expect("Decryption should succeed with valid seed signature");
4801                assert_eq!(
4802                    message,
4803                    decrypted.as_ref(),
4804                    "Decrypted message should match original message"
4805                );
4806                break;
4807            }
4808        });
4809    }
4810
4811    #[test_traced]
4812    fn test_tle() {
4813        tle::<MinPk, Random>();
4814        tle::<MinSig, Random>();
4815    }
4816
4817    fn hailstorm<S, F, L>(
4818        seed: u64,
4819        shutdowns: usize,
4820        interval: ViewDelta,
4821        mut fixture: F,
4822    ) -> String
4823    where
4824        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
4825        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
4826        L: Elector<S>,
4827    {
4828        // Create context
4829        let n = 5;
4830        let activity_timeout = ViewDelta::new(10);
4831        let skip_timeout = ViewDelta::new(5);
4832        let namespace = b"consensus".to_vec();
4833        let cfg = deterministic::Config::new().with_seed(seed);
4834        let executor = deterministic::Runner::new(cfg);
4835        executor.start(|mut context| async move {
4836            // Create simulated network
4837            let (network, mut oracle) = Network::new(
4838                context.with_label("network"),
4839                Config {
4840                    max_size: 1024 * 1024,
4841                    disconnect_on_block: true,
4842                    tracked_peer_sets: None,
4843                },
4844            );
4845
4846            // Start network
4847            network.start();
4848
4849            // Register participants
4850            let Fixture {
4851                participants,
4852                schemes,
4853                ..
4854            } = fixture(&mut context, n);
4855            let mut registrations = register_validators(&mut oracle, &participants).await;
4856
4857            // Link all validators
4858            let link = Link {
4859                latency: Duration::from_millis(10),
4860                jitter: Duration::from_millis(1),
4861                success_rate: 1.0,
4862            };
4863            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
4864
4865            // Create engines
4866            let elector = L::default();
4867            let relay = Arc::new(mocks::relay::Relay::new());
4868            let mut reporters = BTreeMap::new();
4869            let mut engine_handlers = BTreeMap::new();
4870            for (idx, validator) in participants.iter().enumerate() {
4871                // Create scheme context
4872                let context = context.with_label(&format!("validator_{}", *validator));
4873
4874                // Configure engine
4875                let reporter_config = mocks::reporter::Config {
4876                    namespace: namespace.clone(),
4877                    participants: participants.clone().try_into().unwrap(),
4878                    scheme: schemes[idx].clone(),
4879                    elector: elector.clone(),
4880                };
4881                let reporter =
4882                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
4883                reporters.insert(idx, reporter.clone());
4884                let application_cfg = mocks::application::Config {
4885                    hasher: Sha256::default(),
4886                    relay: relay.clone(),
4887                    me: validator.clone(),
4888                    propose_latency: (10.0, 5.0),
4889                    verify_latency: (10.0, 5.0),
4890                    certify_latency: (10.0, 5.0),
4891                    should_certify: mocks::application::Certifier::Sometimes,
4892                };
4893                let (actor, application) = mocks::application::Application::new(
4894                    context.with_label("application"),
4895                    application_cfg,
4896                );
4897                actor.start();
4898                let blocker = oracle.control(validator.clone());
4899                let cfg = config::Config {
4900                    scheme: schemes[idx].clone(),
4901                    elector: elector.clone(),
4902                    blocker,
4903                    automaton: application.clone(),
4904                    relay: application.clone(),
4905                    reporter: reporter.clone(),
4906                    partition: validator.to_string(),
4907                    mailbox_size: 1024,
4908                    epoch: Epoch::new(333),
4909                    namespace: namespace.clone(),
4910                    leader_timeout: Duration::from_secs(1),
4911                    notarization_timeout: Duration::from_secs(2),
4912                    nullify_retry: Duration::from_secs(10),
4913                    fetch_timeout: Duration::from_secs(1),
4914                    activity_timeout,
4915                    skip_timeout,
4916                    fetch_concurrent: 4,
4917                    replay_buffer: NZUsize!(1024 * 1024),
4918                    write_buffer: NZUsize!(1024 * 1024),
4919                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
4920                };
4921                let engine = Engine::new(context.with_label("engine"), cfg);
4922
4923                // Start engine
4924                let (pending, recovered, resolver) = registrations
4925                    .remove(validator)
4926                    .expect("validator should be registered");
4927                engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
4928            }
4929
4930            // Run shutdowns
4931            let mut target = View::zero();
4932            for i in 0..shutdowns {
4933                // Update target
4934                target = target.saturating_add(interval);
4935
4936                // Wait for all engines to finish
4937                let mut finalizers = Vec::new();
4938                for (_, reporter) in reporters.iter_mut() {
4939                    let (mut latest, mut monitor) = reporter.subscribe().await;
4940                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4941                        while latest < target {
4942                            latest = monitor.next().await.expect("event missing");
4943                        }
4944                    }));
4945                }
4946                join_all(finalizers).await;
4947                target = target.saturating_add(interval);
4948
4949                // Select a random engine to shutdown
4950                let idx = context.gen_range(0..engine_handlers.len());
4951                let validator = &participants[idx];
4952                let handle = engine_handlers.remove(&idx).unwrap();
4953                handle.abort();
4954                let _ = handle.await;
4955                let selected_reporter = reporters.remove(&idx).unwrap();
4956                info!(idx, ?validator, "shutdown validator");
4957
4958                // Wait for all engines to finish
4959                let mut finalizers = Vec::new();
4960                for (_, reporter) in reporters.iter_mut() {
4961                    let (mut latest, mut monitor) = reporter.subscribe().await;
4962                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
4963                        while latest < target {
4964                            latest = monitor.next().await.expect("event missing");
4965                        }
4966                    }));
4967                }
4968                join_all(finalizers).await;
4969                target = target.saturating_add(interval);
4970
4971                // Recreate engine
4972                info!(idx, ?validator, "restarting validator");
4973                let context =
4974                    context.with_label(&format!("validator_{}_restarted_{}", *validator, i));
4975
4976                // Start engine
4977                let (pending, recovered, resolver) =
4978                    register_validator(&mut oracle, validator.clone()).await;
4979                let application_cfg = mocks::application::Config {
4980                    hasher: Sha256::default(),
4981                    relay: relay.clone(),
4982                    me: validator.clone(),
4983                    propose_latency: (10.0, 5.0),
4984                    verify_latency: (10.0, 5.0),
4985                    certify_latency: (10.0, 5.0),
4986                    should_certify: mocks::application::Certifier::Sometimes,
4987                };
4988                let (actor, application) = mocks::application::Application::new(
4989                    context.with_label("application"),
4990                    application_cfg,
4991                );
4992                actor.start();
4993                reporters.insert(idx, selected_reporter.clone());
4994                let blocker = oracle.control(validator.clone());
4995                let cfg = config::Config {
4996                    scheme: schemes[idx].clone(),
4997                    elector: elector.clone(),
4998                    blocker,
4999                    automaton: application.clone(),
5000                    relay: application.clone(),
5001                    reporter: selected_reporter,
5002                    partition: validator.to_string(),
5003                    mailbox_size: 1024,
5004                    epoch: Epoch::new(333),
5005                    namespace: namespace.clone(),
5006                    leader_timeout: Duration::from_secs(1),
5007                    notarization_timeout: Duration::from_secs(2),
5008                    nullify_retry: Duration::from_secs(10),
5009                    fetch_timeout: Duration::from_secs(1),
5010                    activity_timeout,
5011                    skip_timeout,
5012                    fetch_concurrent: 4,
5013                    replay_buffer: NZUsize!(1024 * 1024),
5014                    write_buffer: NZUsize!(1024 * 1024),
5015                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5016                };
5017                let engine = Engine::new(context.with_label("engine"), cfg);
5018                engine_handlers.insert(idx, engine.start(pending, recovered, resolver));
5019
5020                // Wait for all engines to hit required containers
5021                let mut finalizers = Vec::new();
5022                for (_, reporter) in reporters.iter_mut() {
5023                    let (mut latest, mut monitor) = reporter.subscribe().await;
5024                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5025                        while latest < target {
5026                            latest = monitor.next().await.expect("event missing");
5027                        }
5028                    }));
5029                }
5030                join_all(finalizers).await;
5031                info!(idx, ?validator, "validator recovered");
5032            }
5033
5034            // Check reporters for correct activity
5035            let latest_complete = target.saturating_sub(activity_timeout);
5036            for (_, reporter) in reporters.iter() {
5037                // Ensure no faults
5038                {
5039                    let faults = reporter.faults.lock().unwrap();
5040                    assert!(faults.is_empty());
5041                }
5042
5043                // Ensure no invalid signatures
5044                {
5045                    let invalid = reporter.invalid.lock().unwrap();
5046                    assert_eq!(*invalid, 0);
5047                }
5048
5049                // Ensure no forks
5050                let mut notarized = HashMap::new();
5051                let mut finalized = HashMap::new();
5052                {
5053                    let notarizes = reporter.notarizes.lock().unwrap();
5054                    for view in View::range(View::new(1), latest_complete) {
5055                        // Ensure only one payload proposed per view
5056                        let Some(payloads) = notarizes.get(&view) else {
5057                            continue;
5058                        };
5059                        if payloads.len() > 1 {
5060                            panic!("view: {view}");
5061                        }
5062                        let (digest, _) = payloads.iter().next().unwrap();
5063                        notarized.insert(view, *digest);
5064                    }
5065                }
5066                {
5067                    let notarizations = reporter.notarizations.lock().unwrap();
5068                    for view in View::range(View::new(1), latest_complete) {
5069                        // Ensure notarization matches digest from notarizes
5070                        let Some(notarization) = notarizations.get(&view) else {
5071                            continue;
5072                        };
5073                        let Some(digest) = notarized.get(&view) else {
5074                            continue;
5075                        };
5076                        assert_eq!(&notarization.proposal.payload, digest);
5077                    }
5078                }
5079                {
5080                    let finalizes = reporter.finalizes.lock().unwrap();
5081                    for view in View::range(View::new(1), latest_complete) {
5082                        // Ensure only one payload proposed per view
5083                        let Some(payloads) = finalizes.get(&view) else {
5084                            continue;
5085                        };
5086                        if payloads.len() > 1 {
5087                            panic!("view: {view}");
5088                        }
5089                        let (digest, _) = payloads.iter().next().unwrap();
5090                        finalized.insert(view, *digest);
5091
5092                        // Only check at views below timeout
5093                        if view > latest_complete {
5094                            continue;
5095                        }
5096
5097                        // Ensure no nullifies for any finalizers
5098                        let nullifies = reporter.nullifies.lock().unwrap();
5099                        let Some(nullifies) = nullifies.get(&view) else {
5100                            continue;
5101                        };
5102                        for (_, finalizers) in payloads.iter() {
5103                            for finalizer in finalizers.iter() {
5104                                if nullifies.contains(finalizer) {
5105                                    panic!("should not nullify and finalize at same view");
5106                                }
5107                            }
5108                        }
5109                    }
5110                }
5111                {
5112                    let finalizations = reporter.finalizations.lock().unwrap();
5113                    for view in View::range(View::new(1), latest_complete) {
5114                        // Ensure finalization matches digest from finalizes
5115                        let Some(finalization) = finalizations.get(&view) else {
5116                            continue;
5117                        };
5118                        let Some(digest) = finalized.get(&view) else {
5119                            continue;
5120                        };
5121                        assert_eq!(&finalization.proposal.payload, digest);
5122                    }
5123                }
5124            }
5125
5126            // Ensure no blocked connections
5127            let blocked = oracle.blocked().await.unwrap();
5128            assert!(blocked.is_empty());
5129
5130            // Return state for audit
5131            context.auditor().state()
5132        })
5133    }
5134
5135    #[test_group("slow")]
5136    #[test_traced]
5137    fn test_hailstorm_bls12381_threshold_min_pk() {
5138        assert_eq!(
5139            hailstorm::<_, _, Random>(
5140                0,
5141                10,
5142                ViewDelta::new(15),
5143                bls12381_threshold::fixture::<MinPk, _>
5144            ),
5145            hailstorm::<_, _, Random>(
5146                0,
5147                10,
5148                ViewDelta::new(15),
5149                bls12381_threshold::fixture::<MinPk, _>
5150            ),
5151        );
5152    }
5153
5154    #[test_group("slow")]
5155    #[test_traced]
5156    fn test_hailstorm_bls12381_threshold_min_sig() {
5157        assert_eq!(
5158            hailstorm::<_, _, Random>(
5159                0,
5160                10,
5161                ViewDelta::new(15),
5162                bls12381_threshold::fixture::<MinSig, _>
5163            ),
5164            hailstorm::<_, _, Random>(
5165                0,
5166                10,
5167                ViewDelta::new(15),
5168                bls12381_threshold::fixture::<MinSig, _>
5169            ),
5170        );
5171    }
5172
5173    #[test_group("slow")]
5174    #[test_traced]
5175    fn test_hailstorm_bls12381_multisig_min_pk() {
5176        assert_eq!(
5177            hailstorm::<_, _, RoundRobin>(
5178                0,
5179                10,
5180                ViewDelta::new(15),
5181                bls12381_multisig::fixture::<MinPk, _>
5182            ),
5183            hailstorm::<_, _, RoundRobin>(
5184                0,
5185                10,
5186                ViewDelta::new(15),
5187                bls12381_multisig::fixture::<MinPk, _>
5188            ),
5189        );
5190    }
5191
5192    #[test_group("slow")]
5193    #[test_traced]
5194    fn test_hailstorm_bls12381_multisig_min_sig() {
5195        assert_eq!(
5196            hailstorm::<_, _, RoundRobin>(
5197                0,
5198                10,
5199                ViewDelta::new(15),
5200                bls12381_multisig::fixture::<MinSig, _>
5201            ),
5202            hailstorm::<_, _, RoundRobin>(
5203                0,
5204                10,
5205                ViewDelta::new(15),
5206                bls12381_multisig::fixture::<MinSig, _>
5207            ),
5208        );
5209    }
5210
5211    #[test_group("slow")]
5212    #[test_traced]
5213    fn test_hailstorm_ed25519() {
5214        assert_eq!(
5215            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture),
5216            hailstorm::<_, _, RoundRobin>(0, 10, ViewDelta::new(15), ed25519::fixture)
5217        );
5218    }
5219
5220    /// Implementation of [Twins: BFT Systems Made Robust](https://arxiv.org/abs/2004.10617).
5221    fn twins<S, F, L>(seed: u64, n: u32, strategy: Strategy, link: Link, mut fixture: F)
5222    where
5223        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5224        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
5225        L: Elector<S>,
5226    {
5227        let faults = max_faults(n);
5228        let required_containers = View::new(100);
5229        let activity_timeout = ViewDelta::new(10);
5230        let skip_timeout = ViewDelta::new(5);
5231        let namespace = b"consensus".to_vec();
5232        let cfg = deterministic::Config::new()
5233            .with_seed(seed)
5234            .with_timeout(Some(Duration::from_secs(900)));
5235        let executor = deterministic::Runner::new(cfg);
5236        executor.start(|mut context| async move {
5237            let (network, mut oracle) = Network::new(
5238                context.with_label("network"),
5239                Config {
5240                    max_size: 1024 * 1024,
5241                    disconnect_on_block: false,
5242                    tracked_peer_sets: None,
5243                },
5244            );
5245            network.start();
5246
5247            let Fixture {
5248                participants,
5249                schemes,
5250                ..
5251            } = fixture(&mut context, n);
5252            let participants: Arc<[_]> = participants.into();
5253            let mut registrations = register_validators(&mut oracle, &participants).await;
5254            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
5255
5256            // We don't apply partitioning to the relay explicitly, however, a participant will only query the relay by digest
5257            // after receiving a vote (implicitly respecting the partitioning)
5258            let elector = L::default();
5259            let relay = Arc::new(mocks::relay::Relay::new());
5260            let mut reporters = Vec::new();
5261            let mut engine_handlers = Vec::new();
5262
5263            // Create twin engines (f Byzantine twins)
5264            for (idx, validator) in participants.iter().enumerate().take(faults as usize) {
5265                let (
5266                    (vote_sender, vote_receiver),
5267                    (certificate_sender, certificate_receiver),
5268                    (resolver_sender, resolver_receiver),
5269                ) = registrations
5270                    .remove(validator)
5271                    .expect("validator should be registered");
5272
5273                // Create forwarder closures for votes
5274                let make_vote_forwarder = || {
5275                    let participants = participants.clone();
5276                    move |origin: SplitOrigin, _: &Recipients<_>, message: &Bytes| {
5277                        let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
5278                        let (primary, secondary) =
5279                            strategy.partitions(msg.view(), participants.as_ref());
5280                        match origin {
5281                            SplitOrigin::Primary => Some(Recipients::Some(primary)),
5282                            SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
5283                        }
5284                    }
5285                };
5286                // Create forwarder closures for certificates
5287                let make_certificate_forwarder = || {
5288                    let codec = schemes[idx].certificate_codec_config();
5289                    let participants = participants.clone();
5290                    move |origin: SplitOrigin, _: &Recipients<_>, message: &Bytes| {
5291                        let msg: Certificate<S, D> =
5292                            Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
5293                        let (primary, secondary) =
5294                            strategy.partitions(msg.view(), participants.as_ref());
5295                        match origin {
5296                            SplitOrigin::Primary => Some(Recipients::Some(primary)),
5297                            SplitOrigin::Secondary => Some(Recipients::Some(secondary)),
5298                        }
5299                    }
5300                };
5301                let make_drop_forwarder =
5302                    || move |_: SplitOrigin, _: &Recipients<_>, _: &Bytes| None;
5303
5304                // Create router closures for votes
5305                let make_vote_router = || {
5306                    let participants = participants.clone();
5307                    move |(sender, message): &(_, Bytes)| {
5308                        let msg: Vote<S, D> = Vote::decode(message.clone()).unwrap();
5309                        strategy.route(msg.view(), sender, participants.as_ref())
5310                    }
5311                };
5312                // Create router closures for certificates
5313                let make_certificate_router = || {
5314                    let codec = schemes[idx].certificate_codec_config();
5315                    let participants = participants.clone();
5316                    move |(sender, message): &(_, Bytes)| {
5317                        let msg: Certificate<S, D> =
5318                            Certificate::decode_cfg(&mut message.as_ref(), &codec).unwrap();
5319                        strategy.route(msg.view(), sender, participants.as_ref())
5320                    }
5321                };
5322                let make_drop_router = || move |(_, _): &(_, _)| SplitTarget::None;
5323
5324                // Apply view-based forwarder and router to pending and recovered channel
5325                let (vote_sender_primary, vote_sender_secondary) =
5326                    vote_sender.split_with(make_vote_forwarder());
5327                let (vote_receiver_primary, vote_receiver_secondary) = vote_receiver.split_with(
5328                    context.with_label(&format!("pending_split_{idx}")),
5329                    make_vote_router(),
5330                );
5331                let (certificate_sender_primary, certificate_sender_secondary) =
5332                    certificate_sender.split_with(make_certificate_forwarder());
5333                let (certificate_receiver_primary, certificate_receiver_secondary) =
5334                    certificate_receiver.split_with(
5335                        context.with_label(&format!("recovered_split_{idx}")),
5336                        make_certificate_router(),
5337                    );
5338
5339                // Prevent any resolver messages from being sent or received by twins (these messages aren't cleanly mapped to a view and allowing them to bypass partitions seems wrong)
5340                let (resolver_sender_primary, resolver_sender_secondary) =
5341                    resolver_sender.split_with(make_drop_forwarder());
5342                let (resolver_receiver_primary, resolver_receiver_secondary) = resolver_receiver
5343                    .split_with(
5344                        context.with_label(&format!("resolver_split_{idx}")),
5345                        make_drop_router(),
5346                    );
5347
5348                for (twin_label, pending, recovered, resolver) in [
5349                    (
5350                        "primary",
5351                        (vote_sender_primary, vote_receiver_primary),
5352                        (certificate_sender_primary, certificate_receiver_primary),
5353                        (resolver_sender_primary, resolver_receiver_primary),
5354                    ),
5355                    (
5356                        "secondary",
5357                        (vote_sender_secondary, vote_receiver_secondary),
5358                        (certificate_sender_secondary, certificate_receiver_secondary),
5359                        (resolver_sender_secondary, resolver_receiver_secondary),
5360                    ),
5361                ] {
5362                    let label = format!("twin_{idx}_{twin_label}");
5363                    let context = context.with_label(&label);
5364
5365                    let reporter_config = mocks::reporter::Config {
5366                        namespace: namespace.clone(),
5367                        participants: participants.as_ref().try_into().unwrap(),
5368                        scheme: schemes[idx].clone(),
5369                        elector: elector.clone(),
5370                    };
5371                    let reporter = mocks::reporter::Reporter::new(
5372                        context.with_label("reporter"),
5373                        reporter_config,
5374                    );
5375                    reporters.push(reporter.clone());
5376
5377                    let application_cfg = mocks::application::Config {
5378                        hasher: Sha256::default(),
5379                        relay: relay.clone(),
5380                        me: validator.clone(),
5381                        propose_latency: (10.0, 5.0),
5382                        verify_latency: (10.0, 5.0),
5383                        certify_latency: (10.0, 5.0),
5384                        should_certify: mocks::application::Certifier::Sometimes,
5385                    };
5386                    let (actor, application) = mocks::application::Application::new(
5387                        context.with_label("application"),
5388                        application_cfg,
5389                    );
5390                    actor.start();
5391
5392                    let blocker = oracle.control(validator.clone());
5393                    let cfg = config::Config {
5394                        scheme: schemes[idx].clone(),
5395                        elector: elector.clone(),
5396                        blocker,
5397                        automaton: application.clone(),
5398                        relay: application.clone(),
5399                        reporter: reporter.clone(),
5400                        partition: label,
5401                        mailbox_size: 1024,
5402                        epoch: Epoch::new(333),
5403                        namespace: namespace.clone(),
5404                        leader_timeout: Duration::from_secs(1),
5405                        notarization_timeout: Duration::from_secs(2),
5406                        nullify_retry: Duration::from_secs(10),
5407                        fetch_timeout: Duration::from_secs(1),
5408                        activity_timeout,
5409                        skip_timeout,
5410                        fetch_concurrent: 4,
5411                        replay_buffer: NZUsize!(1024 * 1024),
5412                        write_buffer: NZUsize!(1024 * 1024),
5413                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5414                    };
5415                    let engine = Engine::new(context.with_label("engine"), cfg);
5416                    engine_handlers.push(engine.start(pending, recovered, resolver));
5417                }
5418            }
5419
5420            // Create honest engines
5421            for (idx, validator) in participants.iter().enumerate().skip(faults as usize) {
5422                let label = format!("honest_{idx}");
5423                let context = context.with_label(&label);
5424
5425                let reporter_config = mocks::reporter::Config {
5426                    namespace: namespace.clone(),
5427                    participants: participants.as_ref().try_into().unwrap(),
5428                    scheme: schemes[idx].clone(),
5429                    elector: elector.clone(),
5430                };
5431                let reporter =
5432                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
5433                reporters.push(reporter.clone());
5434
5435                let application_cfg = mocks::application::Config {
5436                    hasher: Sha256::default(),
5437                    relay: relay.clone(),
5438                    me: validator.clone(),
5439                    propose_latency: (10.0, 5.0),
5440                    verify_latency: (10.0, 5.0),
5441                    certify_latency: (10.0, 5.0),
5442                    should_certify: mocks::application::Certifier::Sometimes,
5443                };
5444                let (actor, application) = mocks::application::Application::new(
5445                    context.with_label("application"),
5446                    application_cfg,
5447                );
5448                actor.start();
5449
5450                let blocker = oracle.control(validator.clone());
5451                let cfg = config::Config {
5452                    scheme: schemes[idx].clone(),
5453                    elector: elector.clone(),
5454                    blocker,
5455                    automaton: application.clone(),
5456                    relay: application.clone(),
5457                    reporter: reporter.clone(),
5458                    partition: label,
5459                    mailbox_size: 1024,
5460                    epoch: Epoch::new(333),
5461                    namespace: namespace.clone(),
5462                    leader_timeout: Duration::from_secs(1),
5463                    notarization_timeout: Duration::from_secs(2),
5464                    nullify_retry: Duration::from_secs(10),
5465                    fetch_timeout: Duration::from_secs(1),
5466                    activity_timeout,
5467                    skip_timeout,
5468                    fetch_concurrent: 4,
5469                    replay_buffer: NZUsize!(1024 * 1024),
5470                    write_buffer: NZUsize!(1024 * 1024),
5471                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
5472                };
5473                let engine = Engine::new(context.with_label("engine"), cfg);
5474
5475                let (pending, recovered, resolver) = registrations
5476                    .remove(validator)
5477                    .expect("validator should be registered");
5478                engine_handlers.push(engine.start(pending, recovered, resolver));
5479            }
5480
5481            // Wait for progress (liveness check)
5482            let mut finalizers = Vec::new();
5483            for reporter in reporters.iter_mut() {
5484                let (mut latest, mut monitor) = reporter.subscribe().await;
5485                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
5486                    while latest < required_containers {
5487                        latest = monitor.next().await.expect("event missing");
5488                    }
5489                }));
5490            }
5491            join_all(finalizers).await;
5492
5493            // Verify safety: no conflicting finalizations across honest reporters
5494            let honest_start = faults as usize * 2; // Each twin produces 2 reporters
5495            let mut finalized_at_view: BTreeMap<View, D> = BTreeMap::new();
5496            for reporter in reporters.iter().skip(honest_start) {
5497                let finalizations = reporter.finalizations.lock().unwrap();
5498                for (view, finalization) in finalizations.iter() {
5499                    let digest = finalization.proposal.payload;
5500                    if let Some(existing) = finalized_at_view.get(view) {
5501                        assert_eq!(
5502                            existing, &digest,
5503                            "safety violation: conflicting finalizations at view {view}"
5504                        );
5505                    } else {
5506                        finalized_at_view.insert(*view, digest);
5507                    }
5508                }
5509            }
5510
5511            // Verify no invalid signatures were observed
5512            for reporter in reporters.iter().skip(honest_start) {
5513                let invalid = reporter.invalid.lock().unwrap();
5514                assert_eq!(*invalid, 0, "invalid signatures detected");
5515            }
5516
5517            // Ensure faults are attributable to twins
5518            let twin_identities: Vec<_> = participants.iter().take(faults as usize).collect();
5519            for reporter in reporters.iter().skip(honest_start) {
5520                let faults = reporter.faults.lock().unwrap();
5521                for (faulter, _) in faults.iter() {
5522                    assert!(
5523                        twin_identities.contains(&faulter),
5524                        "fault from non-twin participant"
5525                    );
5526                }
5527            }
5528
5529            // Ensure blocked connections are attributable to twins
5530            let blocked = oracle.blocked().await.unwrap();
5531            for (_, faulter) in blocked.iter() {
5532                assert!(
5533                    twin_identities.contains(&faulter),
5534                    "blocked connection from non-twin participant"
5535                );
5536            }
5537        });
5538    }
5539
5540    fn test_twins<S, F, L>(mut fixture: F)
5541    where
5542        S: Scheme<Sha256Digest, PublicKey = PublicKey>,
5543        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
5544        L: Elector<S>,
5545    {
5546        for strategy in [
5547            Strategy::View,
5548            Strategy::Fixed(3),
5549            Strategy::Isolate(4),
5550            Strategy::Broadcast,
5551            Strategy::Shuffle,
5552        ] {
5553            for link in [
5554                Link {
5555                    latency: Duration::from_millis(10),
5556                    jitter: Duration::from_millis(1),
5557                    success_rate: 1.0,
5558                },
5559                Link {
5560                    latency: Duration::from_millis(200),
5561                    jitter: Duration::from_millis(150),
5562                    success_rate: 0.75,
5563                },
5564            ] {
5565                twins::<S, _, L>(0, 5, strategy, link, |context, n| fixture(context, n));
5566            }
5567        }
5568    }
5569
5570    #[test_group("slow")]
5571    #[test_traced]
5572    fn test_twins_multisig_min_pk() {
5573        test_twins::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinPk, _>);
5574    }
5575
5576    #[test_group("slow")]
5577    #[test_traced]
5578    fn test_twins_multisig_min_sig() {
5579        test_twins::<_, _, RoundRobin>(bls12381_multisig::fixture::<MinSig, _>);
5580    }
5581
5582    #[test_group("slow")]
5583    #[test_traced]
5584    fn test_twins_threshold_min_pk() {
5585        test_twins::<_, _, Random>(bls12381_threshold::fixture::<MinPk, _>);
5586    }
5587
5588    #[test_group("slow")]
5589    #[test_traced]
5590    fn test_twins_threshold_min_sig() {
5591        test_twins::<_, _, Random>(bls12381_threshold::fixture::<MinSig, _>);
5592    }
5593
5594    #[test_group("slow")]
5595    #[test_traced]
5596    fn test_twins_ed25519() {
5597        test_twins::<_, _, RoundRobin>(ed25519::fixture);
5598    }
5599
5600    #[test_group("slow")]
5601    #[test_traced]
5602    fn test_twins_large_view() {
5603        twins::<_, _, Random>(
5604            0,
5605            10,
5606            Strategy::View,
5607            Link {
5608                latency: Duration::from_millis(200),
5609                jitter: Duration::from_millis(150),
5610                success_rate: 0.75,
5611            },
5612            bls12381_threshold::fixture::<MinPk, _>,
5613        );
5614    }
5615
5616    #[test_group("slow")]
5617    #[test_traced]
5618    fn test_twins_large_shuffle() {
5619        twins::<_, _, Random>(
5620            0,
5621            10,
5622            Strategy::Shuffle,
5623            Link {
5624                latency: Duration::from_millis(200),
5625                jitter: Duration::from_millis(150),
5626                success_rate: 0.75,
5627            },
5628            bls12381_threshold::fixture::<MinPk, _>,
5629        );
5630    }
5631}