commonware_consensus/simplex/
mod.rs

1//! Simple and fast BFT agreement inspired by Simplex Consensus.
2//!
3//! Inspired by [Simplex Consensus](https://eprint.iacr.org/2023/463), `simplex` provides simple and fast BFT
4//! agreement with network-speed view (i.e. block time) latency and optimal finalization latency in a
5//! partially synchronous setting.
6//!
7//! # Features
8//!
9//! * Wicked Fast Block Times (2 Network Hops)
10//! * Optimal Finalization Latency (3 Network Hops)
11//! * Externalized Uptime and Fault Proofs
12//! * Decoupled Block Broadcast and Sync
13//! * Lazy Message Verification
14//! * Application-Defined Block Format
15//! * Pluggable Hashing and Cryptography
16//! * Embedded VRF (via [signing_scheme::bls12381_threshold])
17//!
18//! # Design
19//!
20//! ## Protocol Description
21//!
22//! ### Specification for View `v`
23//!
24//! Upon entering view `v`:
25//! * Determine leader `l` for view `v`
26//! * Set timer for leader proposal `t_l = 2Δ` and advance `t_a = 3Δ`
27//!     * If leader `l` has not been active in last `r` views, set `t_l` to 0.
28//! * If leader `l`, broadcast `notarize(c,v)`
29//!   * If can't propose container in view `v` because missing notarization/nullification for a
30//!     previous view `v_m`, request `v_m`
31//!
32//! Upon receiving first `notarize(c,v)` from `l`:
33//! * Cancel `t_l`
34//! * If the container's parent `c_parent` is notarized at `v_parent` and we have nullifications for all views
35//!   between `v` and `v_parent`, verify `c` and broadcast `notarize(c,v)`
36//!
37//! Upon receiving `2f+1` `notarize(c,v)`:
38//! * Cancel `t_a`
39//! * Mark `c` as notarized
40//! * Broadcast `notarization(c,v)` (even if we have not verified `c`)
41//! * If have not broadcast `nullify(v)`, broadcast `finalize(c,v)`
42//! * Enter `v+1`
43//!
44//! Upon receiving `2f+1` `nullify(v)`:
45//! * Broadcast `nullification(v)`
46//!     * If observe `>= f+1` `notarize(c,v)` for some `c`, request `notarization(c_parent, v_parent)` and any missing
47//!       `nullification(*)` between `v_parent` and `v`. If `c_parent` is than last finalized, broadcast last finalization
48//!       instead.
49//! * Enter `v+1`
50//!
51//! Upon receiving `2f+1` `finalize(c,v)`:
52//! * Mark `c` as finalized (and recursively finalize its parents)
53//! * Broadcast `finalization(c,v)` (even if we have not verified `c`)
54//!
55//! Upon `t_l` or `t_a` firing:
56//! * Broadcast `nullify(v)`
57//! * Every `t_r` after `nullify(v)` broadcast that we are still in view `v`:
58//!    * Rebroadcast `nullify(v)` and either `notarization(v-1)` or `nullification(v-1)`
59//!
60//! _When `2f+1` votes of a given type (`notarize(c,v)`, `nullify(v)`, or `finalize(c,v)`) have been have been collected
61//! from unique participants, a certificate (`notarization(c,v)`, `nullification(v)`, or `finalization(c,v)`) can be assembled.
62//! These certificates serve as a standalone proof of consensus progress that downstream systems can ingest without executing
63//! the protocol._
64//!
65//! ### Joining Consensus
66//!
67//! As soon as `2f+1` notarizes, nullifies, or finalizes are observed for some view `v`, the `Voter` will
68//! enter `v+1`. This means that a new participant joining consensus will immediately jump ahead to the
69//! latest view and begin participating in consensus (assuming it can verify blocks).
70//!
71//! ### Deviations from Simplex Consensus
72//!
73//! * Fetch missing notarizations/nullifications as needed rather than assuming each proposal contains
74//!   a set of all notarizations/nullifications for all historical blocks.
75//! * Introduce distinct messages for `notarize` and `nullify` rather than referring to both as a `vote` for
76//!   either a "block" or a "dummy block", respectively.
77//! * Introduce a "leader timeout" to trigger early view transitions for unresponsive leaders.
78//! * Skip "leader timeout" and "notarization timeout" if a designated leader hasn't participated in
79//!   some number of views (again to trigger early view transition for an unresponsive leader).
80//! * Introduce message rebroadcast to continue making progress if messages from a given view are dropped (only way
81//!   to ensure messages are reliably delivered is with a heavyweight reliable broadcast protocol).
82//!
83//! ## Architecture
84//!
85//! All logic is split into four components: the `Batcher`, the `Voter`, the `Resolver`, and the `Application` (provided by the user).
86//! The `Batcher` is responsible for collecting messages from peers and lazily verifying them when a quorum is met. The `Voter`
87//! is responsible for directing participation in the current view. The `Resolver` is responsible for
88//! fetching artifacts from previous views required to verify proposed blocks in the latest view. Lastly, the `Application`
89//! is responsible for proposing new blocks and indicating whether some block is valid.
90//!
91//! To drive great performance, all interactions between `Batcher`, `Voter`, `Resolver`, and `Application` are
92//! non-blocking. This means that, for example, the `Voter` can continue processing messages while the
93//! `Application` verifies a proposed block or the `Resolver` fetches a notarization.
94//!
95//! ```txt
96//!                            +------------+          +++++++++++++++
97//!                            |            +--------->+             +
98//!                            |  Batcher   |          +    Peers    +
99//!                            |            |<---------+             +
100//!                            +-------+----+          +++++++++++++++
101//!                                |   ^
102//!                                |   |
103//!                                |   |
104//!                                |   |
105//!                                v   |
106//! +---------------+           +---------+            +++++++++++++++
107//! |               |<----------+         +----------->+             +
108//! |  Application  |           |  Voter  |            +    Peers    +
109//! |               +---------->|         |<-----------+             +
110//! +---------------+           +--+------+            +++++++++++++++
111//!                                |   ^
112//!                                |   |
113//!                                |   |
114//!                                |   |
115//!                                v   |
116//!                            +-------+----+          +++++++++++++++
117//!                            |            +--------->+             +
118//!                            |  Resolver  |          +    Peers    +
119//!                            |            |<---------+             +
120//!                            +------------+          +++++++++++++++
121//! ```
122//!
123//! ### Batched Verification
124//!
125//! Unlike other consensus constructions that verify all incoming messages received from peers, `simplex`
126//! lazily verifies messages (only when a quorum is met). If an invalid signature is detected, the `Batcher`
127//! will perform repeated bisections over collected messages to find the offending message (and block the
128//! peer(s) that sent it via [commonware_p2p::Blocker]).
129//!
130//! _If using a p2p implementation that is not authenticated, it is not safe to employ this optimization
131//! as any attacking peer could simply reconnect from a different address. We recommend [commonware_p2p::authenticated]._
132//!
133//! ## Pluggable Hashing and Cryptography
134//!
135//! Hashing is abstracted via the [commonware_cryptography::Hasher] trait and cryptography is abstracted via
136//! the [Scheme] trait, allowing deployments to employ approaches that best match their requirements (or to
137//! provide their own without modifying any consensus logic). The following [Scheme]s are supported out-of-the-box:
138//!
139//! ### [signing_scheme::ed25519]
140//!
141//! [commonware_cryptography::ed25519] signatures are ["High-speed high-security signatures"](https://eprint.iacr.org/2011/368)
142//! with 32 byte public keys and 64 byte signatures. While they are well-supported by commercial HSMs and offer efficient batch
143//! verification, the signatures are not aggregatable (and certificates grow linearly with the quorum size).
144//!
145//! ### [signing_scheme::bls12381_multisig]
146//!
147//! [commonware_cryptography::bls12381] is a ["digital signature scheme with aggregation properties"](https://www.ietf.org/archive/id/draft-irtf-cfrg-bls-signature-05.txt).
148//! Unlike [commonware_cryptography::ed25519], signatures from multiple participants (say the signers in a certificate) can be aggregated
149//! into a single signature (reducing bandwidth usage per broadcast). That being said, [commonware_cryptography::bls12381] is much slower
150//! to verify than [commonware_cryptography::ed25519] and isn't supported by most HSMs (a standardization effort expired in 2022).
151//!
152//! ### [signing_scheme::bls12381_threshold]
153//!
154//! Last but not least, [signing_scheme::bls12381_threshold]  employs threshold cryptography (specifically BLS12-381 threshold signatures
155//! with a `2f+1` of `3f+1` quorum) to generate both a bias-resistant beacon (for leader election and post-facto execution randomness)
156//! and succinct consensus certificates (any certificate can be verified with just the static public key of the consensus instance) for each view
157//! with zero message overhead (natively integrated). While powerful, this scheme requires both instantiating the shared secret
158//! via [commonware_cryptography::bls12381::dkg] and performing a resharing procedure whenever participants are added or removed.
159//!
160//! #### Embedded VRF
161//!
162//! Every `notarize(c,v)` or `nullify(v)` message includes a `part(v)` message (a partial signature over the view `v`). After `2f+1`
163//! `notarize(c,v)` or `nullify(v)` messages are collected from unique participants, `seed(v)` can be recovered. Because `part(v)` is
164//! only over the view `v`, the seed derived for a given view `v` is the same regardless of whether or not a block was notarized in said
165//! view `v`.
166//!
167//! Because the value of `seed(v)` cannot be known prior to message broadcast by any participant (including the leader) in view `v`
168//! and cannot be manipulated by any participant (deterministic for any `2f+1` signers at a given view `v`), it can be used both as a beacon
169//! for leader election (where `seed(v)` determines the leader for `v+1`) and a source of randomness in execution (where `seed(v)`
170//! is used as a seed in `v`).
171//!
172//! #### Succinct Certificates
173//!
174//! All broadcast consensus messages (`notarize(c,v)`, `nullify(v)`, `finalize(c,v)`) contain partial signatures for a static
175//! public key (derived from a group polynomial that can be recomputed during reconfiguration using [dkg](commonware_cryptography::bls12381::dkg)).
176//! As soon as `2f+1` messages are collected, a threshold signature over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)`
177//! can be recovered, respectively. Because the public key is static, any of these certificates can be verified by an external
178//! process without following the consensus instance and/or tracking the current set of participants (as is typically required
179//! to operate a lite client).
180//!
181//! These threshold signatures over `notarization(c,v)`, `nullification(v)`, and `finalization(c,v)` (i.e. the consensus certificates)
182//! can be used to secure interoperability between different consensus instances and user interactions with an infrastructure provider
183//! (where any data served can be proven to derive from some finalized block of some consensus instance with a known static public key).
184//!
185//! ## Persistence
186//!
187//! The `Voter` caches all data required to participate in consensus to avoid any disk reads on
188//! on the critical path. To enable recovery, the `Voter` writes valid messages it receives from
189//! consensus and messages it generates to a write-ahead log (WAL) implemented by [commonware_storage::journal::segmented::variable::Journal].
190//! Before sending a message, the `Journal` sync is invoked to prevent inadvertent Byzantine behavior
191//! on restart (especially in the case of unclean shutdown).
192
193pub mod signing_scheme;
194pub mod types;
195
196cfg_if::cfg_if! {
197    if #[cfg(not(target_arch = "wasm32"))] {
198        mod actors;
199        mod config;
200        pub use config::Config;
201        mod engine;
202        pub use engine::Engine;
203        mod metrics;
204    }
205}
206
207#[cfg(test)]
208pub mod mocks;
209
210use crate::types::{Round, View};
211use commonware_codec::Encode;
212use signing_scheme::Scheme;
213
214/// The minimum view we are tracking both in-memory and on-disk.
215pub(crate) fn min_active(activity_timeout: View, last_finalized: View) -> View {
216    last_finalized.saturating_sub(activity_timeout)
217}
218
219/// Whether or not a view is interesting to us. This is a function
220/// of both `min_active` and whether or not the view is too far
221/// in the future (based on the view we are currently in).
222pub(crate) fn interesting(
223    activity_timeout: View,
224    last_finalized: View,
225    current: View,
226    pending: View,
227    allow_future: bool,
228) -> bool {
229    if pending < min_active(activity_timeout, last_finalized) {
230        return false;
231    }
232    if !allow_future && pending > current + 1 {
233        return false;
234    }
235    true
236}
237
238/// Selects the leader for a given round using scheme-provided randomness seed when available.
239///
240/// If the active [`Scheme`] exposes a seed (e.g. BLS threshold certificates), the seed is
241/// encoded and reduced modulo the number of participants. Otherwise we fall back to
242/// simple round-robin using the view number.
243///
244/// # Panics
245///
246/// Panics if `participants` is empty.
247pub fn select_leader<S, P: Clone>(
248    participants: &[P],
249    round: Round,
250    seed: Option<S::Seed>,
251) -> (P, u32)
252where
253    S: Scheme,
254{
255    assert!(
256        !participants.is_empty(),
257        "no participants to select leader from"
258    );
259    let idx = if let Some(seed) = seed {
260        commonware_utils::modulo(seed.encode().as_ref(), participants.len() as u64) as usize
261    } else {
262        (round.epoch().wrapping_add(round.view())) as usize % participants.len()
263    };
264    let leader = participants[idx].clone();
265
266    (leader, idx as u32)
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use crate::{
273        simplex::{
274            mocks::fixtures::{bls12381_multisig, bls12381_threshold, ed25519, Fixture},
275            signing_scheme::seed_namespace,
276        },
277        types::Round,
278        Monitor,
279    };
280    use commonware_codec::Encode;
281    use commonware_cryptography::{
282        bls12381::{
283            primitives::variant::{MinPk, MinSig, Variant},
284            tle::{decrypt, encrypt, Block},
285        },
286        ed25519, PrivateKeyExt as _, PublicKey, Sha256, Signer as _,
287    };
288    use commonware_macros::{select, test_traced};
289    use commonware_p2p::simulated::{Config, Link, Network, Oracle, Receiver, Sender};
290    use commonware_runtime::{buffer::PoolRef, deterministic, Clock, Metrics, Runner, Spawner};
291    use commonware_utils::{quorum, NZUsize, NZU32};
292    use engine::Engine;
293    use futures::{future::join_all, StreamExt};
294    use governor::Quota;
295    use rand::{rngs::StdRng, Rng as _, SeedableRng as _};
296    use std::{
297        collections::HashMap,
298        num::NonZeroUsize,
299        sync::{Arc, Mutex},
300        time::Duration,
301    };
302    use tracing::{debug, warn};
303    use types::Activity;
304
305    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
306    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
307
308    /// Registers all validators using the oracle.
309    async fn register_validators<P: PublicKey>(
310        oracle: &mut Oracle<P>,
311        validators: &[P],
312    ) -> HashMap<
313        P,
314        (
315            (Sender<P>, Receiver<P>),
316            (Sender<P>, Receiver<P>),
317            (Sender<P>, Receiver<P>),
318        ),
319    > {
320        let mut registrations = HashMap::new();
321        for validator in validators.iter() {
322            let mut control = oracle.control(validator.clone());
323            let (pending_sender, pending_receiver) = control.register(0).await.unwrap();
324            let (recovered_sender, recovered_receiver) = control.register(1).await.unwrap();
325            let (resolver_sender, resolver_receiver) = control.register(2).await.unwrap();
326            registrations.insert(
327                validator.clone(),
328                (
329                    (pending_sender, pending_receiver),
330                    (recovered_sender, recovered_receiver),
331                    (resolver_sender, resolver_receiver),
332                ),
333            );
334        }
335        registrations
336    }
337
338    /// Enum to describe the action to take when linking validators.
339    enum Action {
340        Link(Link),
341        Update(Link), // Unlink and then link
342        Unlink,
343    }
344
345    /// Links (or unlinks) validators using the oracle.
346    ///
347    /// The `action` parameter determines the action (e.g. link, unlink) to take.
348    /// The `restrict_to` function can be used to restrict the linking to certain connections,
349    /// otherwise all validators will be linked to all other validators.
350    async fn link_validators<P: PublicKey>(
351        oracle: &mut Oracle<P>,
352        validators: &[P],
353        action: Action,
354        restrict_to: Option<fn(usize, usize, usize) -> bool>,
355    ) {
356        for (i1, v1) in validators.iter().enumerate() {
357            for (i2, v2) in validators.iter().enumerate() {
358                // Ignore self
359                if v2 == v1 {
360                    continue;
361                }
362
363                // Restrict to certain connections
364                if let Some(f) = restrict_to {
365                    if !f(validators.len(), i1, i2) {
366                        continue;
367                    }
368                }
369
370                // Do any unlinking first
371                match action {
372                    Action::Update(_) | Action::Unlink => {
373                        oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
374                    }
375                    _ => {}
376                }
377
378                // Do any linking after
379                match action {
380                    Action::Link(ref link) | Action::Update(ref link) => {
381                        oracle
382                            .add_link(v1.clone(), v2.clone(), link.clone())
383                            .await
384                            .unwrap();
385                    }
386                    _ => {}
387                }
388            }
389        }
390    }
391
392    fn all_online<S, F>(mut fixture: F)
393    where
394        S: Scheme<PublicKey = ed25519::PublicKey>,
395        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
396    {
397        // Create context
398        let n = 5;
399        let quorum = quorum(n);
400        let required_containers = 100;
401        let activity_timeout = 10;
402        let skip_timeout = 5;
403        let namespace = b"consensus".to_vec();
404        let executor = deterministic::Runner::timed(Duration::from_secs(30));
405        executor.start(|mut context| async move {
406            // Create simulated network
407            let (network, mut oracle) = Network::new(
408                context.with_label("network"),
409                Config {
410                    max_size: 1024 * 1024,
411                    disconnect_on_block: true,
412                    tracked_peer_sets: None,
413                },
414            );
415
416            // Start network
417            network.start();
418
419            // Register participants
420            let Fixture {
421                participants,
422                schemes,
423                ..
424            } = fixture(&mut context, n);
425            let mut registrations = register_validators(&mut oracle, &participants).await;
426
427            // Link all validators
428            let link = Link {
429                latency: Duration::from_millis(10),
430                jitter: Duration::from_millis(1),
431                success_rate: 1.0,
432            };
433            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
434
435            // Create engines
436            let relay = Arc::new(mocks::relay::Relay::new());
437            let mut reporters = Vec::new();
438            let mut engine_handlers = Vec::new();
439            for (idx, validator) in participants.iter().enumerate() {
440                // Create scheme context
441                let context = context.with_label(&format!("validator-{}", *validator));
442
443                // Configure engine
444                let reporter_config = mocks::reporter::Config {
445                    namespace: namespace.clone(),
446                    participants: participants.clone().into(),
447                    scheme: schemes[idx].clone(),
448                };
449                let reporter =
450                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
451                reporters.push(reporter.clone());
452                let application_cfg = mocks::application::Config {
453                    hasher: Sha256::default(),
454                    relay: relay.clone(),
455                    me: validator.clone(),
456                    propose_latency: (10.0, 5.0),
457                    verify_latency: (10.0, 5.0),
458                };
459                let (actor, application) = mocks::application::Application::new(
460                    context.with_label("application"),
461                    application_cfg,
462                );
463                actor.start();
464                let blocker = oracle.control(validator.clone());
465                let cfg = config::Config {
466                    scheme: schemes[idx].clone(),
467                    blocker,
468                    automaton: application.clone(),
469                    relay: application.clone(),
470                    reporter: reporter.clone(),
471                    partition: validator.to_string(),
472                    mailbox_size: 1024,
473                    epoch: 333,
474                    namespace: namespace.clone(),
475                    leader_timeout: Duration::from_secs(1),
476                    notarization_timeout: Duration::from_secs(2),
477                    nullify_retry: Duration::from_secs(10),
478                    fetch_timeout: Duration::from_secs(1),
479                    activity_timeout,
480                    skip_timeout,
481                    max_fetch_count: 1,
482                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
483                    fetch_concurrent: 1,
484                    replay_buffer: NZUsize!(1024 * 1024),
485                    write_buffer: NZUsize!(1024 * 1024),
486                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
487                };
488                let engine = Engine::new(context.with_label("engine"), cfg);
489
490                // Start engine
491                let (pending, recovered, resolver) = registrations
492                    .remove(validator)
493                    .expect("validator should be registered");
494                engine_handlers.push(engine.start(pending, recovered, resolver));
495            }
496
497            // Wait for all engines to finish
498            let mut finalizers = Vec::new();
499            for reporter in reporters.iter_mut() {
500                let (mut latest, mut monitor) = reporter.subscribe().await;
501                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
502                    while latest < required_containers {
503                        latest = monitor.next().await.expect("event missing");
504                    }
505                }));
506            }
507            join_all(finalizers).await;
508
509            // Check reporters for correct activity
510            let latest_complete = required_containers - activity_timeout;
511            for reporter in reporters.iter() {
512                // Ensure no faults
513                {
514                    let faults = reporter.faults.lock().unwrap();
515                    assert!(faults.is_empty());
516                }
517
518                // Ensure no invalid signatures
519                {
520                    let invalid = reporter.invalid.lock().unwrap();
521                    assert_eq!(*invalid, 0);
522                }
523
524                // Ensure seeds for all views
525                {
526                    let seeds = reporter.seeds.lock().unwrap();
527                    for view in 1..latest_complete {
528                        // Ensure seed for every view
529                        if !seeds.contains_key(&view) {
530                            panic!("view: {view}");
531                        }
532                    }
533                }
534
535                // Ensure no forks
536                let mut notarized = HashMap::new();
537                let mut finalized = HashMap::new();
538                {
539                    let notarizes = reporter.notarizes.lock().unwrap();
540                    for view in 1..latest_complete {
541                        // Ensure only one payload proposed per view
542                        let Some(payloads) = notarizes.get(&view) else {
543                            continue;
544                        };
545                        if payloads.len() > 1 {
546                            panic!("view: {view}");
547                        }
548                        let (digest, notarizers) = payloads.iter().next().unwrap();
549                        notarized.insert(view, *digest);
550
551                        if notarizers.len() < quorum as usize {
552                            // We can't verify that everyone participated at every view because some nodes may
553                            // have started later.
554                            panic!("view: {view}");
555                        }
556                    }
557                }
558                {
559                    let notarizations = reporter.notarizations.lock().unwrap();
560                    for view in 1..latest_complete {
561                        // Ensure notarization matches digest from notarizes
562                        let Some(notarization) = notarizations.get(&view) else {
563                            continue;
564                        };
565                        let Some(digest) = notarized.get(&view) else {
566                            continue;
567                        };
568                        assert_eq!(&notarization.proposal.payload, digest);
569                    }
570                }
571                {
572                    let finalizes = reporter.finalizes.lock().unwrap();
573                    for view in 1..latest_complete {
574                        // Ensure only one payload proposed per view
575                        let Some(payloads) = finalizes.get(&view) else {
576                            continue;
577                        };
578                        if payloads.len() > 1 {
579                            panic!("view: {view}");
580                        }
581                        let (digest, finalizers) = payloads.iter().next().unwrap();
582                        finalized.insert(view, *digest);
583
584                        // Only check at views below timeout
585                        if view > latest_complete {
586                            continue;
587                        }
588
589                        // Ensure everyone participating
590                        if finalizers.len() < quorum as usize {
591                            // We can't verify that everyone participated at every view because some nodes may
592                            // have started later.
593                            panic!("view: {view}");
594                        }
595
596                        // Ensure no nullifies for any finalizers
597                        let nullifies = reporter.nullifies.lock().unwrap();
598                        let Some(nullifies) = nullifies.get(&view) else {
599                            continue;
600                        };
601                        for (_, finalizers) in payloads.iter() {
602                            for finalizer in finalizers.iter() {
603                                if nullifies.contains(finalizer) {
604                                    panic!("should not nullify and finalize at same view");
605                                }
606                            }
607                        }
608                    }
609                }
610                {
611                    let finalizations = reporter.finalizations.lock().unwrap();
612                    for view in 1..latest_complete {
613                        // Ensure finalization matches digest from finalizes
614                        let Some(finalization) = finalizations.get(&view) else {
615                            continue;
616                        };
617                        let Some(digest) = finalized.get(&view) else {
618                            continue;
619                        };
620                        assert_eq!(&finalization.proposal.payload, digest);
621                    }
622                }
623            }
624
625            // Ensure no blocked connections
626            let blocked = oracle.blocked().await.unwrap();
627            assert!(blocked.is_empty());
628        });
629    }
630
631    #[test_traced]
632    fn test_all_online() {
633        all_online(bls12381_threshold::<MinPk, _>);
634        all_online(bls12381_threshold::<MinSig, _>);
635        all_online(bls12381_multisig::<MinPk, _>);
636        all_online(bls12381_multisig::<MinSig, _>);
637        all_online(ed25519);
638    }
639
640    fn observer<S, F>(mut fixture: F)
641    where
642        S: Scheme<PublicKey = ed25519::PublicKey>,
643        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
644    {
645        // Create context
646        let n_active = 5;
647        let required_containers = 100;
648        let activity_timeout = 10;
649        let skip_timeout = 5;
650        let namespace = b"consensus".to_vec();
651        let executor = deterministic::Runner::timed(Duration::from_secs(30));
652        executor.start(|mut context| async move {
653            // Create simulated network
654            let (network, mut oracle) = Network::new(
655                context.with_label("network"),
656                Config {
657                    max_size: 1024 * 1024,
658                    disconnect_on_block: true,
659                    tracked_peer_sets: None,
660                },
661            );
662
663            // Start network
664            network.start();
665
666            // Register participants (active)
667            let Fixture {
668                participants,
669                schemes,
670                verifier,
671                ..
672            } = fixture(&mut context, n_active);
673
674            // Add observer (no share)
675            let private_key_observer = ed25519::PrivateKey::from_seed(n_active as u64);
676            let public_key_observer = private_key_observer.public_key();
677
678            // Register all (including observer) with the network
679            let mut all_validators = participants.clone();
680            all_validators.push(public_key_observer.clone());
681            all_validators.sort();
682            let mut registrations = register_validators(&mut oracle, &all_validators).await;
683
684            // Link all peers (including observer)
685            let link = Link {
686                latency: Duration::from_millis(10),
687                jitter: Duration::from_millis(1),
688                success_rate: 1.0,
689            };
690            link_validators(&mut oracle, &all_validators, Action::Link(link), None).await;
691
692            // Create engines
693            let relay = Arc::new(mocks::relay::Relay::new());
694            let mut reporters = Vec::new();
695
696            for (idx, validator) in participants.iter().enumerate() {
697                let is_observer = *validator == public_key_observer;
698
699                // Create scheme context
700                let context = context.with_label(&format!("validator-{}", *validator));
701
702                // Configure engine
703                let signing = if is_observer {
704                    verifier.clone()
705                } else {
706                    schemes[idx].clone()
707                };
708                let reporter_config = mocks::reporter::Config {
709                    namespace: namespace.clone(),
710                    participants: participants.clone().into(),
711                    scheme: signing.clone(),
712                };
713                let reporter =
714                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
715                reporters.push(reporter.clone());
716                let application_cfg = mocks::application::Config {
717                    hasher: Sha256::default(),
718                    relay: relay.clone(),
719                    me: validator.clone(),
720                    propose_latency: (10.0, 5.0),
721                    verify_latency: (10.0, 5.0),
722                };
723                let (actor, application) = mocks::application::Application::new(
724                    context.with_label("application"),
725                    application_cfg,
726                );
727                actor.start();
728                let blocker = oracle.control(validator.clone());
729                let cfg = config::Config {
730                    scheme: signing.clone(),
731                    blocker,
732                    automaton: application.clone(),
733                    relay: application.clone(),
734                    reporter: reporter.clone(),
735                    partition: validator.to_string(),
736                    mailbox_size: 1024,
737                    epoch: 333,
738                    namespace: namespace.clone(),
739                    leader_timeout: Duration::from_secs(1),
740                    notarization_timeout: Duration::from_secs(2),
741                    nullify_retry: Duration::from_secs(10),
742                    fetch_timeout: Duration::from_secs(1),
743                    activity_timeout,
744                    skip_timeout,
745                    max_fetch_count: 1,
746                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
747                    fetch_concurrent: 1,
748                    replay_buffer: NZUsize!(1024 * 1024),
749                    write_buffer: NZUsize!(1024 * 1024),
750                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
751                };
752                let engine = Engine::new(context.with_label("engine"), cfg);
753
754                // Start engine
755                let (pending, recovered, resolver) = registrations
756                    .remove(validator)
757                    .expect("validator should be registered");
758                engine.start(pending, recovered, resolver);
759            }
760
761            // Wait for all  engines to finish
762            let mut finalizers = Vec::new();
763            for reporter in reporters.iter_mut() {
764                let (mut latest, mut monitor) = reporter.subscribe().await;
765                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
766                    while latest < required_containers {
767                        latest = monitor.next().await.expect("event missing");
768                    }
769                }));
770            }
771            join_all(finalizers).await;
772
773            // Sanity check
774            for reporter in reporters.iter() {
775                // Ensure no faults or invalid signatures
776                {
777                    let faults = reporter.faults.lock().unwrap();
778                    assert!(faults.is_empty());
779                }
780                {
781                    let invalid = reporter.invalid.lock().unwrap();
782                    assert_eq!(*invalid, 0);
783                }
784
785                // Ensure no blocked connections
786                let blocked = oracle.blocked().await.unwrap();
787                assert!(blocked.is_empty());
788            }
789        });
790    }
791
792    #[test_traced]
793    fn test_observer() {
794        observer(bls12381_threshold::<MinPk, _>);
795        observer(bls12381_threshold::<MinSig, _>);
796        observer(bls12381_multisig::<MinPk, _>);
797        observer(bls12381_multisig::<MinSig, _>);
798        observer(ed25519);
799    }
800
801    fn unclean_shutdown<S, F>(mut fixture: F)
802    where
803        S: Scheme<PublicKey = ed25519::PublicKey>,
804        F: FnMut(&mut StdRng, u32) -> Fixture<S>,
805    {
806        // Create context
807        let n = 5;
808        let required_containers = 100;
809        let activity_timeout = 10;
810        let skip_timeout = 5;
811        let namespace = b"consensus".to_vec();
812
813        // Random restarts every x seconds
814        let shutdowns: Arc<Mutex<u64>> = Arc::new(Mutex::new(0));
815        let supervised = Arc::new(Mutex::new(Vec::new()));
816        let mut prev_checkpoint = None;
817
818        // Create validator keys
819        let mut rng = StdRng::seed_from_u64(0);
820        let Fixture {
821            participants,
822            schemes,
823            ..
824        } = fixture(&mut rng, n);
825
826        loop {
827            let rng = rng.clone();
828            let participants = participants.clone();
829            let schemes = schemes.clone();
830            let namespace = namespace.clone();
831            let shutdowns = shutdowns.clone();
832            let supervised = supervised.clone();
833
834            let f = |mut context: deterministic::Context| async move {
835                // Create simulated network
836                let (network, mut oracle) = Network::new(
837                    context.with_label("network"),
838                    Config {
839                        max_size: 1024 * 1024,
840                        disconnect_on_block: true,
841                        tracked_peer_sets: None,
842                    },
843                );
844
845                // Start network
846                network.start();
847
848                // Register participants
849                let mut registrations = register_validators(&mut oracle, &participants).await;
850
851                // Link all validators
852                let link = Link {
853                    latency: Duration::from_millis(50),
854                    jitter: Duration::from_millis(50),
855                    success_rate: 1.0,
856                };
857                link_validators(&mut oracle, &participants, Action::Link(link), None).await;
858
859                // Create engines
860                let relay = Arc::new(mocks::relay::Relay::new());
861                let mut reporters = HashMap::new();
862                let mut engine_handlers = Vec::new();
863                for (idx, validator) in participants.iter().enumerate() {
864                    // Create scheme context
865                    let context = context.with_label(&format!("validator-{}", *validator));
866
867                    // Configure engine
868                    let reporter_config = mocks::reporter::Config {
869                        namespace: namespace.clone(),
870                        participants: participants.clone().into(),
871                        scheme: schemes[idx].clone(),
872                    };
873                    let reporter = mocks::reporter::Reporter::new(rng.clone(), reporter_config);
874                    reporters.insert(validator.clone(), reporter.clone());
875                    let application_cfg = mocks::application::Config {
876                        hasher: Sha256::default(),
877                        relay: relay.clone(),
878                        me: validator.clone(),
879                        propose_latency: (10.0, 5.0),
880                        verify_latency: (10.0, 5.0),
881                    };
882                    let (actor, application) = mocks::application::Application::new(
883                        context.with_label("application"),
884                        application_cfg,
885                    );
886                    actor.start();
887                    let blocker = oracle.control(validator.clone());
888                    let cfg = config::Config {
889                        scheme: schemes[idx].clone(),
890                        blocker,
891                        automaton: application.clone(),
892                        relay: application.clone(),
893                        reporter: reporter.clone(),
894                        partition: validator.to_string(),
895                        mailbox_size: 1024,
896                        epoch: 333,
897                        namespace: namespace.clone(),
898                        leader_timeout: Duration::from_secs(1),
899                        notarization_timeout: Duration::from_secs(2),
900                        nullify_retry: Duration::from_secs(10),
901                        fetch_timeout: Duration::from_secs(1),
902                        activity_timeout,
903                        skip_timeout,
904                        max_fetch_count: 1,
905                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
906                        fetch_concurrent: 1,
907                        replay_buffer: NZUsize!(1024 * 1024),
908                        write_buffer: NZUsize!(1024 * 1024),
909                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
910                    };
911                    let engine = Engine::new(context.with_label("engine"), cfg);
912
913                    // Start engine
914                    let (pending, recovered, resolver) = registrations
915                        .remove(validator)
916                        .expect("validator should be registered");
917                    engine_handlers.push(engine.start(pending, recovered, resolver));
918                }
919
920                // Store all finalizer handles
921                let mut finalizers = Vec::new();
922                for (_, reporter) in reporters.iter_mut() {
923                    let (mut latest, mut monitor) = reporter.subscribe().await;
924                    finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
925                        while latest < required_containers {
926                            latest = monitor.next().await.expect("event missing");
927                        }
928                    }));
929                }
930
931                // Exit at random points for unclean shutdown of entire set
932                let wait =
933                    context.gen_range(Duration::from_millis(10)..Duration::from_millis(2_000));
934                let result = select! {
935                    _ = context.sleep(wait) => {
936                        // Collect reporters to check faults
937                        {
938                            let mut shutdowns = shutdowns.lock().unwrap();
939                            debug!(shutdowns = *shutdowns, elapsed = ?wait, "restarting");
940                            *shutdowns += 1;
941                        }
942                        supervised.lock().unwrap().push(reporters);
943                        false
944                    },
945                    _ = join_all(finalizers) => {
946                        // Check reporters for faults activity
947                        let supervised = supervised.lock().unwrap();
948                        for reporters in supervised.iter() {
949                            for (_, reporter) in reporters.iter() {
950                                let faults = reporter.faults.lock().unwrap();
951                                assert!(faults.is_empty());
952                            }
953                        }
954                        true
955                    }
956                };
957
958                // Ensure no blocked connections
959                let blocked = oracle.blocked().await.unwrap();
960                assert!(blocked.is_empty());
961
962                result
963            };
964
965            let (complete, checkpoint) = if let Some(prev_checkpoint) = prev_checkpoint {
966                deterministic::Runner::from(prev_checkpoint)
967            } else {
968                deterministic::Runner::timed(Duration::from_secs(60))
969            }
970            .start_and_recover(f);
971
972            // Check if we should exit
973            if complete {
974                break;
975            }
976
977            prev_checkpoint = Some(checkpoint);
978        }
979    }
980
981    #[test_traced]
982    fn test_unclean_shutdown() {
983        unclean_shutdown(bls12381_threshold::<MinPk, _>);
984        unclean_shutdown(bls12381_threshold::<MinSig, _>);
985        unclean_shutdown(bls12381_multisig::<MinPk, _>);
986        unclean_shutdown(bls12381_multisig::<MinSig, _>);
987        unclean_shutdown(ed25519);
988    }
989
990    fn backfill<S, F>(mut fixture: F)
991    where
992        S: Scheme<PublicKey = ed25519::PublicKey>,
993        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
994    {
995        // Create context
996        let n = 4;
997        let required_containers = 100;
998        let activity_timeout = 10;
999        let skip_timeout = 5;
1000        let namespace = b"consensus".to_vec();
1001        let executor = deterministic::Runner::timed(Duration::from_secs(720));
1002        executor.start(|mut context| async move {
1003            // Create simulated network
1004            let (network, mut oracle) = Network::new(
1005                context.with_label("network"),
1006                Config {
1007                    max_size: 1024 * 1024,
1008                    disconnect_on_block: true,
1009                    tracked_peer_sets: None,
1010                },
1011            );
1012
1013            // Start network
1014            network.start();
1015
1016            // Register participants
1017            let Fixture {
1018                participants,
1019                schemes,
1020                ..
1021            } = fixture(&mut context, n);
1022            let mut registrations = register_validators(&mut oracle, &participants).await;
1023
1024            // Link all validators except first
1025            let link = Link {
1026                latency: Duration::from_millis(10),
1027                jitter: Duration::from_millis(1),
1028                success_rate: 1.0,
1029            };
1030            link_validators(
1031                &mut oracle,
1032                &participants,
1033                Action::Link(link),
1034                Some(|_, i, j| ![i, j].contains(&0usize)),
1035            )
1036            .await;
1037
1038            // Create engines
1039            let relay = Arc::new(mocks::relay::Relay::new());
1040            let mut reporters = Vec::new();
1041            let mut engine_handlers = Vec::new();
1042            for (idx_scheme, validator) in participants.iter().enumerate() {
1043                // Skip first peer
1044                if idx_scheme == 0 {
1045                    continue;
1046                }
1047
1048                // Create scheme context
1049                let context = context.with_label(&format!("validator-{}", *validator));
1050
1051                // Configure engine
1052                let reporter_config = mocks::reporter::Config {
1053                    namespace: namespace.clone(),
1054                    participants: participants.clone().into(),
1055                    scheme: schemes[idx_scheme].clone(),
1056                };
1057                let reporter =
1058                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1059                reporters.push(reporter.clone());
1060                let application_cfg = mocks::application::Config {
1061                    hasher: Sha256::default(),
1062                    relay: relay.clone(),
1063                    me: validator.clone(),
1064                    propose_latency: (10.0, 5.0),
1065                    verify_latency: (10.0, 5.0),
1066                };
1067                let (actor, application) = mocks::application::Application::new(
1068                    context.with_label("application"),
1069                    application_cfg,
1070                );
1071                actor.start();
1072                let blocker = oracle.control(validator.clone());
1073                let cfg = config::Config {
1074                    scheme: schemes[idx_scheme].clone(),
1075                    blocker,
1076                    automaton: application.clone(),
1077                    relay: application.clone(),
1078                    reporter: reporter.clone(),
1079                    partition: validator.to_string(),
1080                    mailbox_size: 1024,
1081                    epoch: 333,
1082                    namespace: namespace.clone(),
1083                    leader_timeout: Duration::from_secs(1),
1084                    notarization_timeout: Duration::from_secs(2),
1085                    nullify_retry: Duration::from_secs(10),
1086                    fetch_timeout: Duration::from_secs(1),
1087                    activity_timeout,
1088                    skip_timeout,
1089                    max_fetch_count: 1, // force many fetches
1090                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1091                    fetch_concurrent: 1,
1092                    replay_buffer: NZUsize!(1024 * 1024),
1093                    write_buffer: NZUsize!(1024 * 1024),
1094                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1095                };
1096                let engine = Engine::new(context.with_label("engine"), cfg);
1097
1098                // Start engine
1099                let (pending, recovered, resolver) = registrations
1100                    .remove(validator)
1101                    .expect("validator should be registered");
1102                engine_handlers.push(engine.start(pending, recovered, resolver));
1103            }
1104
1105            // Wait for all engines to finish
1106            let mut finalizers = Vec::new();
1107            for reporter in reporters.iter_mut() {
1108                let (mut latest, mut monitor) = reporter.subscribe().await;
1109                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1110                    while latest < required_containers {
1111                        latest = monitor.next().await.expect("event missing");
1112                    }
1113                }));
1114            }
1115            join_all(finalizers).await;
1116
1117            // Degrade network connections for online peers
1118            let link = Link {
1119                latency: Duration::from_secs(3),
1120                jitter: Duration::from_millis(0),
1121                success_rate: 1.0,
1122            };
1123            link_validators(
1124                &mut oracle,
1125                &participants,
1126                Action::Update(link.clone()),
1127                Some(|_, i, j| ![i, j].contains(&0usize)),
1128            )
1129            .await;
1130
1131            // Wait for nullifications to accrue
1132            context.sleep(Duration::from_secs(120)).await;
1133
1134            // Unlink second peer from all (except first)
1135            link_validators(
1136                &mut oracle,
1137                &participants,
1138                Action::Unlink,
1139                Some(|_, i, j| [i, j].contains(&1usize) && ![i, j].contains(&0usize)),
1140            )
1141            .await;
1142
1143            // Configure engine for first peer
1144            let me = participants[0].clone();
1145            let context = context.with_label(&format!("validator-{me}"));
1146
1147            // Link first peer to all (except second)
1148            link_validators(
1149                &mut oracle,
1150                &participants,
1151                Action::Link(link),
1152                Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
1153            )
1154            .await;
1155
1156            // Restore network connections for all online peers
1157            let link = Link {
1158                latency: Duration::from_millis(10),
1159                jitter: Duration::from_millis(3),
1160                success_rate: 1.0,
1161            };
1162            link_validators(
1163                &mut oracle,
1164                &participants,
1165                Action::Update(link),
1166                Some(|_, i, j| ![i, j].contains(&1usize)),
1167            )
1168            .await;
1169
1170            // Configure engine
1171            let reporter_config = mocks::reporter::Config {
1172                namespace: namespace.clone(),
1173                participants: participants.clone().into(),
1174                scheme: schemes[0].clone(),
1175            };
1176            let mut reporter =
1177                mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1178            reporters.push(reporter.clone());
1179            let application_cfg = mocks::application::Config {
1180                hasher: Sha256::default(),
1181                relay: relay.clone(),
1182                me: me.clone(),
1183                propose_latency: (10.0, 5.0),
1184                verify_latency: (10.0, 5.0),
1185            };
1186            let (actor, application) = mocks::application::Application::new(
1187                context.with_label("application"),
1188                application_cfg,
1189            );
1190            actor.start();
1191            let blocker = oracle.control(me.clone());
1192            let cfg = config::Config {
1193                scheme: schemes[0].clone(),
1194                blocker,
1195                automaton: application.clone(),
1196                relay: application.clone(),
1197                reporter: reporter.clone(),
1198                partition: me.to_string(),
1199                mailbox_size: 1024,
1200                epoch: 333,
1201                namespace: namespace.clone(),
1202                leader_timeout: Duration::from_secs(1),
1203                notarization_timeout: Duration::from_secs(2),
1204                nullify_retry: Duration::from_secs(10),
1205                fetch_timeout: Duration::from_secs(1),
1206                activity_timeout,
1207                skip_timeout,
1208                max_fetch_count: 1,
1209                fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1210                fetch_concurrent: 1,
1211                replay_buffer: NZUsize!(1024 * 1024),
1212                write_buffer: NZUsize!(1024 * 1024),
1213                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1214            };
1215            let engine = Engine::new(context.with_label("engine"), cfg);
1216
1217            // Start engine
1218            let (pending, recovered, resolver) = registrations
1219                .remove(&me)
1220                .expect("validator should be registered");
1221            engine_handlers.push(engine.start(pending, recovered, resolver));
1222
1223            // Wait for new engine to finalize required
1224            let (mut latest, mut monitor) = reporter.subscribe().await;
1225            while latest < required_containers {
1226                latest = monitor.next().await.expect("event missing");
1227            }
1228
1229            // Ensure no blocked connections
1230            let blocked = oracle.blocked().await.unwrap();
1231            assert!(blocked.is_empty());
1232        });
1233    }
1234
1235    #[test_traced]
1236    fn test_backfill() {
1237        backfill(bls12381_threshold::<MinPk, _>);
1238        backfill(bls12381_threshold::<MinSig, _>);
1239        backfill(bls12381_multisig::<MinPk, _>);
1240        backfill(bls12381_multisig::<MinSig, _>);
1241        backfill(ed25519);
1242    }
1243
1244    fn one_offline<S, F>(mut fixture: F)
1245    where
1246        S: Scheme<PublicKey = ed25519::PublicKey>,
1247        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
1248    {
1249        // Create context
1250        let n = 5;
1251        let quorum = quorum(n);
1252        let required_containers = 100;
1253        let activity_timeout = 10;
1254        let skip_timeout = 5;
1255        let max_exceptions = 10;
1256        let namespace = b"consensus".to_vec();
1257        let executor = deterministic::Runner::timed(Duration::from_secs(30));
1258        executor.start(|mut context| async move {
1259            // Create simulated network
1260            let (network, mut oracle) = Network::new(
1261                context.with_label("network"),
1262                Config {
1263                    max_size: 1024 * 1024,
1264                    disconnect_on_block: true,
1265                    tracked_peer_sets: None,
1266                },
1267            );
1268
1269            // Start network
1270            network.start();
1271
1272            // Register participants
1273            let Fixture {
1274                participants,
1275                schemes,
1276                ..
1277            } = fixture(&mut context, n);
1278            let mut registrations = register_validators(&mut oracle, &participants).await;
1279
1280            // Link all validators except first
1281            let link = Link {
1282                latency: Duration::from_millis(10),
1283                jitter: Duration::from_millis(1),
1284                success_rate: 1.0,
1285            };
1286            link_validators(
1287                &mut oracle,
1288                &participants,
1289                Action::Link(link),
1290                Some(|_, i, j| ![i, j].contains(&0usize)),
1291            )
1292            .await;
1293
1294            // Create engines
1295            let relay = Arc::new(mocks::relay::Relay::new());
1296            let mut reporters = Vec::new();
1297            let mut engine_handlers = Vec::new();
1298            for (idx_scheme, validator) in participants.iter().enumerate() {
1299                // Skip first peer
1300                if idx_scheme == 0 {
1301                    continue;
1302                }
1303
1304                // Create scheme context
1305                let context = context.with_label(&format!("validator-{}", *validator));
1306
1307                // Configure engine
1308                let reporter_config = mocks::reporter::Config {
1309                    namespace: namespace.clone(),
1310                    participants: participants.clone().into(),
1311                    scheme: schemes[idx_scheme].clone(),
1312                };
1313                let reporter =
1314                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1315                reporters.push(reporter.clone());
1316                let application_cfg = mocks::application::Config {
1317                    hasher: Sha256::default(),
1318                    relay: relay.clone(),
1319                    me: validator.clone(),
1320                    propose_latency: (10.0, 5.0),
1321                    verify_latency: (10.0, 5.0),
1322                };
1323                let (actor, application) = mocks::application::Application::new(
1324                    context.with_label("application"),
1325                    application_cfg,
1326                );
1327                actor.start();
1328                let blocker = oracle.control(validator.clone());
1329                let cfg = config::Config {
1330                    scheme: schemes[idx_scheme].clone(),
1331                    blocker,
1332                    automaton: application.clone(),
1333                    relay: application.clone(),
1334                    reporter: reporter.clone(),
1335                    partition: validator.to_string(),
1336                    mailbox_size: 1024,
1337                    epoch: 333,
1338                    namespace: namespace.clone(),
1339                    leader_timeout: Duration::from_secs(1),
1340                    notarization_timeout: Duration::from_secs(2),
1341                    nullify_retry: Duration::from_secs(10),
1342                    fetch_timeout: Duration::from_secs(1),
1343                    activity_timeout,
1344                    skip_timeout,
1345                    max_fetch_count: 1,
1346                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1347                    fetch_concurrent: 1,
1348                    replay_buffer: NZUsize!(1024 * 1024),
1349                    write_buffer: NZUsize!(1024 * 1024),
1350                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1351                };
1352                let engine = Engine::new(context.with_label("engine"), cfg);
1353
1354                // Start engine
1355                let (pending, recovered, resolver) = registrations
1356                    .remove(validator)
1357                    .expect("validator should be registered");
1358                engine_handlers.push(engine.start(pending, recovered, resolver));
1359            }
1360
1361            // Wait for all engines to finish
1362            let mut finalizers = Vec::new();
1363            for reporter in reporters.iter_mut() {
1364                let (mut latest, mut monitor) = reporter.subscribe().await;
1365                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1366                    while latest < required_containers {
1367                        latest = monitor.next().await.expect("event missing");
1368                    }
1369                }));
1370            }
1371            join_all(finalizers).await;
1372
1373            // Check reporters for correct activity
1374            let exceptions = 0;
1375            let offline = &participants[0];
1376            for reporter in reporters.iter() {
1377                // Ensure no faults
1378                {
1379                    let faults = reporter.faults.lock().unwrap();
1380                    assert!(faults.is_empty());
1381                }
1382
1383                // Ensure no invalid signatures
1384                {
1385                    let invalid = reporter.invalid.lock().unwrap();
1386                    assert_eq!(*invalid, 0);
1387                }
1388
1389                // Ensure offline node is never active
1390                let mut exceptions = 0;
1391                {
1392                    let notarizes = reporter.notarizes.lock().unwrap();
1393                    for (view, payloads) in notarizes.iter() {
1394                        for (_, participants) in payloads.iter() {
1395                            if participants.contains(offline) {
1396                                panic!("view: {view}");
1397                            }
1398                        }
1399                    }
1400                }
1401                {
1402                    let nullifies = reporter.nullifies.lock().unwrap();
1403                    for (view, participants) in nullifies.iter() {
1404                        if participants.contains(offline) {
1405                            panic!("view: {view}");
1406                        }
1407                    }
1408                }
1409                {
1410                    let finalizes = reporter.finalizes.lock().unwrap();
1411                    for (view, payloads) in finalizes.iter() {
1412                        for (_, finalizers) in payloads.iter() {
1413                            if finalizers.contains(offline) {
1414                                panic!("view: {view}");
1415                            }
1416                        }
1417                    }
1418                }
1419
1420                // Identify offline views
1421                let mut offline_views = Vec::new();
1422                {
1423                    let leaders = reporter.leaders.lock().unwrap();
1424                    for (view, leader) in leaders.iter() {
1425                        if leader == offline {
1426                            offline_views.push(*view);
1427                        }
1428                    }
1429                }
1430                assert!(!offline_views.is_empty());
1431
1432                // Ensure nullifies/nullification collected for offline node
1433                {
1434                    let nullifies = reporter.nullifies.lock().unwrap();
1435                    for view in offline_views.iter() {
1436                        let nullifies = nullifies.get(view).map_or(0, |n| n.len());
1437                        if nullifies < quorum as usize {
1438                            warn!("missing expected view nullifies: {}", view);
1439                            exceptions += 1;
1440                        }
1441                    }
1442                }
1443                {
1444                    let nullifications = reporter.nullifications.lock().unwrap();
1445                    for view in offline_views.iter() {
1446                        if !nullifications.contains_key(view) {
1447                            warn!("missing expected view nullifies: {}", view);
1448                            exceptions += 1;
1449                        }
1450                    }
1451                }
1452
1453                // Ensure exceptions within allowed
1454                assert!(exceptions <= max_exceptions);
1455            }
1456            assert!(exceptions <= max_exceptions);
1457
1458            // Ensure no blocked connections
1459            let blocked = oracle.blocked().await.unwrap();
1460            assert!(blocked.is_empty());
1461
1462            // Ensure we are skipping views
1463            let encoded = context.encode();
1464            let lines = encoded.lines();
1465            let mut skipped_views = 0;
1466            let mut nodes_skipping = 0;
1467            for line in lines {
1468                if line.contains("_engine_voter_skipped_views_total") {
1469                    let parts: Vec<&str> = line.split_whitespace().collect();
1470                    if let Some(number_str) = parts.last() {
1471                        if let Ok(number) = number_str.parse::<u64>() {
1472                            if number > 0 {
1473                                nodes_skipping += 1;
1474                            }
1475                            if number > skipped_views {
1476                                skipped_views = number;
1477                            }
1478                        }
1479                    }
1480                }
1481            }
1482            assert!(
1483                skipped_views > 0,
1484                "expected skipped views to be greater than 0"
1485            );
1486            assert_eq!(
1487                nodes_skipping,
1488                n - 1,
1489                "expected all online nodes to be skipping views"
1490            );
1491        });
1492    }
1493
1494    #[test_traced]
1495    fn test_one_offline() {
1496        one_offline(bls12381_threshold::<MinPk, _>);
1497        one_offline(bls12381_threshold::<MinSig, _>);
1498        one_offline(bls12381_multisig::<MinPk, _>);
1499        one_offline(bls12381_multisig::<MinSig, _>);
1500        one_offline(ed25519);
1501    }
1502
1503    fn slow_validator<S, F>(mut fixture: F)
1504    where
1505        S: Scheme<PublicKey = ed25519::PublicKey>,
1506        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
1507    {
1508        // Create context
1509        let n = 5;
1510        let required_containers = 50;
1511        let activity_timeout = 10;
1512        let skip_timeout = 5;
1513        let namespace = b"consensus".to_vec();
1514        let executor = deterministic::Runner::timed(Duration::from_secs(30));
1515        executor.start(|mut context| async move {
1516            // Create simulated network
1517            let (network, mut oracle) = Network::new(
1518                context.with_label("network"),
1519                Config {
1520                    max_size: 1024 * 1024,
1521                    disconnect_on_block: true,
1522                    tracked_peer_sets: None,
1523                },
1524            );
1525
1526            // Start network
1527            network.start();
1528
1529            // Register participants
1530            let Fixture {
1531                participants,
1532                schemes,
1533                ..
1534            } = fixture(&mut context, n);
1535            let mut registrations = register_validators(&mut oracle, &participants).await;
1536
1537            // Link all validators
1538            let link = Link {
1539                latency: Duration::from_millis(10),
1540                jitter: Duration::from_millis(1),
1541                success_rate: 1.0,
1542            };
1543            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1544
1545            // Create engines
1546            let relay = Arc::new(mocks::relay::Relay::new());
1547            let mut reporters = Vec::new();
1548            let mut engine_handlers = Vec::new();
1549            for (idx_scheme, validator) in participants.iter().enumerate() {
1550                // Create scheme context
1551                let context = context.with_label(&format!("validator-{}", *validator));
1552
1553                // Configure engine
1554                let reporter_config = mocks::reporter::Config {
1555                    namespace: namespace.clone(),
1556                    participants: participants.clone().into(),
1557                    scheme: schemes[idx_scheme].clone(),
1558                };
1559                let reporter =
1560                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1561                reporters.push(reporter.clone());
1562                let application_cfg = if idx_scheme == 0 {
1563                    mocks::application::Config {
1564                        hasher: Sha256::default(),
1565                        relay: relay.clone(),
1566                        me: validator.clone(),
1567                        propose_latency: (10_000.0, 0.0),
1568                        verify_latency: (10_000.0, 5.0),
1569                    }
1570                } else {
1571                    mocks::application::Config {
1572                        hasher: Sha256::default(),
1573                        relay: relay.clone(),
1574                        me: validator.clone(),
1575                        propose_latency: (10.0, 5.0),
1576                        verify_latency: (10.0, 5.0),
1577                    }
1578                };
1579                let (actor, application) = mocks::application::Application::new(
1580                    context.with_label("application"),
1581                    application_cfg,
1582                );
1583                actor.start();
1584                let blocker = oracle.control(validator.clone());
1585                let cfg = config::Config {
1586                    scheme: schemes[idx_scheme].clone(),
1587                    blocker,
1588                    automaton: application.clone(),
1589                    relay: application.clone(),
1590                    reporter: reporter.clone(),
1591                    partition: validator.to_string(),
1592                    mailbox_size: 1024,
1593                    epoch: 333,
1594                    namespace: namespace.clone(),
1595                    leader_timeout: Duration::from_secs(1),
1596                    notarization_timeout: Duration::from_secs(2),
1597                    nullify_retry: Duration::from_secs(10),
1598                    fetch_timeout: Duration::from_secs(1),
1599                    activity_timeout,
1600                    skip_timeout,
1601                    max_fetch_count: 1,
1602                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1603                    fetch_concurrent: 1,
1604                    replay_buffer: NZUsize!(1024 * 1024),
1605                    write_buffer: NZUsize!(1024 * 1024),
1606                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1607                };
1608                let engine = Engine::new(context.with_label("engine"), cfg);
1609
1610                // Start engine
1611                let (pending, recovered, resolver) = registrations
1612                    .remove(validator)
1613                    .expect("validator should be registered");
1614                engine_handlers.push(engine.start(pending, recovered, resolver));
1615            }
1616
1617            // Wait for all engines to finish
1618            let mut finalizers = Vec::new();
1619            for reporter in reporters.iter_mut() {
1620                let (mut latest, mut monitor) = reporter.subscribe().await;
1621                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1622                    while latest < required_containers {
1623                        latest = monitor.next().await.expect("event missing");
1624                    }
1625                }));
1626            }
1627            join_all(finalizers).await;
1628
1629            // Check reporters for correct activity
1630            let slow = &participants[0];
1631            for reporter in reporters.iter() {
1632                // Ensure no faults
1633                {
1634                    let faults = reporter.faults.lock().unwrap();
1635                    assert!(faults.is_empty());
1636                }
1637
1638                // Ensure no invalid signatures
1639                {
1640                    let invalid = reporter.invalid.lock().unwrap();
1641                    assert_eq!(*invalid, 0);
1642                }
1643
1644                // Ensure slow node never emits a notarize or finalize (will never finish verification in a timely manner)
1645                {
1646                    let notarizes = reporter.notarizes.lock().unwrap();
1647                    for (view, payloads) in notarizes.iter() {
1648                        for (_, participants) in payloads.iter() {
1649                            if participants.contains(slow) {
1650                                panic!("view: {view}");
1651                            }
1652                        }
1653                    }
1654                }
1655                {
1656                    let finalizes = reporter.finalizes.lock().unwrap();
1657                    for (view, payloads) in finalizes.iter() {
1658                        for (_, finalizers) in payloads.iter() {
1659                            if finalizers.contains(slow) {
1660                                panic!("view: {view}");
1661                            }
1662                        }
1663                    }
1664                }
1665            }
1666
1667            // Ensure no blocked connections
1668            let blocked = oracle.blocked().await.unwrap();
1669            assert!(blocked.is_empty());
1670        });
1671    }
1672
1673    #[test_traced]
1674    fn test_slow_validator() {
1675        slow_validator(bls12381_threshold::<MinPk, _>);
1676        slow_validator(bls12381_threshold::<MinSig, _>);
1677        slow_validator(bls12381_multisig::<MinPk, _>);
1678        slow_validator(bls12381_multisig::<MinSig, _>);
1679        slow_validator(ed25519);
1680    }
1681
1682    fn all_recovery<S, F>(mut fixture: F)
1683    where
1684        S: Scheme<PublicKey = ed25519::PublicKey>,
1685        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
1686    {
1687        // Create context
1688        let n = 5;
1689        let required_containers = 100;
1690        let activity_timeout = 10;
1691        let skip_timeout = 2;
1692        let namespace = b"consensus".to_vec();
1693        let executor = deterministic::Runner::timed(Duration::from_secs(180));
1694        executor.start(|mut context| async move {
1695            // Create simulated network
1696            let (network, mut oracle) = Network::new(
1697                context.with_label("network"),
1698                Config {
1699                    max_size: 1024 * 1024,
1700                    disconnect_on_block: false,
1701                    tracked_peer_sets: None,
1702                },
1703            );
1704
1705            // Start network
1706            network.start();
1707
1708            // Register participants
1709            let Fixture {
1710                participants,
1711                schemes,
1712                ..
1713            } = fixture(&mut context, n);
1714            let mut registrations = register_validators(&mut oracle, &participants).await;
1715
1716            // Link all validators
1717            let link = Link {
1718                latency: Duration::from_secs(3),
1719                jitter: Duration::from_millis(0),
1720                success_rate: 1.0,
1721            };
1722            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1723
1724            // Create engines
1725            let relay = Arc::new(mocks::relay::Relay::new());
1726            let mut reporters = Vec::new();
1727            let mut engine_handlers = Vec::new();
1728            for (idx, validator) in participants.iter().enumerate() {
1729                // Create scheme context
1730                let context = context.with_label(&format!("validator-{}", *validator));
1731
1732                // Configure engine
1733                let reporter_config = mocks::reporter::Config {
1734                    namespace: namespace.clone(),
1735                    participants: participants.clone().into(),
1736                    scheme: schemes[idx].clone(),
1737                };
1738                let reporter =
1739                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1740                reporters.push(reporter.clone());
1741                let application_cfg = mocks::application::Config {
1742                    hasher: Sha256::default(),
1743                    relay: relay.clone(),
1744                    me: validator.clone(),
1745                    propose_latency: (10.0, 5.0),
1746                    verify_latency: (10.0, 5.0),
1747                };
1748                let (actor, application) = mocks::application::Application::new(
1749                    context.with_label("application"),
1750                    application_cfg,
1751                );
1752                actor.start();
1753                let blocker = oracle.control(validator.clone());
1754                let cfg = config::Config {
1755                    scheme: schemes[idx].clone(),
1756                    blocker,
1757                    automaton: application.clone(),
1758                    relay: application.clone(),
1759                    reporter: reporter.clone(),
1760                    partition: validator.to_string(),
1761                    mailbox_size: 1024,
1762                    epoch: 333,
1763                    namespace: namespace.clone(),
1764                    leader_timeout: Duration::from_secs(1),
1765                    notarization_timeout: Duration::from_secs(2),
1766                    nullify_retry: Duration::from_secs(10),
1767                    fetch_timeout: Duration::from_secs(1),
1768                    activity_timeout,
1769                    skip_timeout,
1770                    max_fetch_count: 1,
1771                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1772                    fetch_concurrent: 1,
1773                    replay_buffer: NZUsize!(1024 * 1024),
1774                    write_buffer: NZUsize!(1024 * 1024),
1775                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1776                };
1777                let engine = Engine::new(context.with_label("engine"), cfg);
1778
1779                // Start engine
1780                let (pending, recovered, resolver) = registrations
1781                    .remove(validator)
1782                    .expect("validator should be registered");
1783                engine_handlers.push(engine.start(pending, recovered, resolver));
1784            }
1785
1786            // Wait for a few virtual minutes (shouldn't finalize anything)
1787            let mut finalizers = Vec::new();
1788            for reporter in reporters.iter_mut() {
1789                let (_, mut monitor) = reporter.subscribe().await;
1790                finalizers.push(
1791                    context
1792                        .with_label("finalizer")
1793                        .spawn(move |context| async move {
1794                            select! {
1795                                _timeout = context.sleep(Duration::from_secs(60)) => {},
1796                                _done = monitor.next() => {
1797                                    panic!("engine should not notarize or finalize anything");
1798                                }
1799                            }
1800                        }),
1801                );
1802            }
1803            join_all(finalizers).await;
1804
1805            // Unlink all validators to get latest view
1806            link_validators(&mut oracle, &participants, Action::Unlink, None).await;
1807
1808            // Wait for a virtual minute (nothing should happen)
1809            context.sleep(Duration::from_secs(60)).await;
1810
1811            // Get latest view
1812            let mut latest = 0;
1813            for reporter in reporters.iter() {
1814                let nullifies = reporter.nullifies.lock().unwrap();
1815                let max = nullifies.keys().max().unwrap();
1816                if *max > latest {
1817                    latest = *max;
1818                }
1819            }
1820
1821            // Update links
1822            let link = Link {
1823                latency: Duration::from_millis(10),
1824                jitter: Duration::from_millis(1),
1825                success_rate: 1.0,
1826            };
1827            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
1828
1829            // Wait for all engines to finish
1830            let mut finalizers = Vec::new();
1831            for reporter in reporters.iter_mut() {
1832                let (mut latest, mut monitor) = reporter.subscribe().await;
1833                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1834                    while latest < required_containers {
1835                        latest = monitor.next().await.expect("event missing");
1836                    }
1837                }));
1838            }
1839            join_all(finalizers).await;
1840
1841            // Check reporters for correct activity
1842            for reporter in reporters.iter() {
1843                // Ensure no faults
1844                {
1845                    let faults = reporter.faults.lock().unwrap();
1846                    assert!(faults.is_empty());
1847                }
1848
1849                // Ensure no invalid signatures
1850                {
1851                    let invalid = reporter.invalid.lock().unwrap();
1852                    assert_eq!(*invalid, 0);
1853                }
1854
1855                // Ensure quick recovery.
1856                //
1857                // If the skip timeout isn't implemented correctly, we may go many views before participants
1858                // start to consider a validator's proposal.
1859                {
1860                    // Ensure nearly all views around latest finalize
1861                    let mut found = 0;
1862                    let finalizations = reporter.finalizations.lock().unwrap();
1863                    for i in latest..latest + activity_timeout {
1864                        if finalizations.contains_key(&i) {
1865                            found += 1;
1866                        }
1867                    }
1868                    assert!(found >= activity_timeout - 2, "found: {found}");
1869                }
1870            }
1871
1872            // Ensure no blocked connections
1873            let blocked = oracle.blocked().await.unwrap();
1874            assert!(blocked.is_empty());
1875        });
1876    }
1877
1878    #[test_traced]
1879    fn test_all_recovery() {
1880        all_recovery(bls12381_threshold::<MinPk, _>);
1881        all_recovery(bls12381_threshold::<MinSig, _>);
1882        all_recovery(bls12381_multisig::<MinPk, _>);
1883        all_recovery(bls12381_multisig::<MinSig, _>);
1884        all_recovery(ed25519);
1885    }
1886
1887    fn partition<S, F>(mut fixture: F)
1888    where
1889        S: Scheme<PublicKey = ed25519::PublicKey>,
1890        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
1891    {
1892        // Create context
1893        let n = 10;
1894        let required_containers = 50;
1895        let activity_timeout = 10;
1896        let skip_timeout = 5;
1897        let namespace = b"consensus".to_vec();
1898        let executor = deterministic::Runner::timed(Duration::from_secs(900));
1899        executor.start(|mut context| async move {
1900            // Create simulated network
1901            let (network, mut oracle) = Network::new(
1902                context.with_label("network"),
1903                Config {
1904                    max_size: 1024 * 1024,
1905                    disconnect_on_block: false,
1906                    tracked_peer_sets: None,
1907                },
1908            );
1909
1910            // Start network
1911            network.start();
1912
1913            // Register participants
1914            let Fixture {
1915                participants,
1916                schemes,
1917                ..
1918            } = fixture(&mut context, n);
1919            let mut registrations = register_validators(&mut oracle, &participants).await;
1920
1921            // Link all validators
1922            let link = Link {
1923                latency: Duration::from_millis(10),
1924                jitter: Duration::from_millis(1),
1925                success_rate: 1.0,
1926            };
1927            link_validators(&mut oracle, &participants, Action::Link(link.clone()), None).await;
1928
1929            // Create engines
1930            let relay = Arc::new(mocks::relay::Relay::new());
1931            let mut reporters = Vec::new();
1932            let mut engine_handlers = Vec::new();
1933            for (idx, validator) in participants.iter().enumerate() {
1934                // Create scheme context
1935                let context = context.with_label(&format!("validator-{}", *validator));
1936
1937                // Configure engine
1938                let reporter_config = mocks::reporter::Config {
1939                    namespace: namespace.clone(),
1940                    participants: participants.clone().into(),
1941                    scheme: schemes[idx].clone(),
1942                };
1943                let reporter =
1944                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
1945                reporters.push(reporter.clone());
1946                let application_cfg = mocks::application::Config {
1947                    hasher: Sha256::default(),
1948                    relay: relay.clone(),
1949                    me: validator.clone(),
1950                    propose_latency: (10.0, 5.0),
1951                    verify_latency: (10.0, 5.0),
1952                };
1953                let (actor, application) = mocks::application::Application::new(
1954                    context.with_label("application"),
1955                    application_cfg,
1956                );
1957                actor.start();
1958                let blocker = oracle.control(validator.clone());
1959                let cfg = config::Config {
1960                    scheme: schemes[idx].clone(),
1961                    blocker,
1962                    automaton: application.clone(),
1963                    relay: application.clone(),
1964                    reporter: reporter.clone(),
1965                    partition: validator.to_string(),
1966                    mailbox_size: 1024,
1967                    epoch: 333,
1968                    namespace: namespace.clone(),
1969                    leader_timeout: Duration::from_secs(1),
1970                    notarization_timeout: Duration::from_secs(2),
1971                    nullify_retry: Duration::from_secs(10),
1972                    fetch_timeout: Duration::from_secs(1),
1973                    activity_timeout,
1974                    skip_timeout,
1975                    max_fetch_count: 1,
1976                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
1977                    fetch_concurrent: 1,
1978                    replay_buffer: NZUsize!(1024 * 1024),
1979                    write_buffer: NZUsize!(1024 * 1024),
1980                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1981                };
1982                let engine = Engine::new(context.with_label("engine"), cfg);
1983
1984                // Start engine
1985                let (pending, recovered, resolver) = registrations
1986                    .remove(validator)
1987                    .expect("validator should be registered");
1988                engine_handlers.push(engine.start(pending, recovered, resolver));
1989            }
1990
1991            // Wait for all engines to finish
1992            let mut finalizers = Vec::new();
1993            for reporter in reporters.iter_mut() {
1994                let (mut latest, mut monitor) = reporter.subscribe().await;
1995                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
1996                    while latest < required_containers {
1997                        latest = monitor.next().await.expect("event missing");
1998                    }
1999                }));
2000            }
2001            join_all(finalizers).await;
2002
2003            // Cut all links between validator halves
2004            fn separated(n: usize, a: usize, b: usize) -> bool {
2005                let m = n / 2;
2006                (a < m && b >= m) || (a >= m && b < m)
2007            }
2008            link_validators(&mut oracle, &participants, Action::Unlink, Some(separated)).await;
2009
2010            // Wait for any in-progress notarizations/finalizations to finish
2011            context.sleep(Duration::from_secs(10)).await;
2012
2013            // Wait for a few virtual minutes (shouldn't finalize anything)
2014            let mut finalizers = Vec::new();
2015            for reporter in reporters.iter_mut() {
2016                let (_, mut monitor) = reporter.subscribe().await;
2017                finalizers.push(
2018                    context
2019                        .with_label("finalizer")
2020                        .spawn(move |context| async move {
2021                            select! {
2022                                _timeout = context.sleep(Duration::from_secs(60)) => {},
2023                                _done = monitor.next() => {
2024                                    panic!("engine should not notarize or finalize anything");
2025                                }
2026                            }
2027                        }),
2028                );
2029            }
2030            join_all(finalizers).await;
2031
2032            // Restore links
2033            link_validators(
2034                &mut oracle,
2035                &participants,
2036                Action::Link(link),
2037                Some(separated),
2038            )
2039            .await;
2040
2041            // Wait for all engines to finish
2042            let mut finalizers = Vec::new();
2043            for reporter in reporters.iter_mut() {
2044                let (mut latest, mut monitor) = reporter.subscribe().await;
2045                let required = latest + required_containers;
2046                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2047                    while latest < required {
2048                        latest = monitor.next().await.expect("event missing");
2049                    }
2050                }));
2051            }
2052            join_all(finalizers).await;
2053
2054            // Check reporters for correct activity
2055            for reporter in reporters.iter() {
2056                // Ensure no faults
2057                {
2058                    let faults = reporter.faults.lock().unwrap();
2059                    assert!(faults.is_empty());
2060                }
2061
2062                // Ensure no invalid signatures
2063                {
2064                    let invalid = reporter.invalid.lock().unwrap();
2065                    assert_eq!(*invalid, 0);
2066                }
2067            }
2068
2069            // Ensure no blocked connections
2070            let blocked = oracle.blocked().await.unwrap();
2071            assert!(blocked.is_empty());
2072        });
2073    }
2074
2075    #[test_traced]
2076    #[ignore]
2077    fn test_partition() {
2078        partition(bls12381_threshold::<MinPk, _>);
2079        partition(bls12381_threshold::<MinSig, _>);
2080        partition(bls12381_multisig::<MinPk, _>);
2081        partition(bls12381_multisig::<MinSig, _>);
2082        partition(ed25519);
2083    }
2084
2085    fn slow_and_lossy_links<S, F>(seed: u64, mut fixture: F) -> String
2086    where
2087        S: Scheme<PublicKey = ed25519::PublicKey>,
2088        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2089    {
2090        // Create context
2091        let n = 5;
2092        let required_containers = 50;
2093        let activity_timeout = 10;
2094        let skip_timeout = 5;
2095        let namespace = b"consensus".to_vec();
2096        let cfg = deterministic::Config::new()
2097            .with_seed(seed)
2098            .with_timeout(Some(Duration::from_secs(5_000)));
2099        let executor = deterministic::Runner::new(cfg);
2100        executor.start(|mut context| async move {
2101            // Create simulated network
2102            let (network, mut oracle) = Network::new(
2103                context.with_label("network"),
2104                Config {
2105                    max_size: 1024 * 1024,
2106                    disconnect_on_block: false,
2107                    tracked_peer_sets: None,
2108                },
2109            );
2110
2111            // Start network
2112            network.start();
2113
2114            // Register participants
2115            let Fixture {
2116                participants,
2117                schemes,
2118                ..
2119            } = fixture(&mut context, n);
2120            let mut registrations = register_validators(&mut oracle, &participants).await;
2121
2122            // Link all validators
2123            let degraded_link = Link {
2124                latency: Duration::from_millis(200),
2125                jitter: Duration::from_millis(150),
2126                success_rate: 0.5,
2127            };
2128            link_validators(
2129                &mut oracle,
2130                &participants,
2131                Action::Link(degraded_link),
2132                None,
2133            )
2134            .await;
2135
2136            // Create engines
2137            let relay = Arc::new(mocks::relay::Relay::new());
2138            let mut reporters = Vec::new();
2139            let mut engine_handlers = Vec::new();
2140            for (idx, validator) in participants.iter().enumerate() {
2141                // Create scheme context
2142                let context = context.with_label(&format!("validator-{}", *validator));
2143
2144                // Configure engine
2145                let reporter_config = mocks::reporter::Config {
2146                    namespace: namespace.clone(),
2147                    participants: participants.clone().into(),
2148                    scheme: schemes[idx].clone(),
2149                };
2150                let reporter =
2151                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2152                reporters.push(reporter.clone());
2153                let application_cfg = mocks::application::Config {
2154                    hasher: Sha256::default(),
2155                    relay: relay.clone(),
2156                    me: validator.clone(),
2157                    propose_latency: (10.0, 5.0),
2158                    verify_latency: (10.0, 5.0),
2159                };
2160                let (actor, application) = mocks::application::Application::new(
2161                    context.with_label("application"),
2162                    application_cfg,
2163                );
2164                actor.start();
2165                let blocker = oracle.control(validator.clone());
2166                let cfg = config::Config {
2167                    scheme: schemes[idx].clone(),
2168                    blocker,
2169                    automaton: application.clone(),
2170                    relay: application.clone(),
2171                    reporter: reporter.clone(),
2172                    partition: validator.to_string(),
2173                    mailbox_size: 1024,
2174                    epoch: 333,
2175                    namespace: namespace.clone(),
2176                    leader_timeout: Duration::from_secs(1),
2177                    notarization_timeout: Duration::from_secs(2),
2178                    nullify_retry: Duration::from_secs(10),
2179                    fetch_timeout: Duration::from_secs(1),
2180                    activity_timeout,
2181                    skip_timeout,
2182                    max_fetch_count: 1,
2183                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2184                    fetch_concurrent: 1,
2185                    replay_buffer: NZUsize!(1024 * 1024),
2186                    write_buffer: NZUsize!(1024 * 1024),
2187                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2188                };
2189                let engine = Engine::new(context.with_label("engine"), cfg);
2190
2191                // Start engine
2192                let (pending, recovered, resolver) = registrations
2193                    .remove(validator)
2194                    .expect("validator should be registered");
2195                engine_handlers.push(engine.start(pending, recovered, resolver));
2196            }
2197
2198            // Wait for all engines to finish
2199            let mut finalizers = Vec::new();
2200            for reporter in reporters.iter_mut() {
2201                let (mut latest, mut monitor) = reporter.subscribe().await;
2202                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2203                    while latest < required_containers {
2204                        latest = monitor.next().await.expect("event missing");
2205                    }
2206                }));
2207            }
2208            join_all(finalizers).await;
2209
2210            // Check reporters for correct activity
2211            for reporter in reporters.iter() {
2212                // Ensure no faults
2213                {
2214                    let faults = reporter.faults.lock().unwrap();
2215                    assert!(faults.is_empty());
2216                }
2217
2218                // Ensure no invalid signatures
2219                {
2220                    let invalid = reporter.invalid.lock().unwrap();
2221                    assert_eq!(*invalid, 0);
2222                }
2223            }
2224
2225            // Ensure no blocked connections
2226            let blocked = oracle.blocked().await.unwrap();
2227            assert!(blocked.is_empty());
2228
2229            context.auditor().state()
2230        })
2231    }
2232
2233    #[test_traced]
2234    fn test_slow_and_lossy_links() {
2235        slow_and_lossy_links(0, bls12381_threshold::<MinPk, _>);
2236        slow_and_lossy_links(0, bls12381_threshold::<MinSig, _>);
2237        slow_and_lossy_links(0, bls12381_multisig::<MinPk, _>);
2238        slow_and_lossy_links(0, bls12381_multisig::<MinSig, _>);
2239        slow_and_lossy_links(0, ed25519);
2240    }
2241
2242    #[test_traced]
2243    #[ignore]
2244    fn test_determinism() {
2245        // We use slow and lossy links as the deterministic test
2246        // because it is the most complex test.
2247        for seed in 1..6 {
2248            let ts_pk_state_1 = slow_and_lossy_links(seed, bls12381_threshold::<MinPk, _>);
2249            let ts_pk_state_2 = slow_and_lossy_links(seed, bls12381_threshold::<MinPk, _>);
2250            assert_eq!(ts_pk_state_1, ts_pk_state_2);
2251
2252            let ts_sig_state_1 = slow_and_lossy_links(seed, bls12381_threshold::<MinSig, _>);
2253            let ts_sig_state_2 = slow_and_lossy_links(seed, bls12381_threshold::<MinSig, _>);
2254            assert_eq!(ts_sig_state_1, ts_sig_state_2);
2255
2256            let ms_pk_state_1 = slow_and_lossy_links(seed, bls12381_multisig::<MinPk, _>);
2257            let ms_pk_state_2 = slow_and_lossy_links(seed, bls12381_multisig::<MinPk, _>);
2258            assert_eq!(ms_pk_state_1, ms_pk_state_2);
2259
2260            let ms_sig_state_1 = slow_and_lossy_links(seed, bls12381_multisig::<MinSig, _>);
2261            let ms_sig_state_2 = slow_and_lossy_links(seed, bls12381_multisig::<MinSig, _>);
2262            assert_eq!(ms_sig_state_1, ms_sig_state_2);
2263
2264            let ed_state_1 = slow_and_lossy_links(seed, ed25519);
2265            let ed_state_2 = slow_and_lossy_links(seed, ed25519);
2266            assert_eq!(ed_state_1, ed_state_2);
2267
2268            let states = [
2269                ("threshold-minpk", ts_pk_state_1),
2270                ("threshold-minsig", ts_sig_state_1),
2271                ("multisig-minpk", ms_pk_state_1),
2272                ("multisig-minsig", ms_sig_state_1),
2273                ("ed25519", ed_state_1),
2274            ];
2275
2276            // Sanity check that different types can't be identical
2277            for pair in states.windows(2) {
2278                assert_ne!(
2279                    pair[0].1, pair[1].1,
2280                    "state {} equals state {}",
2281                    pair[0].0, pair[0].0
2282                );
2283            }
2284        }
2285    }
2286
2287    fn conflicter<S, F>(seed: u64, mut fixture: F)
2288    where
2289        S: Scheme<PublicKey = ed25519::PublicKey>,
2290        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2291    {
2292        // Create context
2293        let n = 4;
2294        let required_containers = 50;
2295        let activity_timeout = 10;
2296        let skip_timeout = 5;
2297        let namespace = b"consensus".to_vec();
2298        let cfg = deterministic::Config::new()
2299            .with_seed(seed)
2300            .with_timeout(Some(Duration::from_secs(30)));
2301        let executor = deterministic::Runner::new(cfg);
2302        executor.start(|mut context| async move {
2303            // Create simulated network
2304            let (network, mut oracle) = Network::new(
2305                context.with_label("network"),
2306                Config {
2307                    max_size: 1024 * 1024,
2308                    disconnect_on_block: false,
2309                    tracked_peer_sets: None,
2310                },
2311            );
2312
2313            // Start network
2314            network.start();
2315
2316            // Register participants
2317            let Fixture {
2318                participants,
2319                schemes,
2320                ..
2321            } = fixture(&mut context, n);
2322            let mut registrations = register_validators(&mut oracle, &participants).await;
2323
2324            // Link all validators
2325            let link = Link {
2326                latency: Duration::from_millis(10),
2327                jitter: Duration::from_millis(1),
2328                success_rate: 1.0,
2329            };
2330            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2331
2332            // Create engines
2333            let relay = Arc::new(mocks::relay::Relay::new());
2334            let mut reporters = Vec::new();
2335            for (idx_scheme, validator) in participants.iter().enumerate() {
2336                // Create scheme context
2337                let context = context.with_label(&format!("validator-{}", *validator));
2338
2339                // Start engine
2340                let reporter_config = mocks::reporter::Config {
2341                    namespace: namespace.clone(),
2342                    participants: participants.clone().into(),
2343                    scheme: schemes[idx_scheme].clone(),
2344                };
2345                let reporter =
2346                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2347                let (pending, recovered, resolver) = registrations
2348                    .remove(validator)
2349                    .expect("validator should be registered");
2350                if idx_scheme == 0 {
2351                    let cfg = mocks::conflicter::Config {
2352                        namespace: namespace.clone(),
2353                        scheme: schemes[idx_scheme].clone(),
2354                    };
2355
2356                    let engine: mocks::conflicter::Conflicter<_, _, Sha256> =
2357                        mocks::conflicter::Conflicter::new(
2358                            context.with_label("byzantine_engine"),
2359                            cfg,
2360                        );
2361                    engine.start(pending);
2362                } else {
2363                    reporters.push(reporter.clone());
2364                    let application_cfg = mocks::application::Config {
2365                        hasher: Sha256::default(),
2366                        relay: relay.clone(),
2367                        me: validator.clone(),
2368                        propose_latency: (10.0, 5.0),
2369                        verify_latency: (10.0, 5.0),
2370                    };
2371                    let (actor, application) = mocks::application::Application::new(
2372                        context.with_label("application"),
2373                        application_cfg,
2374                    );
2375                    actor.start();
2376                    let blocker = oracle.control(validator.clone());
2377                    let cfg = config::Config {
2378                        scheme: schemes[idx_scheme].clone(),
2379                        blocker,
2380                        automaton: application.clone(),
2381                        relay: application.clone(),
2382                        reporter: reporter.clone(),
2383                        partition: validator.to_string(),
2384                        mailbox_size: 1024,
2385                        epoch: 333,
2386                        namespace: namespace.clone(),
2387                        leader_timeout: Duration::from_secs(1),
2388                        notarization_timeout: Duration::from_secs(2),
2389                        nullify_retry: Duration::from_secs(10),
2390                        fetch_timeout: Duration::from_secs(1),
2391                        activity_timeout,
2392                        skip_timeout,
2393                        max_fetch_count: 1,
2394                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2395                        fetch_concurrent: 1,
2396                        replay_buffer: NZUsize!(1024 * 1024),
2397                        write_buffer: NZUsize!(1024 * 1024),
2398                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2399                    };
2400                    let engine = Engine::new(context.with_label("engine"), cfg);
2401                    engine.start(pending, recovered, resolver);
2402                }
2403            }
2404
2405            // Wait for all engines to finish
2406            let mut finalizers = Vec::new();
2407            for reporter in reporters.iter_mut() {
2408                let (mut latest, mut monitor) = reporter.subscribe().await;
2409                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2410                    while latest < required_containers {
2411                        latest = monitor.next().await.expect("event missing");
2412                    }
2413                }));
2414            }
2415            join_all(finalizers).await;
2416
2417            // Check reporters for correct activity
2418            let byz = &participants[0];
2419            let mut count_conflicting = 0;
2420            for reporter in reporters.iter() {
2421                // Ensure only faults for byz
2422                {
2423                    let faults = reporter.faults.lock().unwrap();
2424                    assert_eq!(faults.len(), 1);
2425                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
2426                    for (_, faults) in faulter.iter() {
2427                        for fault in faults.iter() {
2428                            match fault {
2429                                Activity::ConflictingNotarize(_) => {
2430                                    count_conflicting += 1;
2431                                }
2432                                Activity::ConflictingFinalize(_) => {
2433                                    count_conflicting += 1;
2434                                }
2435                                _ => panic!("unexpected fault: {fault:?}"),
2436                            }
2437                        }
2438                    }
2439                }
2440
2441                // Ensure no invalid signatures
2442                {
2443                    let invalid = reporter.invalid.lock().unwrap();
2444                    assert_eq!(*invalid, 0);
2445                }
2446            }
2447            assert!(count_conflicting > 0);
2448
2449            // Ensure conflicter is blocked
2450            let blocked = oracle.blocked().await.unwrap();
2451            assert!(!blocked.is_empty());
2452            for (a, b) in blocked {
2453                assert_ne!(&a, byz);
2454                assert_eq!(&b, byz);
2455            }
2456        });
2457    }
2458
2459    #[test_traced]
2460    #[ignore]
2461    fn test_conflicter() {
2462        for seed in 0..5 {
2463            conflicter(seed, bls12381_threshold::<MinPk, _>);
2464            conflicter(seed, bls12381_threshold::<MinSig, _>);
2465            conflicter(seed, bls12381_multisig::<MinPk, _>);
2466            conflicter(seed, bls12381_multisig::<MinSig, _>);
2467            conflicter(seed, ed25519);
2468        }
2469    }
2470
2471    fn invalid<S, F>(seed: u64, mut fixture: F)
2472    where
2473        S: Scheme<PublicKey = ed25519::PublicKey>,
2474        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2475    {
2476        // Create context
2477        let n = 4;
2478        let required_containers = 50;
2479        let activity_timeout = 10;
2480        let skip_timeout = 5;
2481        let namespace = b"consensus".to_vec();
2482        let cfg = deterministic::Config::new()
2483            .with_seed(seed)
2484            .with_timeout(Some(Duration::from_secs(30)));
2485        let executor = deterministic::Runner::new(cfg);
2486        executor.start(|mut context| async move {
2487            // Create simulated network
2488            let (network, mut oracle) = Network::new(
2489                context.with_label("network"),
2490                Config {
2491                    max_size: 1024 * 1024,
2492                    disconnect_on_block: false,
2493                    tracked_peer_sets: None,
2494                },
2495            );
2496
2497            // Start network
2498            network.start();
2499
2500            // Register participants
2501            let Fixture {
2502                participants,
2503                schemes,
2504                ..
2505            } = fixture(&mut context, n);
2506            let mut registrations = register_validators(&mut oracle, &participants).await;
2507
2508            // Link all validators
2509            let link = Link {
2510                latency: Duration::from_millis(10),
2511                jitter: Duration::from_millis(1),
2512                success_rate: 1.0,
2513            };
2514            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2515
2516            // Create engines
2517            let relay = Arc::new(mocks::relay::Relay::new());
2518            let mut reporters = Vec::new();
2519            for (idx_scheme, validator) in participants.iter().enumerate() {
2520                // Create scheme context
2521                let context = context.with_label(&format!("validator-{}", *validator));
2522
2523                // Byzantine node (idx 0) uses empty namespace to produce invalid signatures
2524                let engine_namespace = if idx_scheme == 0 {
2525                    vec![]
2526                } else {
2527                    namespace.clone()
2528                };
2529
2530                let reporter_config = mocks::reporter::Config {
2531                    namespace: namespace.clone(), // Reporter always uses correct namespace
2532                    participants: participants.clone().into(),
2533                    scheme: schemes[idx_scheme].clone(),
2534                };
2535                let reporter =
2536                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2537                reporters.push(reporter.clone());
2538
2539                let application_cfg = mocks::application::Config {
2540                    hasher: Sha256::default(),
2541                    relay: relay.clone(),
2542                    me: validator.clone(),
2543                    propose_latency: (10.0, 5.0),
2544                    verify_latency: (10.0, 5.0),
2545                };
2546                let (actor, application) = mocks::application::Application::new(
2547                    context.with_label("application"),
2548                    application_cfg,
2549                );
2550                actor.start();
2551                let blocker = oracle.control(validator.clone());
2552                let cfg = config::Config {
2553                    scheme: schemes[idx_scheme].clone(),
2554                    blocker,
2555                    automaton: application.clone(),
2556                    relay: application.clone(),
2557                    reporter: reporter.clone(),
2558                    partition: validator.clone().to_string(),
2559                    mailbox_size: 1024,
2560                    epoch: 333,
2561                    namespace: engine_namespace,
2562                    leader_timeout: Duration::from_secs(1),
2563                    notarization_timeout: Duration::from_secs(2),
2564                    nullify_retry: Duration::from_secs(10),
2565                    fetch_timeout: Duration::from_secs(1),
2566                    activity_timeout,
2567                    skip_timeout,
2568                    max_fetch_count: 1,
2569                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2570                    fetch_concurrent: 1,
2571                    replay_buffer: NZUsize!(1024 * 1024),
2572                    write_buffer: NZUsize!(1024 * 1024),
2573                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2574                };
2575                let engine = Engine::new(context.with_label("engine"), cfg);
2576                let (pending, recovered, resolver) = registrations
2577                    .remove(validator)
2578                    .expect("validator should be registered");
2579                engine.start(pending, recovered, resolver);
2580            }
2581
2582            // Wait for honest engines to finish (skip byzantine node at index 0)
2583            let mut finalizers = Vec::new();
2584            for reporter in reporters.iter_mut().skip(1) {
2585                let (mut latest, mut monitor) = reporter.subscribe().await;
2586                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2587                    while latest < required_containers {
2588                        latest = monitor.next().await.expect("event missing");
2589                    }
2590                }));
2591            }
2592            join_all(finalizers).await;
2593
2594            // Check honest reporters (reporters[1..]) for correct activity
2595            let mut invalid_count = 0;
2596            for reporter in reporters.iter().skip(1) {
2597                // Ensure no faults
2598                {
2599                    let faults = reporter.faults.lock().unwrap();
2600                    assert!(faults.is_empty());
2601                }
2602
2603                // Count invalid signatures
2604                {
2605                    let invalid = reporter.invalid.lock().unwrap();
2606                    if *invalid > 0 {
2607                        invalid_count += 1;
2608                    }
2609                }
2610            }
2611
2612            // All honest nodes should see invalid signatures from the byzantine node
2613            assert_eq!(invalid_count, n - 1);
2614
2615            // Ensure byzantine node is blocked by honest nodes
2616            let blocked = oracle.blocked().await.unwrap();
2617            assert!(!blocked.is_empty());
2618            for (a, b) in blocked {
2619                if a != participants[0] {
2620                    assert_eq!(b, participants[0]);
2621                }
2622            }
2623        });
2624    }
2625
2626    #[test_traced]
2627    #[ignore]
2628    fn test_invalid() {
2629        for seed in 0..5 {
2630            invalid(seed, bls12381_threshold::<MinPk, _>);
2631            invalid(seed, bls12381_threshold::<MinSig, _>);
2632            invalid(seed, bls12381_multisig::<MinPk, _>);
2633            invalid(seed, bls12381_multisig::<MinSig, _>);
2634            invalid(seed, ed25519);
2635        }
2636    }
2637
2638    fn impersonator<S, F>(seed: u64, mut fixture: F)
2639    where
2640        S: Scheme<PublicKey = ed25519::PublicKey>,
2641        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2642    {
2643        // Create context
2644        let n = 4;
2645        let required_containers = 50;
2646        let activity_timeout = 10;
2647        let skip_timeout = 5;
2648        let namespace = b"consensus".to_vec();
2649        let cfg = deterministic::Config::new()
2650            .with_seed(seed)
2651            .with_timeout(Some(Duration::from_secs(30)));
2652        let executor = deterministic::Runner::new(cfg);
2653        executor.start(|mut context| async move {
2654            // Create simulated network
2655            let (network, mut oracle) = Network::new(
2656                context.with_label("network"),
2657                Config {
2658                    max_size: 1024 * 1024,
2659                    disconnect_on_block: false,
2660                    tracked_peer_sets: None,
2661                },
2662            );
2663
2664            // Start network
2665            network.start();
2666
2667            // Register participants
2668            let Fixture {
2669                participants,
2670                schemes,
2671                ..
2672            } = fixture(&mut context, n);
2673            let mut registrations = register_validators(&mut oracle, &participants).await;
2674
2675            // Link all validators
2676            let link = Link {
2677                latency: Duration::from_millis(10),
2678                jitter: Duration::from_millis(1),
2679                success_rate: 1.0,
2680            };
2681            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2682
2683            // Create engines
2684            let relay = Arc::new(mocks::relay::Relay::new());
2685            let mut reporters = Vec::new();
2686            for (idx_scheme, validator) in participants.iter().enumerate() {
2687                // Create scheme context
2688                let context = context.with_label(&format!("validator-{}", *validator));
2689
2690                // Start engine
2691                let reporter_config = mocks::reporter::Config {
2692                    namespace: namespace.clone(),
2693                    participants: participants.clone().into(),
2694                    scheme: schemes[idx_scheme].clone(),
2695                };
2696                let reporter =
2697                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2698                let (pending, recovered, resolver) = registrations
2699                    .remove(validator)
2700                    .expect("validator should be registered");
2701                if idx_scheme == 0 {
2702                    let cfg = mocks::impersonator::Config {
2703                        scheme: schemes[idx_scheme].clone(),
2704                        namespace: namespace.clone(),
2705                    };
2706
2707                    let engine: mocks::impersonator::Impersonator<_, _, Sha256> =
2708                        mocks::impersonator::Impersonator::new(
2709                            context.with_label("byzantine_engine"),
2710                            cfg,
2711                        );
2712                    engine.start(pending);
2713                } else {
2714                    reporters.push(reporter.clone());
2715                    let application_cfg = mocks::application::Config {
2716                        hasher: Sha256::default(),
2717                        relay: relay.clone(),
2718                        me: validator.clone(),
2719                        propose_latency: (10.0, 5.0),
2720                        verify_latency: (10.0, 5.0),
2721                    };
2722                    let (actor, application) = mocks::application::Application::new(
2723                        context.with_label("application"),
2724                        application_cfg,
2725                    );
2726                    actor.start();
2727                    let blocker = oracle.control(validator.clone());
2728                    let cfg = config::Config {
2729                        scheme: schemes[idx_scheme].clone(),
2730                        blocker,
2731                        automaton: application.clone(),
2732                        relay: application.clone(),
2733                        reporter: reporter.clone(),
2734                        partition: validator.clone().to_string(),
2735                        mailbox_size: 1024,
2736                        epoch: 333,
2737                        namespace: namespace.clone(),
2738                        leader_timeout: Duration::from_secs(1),
2739                        notarization_timeout: Duration::from_secs(2),
2740                        nullify_retry: Duration::from_secs(10),
2741                        fetch_timeout: Duration::from_secs(1),
2742                        activity_timeout,
2743                        skip_timeout,
2744                        max_fetch_count: 1,
2745                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2746                        fetch_concurrent: 1,
2747                        replay_buffer: NZUsize!(1024 * 1024),
2748                        write_buffer: NZUsize!(1024 * 1024),
2749                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2750                    };
2751                    let engine = Engine::new(context.with_label("engine"), cfg);
2752                    engine.start(pending, recovered, resolver);
2753                }
2754            }
2755
2756            // Wait for all engines to finish
2757            let mut finalizers = Vec::new();
2758            for reporter in reporters.iter_mut() {
2759                let (mut latest, mut monitor) = reporter.subscribe().await;
2760                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2761                    while latest < required_containers {
2762                        latest = monitor.next().await.expect("event missing");
2763                    }
2764                }));
2765            }
2766            join_all(finalizers).await;
2767
2768            // Check reporters for correct activity
2769            let byz = &participants[0];
2770            for reporter in reporters.iter() {
2771                // Ensure no faults
2772                {
2773                    let faults = reporter.faults.lock().unwrap();
2774                    assert!(faults.is_empty());
2775                }
2776
2777                // Ensure no invalid signatures
2778                {
2779                    let invalid = reporter.invalid.lock().unwrap();
2780                    assert_eq!(*invalid, 0);
2781                }
2782            }
2783
2784            // Ensure invalid is blocked
2785            let blocked = oracle.blocked().await.unwrap();
2786            assert!(!blocked.is_empty());
2787            for (a, b) in blocked {
2788                assert_ne!(&a, byz);
2789                assert_eq!(&b, byz);
2790            }
2791        });
2792    }
2793
2794    #[test_traced]
2795    #[ignore]
2796    fn test_impersonator() {
2797        for seed in 0..5 {
2798            impersonator(seed, bls12381_threshold::<MinPk, _>);
2799            impersonator(seed, bls12381_threshold::<MinSig, _>);
2800            impersonator(seed, bls12381_multisig::<MinPk, _>);
2801            impersonator(seed, bls12381_multisig::<MinSig, _>);
2802            impersonator(seed, ed25519);
2803        }
2804    }
2805
2806    fn reconfigurer<S, F>(seed: u64, mut fixture: F)
2807    where
2808        S: Scheme<PublicKey = ed25519::PublicKey>,
2809        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2810    {
2811        // Create context
2812        let n = 4;
2813        let required_containers = 50;
2814        let activity_timeout = 10;
2815        let skip_timeout = 5;
2816        let namespace = b"consensus".to_vec();
2817        let cfg = deterministic::Config::new()
2818            .with_seed(seed)
2819            .with_timeout(Some(Duration::from_secs(30)));
2820        let executor = deterministic::Runner::new(cfg);
2821        executor.start(|mut context| async move {
2822            // Create simulated network
2823            let (network, mut oracle) = Network::new(
2824                context.with_label("network"),
2825                Config {
2826                    max_size: 1024 * 1024,
2827                    disconnect_on_block: false,
2828                    tracked_peer_sets: None,
2829                },
2830            );
2831
2832            // Start network
2833            network.start();
2834
2835            // Register participants
2836            let Fixture {
2837                participants,
2838                schemes,
2839                ..
2840            } = fixture(&mut context, n);
2841            let mut registrations = register_validators(&mut oracle, &participants).await;
2842
2843            // Link all validators
2844            let link = Link {
2845                latency: Duration::from_millis(10),
2846                jitter: Duration::from_millis(1),
2847                success_rate: 1.0,
2848            };
2849            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
2850
2851            // Create engines
2852            let relay = Arc::new(mocks::relay::Relay::new());
2853            let mut reporters = Vec::new();
2854            for (idx_scheme, validator) in participants.iter().enumerate() {
2855                // Create scheme context
2856                let context = context.with_label(&format!("validator-{}", *validator));
2857
2858                // Start engine
2859                let reporter_config = mocks::reporter::Config {
2860                    namespace: namespace.clone(),
2861                    participants: participants.clone().into(),
2862                    scheme: schemes[idx_scheme].clone(),
2863                };
2864                let reporter =
2865                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
2866                let (pending, recovered, resolver) = registrations
2867                    .remove(validator)
2868                    .expect("validator should be registered");
2869                if idx_scheme == 0 {
2870                    let cfg = mocks::reconfigurer::Config {
2871                        scheme: schemes[idx_scheme].clone(),
2872                        namespace: namespace.clone(),
2873                    };
2874                    let engine: mocks::reconfigurer::Reconfigurer<_, _, Sha256> =
2875                        mocks::reconfigurer::Reconfigurer::new(
2876                            context.with_label("byzantine_engine"),
2877                            cfg,
2878                        );
2879                    engine.start(pending);
2880                } else {
2881                    reporters.push(reporter.clone());
2882                    let application_cfg = mocks::application::Config {
2883                        hasher: Sha256::default(),
2884                        relay: relay.clone(),
2885                        me: validator.clone(),
2886                        propose_latency: (10.0, 5.0),
2887                        verify_latency: (10.0, 5.0),
2888                    };
2889                    let (actor, application) = mocks::application::Application::new(
2890                        context.with_label("application"),
2891                        application_cfg,
2892                    );
2893                    actor.start();
2894                    let blocker = oracle.control(validator.clone());
2895                    let cfg = config::Config {
2896                        scheme: schemes[idx_scheme].clone(),
2897                        blocker,
2898                        automaton: application.clone(),
2899                        relay: application.clone(),
2900                        reporter: reporter.clone(),
2901                        partition: validator.to_string(),
2902                        mailbox_size: 1024,
2903                        epoch: 333,
2904                        namespace: namespace.clone(),
2905                        leader_timeout: Duration::from_secs(1),
2906                        notarization_timeout: Duration::from_secs(2),
2907                        nullify_retry: Duration::from_secs(10),
2908                        fetch_timeout: Duration::from_secs(1),
2909                        activity_timeout,
2910                        skip_timeout,
2911                        max_fetch_count: 1,
2912                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
2913                        fetch_concurrent: 1,
2914                        replay_buffer: NZUsize!(1024 * 1024),
2915                        write_buffer: NZUsize!(1024 * 1024),
2916                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2917                    };
2918                    let engine = Engine::new(context.with_label("engine"), cfg);
2919                    engine.start(pending, recovered, resolver);
2920                }
2921            }
2922
2923            // Wait for all engines to finish
2924            let mut finalizers = Vec::new();
2925            for reporter in reporters.iter_mut() {
2926                let (mut latest, mut monitor) = reporter.subscribe().await;
2927                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
2928                    while latest < required_containers {
2929                        latest = monitor.next().await.expect("event missing");
2930                    }
2931                }));
2932            }
2933            join_all(finalizers).await;
2934
2935            // Check reporters for correct activity
2936            let byz = &participants[0];
2937            for reporter in reporters.iter() {
2938                // Ensure no faults
2939                {
2940                    let faults = reporter.faults.lock().unwrap();
2941                    assert!(faults.is_empty());
2942                }
2943
2944                // Ensure no invalid signatures
2945                {
2946                    let invalid = reporter.invalid.lock().unwrap();
2947                    assert_eq!(*invalid, 0);
2948                }
2949            }
2950
2951            // Ensure reconfigurer is blocked (epoch mismatch)
2952            let blocked = oracle.blocked().await.unwrap();
2953            assert!(!blocked.is_empty());
2954            for (a, b) in blocked {
2955                assert_ne!(&a, byz);
2956                assert_eq!(&b, byz);
2957            }
2958        });
2959    }
2960
2961    #[test_traced]
2962    #[ignore]
2963    fn test_reconfigurer() {
2964        for seed in 0..5 {
2965            reconfigurer(seed, bls12381_threshold::<MinPk, _>);
2966            reconfigurer(seed, bls12381_threshold::<MinSig, _>);
2967            reconfigurer(seed, bls12381_multisig::<MinPk, _>);
2968            reconfigurer(seed, bls12381_multisig::<MinSig, _>);
2969            reconfigurer(seed, ed25519);
2970        }
2971    }
2972
2973    fn nuller<S, F>(seed: u64, mut fixture: F)
2974    where
2975        S: Scheme<PublicKey = ed25519::PublicKey>,
2976        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
2977    {
2978        // Create context
2979        let n = 4;
2980        let required_containers = 50;
2981        let activity_timeout = 10;
2982        let skip_timeout = 5;
2983        let namespace = b"consensus".to_vec();
2984        let cfg = deterministic::Config::new()
2985            .with_seed(seed)
2986            .with_timeout(Some(Duration::from_secs(30)));
2987        let executor = deterministic::Runner::new(cfg);
2988        executor.start(|mut context| async move {
2989            // Create simulated network
2990            let (network, mut oracle) = Network::new(
2991                context.with_label("network"),
2992                Config {
2993                    max_size: 1024 * 1024,
2994                    disconnect_on_block: false,
2995                    tracked_peer_sets: None,
2996                },
2997            );
2998
2999            // Start network
3000            network.start();
3001
3002            // Register participants
3003            let Fixture {
3004                participants,
3005                schemes,
3006                ..
3007            } = fixture(&mut context, n);
3008            let mut registrations = register_validators(&mut oracle, &participants).await;
3009
3010            // Link all validators
3011            let link = Link {
3012                latency: Duration::from_millis(10),
3013                jitter: Duration::from_millis(1),
3014                success_rate: 1.0,
3015            };
3016            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3017
3018            // Create engines
3019            let relay = Arc::new(mocks::relay::Relay::new());
3020            let mut reporters = Vec::new();
3021            for (idx_scheme, validator) in participants.iter().enumerate() {
3022                // Create scheme context
3023                let context = context.with_label(&format!("validator-{}", *validator));
3024
3025                // Start engine
3026                let reporter_config = mocks::reporter::Config {
3027                    namespace: namespace.clone(),
3028                    participants: participants.clone().into(),
3029                    scheme: schemes[idx_scheme].clone(),
3030                };
3031                let reporter =
3032                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3033                let (pending, recovered, resolver) = registrations
3034                    .remove(validator)
3035                    .expect("validator should be registered");
3036                if idx_scheme == 0 {
3037                    let cfg = mocks::nuller::Config {
3038                        namespace: namespace.clone(),
3039                        scheme: schemes[idx_scheme].clone(),
3040                    };
3041                    let engine: mocks::nuller::Nuller<_, _, Sha256> =
3042                        mocks::nuller::Nuller::new(context.with_label("byzantine_engine"), cfg);
3043                    engine.start(pending);
3044                } else {
3045                    reporters.push(reporter.clone());
3046                    let application_cfg = mocks::application::Config {
3047                        hasher: Sha256::default(),
3048                        relay: relay.clone(),
3049                        me: validator.clone(),
3050                        propose_latency: (10.0, 5.0),
3051                        verify_latency: (10.0, 5.0),
3052                    };
3053                    let (actor, application) = mocks::application::Application::new(
3054                        context.with_label("application"),
3055                        application_cfg,
3056                    );
3057                    actor.start();
3058                    let blocker = oracle.control(validator.clone());
3059                    let cfg = config::Config {
3060                        scheme: schemes[idx_scheme].clone(),
3061                        blocker,
3062                        automaton: application.clone(),
3063                        relay: application.clone(),
3064                        reporter: reporter.clone(),
3065                        partition: validator.clone().to_string(),
3066                        mailbox_size: 1024,
3067                        epoch: 333,
3068                        namespace: namespace.clone(),
3069                        leader_timeout: Duration::from_secs(1),
3070                        notarization_timeout: Duration::from_secs(2),
3071                        nullify_retry: Duration::from_secs(10),
3072                        fetch_timeout: Duration::from_secs(1),
3073                        activity_timeout,
3074                        skip_timeout,
3075                        max_fetch_count: 1,
3076                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
3077                        fetch_concurrent: 1,
3078                        replay_buffer: NZUsize!(1024 * 1024),
3079                        write_buffer: NZUsize!(1024 * 1024),
3080                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3081                    };
3082                    let engine = Engine::new(context.with_label("engine"), cfg);
3083                    engine.start(pending, recovered, resolver);
3084                }
3085            }
3086
3087            // Wait for all engines to finish
3088            let mut finalizers = Vec::new();
3089            for reporter in reporters.iter_mut() {
3090                let (mut latest, mut monitor) = reporter.subscribe().await;
3091                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3092                    while latest < required_containers {
3093                        latest = monitor.next().await.expect("event missing");
3094                    }
3095                }));
3096            }
3097            join_all(finalizers).await;
3098
3099            // Check reporters for correct activity
3100            let byz = &participants[0];
3101            let mut count_nullify_and_finalize = 0;
3102            for reporter in reporters.iter() {
3103                // Ensure only faults for byz
3104                {
3105                    let faults = reporter.faults.lock().unwrap();
3106                    assert_eq!(faults.len(), 1);
3107                    let faulter = faults.get(byz).expect("byzantine party is not faulter");
3108                    for (_, faults) in faulter.iter() {
3109                        for fault in faults.iter() {
3110                            match fault {
3111                                Activity::NullifyFinalize(_) => {
3112                                    count_nullify_and_finalize += 1;
3113                                }
3114                                _ => panic!("unexpected fault: {fault:?}"),
3115                            }
3116                        }
3117                    }
3118                }
3119
3120                // Ensure no invalid signatures
3121                {
3122                    let invalid = reporter.invalid.lock().unwrap();
3123                    assert_eq!(*invalid, 0);
3124                }
3125            }
3126            assert!(count_nullify_and_finalize > 0);
3127
3128            // Ensure nullifier is blocked
3129            let blocked = oracle.blocked().await.unwrap();
3130            assert!(!blocked.is_empty());
3131            for (a, b) in blocked {
3132                assert_ne!(&a, byz);
3133                assert_eq!(&b, byz);
3134            }
3135        });
3136    }
3137
3138    #[test_traced]
3139    #[ignore]
3140    fn test_nuller() {
3141        for seed in 0..5 {
3142            nuller(seed, bls12381_threshold::<MinPk, _>);
3143            nuller(seed, bls12381_threshold::<MinSig, _>);
3144            nuller(seed, bls12381_multisig::<MinPk, _>);
3145            nuller(seed, bls12381_multisig::<MinSig, _>);
3146            nuller(seed, ed25519);
3147        }
3148    }
3149
3150    fn outdated<S, F>(seed: u64, mut fixture: F)
3151    where
3152        S: Scheme<PublicKey = ed25519::PublicKey>,
3153        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
3154    {
3155        // Create context
3156        let n = 4;
3157        let required_containers = 100;
3158        let activity_timeout = 10;
3159        let skip_timeout = 5;
3160        let namespace = b"consensus".to_vec();
3161        let cfg = deterministic::Config::new()
3162            .with_seed(seed)
3163            .with_timeout(Some(Duration::from_secs(30)));
3164        let executor = deterministic::Runner::new(cfg);
3165        executor.start(|mut context| async move {
3166            // Create simulated network
3167            let (network, mut oracle) = Network::new(
3168                context.with_label("network"),
3169                Config {
3170                    max_size: 1024 * 1024,
3171                    disconnect_on_block: false,
3172                    tracked_peer_sets: None,
3173                },
3174            );
3175
3176            // Start network
3177            network.start();
3178
3179            // Register participants
3180            let Fixture {
3181                participants,
3182                schemes,
3183                ..
3184            } = fixture(&mut context, n);
3185            let mut registrations = register_validators(&mut oracle, &participants).await;
3186
3187            // Link all validators
3188            let link = Link {
3189                latency: Duration::from_millis(10),
3190                jitter: Duration::from_millis(1),
3191                success_rate: 1.0,
3192            };
3193            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3194
3195            // Create engines
3196            let relay = Arc::new(mocks::relay::Relay::new());
3197            let mut reporters = Vec::new();
3198            for (idx_scheme, validator) in participants.iter().enumerate() {
3199                // Create scheme context
3200                let context = context.with_label(&format!("validator-{}", *validator));
3201
3202                // Start engine
3203                let reporter_config = mocks::reporter::Config {
3204                    namespace: namespace.clone(),
3205                    participants: participants.clone().into(),
3206                    scheme: schemes[idx_scheme].clone(),
3207                };
3208                let reporter =
3209                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3210                let (pending, recovered, resolver) = registrations
3211                    .remove(validator)
3212                    .expect("validator should be registered");
3213                if idx_scheme == 0 {
3214                    let cfg = mocks::outdated::Config {
3215                        scheme: schemes[idx_scheme].clone(),
3216                        namespace: namespace.clone(),
3217                        view_delta: activity_timeout * 4,
3218                    };
3219                    let engine: mocks::outdated::Outdated<_, _, Sha256> =
3220                        mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg);
3221                    engine.start(pending);
3222                } else {
3223                    reporters.push(reporter.clone());
3224                    let application_cfg = mocks::application::Config {
3225                        hasher: Sha256::default(),
3226                        relay: relay.clone(),
3227                        me: validator.clone(),
3228                        propose_latency: (10.0, 5.0),
3229                        verify_latency: (10.0, 5.0),
3230                    };
3231                    let (actor, application) = mocks::application::Application::new(
3232                        context.with_label("application"),
3233                        application_cfg,
3234                    );
3235                    actor.start();
3236                    let blocker = oracle.control(validator.clone());
3237                    let cfg = config::Config {
3238                        scheme: schemes[idx_scheme].clone(),
3239                        blocker,
3240                        automaton: application.clone(),
3241                        relay: application.clone(),
3242                        reporter: reporter.clone(),
3243                        partition: validator.clone().to_string(),
3244                        mailbox_size: 1024,
3245                        epoch: 333,
3246                        namespace: namespace.clone(),
3247                        leader_timeout: Duration::from_secs(1),
3248                        notarization_timeout: Duration::from_secs(2),
3249                        nullify_retry: Duration::from_secs(10),
3250                        fetch_timeout: Duration::from_secs(1),
3251                        activity_timeout,
3252                        skip_timeout,
3253                        max_fetch_count: 1,
3254                        fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
3255                        fetch_concurrent: 1,
3256                        replay_buffer: NZUsize!(1024 * 1024),
3257                        write_buffer: NZUsize!(1024 * 1024),
3258                        buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3259                    };
3260                    let engine = Engine::new(context.with_label("engine"), cfg);
3261                    engine.start(pending, recovered, resolver);
3262                }
3263            }
3264
3265            // Wait for all engines to finish
3266            let mut finalizers = Vec::new();
3267            for reporter in reporters.iter_mut() {
3268                let (mut latest, mut monitor) = reporter.subscribe().await;
3269                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3270                    while latest < required_containers {
3271                        latest = monitor.next().await.expect("event missing");
3272                    }
3273                }));
3274            }
3275            join_all(finalizers).await;
3276
3277            // Check reporters for correct activity
3278            for reporter in reporters.iter() {
3279                // Ensure no faults
3280                {
3281                    let faults = reporter.faults.lock().unwrap();
3282                    assert!(faults.is_empty());
3283                }
3284
3285                // Ensure no invalid signatures
3286                {
3287                    let invalid = reporter.invalid.lock().unwrap();
3288                    assert_eq!(*invalid, 0);
3289                }
3290            }
3291
3292            // Ensure no blocked connections
3293            let blocked = oracle.blocked().await.unwrap();
3294            assert!(blocked.is_empty());
3295        });
3296    }
3297
3298    #[test_traced]
3299    #[ignore]
3300    fn test_outdated() {
3301        for seed in 0..5 {
3302            outdated(seed, bls12381_threshold::<MinPk, _>);
3303            outdated(seed, bls12381_threshold::<MinSig, _>);
3304            outdated(seed, bls12381_multisig::<MinPk, _>);
3305            outdated(seed, bls12381_multisig::<MinSig, _>);
3306            outdated(seed, ed25519);
3307        }
3308    }
3309
3310    fn run_1k<S, F>(mut fixture: F)
3311    where
3312        S: Scheme<PublicKey = ed25519::PublicKey>,
3313        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
3314    {
3315        // Create context
3316        let n = 10;
3317        let required_containers = 1_000;
3318        let activity_timeout = 10;
3319        let skip_timeout = 5;
3320        let namespace = b"consensus".to_vec();
3321        let cfg = deterministic::Config::new();
3322        let executor = deterministic::Runner::new(cfg);
3323        executor.start(|mut context| async move {
3324            // Create simulated network
3325            let (network, mut oracle) = Network::new(
3326                context.with_label("network"),
3327                Config {
3328                    max_size: 1024 * 1024,
3329                    disconnect_on_block: false,
3330                    tracked_peer_sets: None,
3331                },
3332            );
3333
3334            // Start network
3335            network.start();
3336
3337            // Register participants
3338            let Fixture {
3339                participants,
3340                schemes,
3341                ..
3342            } = fixture(&mut context, n);
3343            let mut registrations = register_validators(&mut oracle, &participants).await;
3344
3345            // Link all validators
3346            let link = Link {
3347                latency: Duration::from_millis(80),
3348                jitter: Duration::from_millis(10),
3349                success_rate: 0.98,
3350            };
3351            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3352
3353            // Create engines
3354            let relay = Arc::new(mocks::relay::Relay::new());
3355            let mut reporters = Vec::new();
3356            let mut engine_handlers = Vec::new();
3357            for (idx, validator) in participants.iter().enumerate() {
3358                // Create scheme context
3359                let context = context.with_label(&format!("validator-{}", *validator));
3360
3361                // Configure engine
3362                let reporter_config = mocks::reporter::Config {
3363                    namespace: namespace.clone(),
3364                    participants: participants.clone().into(),
3365                    scheme: schemes[idx].clone(),
3366                };
3367                let reporter =
3368                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3369                reporters.push(reporter.clone());
3370                let application_cfg = mocks::application::Config {
3371                    hasher: Sha256::default(),
3372                    relay: relay.clone(),
3373                    me: validator.clone(),
3374                    propose_latency: (100.0, 50.0),
3375                    verify_latency: (50.0, 40.0),
3376                };
3377                let (actor, application) = mocks::application::Application::new(
3378                    context.with_label("application"),
3379                    application_cfg,
3380                );
3381                actor.start();
3382                let blocker = oracle.control(validator.clone());
3383                let cfg = config::Config {
3384                    scheme: schemes[idx].clone(),
3385                    blocker,
3386                    automaton: application.clone(),
3387                    relay: application.clone(),
3388                    reporter: reporter.clone(),
3389                    partition: validator.to_string(),
3390                    mailbox_size: 1024,
3391                    epoch: 333,
3392                    namespace: namespace.clone(),
3393                    leader_timeout: Duration::from_secs(1),
3394                    notarization_timeout: Duration::from_secs(2),
3395                    nullify_retry: Duration::from_secs(10),
3396                    fetch_timeout: Duration::from_secs(1),
3397                    activity_timeout,
3398                    skip_timeout,
3399                    max_fetch_count: 1,
3400                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
3401                    fetch_concurrent: 1,
3402                    replay_buffer: NZUsize!(1024 * 1024),
3403                    write_buffer: NZUsize!(1024 * 1024),
3404                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3405                };
3406                let engine = Engine::new(context.with_label("engine"), cfg);
3407
3408                // Start engine
3409                let (pending, recovered, resolver) = registrations
3410                    .remove(validator)
3411                    .expect("validator should be registered");
3412                engine_handlers.push(engine.start(pending, recovered, resolver));
3413            }
3414
3415            // Wait for all engines to finish
3416            let mut finalizers = Vec::new();
3417            for reporter in reporters.iter_mut() {
3418                let (mut latest, mut monitor) = reporter.subscribe().await;
3419                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3420                    while latest < required_containers {
3421                        latest = monitor.next().await.expect("event missing");
3422                    }
3423                }));
3424            }
3425            join_all(finalizers).await;
3426
3427            // Check reporters for correct activity
3428            for reporter in reporters.iter() {
3429                // Ensure no faults
3430                {
3431                    let faults = reporter.faults.lock().unwrap();
3432                    assert!(faults.is_empty());
3433                }
3434
3435                // Ensure no invalid signatures
3436                {
3437                    let invalid = reporter.invalid.lock().unwrap();
3438                    assert_eq!(*invalid, 0);
3439                }
3440            }
3441
3442            // Ensure no blocked connections
3443            let blocked = oracle.blocked().await.unwrap();
3444            assert!(blocked.is_empty());
3445        })
3446    }
3447
3448    #[test_traced]
3449    #[ignore]
3450    fn test_1k_bls12381_threshold_min_pk() {
3451        run_1k(bls12381_threshold::<MinPk, _>);
3452    }
3453
3454    #[test_traced]
3455    #[ignore]
3456    fn test_1k_bls12381_threshold_min_sig() {
3457        run_1k(bls12381_threshold::<MinSig, _>);
3458    }
3459
3460    #[test_traced]
3461    #[ignore]
3462    fn test_1k_bls12381_multisig_min_pk() {
3463        run_1k(bls12381_multisig::<MinPk, _>);
3464    }
3465
3466    #[test_traced]
3467    #[ignore]
3468    fn test_1k_bls12381_multisig_min_sig() {
3469        run_1k(bls12381_multisig::<MinSig, _>);
3470    }
3471
3472    #[test_traced]
3473    #[ignore]
3474    fn test_1k_ed25519() {
3475        run_1k(ed25519);
3476    }
3477
3478    fn children_shutdown_on_engine_abort<S, F>(mut fixture: F)
3479    where
3480        S: Scheme<PublicKey = ed25519::PublicKey>,
3481        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
3482    {
3483        // Create context
3484        let n = 1;
3485        let namespace = b"consensus".to_vec();
3486        let executor = deterministic::Runner::timed(Duration::from_secs(10));
3487        executor.start(|mut context| async move {
3488            // Create simulated network
3489            let (network, mut oracle) = Network::new(
3490                context.with_label("network"),
3491                Config {
3492                    max_size: 1024 * 1024,
3493                    disconnect_on_block: true,
3494                    tracked_peer_sets: None,
3495                },
3496            );
3497
3498            // Start network
3499            network.start();
3500
3501            // Register a single participant
3502            let Fixture {
3503                participants,
3504                schemes,
3505                ..
3506            } = fixture(&mut context, n);
3507            let mut registrations = register_validators(&mut oracle, &participants).await;
3508
3509            // Link the single validator to itself (no-ops for completeness)
3510            let link = Link {
3511                latency: Duration::from_millis(1),
3512                jitter: Duration::from_millis(0),
3513                success_rate: 1.0,
3514            };
3515            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3516
3517            // Create engine
3518            let reporter_config = mocks::reporter::Config {
3519                namespace: namespace.clone(),
3520                participants: participants.clone().into(),
3521                scheme: schemes[0].clone(),
3522            };
3523            let reporter =
3524                mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3525            let relay = Arc::new(mocks::relay::Relay::new());
3526            let application_cfg = mocks::application::Config {
3527                hasher: Sha256::default(),
3528                relay: relay.clone(),
3529                me: participants[0].clone(),
3530                propose_latency: (1.0, 0.0),
3531                verify_latency: (1.0, 0.0),
3532            };
3533            let (actor, application) = mocks::application::Application::new(
3534                context.with_label("application"),
3535                application_cfg,
3536            );
3537            actor.start();
3538            let blocker = oracle.control(participants[0].clone());
3539            let cfg = config::Config {
3540                scheme: schemes[0].clone(),
3541                blocker,
3542                automaton: application.clone(),
3543                relay: application.clone(),
3544                reporter: reporter.clone(),
3545                partition: participants[0].clone().to_string(),
3546                mailbox_size: 64,
3547                epoch: 333,
3548                namespace: namespace.clone(),
3549                leader_timeout: Duration::from_millis(50),
3550                notarization_timeout: Duration::from_millis(100),
3551                nullify_retry: Duration::from_millis(250),
3552                fetch_timeout: Duration::from_millis(50),
3553                activity_timeout: 4,
3554                skip_timeout: 2,
3555                max_fetch_count: 1,
3556                fetch_rate_per_peer: Quota::per_second(NZU32!(10)),
3557                fetch_concurrent: 1,
3558                replay_buffer: NZUsize!(1024 * 16),
3559                write_buffer: NZUsize!(1024 * 16),
3560                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3561            };
3562            let engine = Engine::new(context.with_label("engine"), cfg);
3563
3564            // Start engine
3565            let (pending, recovered, resolver) = registrations
3566                .remove(&participants[0])
3567                .expect("validator should be registered");
3568            let handle = engine.start(pending, recovered, resolver);
3569
3570            // Allow tasks to start
3571            context.sleep(Duration::from_millis(1000)).await;
3572
3573            // Verify that engine and child actors are running
3574            let metrics_before = context.encode();
3575            let is_running = |name: &str| -> bool {
3576                metrics_before.lines().any(|line| {
3577                    line.starts_with("runtime_tasks_running{")
3578                        && line.contains(&format!("name=\"{name}\""))
3579                        && line.contains("kind=\"Task\"")
3580                        && line.trim_end().ends_with(" 1")
3581                })
3582            };
3583            assert!(is_running("engine"));
3584            assert!(is_running("engine_batcher"));
3585            assert!(is_running("engine_voter"));
3586            assert!(is_running("engine_resolver"));
3587
3588            // Make sure the engine is still running
3589            context.sleep(Duration::from_millis(1000)).await;
3590            assert!(is_running("engine"));
3591
3592            // Abort engine and ensure children stop
3593            handle.abort();
3594            let _ = handle.await; // ensure parent tear-down runs
3595
3596            // Give the runtime a tick to process aborts
3597            context.sleep(Duration::from_millis(1000)).await;
3598
3599            let metrics_after = context.encode();
3600            let is_stopped = |name: &str| -> bool {
3601                // Either the gauge is 0, or the entry is absent (both imply not running)
3602                metrics_after.lines().any(|line| {
3603                    line.starts_with("runtime_tasks_running{")
3604                        && line.contains(&format!("name=\"{name}\""))
3605                        && line.contains("kind=\"Task\"")
3606                        && line.trim_end().ends_with(" 0")
3607                })
3608            };
3609            assert!(is_stopped("engine"));
3610            assert!(is_stopped("engine_batcher"));
3611            assert!(is_stopped("engine_voter"));
3612            assert!(is_stopped("engine_resolver"));
3613        });
3614    }
3615
3616    #[test_traced]
3617    fn test_children_shutdown_on_engine_abort() {
3618        children_shutdown_on_engine_abort(bls12381_threshold::<MinPk, _>);
3619        children_shutdown_on_engine_abort(bls12381_threshold::<MinSig, _>);
3620        children_shutdown_on_engine_abort(bls12381_multisig::<MinPk, _>);
3621        children_shutdown_on_engine_abort(bls12381_multisig::<MinSig, _>);
3622        children_shutdown_on_engine_abort(ed25519);
3623    }
3624
3625    fn attributable_reporter_filtering<S, F>(mut fixture: F)
3626    where
3627        S: Scheme<PublicKey = ed25519::PublicKey>,
3628        F: FnMut(&mut deterministic::Context, u32) -> Fixture<S>,
3629    {
3630        let n = 3;
3631        let required_containers = 10;
3632        let activity_timeout = 10;
3633        let skip_timeout = 5;
3634        let namespace = b"consensus".to_vec();
3635        let executor = deterministic::Runner::timed(Duration::from_secs(30));
3636        executor.start(|mut context| async move {
3637            // Create simulated network
3638            let (network, mut oracle) = Network::new(
3639                context.with_label("network"),
3640                Config {
3641                    max_size: 1024 * 1024,
3642                    disconnect_on_block: false,
3643                    tracked_peer_sets: None,
3644                },
3645            );
3646            network.start();
3647
3648            // Register participants
3649            let Fixture {
3650                participants,
3651                schemes,
3652                ..
3653            } = fixture(&mut context, n);
3654            let mut registrations = register_validators(&mut oracle, &participants).await;
3655
3656            // Link all validators
3657            let link = Link {
3658                latency: Duration::from_millis(10),
3659                jitter: Duration::from_millis(1),
3660                success_rate: 1.0,
3661            };
3662            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3663
3664            // Create engines with `AttributableReporter` wrapper
3665            let relay = Arc::new(mocks::relay::Relay::new());
3666            let mut reporters = Vec::new();
3667            for (idx, validator) in participants.iter().enumerate() {
3668                let context = context.with_label(&format!("validator-{}", *validator));
3669
3670                let reporter_config = mocks::reporter::Config {
3671                    namespace: namespace.clone(),
3672                    participants: participants.clone().into(),
3673                    scheme: schemes[idx].clone(),
3674                };
3675                let mock_reporter = mocks::reporter::Reporter::new(
3676                    context.with_label("mock_reporter"),
3677                    reporter_config,
3678                );
3679
3680                // Wrap with `AttributableReporter`
3681                let attributable_reporter = signing_scheme::reporter::AttributableReporter::new(
3682                    context.with_label("rng"),
3683                    schemes[idx].clone(),
3684                    namespace.clone(),
3685                    mock_reporter.clone(),
3686                    true, // Enable verification
3687                );
3688                reporters.push(mock_reporter.clone());
3689
3690                let application_cfg = mocks::application::Config {
3691                    hasher: Sha256::default(),
3692                    relay: relay.clone(),
3693                    me: validator.clone(),
3694                    propose_latency: (10.0, 5.0),
3695                    verify_latency: (10.0, 5.0),
3696                };
3697                let (actor, application) = mocks::application::Application::new(
3698                    context.with_label("application"),
3699                    application_cfg,
3700                );
3701                actor.start();
3702                let blocker = oracle.control(validator.clone());
3703                let cfg = config::Config {
3704                    scheme: schemes[idx].clone(),
3705                    blocker,
3706                    automaton: application.clone(),
3707                    relay: application.clone(),
3708                    reporter: attributable_reporter,
3709                    partition: validator.to_string(),
3710                    mailbox_size: 1024,
3711                    epoch: 333,
3712                    namespace: namespace.clone(),
3713                    leader_timeout: Duration::from_secs(1),
3714                    notarization_timeout: Duration::from_secs(2),
3715                    nullify_retry: Duration::from_secs(10),
3716                    fetch_timeout: Duration::from_secs(1),
3717                    activity_timeout,
3718                    skip_timeout,
3719                    max_fetch_count: 1,
3720                    fetch_rate_per_peer: Quota::per_second(NZU32!(1)),
3721                    fetch_concurrent: 1,
3722                    replay_buffer: NZUsize!(1024 * 1024),
3723                    write_buffer: NZUsize!(1024 * 1024),
3724                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3725                };
3726                let engine = Engine::new(context.with_label("engine"), cfg);
3727
3728                // Start engine
3729                let (pending, recovered, resolver) = registrations
3730                    .remove(validator)
3731                    .expect("validator should be registered");
3732                engine.start(pending, recovered, resolver);
3733            }
3734
3735            // Wait for all engines to finish
3736            let mut finalizers = Vec::new();
3737            for reporter in reporters.iter_mut() {
3738                let (mut latest, mut monitor) = reporter.subscribe().await;
3739                finalizers.push(context.with_label("finalizer").spawn(move |_| async move {
3740                    while latest < required_containers {
3741                        latest = monitor.next().await.expect("event missing");
3742                    }
3743                }));
3744            }
3745            join_all(finalizers).await;
3746
3747            // Verify filtering behavior based on scheme attributability
3748            for reporter in reporters.iter() {
3749                // Ensure no faults (normal operation)
3750                {
3751                    let faults = reporter.faults.lock().unwrap();
3752                    assert!(faults.is_empty(), "No faults should be reported");
3753                }
3754
3755                // Ensure no invalid signatures
3756                {
3757                    let invalid = reporter.invalid.lock().unwrap();
3758                    assert_eq!(*invalid, 0, "No invalid signatures");
3759                }
3760
3761                // Check that we have certificates reported
3762                {
3763                    let notarizations = reporter.notarizations.lock().unwrap();
3764                    let finalizations = reporter.finalizations.lock().unwrap();
3765                    assert!(
3766                        !notarizations.is_empty() || !finalizations.is_empty(),
3767                        "Certificates should be reported"
3768                    );
3769                }
3770
3771                // Check notarizes
3772                let notarizes = reporter.notarizes.lock().unwrap();
3773                let last_view = notarizes.keys().max().cloned().unwrap_or_default();
3774                for (view, payloads) in notarizes.iter() {
3775                    if *view == last_view {
3776                        continue; // Skip last view
3777                    }
3778
3779                    let signers: usize = payloads.values().map(|signers| signers.len()).sum();
3780
3781                    // For attributable schemes, we should see peer activities
3782                    if schemes[0].is_attributable() {
3783                        assert!(signers > 1, "view {view}: {signers}");
3784                    } else {
3785                        // For non-attributable, we shouldn't see any peer activities
3786                        assert_eq!(signers, 0);
3787                    }
3788                }
3789
3790                // Check finalizes
3791                let finalizes = reporter.finalizes.lock().unwrap();
3792                for (_, payloads) in finalizes.iter() {
3793                    let signers: usize = payloads.values().map(|signers| signers.len()).sum();
3794
3795                    // For attributable schemes, we should see peer activities
3796                    if schemes[0].is_attributable() {
3797                        assert!(signers > 1);
3798                    } else {
3799                        // For non-attributable, we shouldn't see any peer activities
3800                        assert_eq!(signers, 0);
3801                    }
3802                }
3803            }
3804
3805            // Ensure no blocked connections (normal operation)
3806            let blocked = oracle.blocked().await.unwrap();
3807            assert!(blocked.is_empty());
3808        });
3809    }
3810
3811    #[test_traced]
3812    fn test_attributable_reporter_filtering() {
3813        attributable_reporter_filtering(bls12381_threshold::<MinPk, _>);
3814        attributable_reporter_filtering(bls12381_threshold::<MinSig, _>);
3815        attributable_reporter_filtering(bls12381_multisig::<MinPk, _>);
3816        attributable_reporter_filtering(bls12381_multisig::<MinSig, _>);
3817        attributable_reporter_filtering(ed25519);
3818    }
3819
3820    fn tle<V: Variant>() {
3821        // Create context
3822        let n = 4;
3823        let namespace = b"consensus".to_vec();
3824        let activity_timeout = 100;
3825        let skip_timeout = 50;
3826        let executor = deterministic::Runner::timed(Duration::from_secs(30));
3827        executor.start(|mut context| async move {
3828            // Create simulated network
3829            let (network, mut oracle) = Network::new(
3830                context.with_label("network"),
3831                Config {
3832                    max_size: 1024 * 1024,
3833                    disconnect_on_block: false,
3834                    tracked_peer_sets: None,
3835                },
3836            );
3837
3838            // Start network
3839            network.start();
3840
3841            // Register participants
3842            let Fixture {
3843                participants,
3844                schemes,
3845                ..
3846            } = bls12381_threshold::<V, _>(&mut context, n);
3847            let mut registrations = register_validators(&mut oracle, &participants).await;
3848
3849            // Link all validators
3850            let link = Link {
3851                latency: Duration::from_millis(10),
3852                jitter: Duration::from_millis(5),
3853                success_rate: 1.0,
3854            };
3855            link_validators(&mut oracle, &participants, Action::Link(link), None).await;
3856
3857            // Create engines and reporters
3858            let relay = Arc::new(mocks::relay::Relay::new());
3859            let mut reporters = Vec::new();
3860            let mut engine_handlers = Vec::new();
3861            let monitor_reporter = Arc::new(Mutex::new(None));
3862            for (idx, validator) in participants.iter().enumerate() {
3863                // Create scheme context
3864                let context = context.with_label(&format!("validator-{}", *validator));
3865
3866                // Store first reporter for monitoring
3867                let reporter_config = mocks::reporter::Config {
3868                    namespace: namespace.clone(),
3869                    participants: participants.clone().into(),
3870                    scheme: schemes[idx].clone(),
3871                };
3872                let reporter =
3873                    mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_config);
3874                reporters.push(reporter.clone());
3875                if idx == 0 {
3876                    *monitor_reporter.lock().unwrap() = Some(reporter.clone());
3877                }
3878
3879                // Configure application
3880                let application_cfg = mocks::application::Config {
3881                    hasher: Sha256::default(),
3882                    relay: relay.clone(),
3883                    me: validator.clone(),
3884                    propose_latency: (10.0, 5.0),
3885                    verify_latency: (10.0, 5.0),
3886                };
3887                let (actor, application) = mocks::application::Application::new(
3888                    context.with_label("application"),
3889                    application_cfg,
3890                );
3891                actor.start();
3892                let blocker = oracle.control(validator.clone());
3893                let cfg = config::Config {
3894                    scheme: schemes[idx].clone(),
3895                    blocker,
3896                    automaton: application.clone(),
3897                    relay: application.clone(),
3898                    reporter: reporter.clone(),
3899                    partition: validator.to_string(),
3900                    mailbox_size: 1024,
3901                    epoch: 333,
3902                    namespace: namespace.clone(),
3903                    leader_timeout: Duration::from_millis(100),
3904                    notarization_timeout: Duration::from_millis(200),
3905                    nullify_retry: Duration::from_millis(500),
3906                    fetch_timeout: Duration::from_millis(100),
3907                    activity_timeout,
3908                    skip_timeout,
3909                    max_fetch_count: 1,
3910                    fetch_rate_per_peer: Quota::per_second(NZU32!(10)),
3911                    fetch_concurrent: 1,
3912                    replay_buffer: NZUsize!(1024 * 1024),
3913                    write_buffer: NZUsize!(1024 * 1024),
3914                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
3915                };
3916                let engine = Engine::new(context.with_label("engine"), cfg);
3917
3918                // Start engine
3919                let (pending, recovered, resolver) = registrations
3920                    .remove(validator)
3921                    .expect("validator should be registered");
3922                engine_handlers.push(engine.start(pending, recovered, resolver));
3923            }
3924
3925            // Prepare TLE test data
3926            let target = Round::new(333, 10); // Encrypt for view 10
3927            let message_content = b"Secret message for future view10"; // 32 bytes
3928            let message = Block::new(*message_content);
3929
3930            // Encrypt message for future view using threshold public key
3931            let seed_namespace = seed_namespace(&namespace);
3932            let ciphertext = encrypt::<_, V>(
3933                &mut context,
3934                *schemes[0].identity(),
3935                (Some(&seed_namespace), &target.encode()),
3936                &message,
3937            );
3938
3939            // Wait for consensus to reach the target view and then decrypt
3940            let reporter = monitor_reporter.lock().unwrap().clone().unwrap();
3941            loop {
3942                // Wait for notarization
3943                context.sleep(Duration::from_millis(100)).await;
3944                let notarizations = reporter.notarizations.lock().unwrap();
3945                let Some(notarization) = notarizations.get(&target.view()) else {
3946                    continue;
3947                };
3948
3949                // Decrypt the message using the seed signature
3950                let seed_signature = notarization.certificate.seed_signature;
3951                let decrypted = decrypt::<V>(&seed_signature, &ciphertext)
3952                    .expect("Decryption should succeed with valid seed signature");
3953                assert_eq!(
3954                    message.as_ref(),
3955                    decrypted.as_ref(),
3956                    "Decrypted message should match original message"
3957                );
3958                break;
3959            }
3960        });
3961    }
3962
3963    #[test_traced]
3964    fn test_tle() {
3965        tle::<MinPk>();
3966        tle::<MinSig>();
3967    }
3968}